Tag Archives: Data Science

A Recap of the Data Engineering Open Forum at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/a-recap-of-the-data-engineering-open-forum-at-netflix-6b4d4410b88f

A summary of sessions at the first Data Engineering Open Forum at Netflix on April 18th, 2024

The Data Engineering Open Forum at Netflix on April 18th, 2024.

At Netflix, we aspire to entertain the world, and our data engineering teams play a crucial role in this mission by enabling data-driven decision-making at scale. Netflix is not the only place where data engineers are solving challenging problems with creative solutions. On April 18th, 2024, we hosted the inaugural Data Engineering Open Forum at our Los Gatos office, bringing together data engineers from various industries to share, learn, and connect.

At the conference, our speakers share their unique perspectives on modern developments, immediate challenges, and future prospects of data engineering. We are excited to share the recordings of talks from the conference with the rest of the world.

Opening Remarks

Recording

Speaker: Max Schmeiser (Vice President of Studio and Content Data Science & Engineering)

Summary: Max Schmeiser extends a warm welcome to all attendees, marking the beginning of our inaugural Data Engineering Open Forum.

Evolving from Rule-based Classifier: Machine Learning Powered Auto Remediation in Netflix Data Platform

Recording

Speakers:

Summary: At Netflix, hundreds of thousands of workflows and millions of jobs are running every day on our big data platform, but diagnosing and remediating job failures can impose considerable operational burdens. To handle errors efficiently, Netflix developed a rule-based classifier for error classification called “Pensive.” However, as the system has increased in scale and complexity, Pensive has been facing challenges due to its limited support for operational automation, especially for handling memory configuration errors and unclassified errors. To address these challenges, we have developed a new feature called “Auto Remediation,” which integrates the rules-based classifier with an ML service.

Automating the Data Architect: Generative AI for Enterprise Data Modeling

Recording

Speaker: Jide Ogunjobi (Founder & CTO at Context Data)

Summary: As organizations accumulate ever-larger stores of data across disparate systems, efficiently querying and gaining insights from enterprise data remain ongoing challenges. To address this, we propose developing an intelligent agent that can automatically discover, map, and query all data within an enterprise. This “Enterprise Data Model/Architect Agent” employs generative AI techniques for autonomous enterprise data modeling and architecture.

Tulika Bhatt, Senior Data Engineer at Netflix, shared how her team manages impression data at scale.

Real-Time Delivery of Impressions at Scale

Recording

Speaker: Tulika Bhatt (Senior Data Engineer at Netflix)

Summary: Netflix generates approximately 18 billion impressions daily. These impressions significantly influence a viewer’s browsing experience, as they are essential for powering video ranker algorithms and computing adaptive pages, With the evolution of user interfaces to be more responsive to in-session interactions, coupled with the growing demand for real-time adaptive recommendations, it has become highly imperative that these impressions are provided on a near real-time basis. This talk will delve into the creative solutions Netflix deploys to manage this high-volume, real-time data requirement while balancing scalability and cost.

Reflections on Building a Data Platform From the Ground Up in a Post-GDPR World

Recording

Speaker: Jessica Larson (Data Engineer & Author of “Snowflake Access Control”)

Summary: The requirements for creating a new data warehouse in the post-GDPR world are significantly different from those of the pre-GDPR world, such as the need to prioritize sensitive data protection and regulatory compliance over performance and cost. In this talk, Jessica Larson shares her takeaways from building a new data platform post-GDPR.

Unbundling the Data Warehouse: The Case for Independent Storage

Recording

Speaker: Jason Reid (Co-founder & Head of Product at Tabular)

Summary: Unbundling a data warehouse means splitting it into constituent and modular components that interact via open standard interfaces. In this talk, Jason Reid discusses the pros and cons of both data warehouse bundling and unbundling in terms of performance, governance, and flexibility, and he examines how the trend of data warehouse unbundling will impact the data engineering landscape in the next 5 years.

Clark Wright, Staff Analytics Engineer at Airbnb, talked about the concept of Data Quality Score at Airbnb.

Data Quality Score: How We Evolved the Data Quality Strategy at Airbnb

Recording

Speaker: Clark Wright (Staff Analytics Engineer at Airbnb)

Summary: Recently, Airbnb published a post to their Tech Blog called Data Quality Score: The next chapter of data quality at Airbnb. In this talk, Clark Wright shares the narrative of how data practitioners at Airbnb recognized the need for higher-quality data and then proposed, conceptualized, and launched Airbnb’s first Data Quality Score.

Data Productivity at Scale

Recording

Speaker: Iaroslav Zeigerman (Co-Founder and Chief Architect at Tobiko Data)

Summary: The development and evolution of data pipelines are hindered by outdated tooling compared to software development. Creating new development environments is cumbersome: Populating them with data is compute-intensive, and the deployment process is error-prone, leading to higher costs, slower iteration, and unreliable data. SQLMesh, an open-source project born from our collective experience at companies like Airbnb, Apple, Google, and Netflix, is designed to handle the complexities of evolving data pipelines at an internet scale. In this talk, Iaroslav Zeigerman discusses challenges faced by data practitioners today and how core SQLMesh concepts solve them.

Last but not least, thank you to the organizers of the Data Engineering Open Forum: Chris Colburn, Xinran Waibel, Jai Balani, Rashmi Shamprasad, and Patricia Ho.

Until next time!

If you are interested in attending a future Data Engineering Open Forum, we highly recommend you join our Google Group to stay tuned to event announcements.


A Recap of the Data Engineering Open Forum at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Round 2: A Survey of Causal Inference Applications at Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/round-2-a-survey-of-causal-inference-applications-at-netflix-fd78328ee0bb

At Netflix, we want to ensure that every current and future member finds content that thrills them today and excites them to come back for more. Causal inference is an essential part of the value that Data Science and Engineering adds towards this mission. We rely heavily on both experimentation and quasi-experimentation to help our teams make the best decisions for growing member joy.

Building off of our last successful Causal Inference and Experimentation Summit, we held another week-long internal conference this year to learn from our stunning colleagues. We brought together speakers from across the business to learn about methodological developments and innovative applications.

We covered a wide range of topics and are excited to share five talks from that conference with you in this post. This will give you a behind the scenes look at some of the causal inference research happening at Netflix!

Metrics Projection for Growth A/B Tests

Mihir Tendulkar, Simon Ejdemyr, Dhevi Rajendran, David Hubbard, Arushi Tomar, Steve Beckett, Judit Lantos, Cody Chapman, Ayal Chen-Zion, Apoorva Lal, Ekrem Kocaguneli, Kyoko Shimada

Experimentation is in Netflix’s DNA. When we launch a new product feature, we use — where possible — A/B test results to estimate the annualized incremental impact on the business.

Historically, that estimate has come from our Finance, Strategy, & Analytics (FS&A) partners. For each test cell in an experiment, they manually forecast signups, retention probabilities, and cumulative revenue on a one year horizon, using monthly cohorts. The process can be repetitive and time consuming.

We decided to build out a faster, automated approach that boils down to estimating two pieces of missing data. When we run an A/B test, we might allocate users for one month, and monitor results for only two billing periods. In this simplified example, we have one member cohort, and we have two billing period treatment effects (𝜏.cohort1,period1 and 𝜏.cohort1,period2, which we will shorten to 𝜏.1,1 and 𝜏.1,2, respectively).

To measure annualized impact, we need to estimate:

  1. Unobserved billing periods. For the first cohort, we don’t have treatment effects (TEs) for their third through twelfth billing periods (𝜏.1,j , where j = 3…12).
  2. Unobserved sign up cohorts. We only observed one monthly signup cohort, and there are eleven more cohorts in a year. We need to know both the size of these cohorts, and their TEs (𝜏.i,j, where i = 2…12 and j = 1…12).

For the first piece of missing data, we used a surrogate index approach. We make a standard assumption that the causal path from the treatment to the outcome (in this case, Revenue) goes through the surrogate of retention. We leverage our proprietary Retention Model and short-term observations — in the above example, 𝜏.1,2 — to estimate 𝜏.1,j , where j = 3…12.

For the second piece of missing data, we assume transportability: that each subsequent cohort’s billing-period TE is the same as the first cohort’s TE. Note that if you have long-running A/B tests, this is a testable assumption!

Fig. 1: Monthly cohort-based activity as measured in an A/B test. In green, we show the allocation window throughout January, while blue represents the January cohort’s observation window. From this, we can directly observe 𝜏.1 and 𝜏.2, and we can project later 𝜏.j forward using the surrogate-based approach. We can transport values from observed cohorts to unobserved cohorts.

Now, we can put the pieces together. For the first cohort, we project TEs forward. For unobserved cohorts, we transport the TEs from the first cohort and collapse our notation to remove the cohort index: 𝜏.1,1 is now written as just 𝜏.1. We estimate the annualized impact by summing the values from each cohort.

We empirically validated our results from this method by comparing to long-running AB tests and prior results from our FS&A partners. Now we can provide quicker and more accurate estimates of the longer term value our product features are delivering to members.

A Systematic Framework for Evaluating Game Events

Claire Willeck, Yimeng Tang

In Netflix Games DSE, we are asked many causal inference questions after an intervention has been implemented. For example, how did a product change impact a game’s performance? Or how did a player acquisition campaign impact a key metric?

While we would ideally conduct AB tests to measure the impact of an intervention, it is not always practical to do so. In the first scenario above, A/B tests were not planned before the intervention’s launch, so we needed to use observational causal inference to assess its effectiveness. In the second scenario, the campaign is at the country level, meaning everyone in the country is in the treatment group, which makes traditional A/B tests inviable.

To evaluate the impacts of various game events and updates and to help our team scale, we designed a framework and package around variations of synthetic control.

For most questions in Games, we have game-level or country-level interventions and relatively little data. This means most pre-existing packages that rely on time-series forecasting, unit-level data, or instrumental variables are not useful.

Our framework utilizes a variety of synthetic control (SC) models, including Augmented SC, Robust SC, Penalized SC, and synthetic difference-in-differences, since different approaches can work best in different cases. We utilize a scale-free metric to evaluate the performance of each model and select the one that minimizes pre-treatment bias. Additionally, we conduct robustness tests like backdating and apply inference measures based on the number of control units.

Fig. 2: Example of Augmented Synthetic Control model used to reduce pre-treatment bias by fitting the model in the training period and evaluating performance in the validation period. In this example, the Augmented Synthetic Control model reduced the pre-treatment bias in the validation period more than the other synthetic control variations.

This framework and package allows our team, and other teams, to tackle a broad set of causal inference questions using a consistent approach.

Double Machine Learning for Weighing Metrics Tradeoffs

Apoorva Lal, Winston Chou, Jordan Schafer

As Netflix expands into new business verticals, we’re increasingly seeing examples of metric tradeoffs in A/B tests — for example, an increase in games metrics may occur alongside a decrease in streaming metrics. To help decision-makers navigate scenarios where metrics disagree, we developed a method to compare the relative importance of different metrics (viewed as “treatments”) in terms of their causal effect on the north-star metric (Retention) using Double Machine Learning (DML).

In our first pass at this problem, we found that ranking treatments according to their Average Treatment Effects using DML with a Partially Linear Model (PLM) could yield an incorrect ranking when treatments have different marginal distributions. The PLM ranking would be correct if treatment effects were constant and additive. However, when treatment effects are heterogeneous, PLM upweights the effects for members whose treatment values are most unpredictable. This is problematic for comparing treatments with different baselines.

Instead, we discretized each treatment into bins and fit a multiclass propensity score model. This lets us estimate multiple Average Treatment Effects (ATEs) using Augmented Inverse-Propensity-Weighting (AIPW) to reflect different treatment contrasts, for example the effect of low versus high exposure.

We then weight these treatment effects by the baseline distribution. This yields an “apples-to-apples” ranking of treatments based on their ATE on the same overall population.

Fig. 3: Comparison of PLMs vs. AIPW in estimating treatment effects. Because PLMs do not estimate average treatment effects when effects are heterogeneous, they do not rank metrics by their Average Treatment Effects, whereas AIPW does.

In the example above, we see that PLM ranks Treatment 1 above Treatment 2, while AIPW correctly ranks the treatments in order of their ATEs. This is because PLM upweights the Conditional Average Treatment Effect for units that have more unpredictable treatment assignment (in this example, the group defined by x = 1), whereas AIPW targets the ATE.

Survey AB Tests with Heterogeneous Non-Response Bias

Andreas Aristidou, Carolyn Chu

To improve the quality and reach of Netflix’s survey research, we leverage a research-on-research program that utilizes tools such as survey AB tests. Such experiments allow us to directly test and validate new ideas like providing incentives for survey completion, varying the invitation’s subject-line, message design, time-of-day to send, and many other things.

In our experimentation program we investigate treatment effects on not only primary success metrics, but also on guardrail metrics. A challenge we face is that, in many of our tests, the intervention (e.g. providing higher incentives) and success metrics (e.g. percent of invited members who begin the survey) are upstream of guardrail metrics such as answers to specific questions designed to measure data quality (e.g. survey straightlining).

In such a case, the intervention may (and, in fact, we expect it to) distort upstream metrics (especially sample mix), the balance of which is a necessary component for the identification of our downstream guardrail metrics. This is a consequence of non-response bias, a common external validity concern with surveys that impacts how generalizable the results can be.

For example, if one group of members — group X — responds to our survey invitations at a significantly lower rate than another group — group Y — , then average treatment effects will be skewed towards the behavior of group Y. Further, in a survey AB test, the type of non-response bias can differ between control and treatment groups (e.g. different groups of members may be over/under represented in different cells of the test), thus threatening the internal validity of our test by introducing a covariate imbalance. We call this combination heterogeneous non-response bias.

To overcome this identification problem and investigate treatment effects on downstream metrics, we leverage a combination of several techniques. First, we look at conditional average treatment effects (CATE) for particular sub-populations of interest where confounding covariates are balanced in each strata.

In order to examine the average treatment effects, we leverage a combination of propensity scores to correct for internal validity issues and iterative proportional fitting to correct for external validity issues. With these techniques, we can ensure that our surveys are of the highest quality and that they accurately represent our members’ opinions, thus helping us build products that they want to see.

Design: The Intersection of Humans and Technology

Rina Chang

A design talk at a causal inference conference? Why, yes! Because design is about how a product works, it is fundamentally interwoven into the experimentation platform at Netflix. Our product serves the huge variety of internal users at Netflix who run — and consume the results of — A/B tests. Thus, choosing how to enable our users to take action and how we present data in the product is critical to decision-making via experimentation.

If you were to display some numbers and text, you might opt to show it in a tabular format.

While there is nothing inherently wrong with this presentation, it is not as easily digested as something more visual.

If your goal is to illustrate that those three numbers add up to 100%, and thus are parts of a whole, then you might choose a pie chart.

If you wanted to show how these three numbers combine to illustrate progress toward a goal, then you might choose a stacked bar chart.

Alternatively, if your goal was to compare these three numbers against each other, then you might choose a bar chart instead.

All of these show the same information, but the choice of presentation changes how easily a consumer of an infographic understands the “so what?” of the point you’re trying to convey. Note that there is no “right” solution here; rather, it depends on the desired takeaway.

Thoughtful design applies not only to static representations of data, but also to interactive experiences. In this example, a single item within a long form could be represented by having a pre-filled value.

Alternatively, the same functionality could be achieved by displaying a default value in text, with the ability to edit it.

While functionally equivalent, this UI change shifts the user’s narrative from “Is this value correct?” to “Do I need to do something that is not ‘normal’?” — which is a much easier question to answer. Zooming out even more, thoughtful design addresses product-level choices like if a person knows where to go to accomplish a task. In general, thoughtful design influences product strategy.

Design permeates all aspects of our experimentation product at Netflix, from small choices like color to strategic choices like our roadmap. By thoughtfully approaching design, we can ensure that tools help the team learn the most from our experiments.

External Speaker: Kosuke Imai

In addition to the amazing talks by Netflix employees, we also had the privilege of hearing from Kosuke Imai, Professor of Government and Statistics at Harvard, who delivered our keynote talk. He introduced the “cram method,” a powerful and efficient approach to learning and evaluating treatment policies using generic machine learning algorithms.

Measuring causality is a large part of the data science culture at Netflix, and we are proud to have many stunning colleagues who leverage both experimentation and quasi-experimentation to drive member impact. The conference was a great way to celebrate each other’s work and highlight the ways in which causal methodology can create value for the business.

To stay up to date on our work, follow the Netflix Tech Blog, and if you are interested in joining us, we are currently looking for new stunning colleagues to help us entertain the world!


Round 2: A Survey of Causal Inference Applications at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Ensuring data reliability and observability in risk systems

Post Syndicated from Grab Tech original https://engineering.grab.com/data-observability

Grab has an in-house Risk Management platform called GrabDefence which relies on ingesting large amounts of data gathered from upstream services to power our heuristic risk rules and data science models in real time.

Fig 1. GrabDefence aggregates data from different upstream services

As Grab’s business grows, so does the amount of data. It becomes imperative that the data which fuels our risk systems is of reliable quality as any data discrepancy or missing data could impact fraud detection and prevention capabilities.

We need to quickly detect any data anomalies, which is where data observability comes in.

Data observability as a solution

Data observability is a type of data operation (DataOps; similar to DevOps) where teams build visibility over the health and quality of their data pipelines. This enables teams to be notified of data quality issues, and allows teams to investigate and resolve these issues faster.

We needed a solution that addresses the following issues:

  1. Alerts for any data quality issues as soon as possible – so this means the observability tool had to work in real time.
  2. With hundreds of data points to observe, we needed a neat and scalable solution which allows users to quickly pinpoint which data points were having issues.
  3. A consistent way to compare, analyse, and compute data that might have different formats.

Hence, we decided to use Flink to standardise data transformations, compute, and observe data trends quickly (in real time) and scalably.

Flink SQL is a powerful, flexible tool for performing real-time analytics on streaming data. It allows users to query continuous data streams using standard SQL syntax, enabling complex event processing and data transformation within the Apache Flink ecosystem, which is particularly useful for scenarios requiring low-latency insights and decisions.

In Grab, data comes from multiple sources and while most of the data is in JSON format, the actual JSON structure differs between services. Because of JSON’s nested and dynamic data structure, it is difficult to consistently analyse the data – posing a significant challenge for real-time analysis.

To help address this issue, Apache Flink SQL has the capability to manage such intricacies with ease. It offers specialised functions tailored for parsing and querying JSON data, ensuring efficient processing.

Another standout feature of Flink SQL is the use of custom table functions, such as JSONEXPLOAD, which serves to deconstruct and flatten nested JSON structures into tabular rows. This transformation is crucial as it enables subsequent aggregation operations. By implementing a 5-minute tumbling window, Flink SQL can easily aggregate these now-flattened data streams. This technique is pivotal for monitoring, observing, and analysing data patterns and metrics in near real-time.

Now that data is aggregated by Flink for easy analysis, we still needed a way to incorporate comprehensive monitoring so that teams could be notified of any data anomalies or discrepancies in real time.

How we interfaced the output with Datadog 

Datadog is the observability tool of choice in Grab, with many teams using Datadog for their service reliability observations and alerts. By aggregating data from Apache Flink and integrating it with Datadog, we can harness the synergy of real-time analytics and comprehensive monitoring. Flink excels in processing and aggregating data streams, which, when pushed to Datadog, can be further analysed and visualised. Datadog also provides seamless integration with collaboration tools like Slack, which enables teams to receive instant notifications and alerts.

With Datadog’s out-of-the-box features such as anomaly detection, teams can identify and be alerted to unusual patterns or outliers in their data streams. Taking a proactive approach to monitoring is crucial in maintaining system health and performance as teams can be alerted, then collaborate quickly to diagnose and address anomalies.

This integrated pipeline—from Flink’s real-time data aggregation to Datadog’s monitoring and Slack’s communication capabilities—creates a robust framework for real-time data operations. It ensures that any potential issues are quickly traced and brought to the team’s attention, facilitating a rapid response. Such an ecosystem empowers organisations to maintain high levels of system reliability and performance, ultimately enhancing the overall user experience.

Organising monitors and alerts using out-of-the-box solutions from Datadog

Once we integrated Flink data into Datadog, we realised that it could become unwieldy to try to identify the data point with issues from hundreds of other counters.

Fig 2. Hundreds of data points on a graph make it hard to decipher which ones have issues

We decided to organise the counters according to the service stream it was coming from, and create individual monitors for each service stream. We used Datadog’s Monitor Summary tool to help visualise the total number of service streams we are reading from and the number of underlying data points within each stream.  

Fig 3. Data is grouped according to their source stream

Within each individual stream, we used Datadog’s Anomaly Detection feature to create an alert whenever a data point from the stream exceeds a predefined threshold. This can be configured by the service teams on Datadog.

Fig 4. Datadog’s built-in Anomaly Detection function triggers alerts whenever a data point exceeds a threshold

These alerts are then sent to a Slack channel where the Data team is informed when a data point of interest starts throwing anomalous values.

Fig 5. Datadog integration with Slack to help alert users

Impact

Since the deployment of this data observability tool, we have seen significant improvement in the detection of anomalous values. If there are any anomalies or issues, we now get alerts within the same day (or hour) instead of days to weeks later.

Organising the alerts according to source streams have also helped simplify the monitoring load and allows users to quickly narrow down and identify which pipeline has failed.

What’s next?

At the moment, this data observability tool is only implemented on selected checkpoints in GrabDefence. We plan to expand the observability tool’s coverage to include more checkpoints, and continue to refine the workflows to detect and resolve these data issues.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Grab Experiment Decision Engine – a Unified Toolkit for Experimentation

Post Syndicated from Grab Tech original https://engineering.grab.com/grabx-decision-engine

Introduction

This article introduces the GrabX Decision Engine, an internal open-source package that offers a comprehensive framework for designing and analysing experiments conducted on online experiment platforms. The package encompasses a wide range of functionalities, including a pre-experiment advisor, a post-experiment analysis toolbox, and other advanced tools. In this article, we explore the motivation behind the development of these functionalities, their integration into the unique ecosystem of Grab’s multi-sided marketplace, and how these solutions strengthen the culture and calibre of experimentation at Grab.

Background

Today, Grab’s Experimentation (GrabX) platform orchestrates the testing of thousands of experimental variants each week. As the platform continues to expand and manage a growing volume of experiments, the need for dependable, scalable, and trustworthy experimentation tools becomes increasingly critical for data-driven and evidence-based
decision-making.

In our previous article, we presented the Automated Experiment Analysis application, a tool designed to automate data pipelines for analyses. However, during the development of this application for Grab’s experimenter community, we noticed a prevailing trend: experiments were predominantly analysed on a one-by-one, manual basis. While such a federated approach may be needed in a few cases, it presents numerous challenges at
the organisational level:

  • Lack of a contextual toolkit: GrabX facilitates executing a diverse range of experimentation designs, catering to the varied needs and contexts of different tech teams across the organisation. However, experimenters may often rely on generic online tools for experiment configurations (e.g. sample size calculations), which were not specifically designed to cater to the nuances of GrabX experiments or the recommended evaluation method, given the design. This is exacerbated by the fact
    that most online tutorials or courses on experimental design do not typically address the nuances of multi-sided marketplaces, and cannot consider the nature or constraints of specific experiments.
  • Lack of standards: In this federated model, the absence of standardised and vetted practices can lead to reliability issues. In some cases, these can include poorly designed experiments, inappropriate evaluation methods, suboptimal testing choices, and unreliable inferences, all of which are difficult to monitor and rectify.
  • Lack of scalability and efficiency: Experimenters, coming from varied backgrounds and possessing distinct skill sets, may adopt significantly different approaches to experimentation and inference. This diversity, while valuable, often impedes the transferability and sharing of methods, hindering a cohesive and scalable experimentation framework. Additionally, this variance in methods can extend the lifecycle of experiment analysis, as disagreements over approaches may give rise to
    repeated requests for review or modification.

Solution

To address these challenges, we developed the GrabX Decision Engine, a Python package open-sourced internally across all of Grab’s development platforms. Its central objective is to institutionalise best practices in experiment efficiency and analytics, thereby ensuring the derivation of precise and reliable conclusions from each experiment.

In particular, this unified toolkit significantly enhances our end-to-end experimentation processes by:

  • Ensuring compatibility with GrabX and Automated Experiment Analysis: The package is fully integrated with the Automated Experiment Analysis app, and provides analytics and test results tailored to the designs supported by GrabX. The outcomes can be further used for other downstream jobs, e.g. market modelling, simulation-based calibrations, or auto-adaptive configuration tuning.
  • Standardising experiment analytics: By providing a unified framework, the package ensures that the rationale behind experiment design and the interpretation of analysis results adhere to a company-wide standard, promoting consistency and ease of review across different teams.
  • Enhancing collaboration and quality: As an open-source package, it not only fosters a collaborative culture but also upholds quality through peer reviews. It invites users to tap into a rich pool of features while encouraging contributions that refine and expand the toolkit’s capabilities.

The package is designed for everyone involved in the experimentation process, with data scientists and product analysts being the primary users. Referred to as experimenters in this article, these key stakeholders can not only leverage the existing capabilities of the package to support their projects, but can also contribute their own innovations. Eventually, the experiment results and insights generated from the package via the Automated Experiment Analysis app have an even wider reach to stakeholders across all functions.

In the following section, we go deeper into the key functionalities of the package.

Feature details

The package comprises three key components:

  • An experimentation trusted advisor
  • A comprehensive post-experiment analysis toolbox
  • Advanced tools

These have been built taking into account the type of experiments we typically run at Grab. To understand their functionality, it’s useful to first discuss the key experimental designs supported by GrabX.

A note on experimental designs

While there is a wide variety of specific experimental designs implemented, they can be bucketed into two main categories: a between-subject design and a within-subject design.

In a between-subject design, participants — like our app users, driver-partners, and merchant-partners — are split into experimental groups, and each group gets exposed to a distinct condition throughout the experiment. One challenge in this design is that each participant may provide multiple observations to our experimental analysis sample, causing a high within-subject correlation among observations and deviations between the randomisation and session unit. This can affect the accuracy of
pre-experiment power analysis, and post-experiment inference, since it necessitates adjustments, e.g. clustering of standard errors when conducting hypothesis testing.

Conversely, a within-subject design involves every participant experiencing all conditions. Marketplace-level switchback experiments are a common GrabX use case, where a timeslice becomes the experimental unit. This design not only faces the aforementioned challenges, but also creates other complications that need to be accounted for, such as spillover effects across timeslices.

Designing and analysing the results of both experimental approaches requires careful nuanced statistical tools. Ensuring proper duration, sample size, controlling for confounders, and addressing potential biases are important considerations to enhance the validity of the results.

Trusted Advisor

The first key component of the Decision Engine is the Trusted Advisor, which provides a recommendation to the experimenter on key experiment attributes to be considered when preparing the experiment. This is dependent on the design; at a minimum, the experimenter needs to define whether the experiment design is between- or within-subject.

The between-subject design: We strongly recommend that experimenters utilise the “Trusted Advisor” feature in the Decision Engine for estimating their required sample size. This is designed to account for the multiple observations per user the experiment is expected to generate and adjusts for the presence of clustered errors (Moffatt, 2020; List, Sadoff, & Wagner, 2011). This feature allows users to input their data, either as a PySpark or Pandas dataframe. Alternatively, a function is
provided to extract summary statistics from their data, which can then be inputted into the Trusted Advisor. Obtaining the data beforehand is actually not mandatory; users have the option to directly query the recommended sample size based on common metrics derived from a regular data pipeline job. These functionalities are illustrated in the flowchart below.

Trusted Advisor functionalities

Furthermore, the Trusted Advisor feature can identify the underlying characteristics of the data, whether it’s passed directly, or queried from our common metrics database. This enables it to determine the appropriate power analysis for the experiment, without further guidance. For instance, it can detect if the target metric is a binary decision variable, and will adapt the power analysis to the correct context.

The within-subject design: In this case, we instead provide a best practices guideline to follow. Through our experience supporting various Tech Families running switchback experiments, we have observed various challenges highly dependent on the use case. This makes it difficult to create a one-size-fits-all solution.

For instance, an important factor affecting the final sample size requirement is how frequently treatments switch, which is also tied to what data granularity is appropriate to use in the post-experiment analysis. These considerations are dependent on, among other factors, how quickly a given treatment is expected to cause an effect. Some treatments may take effect relatively quickly (near-instantly, e.g. if applied to price checks), while others may take significantly longer (e.g. 15-30 minutes because they may require a trip to be completed). This has further consequences, e.g. autocorrelation between observations within a treatment window, spillover effects between different treatment windows, requirements for cool-down windows when treatments switch, etc.

Another issue we have identified from analysing the history of experiments on our platform is that a significant portion is prone to issues related to sample ratio mismatch (SRM). We therefore also heavily emphasise the post-experiment analysis corrections and robustness checks that are needed in switchback experiments, and do not simply rely on pre-experiment guidance such as power analysis.

Post-experiment analysis

Upon completion of the experiment, a comprehensive toolbox for post-experiment analysis is available. This toolbox consists of a wide range of statistical tests, ranging from normality tests to non-parametric and parametric tests. Here is an overview of the different types of tests included in the toolbox for different experiment setups:

Tests supported by the post-experiment analysis component

Though we make all the relevant tests available, the package sets a default list of output. With just two lines of code specifying the desired experiment design, experimenters can easily retrieve the recommended results, as summarised in the following table.

Types Details
Basic statistics The mean, variance, and sample size of Treatment and Control
Uplift tests Welch’s t-test;
Non-parametric tests, such as Wilcoxon signed-rank test and Mann-Whitney U Test
Misc tests Normality tests such as the Shapiro-Wilk test, Anderson-Darling test, and Kolmogorov-Smirnov test;
Levene test which assesses the equality of variances between groups
Regression models A standard OLS/Logit model to estimate the treatment uplift;
Recommended regression models
Warning Provides a warning or notification related to the statistical analysis or results, for example:
– Lack of variation in the variables
– Sample size is too small
– Too few randomisation units which will lead to under-estimated standard errors

Besides reporting relevant statistical test results, we adopt regression models to leverage their flexibility in controlling for confounders, fixed effects and heteroskedasticity, as is commonly observed in our experiments. As mentioned in the section “A note on experimental design”, each approach has different implications on the achieved randomisation, and hence requires its own customised regression models.

Between-subject design: the observations are not independent and identically distributed (i.i.d) but clustered due to repeated observations of the same experimental units. Therefore, we set the default clustering level at the participant level in our regression models, considering that most of our between-subject experiments only take a small portion of the population (Abadie et al., 2022).

Within-subject design: this has further challenges, including spillover effects and randomisation imbalances. As a result, they often require better control of confounding factors. We adopt panel data methods and impose time fixed effects, with no option to remove them. Though users have the flexibility to define these themselves, we use hourly fixed effects as our default as we have found that these match the typical seasonality we observe in marketplace metrics. Similar to between-subject
designs, we use standard error corrections for clustered errors, and small number of clusters, as the default. Our API is flexible for users to include further controls, as well as further fixed effects to adapt the estimator to geo-timeslice designs.

Advanced tools

Apart from the pre-experiment Trusted Advisor and the post-experiment Analysis Toolbox, we have enriched this package by providing more advanced tools. Some of them are set as a default feature in the previous two components, while others are ad-hoc capabilities which the users can utilise via calling the functions directly.

Variance reduction

We bring in multiple methods to reduce variance and improve the power and sensitivity of experiments:

  • Stratified sampling: recognised for reducing variance during assignment
  • Post stratification: a post-assignment variance reduction technique
  • CUPED: utilises ANCOVA to decrease variances
  • MLRATE: an extension of CUPED that allows for the use of non-linear / machine learning models

These approaches offer valuable ways to mitigate variance and improve the overall effectiveness of experiments. The experimenters can directly access these ad hoc capabilities via the package.

Multiple comparisons problem

A multiple comparisons problem occurs when multiple hypotheses are simultaneously tested, leading to a higher likelihood of false positives. To address this, we implement various statistical correction techniques in this package, as illustrated below.

Statistical correction techniques

Experimenters can specify if they have concerns about the dependency of the tests and whether the test results are expected to be negatively related. This capability will adopt the following procedures and choose the relevant tests to mitigate the risk of false positives accordingly:

  • False Discovery Rate (FDR) procedures, which control the expected rate of false discoveries.
  • Family-wise Error Rate (FWER) procedures, which control the probability of making at least one false discovery within a set of related tests referred to as a family.

Multiple treatments and unequal treatment sizes

We developed a capability to deal with experiments where there are multiple treatments. This capability employs a conservative approach to ensure that the size reaches a minimum level where any pairwise comparison between the control and treatment groups has a sufficient sample size.

Heterogeneous treatment effects

Heterogeneous treatment effects refer to a situation where the treatment effect varies across different groups or subpopulations within a larger population. For instance, it may be of interest to examine treatment effects specifically on rainy days compared to non-rainy days. We have incorporated this functionality into the tests for both experiment designs. By enabling this feature, we facilitate a more nuanced analysis that accounts for potential variations in treatment effects based on different factors or contexts.

Maintenance and support

The package is available across all internal DS/Machine Learning platforms and individual local development environments within Grab. Its source code is openly accessible to all developers within Grab and its release adheres to a semantic release standard.

In addition to the technical maintenance efforts, we have introduced a dedicated committee and a workspace to address issues that may extend beyond the scope of the package’s current capabilities.

Experiment Council

Within Grab, there is a dedicated committee known as the ‘Experiment Council’. This committee includes data scientists, analysts, and economists from various functions. One of their responsibilities is to collaborate to enhance and maintain the package, as well as guide users in effectively utilising its functionalities. The Experiment Council plays a crucial role in enhancing the overall operational excellence of conducting experiments and deriving meaningful insights from them.

GrabCausal Methodology Bank

Experimenters frequently encounter challenges regarding the feasibility of conducting experiments for causal problems. To address this concern, we have introduced an alternative workspace called GrabCausal Methodology Bank. Similar to the internal open-source nature of this project, the GrabCausal Methodology bank is open to contributions from all users within Grab. It provides a collaborative space where users can readily share their code, case studies, guidelines, and suggestions related to
causal methodologies. By fostering an open and inclusive environment, this workspace encourages knowledge sharing and promotes the advancement of causal research methods.

The workspace functions as a platform, which now exhibits a wide range of commonly used methods, including Diff-in-Diff, Event studies, Regression Discontinuity Designs (RDD), Instrumental Variables (IV), Bayesian structural time series, and Bunching. Additionally, we are dedicated to incorporating more, such as Synthetic control, Double ML (Chernozhukov et al. 2018), DAG discovery/validation, etc., to further enhance our offerings in this space.

Learnings

Over the past few years, we have invested in developing and expanding this package. Our initial motivation was humble yet motivating – to contribute to improving the quality of experimentation at Grab, helping it develop from its initial start-up modus operandi to a more consolidated, rigorous, and guided approach.

Throughout this journey, we have learned that prioritisation holds the utmost significance in open-source projects of this nature; the majority of user demands can be met through relatively small yet pivotal efforts. By focusing on these core capabilities, we avoid spreading resources too thinly across all areas at the initial stage of planning and development.

Meanwhile, we acknowledge that there is still a significant journey ahead. While the package now focuses solely on individual experiments, an inherent challenge in online-controlled experimentation platforms is the interference between experiments (Gupta, et al, 2019). A recent development in the field is to embrace simultaneous tests (Microsoft, Google, Spotify and booking.com and Optimizely), and to carefully consider the tradeoff between accuracy and velocity.

The key to overcoming this challenge will be a close collaboration between the community of experimenters, the teams developing this unified toolkit, and the GrabX platform engineers. In particular, the platform developers will continue to enrich the experimentation SDK by providing diverse assignment strategies, sampling mechanisms, and user interfaces to manage potential inference risks better. Simultaneously, the community of experimenters can coordinate among themselves effectively to
avoid severe interference, which will also be monitored by GrabX. Last but not least, the development of this unified toolkit will also focus on monitoring, evaluating, and managing inter-experiment interference.

In addition, we are committed to keeping this package in sync with industry advancements. Many existing tools in this package, despite being labelled as “advanced” in the earlier discussions, are still relatively simplified. For instance,

  • Incorporating standard errors clustering based on the diverse assignment and sampling strategies requires attention (Abadie, et al, 2023).
  • Sequential testing will play a vital role in detecting uplifts earlier and safely, avoiding p-hacking. One recent innovation is the “always valid inference” (Johari, et al., 2022)
  • The advancements in investigating heterogeneous effects, such as Causal Forest (Athey and Wager, 2019), have extended beyond linear approaches, now incorporating nonlinear and more granular analyses.
  • Estimating the long-term treatment effects observed from short-term follow-ups is also a long-term objective, and one approach is using a Surrogate Index (Athey, et al 2019).
  • Continuous effort is required to stay updated and informed about the latest advancements in statistical testing methodologies, to ensure accuracy and effectiveness.

This article marks the beginning of our journey towards automating the experimentation and product decision-making process among the data scientist community. We are excited about the prospect of expanding the toolkit further in these directions. Stay tuned for more updates and posts.

References

  • Abadie, Alberto, et al. “When should you adjust standard errors for clustering?.” The Quarterly Journal of Economics 138.1 (2023): 1-35.

  • Athey, Susan, et al. “The surrogate index: Combining short-term proxies to estimate long-term treatment effects more rapidly and precisely.” No. w26463. National Bureau of Economic Research, 2019.

  • Athey, Susan, and Stefan Wager. “Estimating treatment effects with causal forests: An application.” Observational studies 5.2 (2019): 37-51.

  • Chernozhukov, Victor, et al. “Double/debiased machine learning for treatment and structural parameters.” (2018): C1-C68.

  • Facure, Matheus. Causal Inference in Python. O’Reilly Media, Inc., 2023.

  • Gupta, Somit, et al. “Top challenges from the first practical online controlled experiments summit.” ACM SIGKDD Explorations Newsletter 21.1 (2019): 20-35.

  • Huntington-Klein, Nick. The Effect: An Introduction to Research Design and Causality. CRC Press, 2021.

  • Imbens, Guido W. and Donald B. Rubin. Causal Inference for Statistics, Social, and Biomedical Sciences: An Introduction. Cambridge University Press, 2015.

  • Johari, Ramesh, et al. “Always valid inference: Continuous monitoring of a/b tests.” Operations Research 70.3 (2022): 1806-1821.

  • List, John A., Sally Sadoff, and Mathis Wagner. “So you want to run an experiment, now what? Some simple rules of thumb for optimal experimental design.” Experimental Economics 14 (2011): 439-457.

  • Moffatt, Peter. Experimetrics: Econometrics for Experimental Economics. Bloomsbury Publishing, 2020.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Iris – Turning observations into actionable insights for enhanced decision making

Post Syndicated from Grab Tech original https://engineering.grab.com/iris

Introduction

Iris (/ˈaɪrɪs/), a name inspired by the Olympian mythological figure who personified the rainbow and served as the messenger of the gods, is a comprehensive observability platform for Extract, Transform, Load (ETL) jobs. Just as the mythological Iris connected the gods to humanity, our Iris platform bridges the gap between raw data and meaningful insights, serving the needs of data-driven organisations. Specialising in meticulous monitoring and tracking of Spark and Presto jobs, Iris stands as a transformative tool for peak observability and effective decision-making.

  • Iris captures critical job metrics right at the Java Virtual Machine (JVM) level, including but not limited to runtime, CPU and memory utilisation rates, garbage collection statistics, stage and task execution details, and much more.
  • Iris not only regularly records these metrics but also supports real-time monitoring and offline analytics of metrics in the data lake. This gives you multi-faceted control and insights into the operational aspects of your workloads.
  • Iris gives you an overview of your jobs, predicts if your jobs are over or under-provisioned, and provides suggestions on how to optimise resource usage and save costs.

Understanding the needs

When examining ETL job monitoring across various platforms, a common deficiency became apparent. Existing tools could only provide CPU and memory usage data at the instance level, where an instance could refer to an EC2 unit or a Kubernetes pod with resources bound to the container level.

However, this CPU and memory usage data included usage from the operating system and other background tasks, making it difficult to isolate usage specific to Spark jobs (JVM level). A sizeable fraction of resource consumption, thus, could not be attributed directly to our ETL jobs. This lack of granularity posed significant challenges when trying to perform effective resource optimisation for individual jobs.

Gap between total instance and JVM provisioned resources

The situation was further complicated when compute instances were shared among various jobs. In such cases, determining the precise resource consumption for a specific job became nearly impossible. This made in-depth analysis and performance optimisation of specific jobs a complex and often ineffective process.

In the initial stages of my career in Spark, I took the reins of handling SEGP ETL jobs deployed in Chimera. Then, Chimera did not possess any tool for observing and understanding SEGP jobs. The lack of an efficient tool for close-to-real-time visualisation of Spark cluster/job metrics, profiling code class/function runtime durations, and investigating deep-level job metrics to assess CPU and memory usage, posed a significant challenge even back then.

In the quest for solutions within Grab, I found no tool that could fulfill all these needs. This prompted me to extend my search beyond the organisation, leading me to discover that Uber had an exceptional tool known as the JVM Profiler. This tool could collect JVM metrics and profile the job. Further research also led me to sparkMeasure, a standalone tool known for its ability to measure Spark metrics on-the-fly without any code changes.

This personal research and journey highlights the importance of a comprehensive, in-depth observability tool – emphasising the need that Iris aims to fulfill in the world of ETL job monitoring. Through this journey, Iris was ideated, named after the Greek deity, encapsulating the mission to bridge the gap between the realm of raw ETL job metrics and the world of actionable insights.

Observability with Iris

Platform architecture

Platform architecture of Iris

Iris’s robust architecture is designed to smartly deliver observability into Spark jobs with high reliability. It consists of three main modules: Metrics Collector, Kafka Queue, and Telegraf, InfluxDB, and Grafana (TIG) Stack.

Metrics Collector: This module listens to Spark jobs, collects metrics, and funnels them to the Kafka queue. What sets this apart is its unobstructive nature – there is no need for end-users to update their application code or notebook.

Kafka Queue: Serving as an asynchronous deliverer of metrics messages, Kafka is leveraged to prevent Iris from becoming another bottleneck slowing down user jobs. By functioning as a message queue, it enables the efficient processing of metric data.

TIG Stack: This component is utilised for real-time monitoring, making visualising performance metrics a cinch. The TIG stack proves to be an effective solution for real-time data visualisation.

For offline analytics, Iris pushes metrics data from Kafka into our data lake. This creates a wealth of historical data that can be utilised for future research, analysis, and predictions. The strategic combination of real-time monitoring and offline analysis forms the basis of Iris’s ability to provide valuable insights.

Next, we will delve into how Iris collects the metrics.

Data collection

Iris’s metrics is now primarily driven by two tools that operate under the Metrics Collector module: JVM Profiler and sparkMeasure.

JVM Profiler

As mentioned earlier, JVM Profiler is an exceptional tool that helps to collect and profile metrics at JVM level.

Java process for the JVM Profiler tool

Uber JVM Profiler supports the following features:

  • Debug memory usage for all your Spark application executors, including java heap memory, non-heap memory, native memory (VmRSS, VmHWM), memory pool, and buffer pool (directed/mapped buffer).
  • Debug CPU usage, garbage collection time for all Spark executors.
  • Debug arbitrary Java class methods (how many times they run, how long they take), also called Duration Profiling.
  • Debug arbitrary Java class method call and trace its argument value, also known as Argument Profiling.
  • Do Stacktrack Profiling and generate flamegraph to visualise CPU time spent for the Spark application.
  • Debug I/O metrics (disk read/write bytes for the application, CPU iowait for the machine).
  • Debug JVM Thread Metrics like Count of Total Threads, Peak Threads, Live/Active Threads, and newThreads.

Example metrics (Source code)

{
        "nonHeapMemoryTotalUsed": 11890584.0,
        "bufferPools": [
                {
                        "totalCapacity": 0,
                        "name": "direct",
                        "count": 0,
                        "memoryUsed": 0
                },
                {
                        "totalCapacity": 0,
                        "name": "mapped",
                        "count": 0,
                        "memoryUsed": 0
                }
        ],
        "heapMemoryTotalUsed": 24330736.0,
        "epochMillis": 1515627003374,
        "nonHeapMemoryCommitted": 13565952.0,
        "heapMemoryCommitted": 257425408.0,
        "memoryPools": [
                {
                        "peakUsageMax": 251658240,
                        "usageMax": 251658240,
                        "peakUsageUsed": 1194496,
                        "name": "Code Cache",
                        "peakUsageCommitted": 2555904,
                        "usageUsed": 1173504,
                        "type": "Non-heap memory",
                        "usageCommitted": 2555904
                },
                {
                        "peakUsageMax": -1,
                        "usageMax": -1,
                        "peakUsageUsed": 9622920,
                        "name": "Metaspace",
                        "peakUsageCommitted": 9830400,
                        "usageUsed": 9622920,
                        "type": "Non-heap memory",
                        "usageCommitted": 9830400
                },
                {
                        "peakUsageMax": 1073741824,
                        "usageMax": 1073741824,
                        "peakUsageUsed": 1094160,
                        "name": "Compressed Class Space",
                        "peakUsageCommitted": 1179648,
                        "usageUsed": 1094160,
                        "type": "Non-heap memory",
                        "usageCommitted": 1179648
                },
                {
                        "peakUsageMax": 1409286144,
                        "usageMax": 1409286144,
                        "peakUsageUsed": 24330736,
                        "name": "PS Eden Space",
                        "peakUsageCommitted": 67108864,
                        "usageUsed": 24330736,
                        "type": "Heap memory",
                        "usageCommitted": 67108864
                },
                {
                        "peakUsageMax": 11010048,
                        "usageMax": 11010048,
                        "peakUsageUsed": 0,
                        "name": "PS Survivor Space",
                        "peakUsageCommitted": 11010048,
                        "usageUsed": 0,
                        "type": "Heap memory",
                        "usageCommitted": 11010048
                },
                {
                        "peakUsageMax": 2863661056,
                        "usageMax": 2863661056,
                        "peakUsageUsed": 0,
                        "name": "PS Old Gen",
                        "peakUsageCommitted": 179306496,
                        "usageUsed": 0,
                        "type": "Heap memory",
                        "usageCommitted": 179306496
                }
        ],
        "processCpuLoad": 0.0008024004394748531,
        "systemCpuLoad": 0.23138430784607697,
        "processCpuTime": 496918000,
        "appId": null,
        "name": "24103@machine01",
        "host": "machine01",
        "processUuid": "3c2ec835-749d-45ea-a7ec-e4b9fe17c23a",
        "tag": "mytag",
        "gc": [
                {
                        "collectionTime": 0,
                        "name": "PS Scavenge",
                        "collectionCount": 0
                },
                {
                        "collectionTime": 0,
                        "name": "PS MarkSweep",
                        "collectionCount": 0
                }
        ]
}

A list of all metrics and information corresponding to them can be found here.

sparkMeasure

Complementing the JVM Profiler is sparkMeasure, a standalone tool that was built to robustly capture Spark job-specific metrics.

Architecture of Spark Task Metrics, Listener Bus, and sparkMeasure (Source)

It is registered as a custom listener and operates by collection built-in metrics that Spark exchanges between the driver node and executor nodes. Its standout feature is the ability to collect all metrics supported by Spark, as defined in Spark’s official documentation here.

Example stage metrics collected by sparkMeasure (Source code)

Scheduling mode = FIFO

Spark Context default degree of parallelism = 8

Aggregated Spark stage metrics:

numStages => 3
numTasks => 17
elapsedTime => 1291 (1 s)
stageDuration => 1058 (1 s)
executorRunTime => 2774 (3 s)
executorCpuTime => 2004 (2 s)
executorDeserializeTime => 2868 (3 s)
executorDeserializeCpuTime => 1051 (1 s)
resultSerializationTime => 5 (5 ms)
jvmGCTime => 88 (88 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 16 (16 ms)
resultSize => 16091 (15.0 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
recordsRead => 2000
bytesRead => 0 (0 Bytes)
recordsWritten => 0
bytesWritten => 0 (0 Bytes)
shuffleRecordsRead => 8
shuffleTotalBlocksFetched => 8
shuffleLocalBlocksFetched => 8
shuffleRemoteBlocksFetched => 0
shuffleTotalBytesRead => 472 (472 Bytes)
shuffleLocalBytesRead => 472 (472 Bytes)
shuffleRemoteBytesRead => 0 (0 Bytes)
shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 472 (472 Bytes)
shuffleRecordsWritten => 8

Stages and their duration:
Stage 0 duration => 593 (0.6 s)
Stage 1 duration => 416 (0.4 s)
Stage 3 duration => 49 (49 ms)

Data organisation

The architecture of Iris is designed to efficiently route metrics to two key destinations:

  • Real-time datasets: InfluxDB
  • Offline datasets: GrabTech Datalake in AWS

Real-time dataset

Freshness/latency: 5 to 10 seconds

All metrics flowing in through Kafka topics are instantly wired into InfluxDB. A crucial part of this process is accomplished by Telegraf, a plugin-driven server agent used for collecting and sending metrics. Acting as a Kafka consumer, Telegraf listens to each Kafka topic according to its corresponding metrics profiling. It parses the incoming JSON messages and extracts crucial data points (such as role, hostname, jobname, etc.). Once the data is processed, Telegraf writes it into the InfluxDB.

InfluxDB organises the stored data in what we call ‘measurements’, which could analogously be considered as tables in traditional relational databases.

In Iris’s context, we have structured our real-time data into the following crucial measurements:

  1. CpuAndMemory: This measures CPU and memory-related metrics, giving us insights into resource utilisation by Spark jobs.
  2. I/O: This records input/output metrics, providing data on the reading and writing operations happening during the execution of jobs.
  3. ThreadInfo: This measurement holds data related to job threading, allowing us to monitor concurrency and synchronisation aspects.
  4. application_started and application_ended: These measurements allow us to track Spark application lifecycles, from initiation to completion.
  5. executors_started and executors_removed: These measurements give us a look at the executor dynamics during Spark application execution.

  1. jobs_started and jobs_ended: These provide vital data points relating to the lifecycle of individual Spark jobs within applications.
  2. queries_started and queries_ended: These measurements are designed to track the lifecycle of individual Spark SQL queries.
  3. stage_metrics, stages_started, and stages_ended: These measurements help monitor individual stages within Spark jobs, a valuable resource for tracking the job progress and identifying potential bottlenecks.

The real-time data collected in these measurements form the backbone of the monitoring capabilities of Iris, providing an accurate and current picture of Spark job performances.

Offline dataset

Freshness/latency: 1 hour

In addition to real-time data management with InfluxDB, Iris is also responsible for routing metrics to our offline data storage in the Grab Tech Datalake for long-term trend studies, pattern analysis, and anomaly detection.

The metrics from Kafka are periodically synchronised to the Amazon S3 tables under the iris schema in the Grab Tech AWS catalogue. This valuable historical data from Kafka is meticulously organised with a one-to-one mapping between the platform or Kafka topic to the table in the iris schema. For example: iris.chimera_jvmprofiler_cpuandmemory map with prd-iris-chimera-jvmprofiler-cpuandmemory Kafka topic.


This streamlined organisation means you can write queries to retrieve information from the AWS dataset very similarly to how you would do it from InfluxDB. Whether it’s CPU and memory usage, I/O, thread info, or spark metrics, you can conveniently fetch historical data for your analysis.

Data visualisation

A well-designed visual representation makes it easier to see patterns, trends, and outliers in groups of data. Iris employs different visualisation tools based on whether the data is real-time or historical.

Real-Time data visualisation – Grafana

Iris uses Grafana for showcasing real-time data. For each platform, two primary dashboards have been set up: JVM metrics and Spark metrics.

JVM metrics dashboard: This dashboard is designed to display information related to the JVM.
Spark metrics dashboard: This dashboard primarily focuses on visualising Spark-specific elements.

Offline data visualisation

While real-time visualisation is crucial for immediate awareness and decision-making, visualising historical data provides invaluable insights about long-term trends, patterns, and anomalies. Developers can query the raw or aggregated data from the Iris tables for their specific analyses.

Moreover, to assist platform owners and end-users in obtaining a quick summary of their job data, we provide built-in dashboards with pre-aggregated visuals. These dashboards contain a wealth of information expressed in an easy-to-understand format. Key metrics include:

  • Total instances
  • Total CPU cores
  • Total memory
  • CPU and memory utilisation
  • Total machine runtimes

  • Besides visualisations for individual jobs, we have designed an overview dashboard providing a comprehensive summary of all resources consumed by all ETL jobs. This is particularly useful for platform owners and tech leads, allowing them to have an all-encompassing visibility of the performance and resource usage across the ETL jobs.

    Dashboard for monitoring ETL jobs

    These dashboards’ visuals effectively turn the historical metrics data into clear, comprehensible, and insightful information, guiding users towards objective-driven decision-making.

    Transforming observations into insights

    While our journey with Iris is just in the early stages, we’ve already begun harnessing its ability to transform raw data into concrete insights. The strength of Iris lies not just in its data collection capabilities but also in its potential to analyse and infer patterns from the collated data.

    Currently, we’re experimenting with a job classification model that aims to predict resource allocation efficiency (i.e. identifying jobs as over or under-provisioned). This information, once accurately predicted, can help optimise the usage of resources by fine-tuning the provisions for each job. While this model is still in its early stages of testing and lacks sufficient validation data, it exemplifies the direction we’re heading – integrating advanced analytics with operational observability.

    As we continue to refine Iris and develop more models, our aim is to empower users with deep insights into their Spark applications. These insights can potentially identify bottlenecks, optimise resource allocation and ultimately, enhance overall performance. In the long run, we see Iris evolving from being a data collection tool to a platform that can provide actionable recommendations and enable data-driven decision-making.

    Job classification feature set

    At the core of our job classification model, there are two carefully selected metrics:

    1. CPU cores per hour: This represents the number of tasks a job can handle concurrently in a given hour. A higher number would mean more tasks being processed simultaneously.

    2. Total Terabytes of data input per core: This considers only the input from the underlying HDFS/S3 input, excluding shuffle data. It represents the volume of data one CPU core needs to process. A larger input would mean more CPUs are required to complete the job in a reasonable timeframe.

    The choice of these two metrics for building feature sets is based on a nuanced understanding of Spark job dynamics:

  • Allocating the right CPU cores is crucial as a higher number of cores means more tasks being processed concurrently. This is especially important for jobs with larger input data and more partitioned files, as they often require more concurrent processing capacity, hence, more CPU cores.
  • The total data input helps to estimate the data processing load of a job. A job tasked with processing a high volume of input data but assigned low CPU cores might be under-provisioned and result in an extended runtime.

  • As for CPU and memory utilisation, while it could offer useful insights, we’ve found it may not always contribute to predicting if a job is over or under-provisioned because utilisation can vary run-to-run. Thus, to keep our feature set robust and consistent, we primarily focus on CPU cores per hour and total terabytes of input data.

    With these metrics as our foundation, we are developing models that can classify jobs into over-provisioned or under-provisioned, helping us optimise resource allocation and improve job performance in the long run.

    As always, treat any information related to our job classification feature set and the insights derived from it with utmost care for data confidentiality and integrity.

    We’d like to reiterate that these models are still in the early stages of testing and we are constantly working to enhance their predictive accuracy. The true value of this model will be unlocked as it is refined and as we gather more validation data.

    Model training and optimisation

    Choosing the right model is crucial for deriving meaningful insights from datasets. We decided to start with a simple, yet powerful algorithm – K-means clustering, for job classification. K-means is a type of unsupervised machine learning algorithm used to classify items into groups (or clusters) based on their features.

    Here is our process:

    1. Model exploration: We began by exploring the K-means algorithm using a small dataset for validation.
    2. Platform-specific cluster numbers: To account for the uniqueness of every platform, we ran a Score Test (an evaluation method to determine the optimal number of clusters) for each platform. The derived optimal number of clusters is then used in the monthly job for that respective platform’s data.
    3. Set up a scheduled job: After ensuring the code was functioning correctly, we set up a job to run the model on a monthly schedule. Monthly re-training was chosen to encapsulate possible changes in the data patterns over time.
    4. Model saving and utilisation: The trained model is saved to our S3 bucket and used to classify jobs as over-provisioned or under-provisioned based on the daily job runs.

    This iterative learning approach, through which our model learns from an ever-increasing pool of historical data, helps maintain its relevance and improve its accuracy over time.

    Here is an example output from Databricks train run:

  • Blue green group: Input per core is too large but the CPU per hour is small, so the job may take a lot of time to complete.
  • Purple group: Input per core is too small but the CPU per hour is too high. There may be a lot of wasted CPU here.
  • Yellow group: I think this is the ideal group where input per core and CPU per hour is not high.

  • Keep in mind that classification insights provided by our K-means model are still in the experimental stage. As we continue to refine the approach, the reliability of these insights is expected to grow, providing increasingly valuable direction for resource allocation optimisation.

    Seeing Iris in action

    This section provides practical examples and real-case scenarios that demonstrate Iris’s capacity for delivering insights from ETL job observations.

    Case study 1: Spark benchmarking

    From August to September 2023, we carried out a Spark benchmarking exercise to measure and compare the cost and performance of Grab’s Spark platforms: Open Source Spark on Kubernetes (Chimera), Databricks and AWS EMR. Since each platform has its own way to measure a job’s performance and cost, Iris was used to collect the necessary Spark metrics in order to calculate the cost for each job. Furthermore, many other metrics were collected by Iris in order to compare the platforms’ performances like CPU and memory utilisation, runtime, etc.

    Case study 2: Improving Databricks Infra Cost Unit (DBIU) Accuracy with Iris

    Being able to accurately calculate and fairly distribute Databricks infrastructure costs has always been a challenge, primarily due to difficulties in distinguishing between on-demand and Spot instance usage. This was further complicated by two conditions:

    • Fallback to on-demand instances: Databricks has a feature that automatically falls back to on-demand instances when Spot instances are not readily available. While beneficial for job execution, this feature has traditionally made it difficult to accurately track per-job Spot vs. on-demand usage.
    • User configurable hybrid policy: Users can specify a mix of on-demand and Spot instances for their jobs. This flexible, hybrid approach often results in complex, non-uniform usage patterns, further complicating cost categorisation.

    Iris has made a key difference in resolving these dilemmas. By providing granular, instance-level metrics including whether each instance is on-demand or Spot, Iris has greatly improved our visibility into per-job instance usage.

    This precise data enables us to isolate the on-demand instance usage, which was previously bundled in the total cost calculation. Similarly, it allows us to accurately gauge and consider the usage ratio of on-demand instances in hybrid policy scenarios.

    The enhanced transparency provided by Iris metrics allows us to standardise DBIU cost calculations, making them fairer for users who majorly or only use Spot instances. In other words, users need to pay more if they intentionally choose or fall back to on-demand instances for their jobs.

    The practical application of Iris in enhancing DBIU accuracy illustrates its potential in driving data-informed decisions and fostering fairness in resource usage and cost distribution.

    Case study 3: Optimising job configuration for better performance and cost efficiency

    One of the key utilities of iris is its potential to assist with job optimisation. For instance, we have been able to pinpoint jobs that were consistently over-provisioned and work with end-users to tune their job configurations.

    Through this exercise and continuous monitoring, we’ve seen substantial results from the job optimisations:

  • Cost reductions ranging from 20% to 50% for most jobs.
  • Positive feedback from users about improvements in job performance and cost efficiency.

  • By the way, interestingly, our analysis led us to identify certain the following patterns. These patterns could be leveraged to widen the impact of our optimisation efforts across multiple use-cases in our platforms:

    Pattern Recommendation
  • Job duration < 20 minutes
  • Input per core < 1GB
  • Total used instance is 2x/3x of max worker nodes
  • Use fixed number of workers nodes potentially speeding up performance and certainly reducing costs.
  • CPU utilisation < 25%
  • Cut max worker in half. E.g: 10 to 5 workers
  • Downgrade instance size a half. E.g: 4xlarge -> 2xlarge
  • Job has much shuffle
  • Bump the instance size and reduce the number of workers. E.g. bump 2xlarge -> 4xlarge and reduce number of workers from 100 -> 50
  • However, we acknowledge that these findings may not apply uniformly to every instance. The optimisation recommendations derived from these patterns might not yield the desired outcomes in all cases.

    The future of Iris

    Building upon its firm foundation as a robust Spark observability tool, we envision a future for Iris wherein it not only monitors metrics but provides actionable insights, discerns usage patterns, and drives predictions.

    Our plans to make Iris more accessible include developing APIs endpoint for platform teams to query performance by job names. Another addition we’re aiming for is the ability for Iris to provide resource tuning recommendations. By making platform-specific and job-specific recommendations easily accessible, we hope to assist platform teams in making informed, data-driven decisions on resource allocation and cost efficiency.

    We’re also looking to expand Iris’s capabilities with the development of a listener for Presto jobs, similar to the sparkMeasure tool currently used for Spark jobs. The listener would provide valuable metrics and insights into the performance of Presto jobs, opening up new avenues for optimisation and cost management.

    Another major focus will be building a feedback loop for Iris to further enhance accuracy, continually refine its models, and improve insights provided. This effort would greatly benefit from the close collaboration and inputs from platform teams and other tech leads, as their expertise aids in interpreting Iris’s metrics and predictions and validating its meaningfulness.

    In conclusion, as Iris continues to develop and mature, we foresee it evolving into a crucial tool for data-driven decision-making and proactive management of Spark applications, playing a significant role in the efficient usage of cloud computing resources.

    Conclusion

    The role of Iris as an observability tool for Spark jobs in the world of Big Data is rapidly evolving. Iris has proven to be more than a simple data collection tool; it is a platform that integrates advanced analytics with operational observability.

    Even though Iris is in its early stages, it’s already been instrumental in creating detailed visualisations of both real-time and historical data from varied platforms. Besides that, Iris has started making strides in its journey towards using machine learning models like K-means clustering to classify jobs, demonstrating its potential in helping operators fine-tune resource allocation.

    Using instance-level metrics, Iris is helping improve cost distribution fairness and accuracy, making it a potent tool for resource optimisation. Furthermore, the successful case study of reducing job costs and enhancing performance through resource reallocation provides a promising outlook into Iris’s future applicability.

    With ongoing development plans, such as the Presto listener and the creation of endpoints for broader accessibility, Iris is poised to become an integral tool for data-informed decision-making. As we strive to enhance Iris, we will continue to collaborate with platform teams and tech leads whose feedback is invaluable in fulfilling Iris’s potential.

    Our journey with Iris is a testament to Grab’s commitment to creating a data-informed and efficient cloud computing environment. Iris, with its observed and planned capabilities, is on its way to revolutionising the way resource allocation is managed and optimised.

    Join us

    Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

    Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

    Supporting Diverse ML Systems at Netflix

    Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/supporting-diverse-ml-systems-at-netflix-2d2e6b6d205d

    David J. Berg, Romain Cledat, Kayla Seeley, Shashank Srikanth, Chaoying Wang, Darin Yu

    Netflix uses data science and machine learning across all facets of the company, powering a wide range of business applications from our internal infrastructure and content demand modeling to media understanding. The Machine Learning Platform (MLP) team at Netflix provides an entire ecosystem of tools around Metaflow, an open source machine learning infrastructure framework we started, to empower data scientists and machine learning practitioners to build and manage a variety of ML systems.

    Since its inception, Metaflow has been designed to provide a human-friendly API for building data and ML (and today AI) applications and deploying them in our production infrastructure frictionlessly. While human-friendly APIs are delightful, it is really the integrations to our production systems that give Metaflow its superpowers. Without these integrations, projects would be stuck at the prototyping stage, or they would have to be maintained as outliers outside the systems maintained by our engineering teams, incurring unsustainable operational overhead.

    Given the very diverse set of ML and AI use cases we support — today we have hundreds of Metaflow projects deployed internally — we don’t expect all projects to follow the same path from prototype to production. Instead, we provide a robust foundational layer with integrations to our company-wide data, compute, and orchestration platform, as well as various paths to deploy applications to production smoothly. On top of this, teams have built their own domain-specific libraries to support their specific use cases and needs.

    In this article, we cover a few key integrations that we provide for various layers of the Metaflow stack at Netflix, as illustrated above. We will also showcase real-life ML projects that rely on them, to give an idea of the breadth of projects we support. Note that all projects leverage multiple integrations, but we highlight them in the context of the integration that they use most prominently. Importantly, all the use cases were engineered by practitioners themselves.

    These integrations are implemented through Metaflow’s extension mechanism which is publicly available but subject to change, and hence not a part of Metaflow’s stable API yet. If you are curious about implementing your own extensions, get in touch with us on the Metaflow community Slack.

    Let’s go over the stack layer by layer, starting with the most foundational integrations.

    Data: Fast Data

    Our main data lake is hosted on S3, organized as Apache Iceberg tables. For ETL and other heavy lifting of data, we mainly rely on Apache Spark. In addition to Spark, we want to support last-mile data processing in Python, addressing use cases such as feature transformations, batch inference, and training. Occasionally, these use cases involve terabytes of data, so we have to pay attention to performance.

    To enable fast, scalable, and robust access to the Netflix data warehouse, we have developed a Fast Data library for Metaflow, which leverages high-performance components from the Python data ecosystem:

    As depicted in the diagram, the Fast Data library consists of two main interfaces:

    • The Table object is responsible for interacting with the Netflix data warehouse which includes parsing Iceberg (or legacy Hive) table metadata, resolving partitions and Parquet files for reading. Recently, we added support for the write path, so tables can be updated as well using the library.
    • Once we have discovered the Parquet files to be processed, MetaflowDataFrame takes over: it downloads data using Metaflow’s high-throughput S3 client directly to the process’ memory, which often outperforms reading of local files.

    We use Apache Arrow to decode Parquet and to host an in-memory representation of data. The user can choose the most suitable tool for manipulating data, such as Pandas or Polars to use a dataframe API, or one of our internal C++ libraries for various high-performance operations. Thanks to Arrow, data can be accessed through these libraries in a zero-copy fashion.

    We also pay attention to dependency issues: (Py)Arrow is a dependency of many ML and data libraries, so we don’t want our custom C++ extensions to depend on a specific version of Arrow, which could easily lead to unresolvable dependency graphs. Instead, in the style of nanoarrow, our Fast Data library only relies on the stable Arrow C data interface, producing a hermetically sealed library with no external dependencies.

    Example use case: Content Knowledge Graph

    Our knowledge graph of the entertainment world encodes relationships between titles, actors and other attributes of a film or series, supporting all aspects of business at Netflix.

    A key challenge in creating a knowledge graph is entity resolution. There may be many different representations of slightly different or conflicting information about a title which must be resolved. This is typically done through a pairwise matching procedure for each entity which becomes non-trivial to do at scale.

    This project leverages Fast Data and horizontal scaling with Metaflow’s foreach construct to load large amounts of title information — approximately a billion pairs — stored in the Netflix Data Warehouse, so the pairs can be matched in parallel across many Metaflow tasks.

    We use metaflow.Table to resolve all input shards which are distributed to Metaflow tasks which are responsible for processing terabytes of data collectively. Each task loads the data using metaflow.MetaflowDataFrame, performs matching using Pandas, and populates a corresponding shard in an output Table. Finally, when all matching is done and data is written the new table is committed so it can be read by other jobs.

    Compute: Titus

    Whereas open-source users of Metaflow rely on AWS Batch or Kubernetes as the compute backend, we rely on our centralized compute-platform, Titus. Under the hood, Titus is powered by Kubernetes, but it provides a thick layer of enhancements over off-the-shelf Kubernetes, to make it more observable, secure, scalable, and cost-efficient.

    By targeting @titus, Metaflow tasks benefit from these battle-hardened features out of the box, with no in-depth technical knowledge or engineering required from the ML engineers or data scientist end. However, in order to benefit from scalable compute, we need to help the developer to package and rehydrate the whole execution environment of a project in a remote pod in a reproducible manner (preferably quickly). Specifically, we don’t want to ask developers to manage Docker images of their own manually, which quickly results in more problems than it solves.

    This is why Metaflow provides support for dependency management out of the box. Originally, we supported only @conda, but based on our work on Portable Execution Environments, open-source Metaflow gained support for @pypi a few months ago as well.

    Example use case: Building model explainers

    Here’s a fascinating example of the usefulness of portable execution environments. For many of our applications, model explainability matters. Stakeholders like to understand why models produce a certain output and why their behavior changes over time.

    There are several ways to provide explainability to models but one way is to train an explainer model based on each trained model. Without going into the details of how this is done exactly, suffice to say that Netflix trains a lot of models, so we need to train a lot of explainers too.

    Thanks to Metaflow, we can allow each application to choose the best modeling approach for their use cases. Correspondingly, each application brings its own bespoke set of dependencies. Training an explainer model therefore requires:

    1. Access to the original model and its training environment, and
    2. Dependencies specific to building the explainer model.

    This poses an interesting challenge in dependency management: we need a higher-order training system, “Explainer flow” in the figure below, which is able to take a full execution environment of another training system as an input and produce a model based on it.

    Explainer flow is event-triggered by an upstream flow, such Model A, B, C flows in the illustration. The build_environment step uses the metaflow environment command provided by our portable environments, to build an environment that includes both the requirements of the input model as well as those needed to build the explainer model itself.

    The built environment is given a unique name that depends on the run identifier (to provide uniqueness) as well as the model type. Given this environment, the train_explainer step is then able to refer to this uniquely named environment and operate in an environment that can both access the input model as well as train the explainer model. Note that, unlike in typical flows using vanilla @conda or @pypi, the portable environments extension allows users to also fetch those environments directly at execution time as opposed to at deploy time which therefore allows users to, as in this case, resolve the environment right before using it in the next step.

    Orchestration: Maestro

    If data is the fuel of ML and the compute layer is the muscle, then the nerves must be the orchestration layer. We have talked about the importance of a production-grade workflow orchestrator in the context of Metaflow when we released support for AWS Step Functions years ago. Since then, open-source Metaflow has gained support for Argo Workflows, a Kubernetes-native orchestrator, as well as support for Airflow which is still widely used by data engineering teams.

    Internally, we use a production workflow orchestrator called Maestro. The Maestro post shares details about how the system supports scalability, high-availability, and usability, which provide the backbone for all of our Metaflow projects in production.

    A hugely important detail that often goes overlooked is event-triggering: it allows a team to integrate their Metaflow flows to surrounding systems upstream (e.g. ETL workflows), as well as downstream (e.g. flows managed by other teams), using a protocol shared by the whole organization, as exemplified by the example use case below.

    Example use case: Content decision making

    One of the most business-critical systems running on Metaflow supports our content decision making, that is, the question of what content Netflix should bring to the service. We support a massive scale of over 260M subscribers spanning over 190 countries representing hugely diverse cultures and tastes, all of whom we want to delight with our content slate. Reflecting the breadth and depth of the challenge, the systems and models focusing on the question have grown to be very sophisticated.

    We approach the question from multiple angles but we have a core set of data pipelines and models that provide a foundation for decision making. To illustrate the complexity of just the core components, consider this high-level diagram:

    In this diagram, gray boxes represent integrations to partner teams downstream and upstream, green boxes are various ETL pipelines, and blue boxes are Metaflow flows. These boxes encapsulate hundreds of advanced models and intricate business logic, handling massive amounts of data daily.

    Despite its complexity, the system is managed by a relatively small team of engineers and data scientists autonomously. This is made possible by a few key features of Metaflow:

    The team has also developed their own domain-specific libraries and configuration management tools, which help them improve and operate the system.

    Deployment: Cache

    To produce business value, all our Metaflow projects are deployed to work with other production systems. In many cases, the integration might be via shared tables in our data warehouse. In other cases, it is more convenient to share the results via a low-latency API.

    Notably, not all API-based deployments require real-time evaluation, which we cover in the section below. We have a number of business-critical applications where some or all predictions can be precomputed, guaranteeing the lowest possible latency and operationally simple high availability at the global scale.

    We have developed an officially supported pattern to cover such use cases. While the system relies on our internal caching infrastructure, you could follow the same pattern using services like Amazon ElasticCache or DynamoDB.

    Example use case: Content performance visualization

    The historical performance of titles is used by decision makers to understand and improve the film and series catalog. Performance metrics can be complex and are often best understood by humans with visualizations that break down the metrics across parameters of interest interactively. Content decision makers are equipped with self-serve visualizations through a real-time web application built with metaflow.Cache, which is accessed through an API provided with metaflow.Hosting.

    A daily scheduled Metaflow job computes aggregate quantities of interest in parallel. The job writes a large volume of results to an online key-value store using metaflow.Cache. A Streamlit app houses the visualization software and data aggregation logic. Users can dynamically change parameters of the visualization application and in real-time a message is sent to a simple Metaflow hosting service which looks up values in the cache, performs computation, and returns the results as a JSON blob to the Streamlit application.

    Deployment: Metaflow Hosting

    For deployments that require an API and real-time evaluation, we provide an integrated model hosting service, Metaflow Hosting. Although details have evolved a lot, this old talk still gives a good overview of the service.

    Metaflow Hosting is specifically geared towards hosting artifacts or models produced in Metaflow. This provides an easy to use interface on top of Netflix’s existing microservice infrastructure, allowing data scientists to quickly move their work from experimentation to a production grade web service that can be consumed over a HTTP REST API with minimal overhead.

    Its key benefits include:

    • Simple decorator syntax to create RESTFull endpoints.
    • The back-end auto-scales the number of instances used to back your service based on traffic.
    • The back-end will scale-to-zero if no requests are made to it after a specified amount of time thereby saving cost particularly if your service requires GPUs to effectively produce a response.
    • Request logging, alerts, monitoring and tracing hooks to Netflix infrastructure

    Consider the service similar to managed model hosting services like AWS Sagemaker Model Hosting, but tightly integrated with our microservice infrastructure.

    Example use case: Media

    We have a long history of using machine learning to process media assets, for instance, to personalize artwork and to help our creatives create promotional content efficiently. Processing large amounts of media assets is technically non-trivial and computationally expensive, so over the years, we have developed plenty of specialized infrastructure dedicated for this purpose in general, and infrastructure supporting media ML use cases in particular.

    To demonstrate the benefits of Metaflow Hosting that provides a general-purpose API layer supporting both synchronous and asynchronous queries, consider this use case involving Amber, our feature store for media.

    While Amber is a feature store, precomputing and storing all media features in advance would be infeasible. Instead, we compute and cache features in an on-demand basis, as depicted below:

    When a service requests a feature from Amber, it computes the feature dependency graph and then sends one or more asynchronous requests to Metaflow Hosting, which places the requests in a queue, eventually triggering feature computations when compute resources become available. Metaflow Hosting caches the response, so Amber can fetch it after a while. We could have built a dedicated microservice just for this use case, but thanks to the flexibility of Metaflow Hosting, we were able to ship the feature faster with no additional operational burden.

    Future Work

    Our appetite to apply ML in diverse use cases is only increasing, so our Metaflow platform will keep expanding its footprint correspondingly and continue to provide delightful integrations to systems built by other teams at Netlfix. For instance, we have plans to work on improvements in the versioning layer, which wasn’t covered by this article, by giving more options for artifact and model management.

    We also plan on building more integrations with other systems that are being developed by sister teams at Netflix. As an example, Metaflow Hosting models are currently not well integrated into model logging facilities — we plan on working on improving this to make models developed with Metaflow more integrated with the feedback loop critical in training new models. We hope to do this in a pluggable manner that would allow other users to integrate with their own logging systems.

    Additionally we want to supply more ways Metaflow artifacts and models can be integrated into non-Metaflow environments and applications, e.g. JVM based edge service, so that Python-based data scientists can contribute to non-Python engineering systems easily. This would allow us to better bridge the gap between the quick iteration that Metaflow provides (in Python) with the requirements and constraints imposed by the infrastructure serving Netflix member facing requests.

    If you are building business-critical ML or AI systems in your organization, join the Metaflow Slack community! We are happy to share experiences, answer any questions, and welcome you to contribute to Metaflow.

    Acknowledgements:

    Thanks to Wenbing Bai, Jan Florjanczyk, Michael Li, Aliki Mavromoustaki, and Sejal Rai for help with use cases and figures. Thanks to our OSS contributors for making Metaflow a better product.


    Supporting Diverse ML Systems at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Enabling near real-time data analytics on the data lake

    Post Syndicated from Grab Tech original https://engineering.grab.com/enabling-near-realtime-data-analytics

    Introduction

    In the domain of data processing, data analysts run their ad hoc queries on the data lake. The lake serves as an interface between our analytics and production environment, preventing downstream queries from impacting upstream data ingestion pipelines. To ensure efficient data processing in the data lake, choosing appropriate storage formats is crucial.

    The vanilla data lake solution is built on top of cloud object storage with Hive metastore, where data files are written in Parquet format. Although this setup is optimised for scalable analytics query patterns, it struggles to handle frequent updates to the data due to two reasons:

    1. The Hive table format requires us to rewrite the Parquet files with the latest data. For instance, to update one record in a Hive unpartitioned table, we would need to read all the data, update the record, and write back the entire data set.
    2. Writing Parquet files is expensive due to the overhead of organising the data to a compressed columnar format, which is more complex than a row format.

    The issue is further exacerbated by the scheduled downstream transformations. These necessary steps, which clean and process the data for use, increase the latency because the total delay now includes the combined scheduled intervals of these processing jobs.

    Fortunately, the introduction of the Hudi format, which supports fast writes by allowing Avro and Parquet files to co-exist on a Merge On Read (MOR) table, opens up the possibility of having a data lake with minimal data latency. The concept of a commit timeline further allows data to be served with Atomicity, Consistency, Isolation, and Durability (ACID) guarantees.

    We employ different sets of configurations for the different characteristics of our input sources:

    1. High or low throughput. A high-throughput source refers to one that has a high level of activity. One example of this can be our stream of booking events generated from each customer transaction. On the other hand, a low-throughput source would be one that has a relative low level of activity. An example of this can be transaction events generated from reconciliation happening on a nightly basis.
    2. Kafka (unbounded) or Relational Database Sources (bounded). Our sinks have sources that can be broadly categorised into unbounded and bounded sources. Unbounded sources are usually related to transaction events materialised as Kafka topics, representing user-generated events as they interact with the Grab superapp. Bounded sources usually refer to Relational Database (RDS) sources, whose size is bound to storage provisioned.

    The following sections will delve into the differences between each source and our corresponding configurations optimised for them.

    High throughput source

    For our data sources with high throughput, we have chosen to write the files in MOR format since the writing of files in Avro format allows for fast writes to meet our latency requirements.

    Figure 1 Architecture for MOR tables

    As seen in Figure 1, we use Flink to perform the stream processing and write out log files in Avro format in our setup. We then set up a separate Spark writer which periodically converts the Avro files into Parquet format in the Hudi compaction process.

    We have further simplified the coordination between the Flink and Spark writers by enabling asynchronous services on the Flink writer so it can generate the compaction plans for Spark writers to act on. During the Spark job runs, it checks for available compaction plans and acts on them, placing the burden of orchestrating the writes solely on the Flink writer. This approach could help minimise potential concurrency problems that might otherwise arise, as there would be a single actor
    orchestrating the associated Hudi table services.

    Low throughput source

    Figure 2 Architecture for COW tables

    For low throughput sources, we gravitate towards the choice of Copy On Write (COW) tables given the simplicity of its design, since it only involves one component, which is the Flink writer. The downside is that it has higher data latency because this setup only generates Parquet format data snapshots at each checkpoint interval, which is typically about 10-15 minutes.

    Connecting to our Kafka (unbounded) data source

    Grab uses Protobuf as our central data format in Kafka, ensuring schema evolution compatibility. However, the derivation of the schema of these topics still requires some transformation to make it compatible with Hudi’s accepted schema. Some of these transformations include ensuring that Avro record fields do not contain just a single array field, and handling logical decimal schemas to transform them to fixed byte schema for Spark compatibility.

    Given the unbounded nature of the source, we decided to partition it by Kafka event time up to the hour level. This ensured that our Hudi operations would be faster. Parquet file writes would be faster since they would only affect files within the same partition, and each Parquet file within the same event time partition would have a bounded size given the monotonically increasing nature of Kafka event time.

    By partitioning tables by Kafka event time, we can further optimise compaction planning operations, since the amount of file lookups required is now reduced with the use of BoundedPartitionAwareCompactionStrategy. Only log files in recent partitions would be selected for compaction and the job manager need not list every partition to figure out which log files to select for compaction during the planning phase anymore.

    Connecting to our RDS (bounded) data source

    For our RDS, we decided to use the Flink Change Data Capture (CDC) connectors by Veverica to obtain the binlog streams. The RDS would then treat the Flink writer as a replication server and start streaming its binlog data to it for each MySQL change. The Flink CDC connector presents the data as a Kafka Connect (KC) Source record, since it uses the Debezium connector under the hood. It is then a straightforward task to deserialise these records and transform them into Hudi records, since
    the Avro schema and associated data changes are already captured within the KC source record.

    The obtained binlog timestamp is also emitted as a metric during consumption for us to monitor the observed data latency at the point of ingestion.

    Optimising for these sources involves two phases:

    1. First, assigning more resources for the cold start incremental snapshot process where Flink takes a snapshot of the current data state in the RDS and loads the Hudi table with that snapshot. This phase is usually resource-heavy as there are a lot of file writes and data ingested during this process.
    2. Once the snapshotting is completed, Flink would then start to process the binlog stream and the observed throughput would drop to a level similar to the DB write throughput. The resources required by the Flink writer at this stage would be much lower than in the snapshot phase.

    Indexing for Hudi tables

    Indexing is important for upserting Hudi tables when the writing engine performs updates, allowing it to efficiently locate the file groups of the data to be updated.

    As of version 0.14, the Flink engine only supports Bucket Index or Flink State Index. Bucket Index performs indexing of the file record by hashing the record key and matching it to a specific bucket of files indicated by the naming convention of the written data files. Flink State Index on the other hand stores the index map of record keys to files in memory.

    Given that our tables include unbounded Kafka sources, there is a possibility for our state indexes to grow indefinitely. Furthermore, the requirement of state preservation for Flink State Index across version deployments and configuration updates adds complexity to the overall solution.

    Thus, we opted for the simple Bucket Index for its simplicity and the fact that our Hudi table size per partition does not change drastically across the week. However, this comes with a limitation whereby the number of buckets cannot be updated easily and imposes a parallelism limit at which our Flink pipelines can scale. Thus, as traffic grows organically, we would find ourselves in a situation whereby our configuration grows obsolete and cannot handle the increased load.

    To resolve this going forward, using consistent hashing for the Bucket Index would be something to explore to optimise our Parquet file sizes and allow the number of buckets to grow seamlessly as traffic grows.

    Impact

    Fresh business metrics

    Post creation of our Hudi Data Ingestion solution, we have enabled various users such as our data analysts to perform ad hoc queries much more easily on data that has lower latency. Furthermore, Hudi tables can be seamlessly joined with Hive tables in Trino for additional context. This enabled the construction of operational dashboards reflecting fresh business metrics to our various operators, empowering them with the necessary information to quickly respond to any abnormalities (such as high-demand events like F1 or seasonal holidays).

    Quicker fraud detection

    Another significant user of our solution is our fraud detection analysts. This enabled them to rapidly access fresh transaction events and analyse them for fraudulent patterns, particularly during the emergence of a new attack pattern that hadn’t been detected by their rules engine. Our solution also allowed them to perform multiple ad hoc queries that involve lookbacks of various days’ worth of data without impacting our production RDS and Kafka clusters by using the data lake as the data interface, reducing the data latency to the minute level and, in turn, empowering them to respond more quickly to attacks.

    What’s next?

    As the landscape of data storage solutions evolves rapidly, we are eager to test and integrate new features like Record Level Indexing and the creation of Pre Join tables. This evolution extends beyond the Hudi community to other table formats such as IceBerg and DeltaLake. We remain ready to adapt ourselves to these changes and incorporate the advantages of each format into our data lake within Grab.

    References

    Join us

    Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

    Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

    Sequential A/B Testing Keeps the World Streaming Netflix Part 1: Continuous Data

    Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/sequential-a-b-testing-keeps-the-world-streaming-netflix-part-1-continuous-data-cba6c7ed49df

    Michael Lindon, Chris Sanden, Vache Shirikian, Yanjun Liu, Minal Mishra, Martin Tingley

    Using sequential anytime-valid hypothesis testing procedures to safely release software

    1. Spot the Difference

    Can you spot any difference between the two data streams below? Each observation is the time interval between a Netflix member hitting the play button and playback commencing, i.e., play-delay. These observations are from a particular type of A/B test that Netflix runs called a software canary or regression-driven experiment. More on that below — for now, what’s important is that we want to quickly and confidently identify any difference in the distribution of play-delay — or conclude that, within some tolerance, there is no difference.

    In this blog post, we will develop a statistical procedure to do just that, and describe the impact of these developments at Netflix. The key idea is to switch from a “fixed time horizon” to an “any-time valid” framing of the problem.

    Sequentially comparing two streams of measurements from treatment and control
    Figure 1. An example data stream for an A/B test where each observation represents play-delay for the control (left) and treatment (right). Can you spot any differences in the statistical distributions between the two data streams?

    2. Safe software deployment, canary testing, and play-delay

    Software engineering readers of this blog are likely familiar with unit, integration and load testing, as well as other testing practices that aim to prevent bugs from reaching production systems. Netflix also performs canary tests — software A/B tests between current and newer software versions. To learn more, see our previous blog post on Safe Updates of Client Applications.

    The purpose of a canary test is twofold: to act as a quality-control gate that catches bugs prior to full release, and to measure performance of the new software in the wild. This is carried out by performing a randomized controlled experiment on a small subset of users, where the treatment group receives the new software update and the control group continues to run the existing software. If any bugs or performance regressions are observed in the treatment group, then the full-scale release can be prevented, limiting the “impact radius” among the user base.

    One of the metrics Netflix monitors in canary tests is how long it takes for the video stream to start when a title is requested by a user. Monitoring this “play-delay” metric throughout releases ensures that the streaming performance of Netflix only ever improves as we release newer versions of the Netflix client. In Figure 1, the left side shows a real-time stream of play-delay measurements from users running the existing version of the Netflix client, while the right side shows play-delay measurements from users running the updated version. We ask ourselves: Are users of the updated client experiencing longer play-delays?

    We consider any increase in play-delay to be a serious performance regression and would prevent the release if we detect an increase. Critically, testing for differences in means or medians is not sufficient and does not provide a complete picture. For example, one situation we might face is that the median or mean play-delay is the same in treatment and control, but the treatment group experiences an increase in the upper quantiles of play-delay. This corresponds to the Netflix experience being degraded for those who already experience high play delays — likely our members on slow or unstable internet connections. Such changes should not be ignored by our testing procedure.

    For a complete picture, we need to be able to reliably and quickly detect an upward shift in any part of the play-delay distribution. That is, we must do inference on and test for any differences between the distributions of play-delay in treatment and control.

    To summarize, here are the design requirements of our canary testing system:

    1. Identify bugs and performance regressions, as measured by play-delay, as quickly as possible. Rationale: To minimize member harm, if there is any problem with the streaming quality experienced by users in the treatment group we need to abort the canary and roll back the software change as quickly as possible.
    2. Strictly control false positive (false alarm) probabilities. Rationale: This system is part of a semi-automated process for all client deployments. A false positive test unnecessarily interrupts the software release process, reducing the velocity of software delivery and sending developers looking for bugs that do not exist.
    3. This system should be able to detect any change in the distribution. Rationale: We care not only about changes in the mean or median, but also about changes in tail behaviour and other quantiles.

    We now build out a sequential testing procedure that meets these design requirements.

    3. Sequential Testing: The Basics

    Standard statistical tests are fixed-n or fixed-time horizon: the analyst waits until some pre-set amount of data is collected, and then performs the analysis a single time. The classic t-test, the Kolmogorov-Smirnov test, and the Mann-Whitney test are all examples of fixed-n tests. A limitation of fixed-n tests is that they can only be performed once — yet in situations like the above, we want to be testing frequently to detect differences as soon as possible. If you apply a fixed-n test more than once, then you forfeit the Type-I error or false positive guarantee.

    Here’s a quick illustration of how fixed-n tests fail under repeated analysis. In the following figure, each red line traces out the p-value when the Mann-Whitney test is repeatedly applied to a data set as 10,000 observations accrue in both treatment and control. Each red line shows an independent simulation, and in each case, there is no difference between treatment and control: these are simulated A/A tests.

    The black dots mark where the p-value falls below the standard 0.05 rejection threshold. An alarming 70% of simulations declare a significant difference at some point in time, even though, by construction, there is no difference: the actual false positive rate is much higher than the nominal 0.05. Exactly the same behaviour would be observed for the Kolmogorov-Smirnov test.

    increased false positives when peeking at mann-whitney test
    Figure 2. 100 Sample paths of the p-value process simulated under the null hypothesis shown in red. The dotted black line indicates the nominal alpha=0.05 level. Black dots indicate where the p-value process dips below the alpha=0.05 threshold, indicating a false rejection of the null hypothesis. A total of 66 out of 100 A/A simulations falsely rejected the null hypothesis.

    This is a manifestation of “peeking”, and much has been written about the downside risks of this practice (see, for example, Johari et al. 2017). If we restrict ourselves to correctly applied fixed-n statistical tests, where we analyze the data exactly once, we face a difficult tradeoff:

    • Perform the test early on, after a small amount of data has been collected. In this case, we will only be powered to detect larger regressions. Smaller performance regressions will not be detected, and we run the risk of steadily eroding the member experience as small regressions accrue.
    • Perform the test later, after a large amount of data has been collected. In this case, we are powered to detect small regressions — but in the case of large regressions, we expose members to a bad experience for an unnecessarily long period of time.

    Sequential, or “any-time valid”, statistical tests overcome these limitations. They permit for peeking –in fact, they can be applied after every new data point arrives– while providing false positive, or Type-I error, guarantees that hold throughout time. As a result, we can continuously monitor data streams like in the image above, using confidence sequences or sequential p-values, and rapidly detect large regressions while eventually detecting small regressions.

    Despite relatively recent adoption in the context of digital experimentation, these methods have a long academic history, with initial ideas dating back to Abraham Wald’s Sequential Tests of Statistical Hypotheses from 1945. Research in this area remains active, and Netflix has made a number of contributions in the last few years (see the references in these papers for a more complete literature review):

    In this and following blogs, we will describe both the methods we’ve developed and their applications at Netflix. The remainder of this post discusses the first paper above, which was published at KDD ’22 (and available on ArXiV). We will keep it high level — readers interested in the technical details can consult the paper.

    4. A sequential testing solution

    Differences in Distributions

    At any point in time, we can estimate the empirical quantile functions for both treatment and control, based on the data observed so far.

    empirical quantile functions for treatment and control data
    Figure 3: Empirical quantile function for control (left) and treatment (right) at a snapshot in time after starting the canary experiment. This is from actual Netflix data, so we’ve suppressed numerical values on the y-axis.

    These two plots look pretty close, but we can do better than an eyeball comparison — and we want the computer to be able to continuously evaluate if there is any significant difference between the distributions. Per the design requirements, we also wish to detect large effects early, while preserving the ability to detect small effects eventually — and we want to maintain the false positive probability at a nominal level while permitting continuous analysis (aka peeking).

    That is, we need a sequential test on the difference in distributions.

    Obtaining “fixed-horizon” confidence bands for the quantile function can be achieved using the DKWM inequality. To obtain time-uniform confidence bands, however, we use the anytime-valid confidence sequences from Howard and Ramdas (2022) [arxiv version]. As the coverage guarantee from these confidence bands holds uniformly across time, we can watch them become tighter without being concerned about peeking. As more data points stream in, these sequential confidence bands continue to shrink in width, which means any difference in the distribution functions — if it exists — will eventually become apparent.

    Anytime-valid confidence bands on treatment and control quantile functions
    Figure 4: 97.5% Time-Uniform Confidence bands on the quantile function for control (left) and treatment (right)

    Note each frame corresponds to a point in time after the experiment began, not sample size. In fact, there is no requirement that each treatment group has the same sample size.

    Differences are easier to see by visualizing the difference between the treatment and control quantile functions.

    Confidence sequences on quantile differences and sequential p-value
    Figure 5: 95% Time-Uniform confidence band on the quantile difference function Q_b(p) — Q_a(p) (left). The sequential p-value (right).

    As the sequential confidence band on the treatment effect quantile function is anytime-valid, the inference procedure becomes rather intuitive. We can continue to watch these confidence bands tighten, and if at any point the band no longer covers zero at any quantile, we can conclude that the distributions are different and stop the test. In addition to the sequential confidence bands, we can also construct a sequential p-value for testing that the distributions differ. Note from the animation that the moment the 95% confidence band over quantile treatment effects excludes zero is the same moment that the sequential p-value falls below 0.05: as with fixed-n tests, there is consistency between confidence intervals and p-values.

    There are many multiple testing concerns in this application. Our solution controls Type-I error across all quantiles, all treatment groups, and all joint sample sizes simultaneously (see our paper, or Howard and Ramdas for details). Results hold for all quantiles, and for all times.

    5. Impact at Netflix

    Releasing new software always carries risk, and we always want to reduce the risk of service interruptions or degradation to the member experience. Our canary testing approach is another layer of protection for preventing bugs and performance regressions from slipping into production. It’s fully automated and has become an integral part of the software delivery process at Netflix. Developers can push to production with peace of mind, knowing that bugs and performance regressions will be rapidly caught. The additional confidence empowers developers to push to production more frequently, reducing the time to market for upgrades to the Netflix client and increasing our rate of software delivery.

    So far this system has successfully prevented a number of serious bugs from reaching our end users. We detail one example.

    Case study: Safe Rollout of Netflix Client Application

    Figures 3–5 are taken from a canary test in which the behaviour of the client application was modified application (actual numerical values of play-delay have been suppressed). As we can see, the canary test revealed that the new version of the client increases a number of quantiles of play-delay, with the median and 75% percentile of play experiencing relative increases of at least 0.5% and 1% respectively. The timeseries of the sequential p-value shows that, in this case, we were able to reject the null of no change in distribution at the 0.05 level after about 60 seconds. This provides rapid feedback in the software delivery process, allowing developers to test the performance of new software and quickly iterate.

    6. What’s next?

    If you are curious about the technical details of the sequential tests for quantiles developed here, you can learn all about the math in our KDD paper (also available on arxiv).

    You might also be wondering what happens if the data are not continuous measurements. Errors and exceptions are critical metrics to log when deploying software, as are many other metrics which are best defined in terms of counts. Stay tuned — our next post will develop sequential testing procedures for count data.


    Sequential A/B Testing Keeps the World Streaming Netflix
    Part 1: Continuous Data
    was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

    Kafka on Kubernetes: Reloaded for fault tolerance

    Post Syndicated from Grab Tech original https://engineering.grab.com/kafka-on-kubernetes

    Introduction

    Coban – Grab’s real-time data streaming platform – has been operating Kafka on Kubernetes with Strimzi in
    production for about two years. In a previous article (Zero trust with Kafka), we explained how we leveraged Strimzi to enhance the security of our data streaming offering.

    In this article, we are going to describe how we improved the fault tolerance of our initial design, to the point where we no longer need to intervene if a Kafka broker is unexpectedly terminated.

    Problem statement

    We operate Kafka in the AWS Cloud. For the Kafka on Kubernetes design described in this article, we rely on Amazon Elastic Kubernetes Service (EKS), the managed Kubernetes offering by AWS, with the worker nodes deployed as self-managed nodes on Amazon Elastic Compute Cloud (EC2).

    To make our operations easier and limit the blast radius of any incidents, we deploy exactly one Kafka cluster for each EKS cluster. We also give a full worker node to each Kafka broker. In terms of storage, we initially relied on EC2 instances with non-volatile memory express (NVMe) instance store volumes for
    maximal I/O performance. Also, each Kafka cluster is accessible beyond its own Virtual Private Cloud (VPC) via a VPC Endpoint Service.

    Fig. 1 Initial design of a 3-node Kafka cluster running on Kubernetes.

    Fig. 1 shows a logical view of our initial design of a 3-node Kafka on Kubernetes cluster, as typically run by Coban. The Zookeeper and Cruise-Control components are not shown for clarity.

    There are four Kubernetes services (1): one for the initial connection – referred to as “bootstrap” – that redirects incoming traffic to any Kafka pods, plus one for each Kafka pod, for the clients to target each Kafka broker individually (a requirement to produce or consume from/to a partition that resides on any particular Kafka broker). Four different listeners on the Network Load Balancer (NLB) listening on four different TCP ports, enable the Kafka clients to target either the bootstrap
    service or any particular Kafka broker they need to reach. This is very similar to what we previously described in Exposing a Kafka Cluster via a VPC Endpoint Service.

    Each worker node hosts a single Kafka pod (2). The NVMe instance store volume is used to create a Kubernetes Persistent Volume (PV), attached to a pod via a Kubernetes Persistent Volume Claim (PVC).

    Lastly, the worker nodes belong to Auto-Scaling Groups (ASG) (3), one by Availability Zone (AZ). Strimzi adds in node affinity to make sure that the brokers are evenly distributed across AZs. In this initial design, ASGs are not for auto-scaling though, because we want to keep the size of the cluster under control. We only use ASGs – with a fixed size – to facilitate manual scaling operation and to automatically replace the terminated worker nodes.

    With this initial design, let us see what happens in case of such a worker node termination.

    Fig. 2 Representation of a worker node termination. Node C is terminated and replaced by node D. However the Kafka broker 3 pod is unable to restart on node D.

    Fig. 2 shows the worker node C being terminated along with its NVMe instance store volume C, and replaced (by the ASG) by a new worker node D and its new, empty NVMe instance store volume D. On start-up, the worker node D automatically joins the Kubernetes cluster. The Kafka broker 3 pod that was running on the faulty worker node C is scheduled to restart on the new worker node D.

    Although the NVMe instance store volume C is terminated along with the worker node C, there is no data loss because all of our Kafka topics are configured with a minimum of three replicas. The data is poised to be copied over from the surviving Kafka brokers 1 and 2 back to Kafka broker 3, as soon as Kafka broker 3 is effectively restarted on the worker node D.

    However, there are three fundamental issues with this initial design:

    1. The Kafka clients that were in the middle of producing or consuming to/from the partition leaders of Kafka broker 3 are suddenly facing connection errors, because the broker was not gracefully demoted beforehand.
    2. The target groups of the NLB for both the bootstrap connection and Kafka broker 3 still point to the worker node C. Therefore, the network communication from the NLB to Kafka broker 3 is broken. A manual reconfiguration of the target groups is required.
    3. The PVC associating the Kafka broker 3 pod with its instance store PV is unable to automatically switch to the new NVMe instance store volume of the worker node D. Indeed, static provisioning is an intrinsic characteristic of Kubernetes local volumes. The PVC is still in Bound state, so Kubernetes does not take any action. However, the actual storage beneath the PV does not exist anymore. Without any storage, the Kafka broker 3 pod is unable to start.

    At this stage, the Kafka cluster is running in a degraded state with only two out of three brokers, until a Coban engineer intervenes to reconfigure the target groups of the NLB and delete the zombie PVC (this, in turn, triggers its re-creation by Strimzi, this time using the new instance store PV).

    In the next section, we will see how we have managed to address the three issues mentioned above to make this design fault-tolerant.

    Solution

    Graceful Kafka shutdown

    To minimise the disruption for the Kafka clients, we leveraged the AWS Node Termination Handler (NTH). This component provided by AWS for Kubernetes environments is able to cordon and drain a worker node that is going to be terminated. This draining, in turn, triggers a graceful shutdown of the Kafka
    process by sending a polite SIGTERM signal to all pods running on the worker node that is being drained (instead of the brutal SIGKILL of a normal termination).

    The termination events of interest that are captured by the NTH are:

    • Scale-in operations by an ASG.
    • Manual termination of an instance.
    • AWS maintenance events, typically EC2 instances scheduled for upcoming retirement.

    This suffices for most of the disruptions our clusters can face in normal times and our common maintenance operations, such as terminating a worker node to refresh it. Only sudden hardware failures (AWS issue events) would fall through the cracks and still trigger errors on the Kafka client side.

    The NTH comes in two modes: Instance Metadata Service (IMDS) and Queue Processor. We chose to go with the latter as it is able to capture a broader range of events, widening the fault tolerance capability.

    Scale-in operations by an ASG

    Fig. 3 Architecture of the NTH with the Queue Processor.

    Fig. 3 shows the NTH with the Queue Processor in action, and how it reacts to a scale-in operation (typically triggered manually, during a maintenance operation):

    1. As soon as the scale-in operation is triggered, an Auto Scaling lifecycle hook is invoked to pause the termination of the instance.
    2. Simultaneously, an Auto Scaling lifecycle hook event is issued to an Amazon Simple Queue Service (SQS) queue. In Fig. 3, we have also materialised EC2 events (e.g. manual termination of an instance, AWS maintenance events, etc.) that transit via Amazon EventBridge to eventually end up in the same SQS queue. We will discuss EC2 events in the next two sections.
    3. The NTH, a pod running in the Kubernetes cluster itself, constantly polls that SQS queue.
    4. When a scale-in event pertaining to a worker node of the Kubernetes cluster is read from the SQS queue, the NTH sends to the Kubernetes API the instruction to cordon and drain the impacted worker node.
    5. On draining, Kubernetes sends a SIGTERM signal to the Kafka pod residing on the worker node.
    6. Upon receiving the SIGTERM signal, the Kafka pod gracefully migrates the leadership of its leader partitions to other brokers of the cluster before shutting down, in a transparent manner for the clients. This behaviour is ensured by the controlled.shutdown.enable parameter of Kafka, which is enabled by default.
    7. Once the impacted worker node has been drained, the NTH eventually resumes the termination of the instance.

    Strimzi also comes with a terminationGracePeriodSeconds parameter, which we have set to 180 seconds to give the Kafka pods enough time to migrate all of their partition leaders gracefully on termination. We have verified that this is enough to migrate all partition leaders on our Kafka clusters (about 60 seconds for 600 partition leaders).

    Manual termination of an instance

    The Auto Scaling lifecycle hook that pauses the termination of an instance (Fig. 3, step 1) as well as the corresponding resuming by the NTH (Fig. 3, step 7) are invoked only for ASG scaling events.

    In case of a manual termination of an EC2 instance, the termination is captured as an EC2 event that also reaches the NTH. Upon receiving that event, the NTH cordons and drains the impacted worker node. However, the instance is immediately terminated, most likely before the leadership of all of its Kafka partition leaders has had the time to get migrated to other brokers.

    To work around this and let a manual termination of an EC2 instance also benefit from the ASG lifecycle hook, the instance must be terminated using the terminate-instance-in-auto-scaling-group AWS CLI command.

    AWS maintenance events

    For AWS maintenance events such as instances scheduled for upcoming retirement, the NTH acts immediately when the event is first received (typically adequately in advance). It cordons and drains the soon-to-be-retired worker node, which in turn triggers the SIGTERM signal and the graceful termination of Kafka as described above. At this stage, the impacted instance is not terminated, so the Kafka partition leaders have plenty of time to complete their migration to other brokers.

    However, the evicted Kafka pod has nowhere to go. There is a need for spinning up a new worker node for it to be able to eventually restart somewhere.

    To make this happen seamlessly, we doubled the maximum size of each of our ASGs and installed the Kubernetes Cluster Autoscaler. With that, when such a maintenance event is received:

    • The worker node scheduled for retirement is cordoned and drained by the NTH. The state of the impacted Kafka pod becomes Pending.
    • The Kubernetes Cluster Autoscaler comes into play and triggers the corresponding ASG to spin up a new EC2 instance that joins the Kubernetes cluster as a new worker node.
    • The impacted Kafka pod restarts on the new worker node.
    • The Kubernetes Cluster Autoscaler detects that the previous worker node is now under-utilised and terminates it.

    In this scenario, the impacted Kafka pod only remains in Pending state for about four minutes in total.

    In case of multiple simultaneous AWS maintenance events, the Kubernetes scheduler would honour our PodDisruptionBudget and not evict more than one Kafka pod at a time.

    Dynamic NLB configuration

    To automatically map the NLB’s target groups with a newly spun up EC2 instance, we leveraged the AWS Load Balancer Controller (LBC).

    Let us see how it works.

    Fig. 4 Architecture of the LBC managing the NLB’s target groups via TargetGroupBinding custom resources.

    Fig. 4 shows how the LBC automates the reconfiguration of the NLB’s target groups:

    1. It first retrieves the desired state described in Kubernetes custom resources (CR) of type TargetGroupBinding. There is one such resource per target group to maintain. Each TargetGroupBinding CR associates its respective target group with a Kubernetes service.
    2. The LBC then watches over the changes of the Kubernetes services that are referenced in the TargetGroupBinding CRs’ definition, specifically the private IP addresses exposed by their respective Endpoints resources.
    3. When a change is detected, it dynamically updates the corresponding NLB’s target groups with those IP addresses as well as the TCP port of the target containers (containerPort).

    This automated design sets up the NLB’s target groups with IP addresses (targetType: ip) instead of EC2 instance IDs (targetType: instance). Although the LBC can handle both target types, the IP address approach is actually more straightforward in our case, since each pod has a routable private IP address in the AWS subnet, thanks to the AWS Container Networking Interface (CNI) plug-in.

    This dynamic NLB configuration design comes with a challenge. Whenever we need to update the Strimzi CR, the rollout of the change to each Kafka pod in a rolling update fashion is happening too fast for the NLB. This is because the NLB inherently takes some time to mark each target as healthy before enabling it. The Kafka brokers that have just been rolled out start advertising their broker-specific endpoints to the Kafka clients via the bootstrap service, but those
    endpoints are actually not immediately available because the NLB is still checking their health. To mitigate this, we have reduced the HealthCheckIntervalSeconds and HealthyThresholdCount parameters of each target group to their minimum values of 5 and 2 respectively. This reduces the maximum delay for the NLB to detect that a target has become healthy to 10 seconds. In addition, we have configured the LBC with a Pod Readiness Gate. This feature makes the Strimzi rolling deployment wait for the health check of the NLB to pass, before marking the current pod as Ready and proceeding with the next pod.

    Fig. 5 Steps for a Strimzi rolling deployment with a Pod Readiness Gate. Only one Kafka broker and one NLB listener and target group are shown for simplicity.

    Fig. 5 shows how the Pod Readiness Gate works during a Strimzi rolling deployment:

    1. The old Kafka pod is terminated.
    2. The new Kafka pod starts up and joins the Kafka cluster. Its individual endpoint for direct access via the NLB is immediately advertised by the Kafka cluster. However, at this stage, it is not reachable, as the target group of the NLB still points to the IP address of the old Kafka pod.
    3. The LBC updates the target group of the NLB with the IP address of the new Kafka pod, but the NLB health check has not yet passed, so the traffic is not forwarded to the new Kafka pod just yet.
    4. The LBC then waits for the NLB health check to pass, which takes 10 seconds. Once the NLB health check has passed, the NLB resumes forwarding the traffic to the Kafka pod.
    5. Finally, the LBC updates the pod readiness gate of the new Kafka pod. This informs Strimzi that it can proceed with the next pod of the rolling deployment.

    Data persistence with EBS

    To address the challenge of the residual PV and PVC of the old worker node preventing Kubernetes from mounting the local storage of the new worker node after a node rotation, we adopted Elastic Block Store (EBS) volumes instead of NVMe instance store volumes. Contrary to the latter, EBS volumes can conveniently be attached and detached. The trade-off is that their performance is significantly lower.

    However, relying on EBS comes with additional benefits:

    • The cost per GB is lower, compared to NVMe instance store volumes.
    • Using EBS decouples the size of an instance in terms of CPU and memory from its storage capacity, leading to further cost savings by independently right-sizing the instance type and its storage. Such a separation of concerns also opens the door to new use cases requiring disproportionate amounts of storage.
    • After a worker node rotation, the time needed for the new node to get back in sync is faster, as it only needs to catch up the data that was produced during the downtime. This leads to shorter maintenance operations and higher iteration speed. Incidentally, the associated inter-AZ traffic cost is also lower, since there is less data to transfer among brokers during this time.
    • Increasing the storage capacity is an online operation.
    • Data backup is supported by taking snapshots of EBS volumes.

    We have verified with our historical monitoring data that the performance of EBS General Purpose 3 (gp3) volumes is significantly above our maximum historical values for both throughput and I/O per second (IOPS), and we have successfully benchmarked a test EBS-based Kafka cluster. We have also set up new monitors to be alerted in case we need to
    provision either additional throughput or IOPS, beyond the baseline of EBS gp3 volumes.

    With that, we updated our instance types from storage optimised instances to either general purpose or memory optimised instances. We added the Amazon EBS Container Storage Interface (CSI) driver to the Kubernetes cluster and created a new Kubernetes storage class to let the cluster dynamically provision EBS gp3 volumes.

    We configured Strimzi to use that storage class to create any new PVCs. This makes Strimzi able to automatically create the EBS volumes it needs, typically when the cluster is first set up, but also to attach/detach the volumes to/from the EC2 instances whenever a Kafka pod is relocated to a different worker node.

    Note that the EBS volumes are not part of any ASG Launch Template, nor do they scale automatically with the ASGs.

    Fig. 6 Steps for the Strimzi Operator to create an EBS volume and attach it to a new Kafka pod.

    Fig. 6 illustrates how this works when Strimzi sets up a new Kafka broker, for example the first broker of the cluster in the initial setup:

    1. The Strimzi Cluster Operator first creates a new PVC, specifying a volume size and EBS gp3 as its storage class. The storage class is configured with the EBS CSI Driver as the volume provisioner, so that volumes are dynamically provisioned [1]. However, because it is also set up with volumeBindingMode: WaitForFirstConsumer, the volume is not yet provisioned until a pod actually claims the PVC.
    2. The Strimzi Cluster Operator then creates the Kafka pod, with a reference to the newly created PVC. The pod is scheduled to start, which in turn claims the PVC.
    3. This triggers the EBS CSI Controller. As the volume provisioner, it dynamically creates a new EBS volume in the AWS VPC, in the AZ of the worker node where the pod has been scheduled to start.
    4. It then attaches the newly created EBS volume to the corresponding EC2 instance.
    5. After that, it creates a Kubernetes PV with nodeAffinity and claimRef specifications, making sure that the PV is reserved for the Kafka broker 1 pod.
    6. Lastly, it updates the PVC with the reference of the newly created PV. The PVC is now in Bound state and the Kafka pod can start.

    One important point to take note of is that EBS volumes can only be attached to EC2 instances residing in their own AZ. Therefore, when rotating a worker node, the EBS volume can only be re-attached to the new instance if both old and new instances reside in the same AZ. A simple way to guarantee this is to set up one ASG per AZ, instead of a single ASG spanning across 3 AZs.

    Also, when such a rotation occurs, the new broker only needs to synchronise the recent data produced during the brief downtime, which is typically an order of magnitude faster than replicating the entire volume (depending on the overall retention period of the hosted Kafka topics).

    Table 1 Comparison of the resynchronization of the Kafka data after a broker rotation between the initial design and the new design with EBS volumes.
    Initial design (NVMe instance store volumes) New design (EBS volumes)
    Data to synchronise All of the data Recent data produced during the brief downtime
    Function of (primarily) Retention period Downtime
    Typical duration Hours Minutes

    Outcome

    With all that, let us revisit the initial scenario, where a malfunctioning worker node is being replaced by a fresh new node.

    Fig. 7 Representation of a worker node termination after implementing the solution. Node C is terminated and replaced by node D. This time, the Kafka broker 3 pod is able to start and serve traffic.

    Fig. 7 shows the worker node C being terminated and replaced (by the ASG) by a new worker node D, similar to what we have described in the initial problem statement. The worker node D automatically joins the Kubernetes cluster on start-up.

    However, this time, a seamless failover takes place:

    1. The Kafka clients that were in the middle of producing or consuming to/from the partition leaders of Kafka broker 3 are gracefully redirected to Kafka brokers 1 and 2, where Kafka has migrated the leadership of its leader partitions.
    2. The target groups of the NLB for both the bootstrap connection and Kafka broker 3 are automatically updated by the LBC. The connectivity between the NLB and Kafka broker 3 is immediately restored.
    3. Triggered by the creation of the Kafka broker 3 pod, the Amazon EBS CSI driver running on the worker node D re-attaches the EBS volume 3 that was previously attached to the worker node C, to the worker node D instead. This enables Kubernetes to automatically re-bind the corresponding PV and PVC to Kafka broker 3 pod. With its storage dependency resolved, Kafka broker 3 is able to start successfully and re-join the Kafka cluster. From there, it only needs to catch up with the new data that was produced
      during its short downtime, by replicating it from Kafka brokers 1 and 2.

    With this fault-tolerant design, when an EC2 instance is being retired by AWS, no particular action is required from our end.

    Similarly, our EKS version upgrades, as well as any operations that require rotating all worker nodes of the cluster in general, are:

    • Simpler and less error-prone: We only need to rotate each instance in sequence, with no need for manually reconfiguring the target groups of the NLB and deleting the zombie PVCs anymore.
    • Faster: The time between each instance rotation is limited to the short amount of time it takes for the restarted Kafka broker to catch up with the new data.
    • More cost-efficient: There is less data to transfer across AZs (which is charged by AWS).

    It is worth noting that we have chosen to omit Zookeeper and Cruise Control in this article, for the sake of clarity and simplicity. In reality, all pods in the Kubernetes cluster – including Zookeeper and Cruise Control – now benefit from the same graceful stop, triggered by the AWS termination events and the NTH. Similarly, the EBS CSI driver improves the fault tolerance of any pods that use EBS volumes for persistent storage, which includes the Zookeeper pods.

    Challenges faced

    One challenge that we are facing with this design lies in the EBS volumes’ management.

    On the one hand, the size of EBS volumes cannot be increased consecutively before the end of a cooldown period (minimum of 6 hours and can exceed 24 hours in some cases [2]). Therefore, when we need to urgently extend some EBS volumes because the size of a Kafka topic is suddenly growing, we need to be relatively generous when sizing the new required capacity and add a comfortable security margin, to make sure that we are not running out of storage in the short run.

    On the other hand, shrinking a Kubernetes PV is not a supported operation. This can affect the cost efficiency of our design if we overprovision the storage capacity by too much, or in case the workload of a particular cluster organically diminishes.

    One way to mitigate this challenge is to tactically scale the cluster horizontally (ie. adding new brokers) when there is a need for more storage and the existing EBS volumes are stuck in a cooldown period, or when the new storage need is only temporary.

    What’s next?

    In the future, we can improve the NTH’s capability by utilising webhooks. Upon receiving events from SQS, the NTH can also forward the events to the specified webhook URLs.

    This can potentially benefit us in a few ways, e.g.:

    • Proactively spinning up a new instance without waiting for the old one to be terminated, whenever a termination event is received. This would shorten the rotation time even further.
    • Sending Slack notifications to Coban engineers to keep them informed of any actions taken by the NTH.

    We would need to develop and maintain an application that receives webhook events from the NTH and performs the necessary actions.

    In addition, we are also rolling out Karpenter to replace the Kubernetes Cluster Autoscaler, as it is able to spin up new instances slightly faster, helping reduce the four minutes delay a Kafka pod remains in Pending state during a node rotation. Incidentally, Karpenter also removes the need for setting up one ASG by AZ, as it is able to deterministically provision instances in a specific AZ, for example where a particular EBS volume resides.

    Lastly, to ensure that the performance of our EBS gp3 volumes is both sufficient and cost-efficient, we want to explore autoscaling their throughput and IOPS beyond the baseline, based on the usage metrics collected by our monitoring stack.

    References

    [1] Dynamic Volume Provisioning | Kubernetes

    [2] Troubleshoot EBS volume stuck in Optimizing state during modification | AWS re:Post

    We would like to thank our team members and Grab Kubernetes gurus that helped review and improve this blog before publication: Will Ho, Gable Heng, Dewin Goh, Vinnson Lee, Siddharth Pandey, Shi Kai Ng, Quang Minh Tran, Yong Liang Oh, Leon Tay, Tuan Anh Vu.

    Join us

    Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

    Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

    Our First Netflix Data Engineering Summit

    Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/our-first-netflix-data-engineering-summit-f326b0589102

    Holden Karau Elizabeth Stone Pedro Duarte Chris Stephens Pallavi Phadnis Lee Woodridge Mark Cho Guil Pires Sujay Jain Tristan Reid Senthilnathan Athinarayanan Bharath Mummadisetty Abhinaya Shetty Judit Lantos Amanuel Kahsay Dao Mi Mick Dreeling Chris Colburn and Agata Gryzbek

    Introduction

    Earlier this summer Netflix held our first-ever Data Engineering Forum. Engineers from across the company came together to share best practices on everything from Data Processing Patterns to Building Reliable Data Pipelines. The result was a series of talks which we are now sharing with the rest of the Data Engineering community!

    You can find each of the talks below with a short description of each, or you can go straight to the playlist on YouTube here.

    The Talks

    The Netflix Data Engineering Stack

    Chris Stephens, Data Engineer, Content & Studio and Pedro Duarte, Software Engineer, Consolidated Logging walk engineers new to Netflix through the building blocks of the Netflix Data Engineering stack. Learn more about how batch and streaming data pipelines are built at Netflix.

    Data Processing Patterns

    Lee Woodridge and Pallavi Phadnis, Data Engineers at Netflix, talk about how you can apply different processing strategies for your batch pipelines by implementing generic abstractions to help scale, be more efficient, handle late-arriving data, and be more fault tolerant.

    Streaming SQL on Data Mesh using Apache Flink

    Mark Cho, Guil Pires and Sujay Jain, Engineers from the Netflix Data Platform talk about how a managed Streaming SQL using Apache Flink can help unlock new Stream Processing use cases at Netflix. You can read more about Data Mesh, Netflix’s next-generation stream processing platform, here

    Building Reliable Data Pipelines

    Holden Karau, OSS Engineer, Data Platform Engineering, talks about the importance of reliable data pipelines and how to build them covering tools from testing to validation and auditing. The talk uses Apache Spark as an example, but the concepts generalize regardless of your specific tools.

    Knowledge Management — Leveraging Institutional Data

    Tristan Reid, software engineer, shares experiences about the Knowledge Management project at Netflix, which seeks to leverage language modeling techniques and metadata from internal systems to improve the impact of the >100K memos that circulate within the company.

    Psyberg, An Incremental ETL Framework Using Iceberg

    Abhinaya Shetty and Bharath Mummadisetty, Data Engineers from Netflix’s Membership Data Engineering team, introduce Psyberg, an incremental ETL framework. Learn about how Psyberg leverages Iceberg metadata to handle late-arriving data, and improves data pipelines while simplifying on-call life!

    Start/Stop/Continue for optimizing complex ETL jobs

    Judit Lantos, Data Engineer, Member Experience Data Engineering, shares a case study to demonstrate an effective approach for optimizing complex ETL jobs.

    Media Data for ML Studio Creative Production

    In the last 2 decades, Netflix has revolutionized the way video content is consumed, however, there is significant work to be done in revolutionizing how movies and tv shows are made. In this video, Sr. Data Engineers Amanual Kahsay and Dao Mi showcase how data and insights are being utilized to accomplish such a vision.

    We hope that our fellow members of the Data Engineering Community find these videos useful and engaging. Please follow our Netflix Data Twitter account for updates and notifications of future Data Engineering Summits!

    Mick Dreeling, Chris Colburn


    Our First Netflix Data Engineering Summit was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

    An elegant platform

    Post Syndicated from Grab Tech original https://engineering.grab.com/an-elegant-platform

    Coban is Grab’s real-time data streaming platform team. As a platform team, we thrive on providing our internal users from all verticals with self-served data-streaming resources, such as Kafka topics, Flink and Change Data Capture (CDC) pipelines, various kinds of Kafka-Connect connectors, as well as Apache Zeppelin notebooks, so that they can effortlessly leverage real-time data to build intelligent applications and services.

    In this article, we present our journey from pure Infrastructure-as-Code (IaC) towards a more sophisticated control plane that has revolutionised the way data streaming resources are self-served at Grab. This change also leads to improved scalability, stability, security, and user adoption of our data streaming platform.

    Problem statement

    In the early ages of public cloud, it was a common practice to create virtual resources by clicking through the web console of a cloud provider, which is sometimes referred to as ClickOps.

    ClickOps has many downsides, such as:

    • Inability to review, track, and audit changes to the infrastructure.
    • Inability to massively scale the infrastructure operations.
    • Inconsistencies between environments, e.g. staging and production.
    • Inability to quickly recover from a disaster by re-creating the infrastructure at a different location.

    That said, ClickOps has one tremendous advantage; it makes creating resources using a graphical User Interface (UI) fairly easy for anyone like Infrastructure Engineers, Software Engineers, Data Engineers etc. This also leads to a high iteration speed towards innovation in general.

    IaC resolved many of the limitations of ClickOps, such as:

    • Changes are committed to a Version Control System (VCS) like Git: They can be reviewed by peers before being merged. The full history of all changes is available for investigating issues and for audit.
    • The infrastructure operations scale better: Code for similar pieces of infrastructure can be modularised. Changes can be rolled out automatically by Continuous Integration (CI) pipelines in the VCS system, when a change is merged to the main branch.
    • The same code can be used to deploy the staging and production environments consistently.
    • The infrastructure can be re-created anytime from its source code, in case of a disaster.

    However, IaC unwittingly posed a new entry barrier too, requiring the learning of new tools like Ansible, Puppet, Chef, Terraform, etc.

    Some organisations set up dedicated Site Reliability Engineer (SRE) teams to centrally manage, operate, and support those tools and the infrastructure as a whole, but that soon created the potential of new bottlenecks in the path to innovation.

    On the other hand, others let engineering teams manage their own infrastructure, and Grab adopted that same approach. We use Terraform to manage infrastructure, and all teams are expected to have select engineers who have received Terraform training and have a clear understanding of it.

    In this context, Coban’s platform initially started as a handful of Git repositories where users had to submit their Merge Requests (MR) of Terraform code to create their data streaming resources. Once reviewed by a Coban engineer, those Terraform changes would be applied by a CI pipeline running Atlantis.

    While this was a meaningful first step towards self-service and platformisation of Coban’s offering within Grab, it had several significant downsides:

    • Stability: Due to the lack of control on the Terraform changes, the CI pipeline was prone to human errors and frequent failures. For example, users would initiate a new Terraform project by duplicating an existing one, but then would forget to change the location of the remote Terraform state, leading to the in-place replacement of an existing resource.
    • Scalability: The Coban team needed to review all MRs and provide ad hoc support whenever the pipeline failed.
    • Security: In the absence of Identity and Access Management (IAM), MRs could potentially contain changes pertaining to other teams’ resources, or even changes to Coban’s core infrastructure, with code review as the only guardrail.
    • Limited user growth: We could only acquire users who were well-versed in Terraform.

    It soon became clear that we needed to build a layer of abstraction between our users and the Terraform code, to increase the level of control and lower the entry barrier to our platform, while still retaining all of the benefits of IaC under the hood.

    Solution

    We designed and built an in-house three-tier control plane made of:

    • Coban UI, a front-end web interface, providing our users with a seamless ClickOps experience.
    • Heimdall, the Go back-end of the web interface, transforming ClickOps into IaC.
    • Khone, the storage and provisioner layer, a Git repository storing Terraform code and metadata of all resources as well as the CI pipelines to plan and apply the changes.

    In the next sections, we will deep dive in those three components.

    Fig. 1 Simplified architecture of a request flowing from the user to the Coban infrastructure, via the three components of the control plane: the Coban UI, Heimdall, and Khone.

    Although we designed the user journey to start from the Coban UI, our users can still opt to communicate with Heimdall and with Khone directly, e.g. for batch changes, or just because many engineers love Git and we want to encourage broad adoption. To make sure that data is eventually consistent across the three systems, we made Khone the only persistent storage layer. Heimdall regularly fetches data from Khone, caches it, and presents it to the Coban UI upon each query.

    We also continued using Terraform for all resources, instead of mixing various declarative infrastructure approaches (e.g. Kubernetes Custom Resource Definition, Helm charts), for the sake of consistency of the logic in Khone’s CI pipelines.

    Coban UI

    The Coban UI is a React Single Page Application (React SPA) designed by our partner team Chroma, a dedicated team of front-end engineers who thrive on building legendary UIs and reusable components for platform teams at Grab.

    It serves as a comprehensive self-service portal, enabling users to effortlessly create data streaming resources by filling out web forms with just a few clicks.

    Fig. 2 Screen capture of a new Kafka topic creation in the Coban UI.

    In addition to facilitating resource creation and configuration, the Coban UI is seamlessly integrated with multiple monitoring systems. This integration allows for real-time monitoring of critical metrics and health status for Coban infrastructure components, including Kafka clusters, Kafka topic bytes in/out rates, and more. Under the hood, all this information is exposed by Heimdall APIs.

    Fig. 3 Screen capture of the metrics of a Kafka cluster in the Coban UI.

    In terms of infrastructure, the Coban UI is hosted in AWS S3 website hosting. All dynamic content is generated by querying the APIs of the back-end: Heimdall.

    Heimdall

    Heimdall is the Go back-end of the Coban UI. It serves a collection of APIs for:

    • Managing the data streaming resources of the Coban platform with Create, Read, Update and Delete (CRUD) operations, treating the Coban UI as a first-class citizen.
    • Exposing the metadata of all Coban resources, so that they can be used by other platforms or searched in the Coban UI.

    All operations are authenticated and authorised. Read more about Heimdall’s access control in Migrating from Role to Attribute-based Access Control.

    In the next sections, we are going to dive deeper into these two features.

    Managing the data streaming resources

    First and foremost, Heimdall enables our users to self-manage their data streaming resources. It primarily relies on Khone as its storage and provisioner layer for actual resource management via Git CI pipelines. Therefore, we designed Heimdall’s resource management workflow to leverage the underlying Git flow.

    Fig. 4 Diagram flow of a request in Heimdall.

    Fig. 4 shows the diagram flow of a typical request in Heimdall to create, update, or delete a resource.

    1. An authenticated user initiates a request, either by navigating in the Coban UI or by calling the Heimdall API directly. At this stage, the request state is Initiated on Heimdall.
    2. Heimdall validates the request against multiple validation rules. For example, if an ongoing change request exists for the same resource, the request fails. If all tests succeed, the request state moves to Ongoing.
    3. Heimdall then creates an MR in Khone, which contains the Terraform files describing the desired state of the resource, as well as an in-house metadata file describing the key attributes of both resource and requester.
    4. After the MR has been created successfully, Heimdall notifies the requester via Slack and shares the MR URL.
    5. After that, Heimdall starts polling the status of the MR in a loop.
    6. For changes pertaining to production resources, an approver who is code owner in the repository of the resource has to approve the MR. Typically, the approver is an immediate teammate of the requester. Indeed, as a platform team, we empower our users to manage their own resources in a self-service fashion. Ultimately, the requester would merge the MR to trigger the CI pipeline applying the actual Terraform changes. Note that for staging resources, this entire step 6 is automatically performed by Heimdall.
    7. Depending on the MR status and the status of its CI pipeline in Khone, the final state of the request can be:
      • Failed if the CI pipeline has failed in Khone.
      • Completed if the CI pipeline has succeeded in Khone.
      • Cancelled if the MR was closed in Khone.

    Heimdall exposes APIs to let users track the status of their requests. In the Coban UI, a page queries those APIs to elegantly display the requests.

    Fig. 5 Screen capture of the Coban UI showing all requests.

    Exposing the metadata

    Apart from managing the data streaming resources, Heimdall also centralises and exposes the metadata pertaining to those resources so other Grab systems can fetch and use it. They can make various queries, for example, listing the producers and consumers of a given Kafka topic, or determining if a database (DB) is the data source for any CDC pipeline.

    To make this happen, Heimdall not only retains the metadata of all of the resources that it creates, but also regularly ingests additional information from a variety of upstream systems and platforms, to enrich and make this metadata comprehensive.

    Fig. 6 Diagram showing some of Heimdall’s upstreams (on the left) and downstreams (on the right) for metadata collection, enrichment, and serving. The arrows show the data flow. The network connection (client -> server) is actually the other way around.

    On the left side of Fig. 6, we illustrate Heimdall’s ingestion mechanism with several examples (step 1):

    • The metadata of all Coban resources is ingested from Khone. This means the metadata of the resources that were created directly in Khone is also available in Heimdall.
    • The list of Kafka producers is retrieved from our monitoring platform, where most of them emit metrics.
    • The list of Kafka consumers is retrieved directly from the respective Kafka clusters, by listing the consumer groups and respective Client IDs of each partition.
    • The metadata of all DBs, that are used as a data source for CDC pipelines, is fetched from Grab’s internal DB management platform.
    • The Kafka stream schemas are retrieved from the Coban schema repository.
    • The Kafka stream configuration of each stream is retrieved from Grab Universal Configuration Management platform.

    With all of this ingested data, Heimdall can provide comprehensive and accurate information about all data streaming resources to any other Grab platforms via a set of dedicated APIs.

    The right side of Fig. 6 shows some examples (step 2) of Heimdall’s serving mechanism:

    • As a downstream of Heimdall, the Coban UI enables our direct users to conveniently browse their data streaming resources and access their attributes.
    • The entire resource inventory is ingested into the broader Grab inventory platform, based on backstage.io.
    • The Kafka streams are ingested into Grab’s internal data discovery platform, based on DataHub, where users can discover and trace the lineage of any piece of data.
    • The CDC connectors pertaining to DBs are ingested by Grab internal DB management platform, so that they are made visible in that platform when users are browsing their DBs.

    Note that the downstream platforms that ingest data from Heimdall each expose a particular view of the Coban inventory that serves their purpose, but the Coban platform remains the only source of truth for any data streaming resource at Grab.

    Lastly, Heimdall leverages an internal MySQL DB to support quick data query and exploration. The corresponding API is called by the Coban UI to let our users conveniently search globally among all resources’ attributes.

    Fig. 7 Screen capture of the global search feature in the Coban UI.

    Khone

    Khone is the persistent storage layer of our platform, as well as the executor for actual resource creation, changes, and deletion. Under the hood, it is actually a GitLab repository of Terraform code in typical GitOps fashion, with CI pipelines to plan and apply the Terraform changes automatically. In addition, it also stores a metadata file for each resource.

    Compared to letting the platform create the infrastructure directly and keep track of the desired state in its own way, relying on a standard IaC tool like Terraform for the actual changes to the infrastructure presents two major advantages:

    • The Terraform code can directly be used for disaster recovery. In case of a disaster, any entitled Cobaner with a local copy of the main branch of the Khone repository is able to recreate all our platform resources directly from their machine. There is no need to rebuild the entire platform’s control plane, thus reducing our Recovery Time Objective (RTO).
    • Minimal effort required to follow the API changes of our infrastructure ecosystem (AWS, Kubernetes, Kafka, etc.). When such a change happens, all we need to do is to update the corresponding Terraform provider.

    If you’d like to read more about Khone, check out Securing GitOps pipelines. In this section, we will only focus on Khone’s features that are relevant from the platform perspective.

    Lightweight Terraform

    In Khone, each resource is stored as a Terraform definition. There are two major differences from a normal Terraform project:

    • No Terraform environment, such as the required Terraform providers and the location of the remote Terraform state file. They are automatically generated by the CI pipeline via a simple wrapper.
    • Only vetted Khone Terraform modules can be used. This is controlled and enforced by the CI pipeline via code inspection. There is one such Terraform module for each kind of supported resource of our platform (e.g. Kafka topic, Flink pipeline, Kafka Connect mirror source connector etc.). Furthermore, those in-house Terraform modules are designed to automatically derive their key variables (e.g. resource name, cluster name, environment) from the relative path of the parent Terraform project in the Khone repository.

    Those characteristics are designed to limit the risk and blast radius of human errors. They also make sure that all resources created in Khone are supported by our platform, so that they can also be discovered and managed in Heimdall and the Coban UI. Lastly, by generating the Terraform environment on the fly, we can destroy resources simply by deleting the directory of the project in the code base – this would not be possible otherwise.

    Resource metadata

    All resource metadata is stored in a YAML file that is present in the Terraform directory of each resource in the Khone repository. This is mainly used for ownership and cost attribution.

    With this metadata, we can:

    • Better communicate with our users whenever their resources are impacted by an incident or an upcoming maintenance operation.
    • Help teams understand the costs of their usage of our platform, a significant step towards cost efficiency.

    There are two different ways resource metadata can be created:

    • Automatically through Heimdall: The YAML metadata file is automatically generated by Heimdall.
    • Through Khone by a human user: The user needs to prepare the YAML metadata file and include it in the MR. This file is then verified by the CI pipeline.

    Outcome

    The initial version of the three-tier Coban platform, as described in this article, was internally released in March 2022, supporting only Kafka topic management at the time. Since then, we have added support for Flink pipelines, four kinds of Kafka Connect connectors, CDC pipelines, and more recently, Apache Zeppelin notebooks. At the time of writing, the Coban platform manages about 5000 data streaming resources, all described as IaC under the hood.

    Our platform also exposes enriched metadata that includes the full data lineage from Kafka producers to Kafka consumers, as well as ownership information, and cost attribution.

    With that, our monthly active users have almost quadrupled, truly moving the needle towards democratising the usage of real-time data within all Grab verticals.

    In spite of that user growth, the end-to-end workflow success rate for self-served resource creation, change or deletion, remained well above 90% in the first half of 2023, while the Heimdall API uptime was above 99.95%.

    Challenges faced

    A common challenge for platform teams resides in the misalignment between the Service Level Objective (SLO) of the platform, and the various environments (e.g. staging, production) of the managed resources and upstream/downstream systems and platforms.

    Indeed, the platform aims to guarantee the same level of service, regardless of whether it is used to create resources in the staging or the production environment. From the platform team’s perspective, the platform as a whole is considered production-grade, as soon as it serves actual users.

    A naive approach to address this challenge is to let the production version of the platform manage all resources regardless of their respective environments. However, doing so does not permit a hermetic segregation of the staging and production environments across the organisation, which is a good security practice, and often a requirement for compliance. For example, the production version of the platform would have to connect to upstream systems in the staging environment, e.g. staging Kafka clusters to collect their consumer groups, in the case of Heimdall. Conversely, the staging version of certain downstreams would have to connect to the production version of Heimdall, to fetch the metadata of relevant staging resources.

    The alternative approach, generally adopted across Grab, is to instantiate all platforms in each environment (staging and production), while still considering both instances as production-grade and guaranteeing tight SLOs in both environments.

    Fig. 8 Architecture of the Coban platform, broken down by environment.

    In Fig. 8, both instances of Heimdall have equivalent SLOs. The caveat is that all upstream systems and platforms must also guarantee a strict SLO in both environments. This obviously comes with a cost, for example, tighter maintenance windows for the operations pertaining to the Kafka clusters in the staging environment.

    A strong “platform” culture is required for platform teams to fully understand that their instance residing in the staging environment is not their own staging environment and should not be used for testing new features.

    What’s next?

    Currently, users creating, updating, or deleting production resources in the Coban UI (or directly by calling Heimdall API) receive the URL of the generated GitLab MR in a Slack message. From there, they must get the MR approved by a code owner, typically another team member, and finally merge the MR, for the requested change to be actually implemented by the CI pipeline.

    Although this was a fairly easy way to implement a maker/checker process that was immediately compliant with our regulatory requirements for any changes in production, the user experience is not optimal. In the near future, we plan to bring the approval mechanism into Heimdall and the Coban UI, while still providing our more advanced users with the option to directly create, approve, and merge MRs in GitLab. In the longer run, we would also like to enhance the Coban UI with the output of the Khone CI jobs that include the Terraform plan and apply results.

    There is another aspect of the platform that we want to improve. As Heimdall regularly polls the upstream platforms to collect their metadata, this introduces a latency between a change in one of those platforms and its reflection in the Coban platform, which can hinder the user experience. To refresh resource metadata in Heimdall in near real time, we plan to leverage an existing Grab-wide event stream, where most of the configuration and code changes at Grab are produced as events. Heimdall will soon be able to consume those events and update the metadata of the affected resources immediately, without waiting for the next periodic refresh.

    Join us

    Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

    Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

    Road localisation in GrabMaps

    Post Syndicated from Grab Tech original https://engineering.grab.com/road-localisation-grabmaps

    Introduction

    In 2022, Grab achieved self-sufficiency in its Geo services. As part of this transition, one crucial step was moving towards using an internally-developed map tailored specifically to the market in which Grab operates. Now that we have full control over the map layer, we can add more data to it or improve it according to the needs of the services running on top. One key aspect that this transition unlocked for us was the possibility of creating hyperlocal data at map level.

    For instance, by determining the country to which a road belongs, we can now automatically infer the official language of that country and display the street name in that language. In another example, knowing the country for a specific road, we can automatically infer the driving side (left-handed or right-handed) leading to an improved navigation experience. Furthermore, this capability also enables us to efficiently handle various scenarios. For example, if we know that a road is part of a gated community, an area where our driver partners face restricted access, we can prevent the transit through that area.

    These are just some examples of the possibilities from having full control over the map layer. By having an internal map, we can align our maps with specific markets and provide better experiences for our driver-partners and customers.

    Background

    For all these to be possible, we first needed to localise the roads inside the map. Our goal was to include hyperlocal data into the map, which refers to data that is specific to a certain area, such as a country, city, or even a smaller part of the city like a gated community. At the same time, we aimed to deliver our map with a high cadence, thus, we needed to find the right way to process this large amount of data while continuing to create maps in a cost-effective manner.

    Solution

    In the following sections of this article, we will use an extract from the Southeast Asia map to provide visual representations of the concepts discussed.

    In Figure 1, Image 1 shows a visualisation of the road network, the roads belonging to this area. The coloured lines in Image 2 represent the borders identifying the countries in the same area. Overlapping the information from Image 1 and Image 2, we can extrapolate and say that the entire surface included in a certain border could have the same set of common properties as shown in Image 3. In Image 4, we then proceed with adding localised roads for each area.

    Figure 1 – Map of Southeast Asia

    For this to be possible, we have to find a way to localise each road and identify its associated country. Once this localisation process is complete, we can replicate all this information specific to a given border onto each individual road. This information includes details such as the country name, driving side, and official language. We can go even further and infer more information, and add hyperlocal data. For example, in Vietnam, we can automatically prevent motorcycle access on the motorways.

    Assigning each road on the map to a specific area, such as a country, service area, or subdivision, presents a complex task. So, how can we efficiently accomplish this?

    Implementation

    The most straightforward approach would be to test the inclusion of each road into each area boundary, but that is easier said than done. With close to 30 million road segments in the Southeast Asia map and over 10 thousand areas, the computational cost of determining inclusion or intersection between a polyline and a polygon is expensive.

    Our solution to this challenge involves replacing the expensive yet precise operation with a decent approximation. We introduce a proxy entity, the geohash, and we use it to approximate the areas and also to localise the roads.

    We replace the geometrical inclusion with a series of simpler and less expensive operations. First, we conduct an inexpensive precomputation where we identify all the geohases that belong to a certain area or within a defined border. We then identify the geohashes to which the roads belong to. Finally, we use these precomputed values to assign roads to their respective areas. This process is also computationally inexpensive.

    Given the large area we process, we leverage big data techniques to distribute the execution across multiple nodes and thus speed up the operation. We want to deliver the map daily and this is one of the many operations that are part of the map-making process.

    What is a geohash?

    To further understand our implementation we will first explain the geohash concept. A geohash is a unique identifier of a specific region on the Earth. The basic idea is that the Earth is divided into regions of user-defined size and each region is assigned a unique id, which is known as its geohash. For a given location on earth, the geohash algorithm converts its latitude and longitude into a string.

    Geohashes uses a Base-32 alphabet encoding system comprising characters ranging from 0 to 9 and A to Z, excluding “A”, “I”, “L” and “O”. Imagine dividing the world into a grid with 32 cells. The first character in a geohash identifies the initial location of one of these 32 cells. Each of these cells are then further subdivided into 32 smaller cells.This subdivision process continues and refines to specific areas in the world. Adding characters to the geohash sub-divides a cell, effectively zooming in to a more detailed area.

    The precision factor of the geohash determines the size of the cell. For instance, a precision factor of one creates a cell 5,000 km high and 5,000 km wide. A precision factor of six creates a cell 0.61km high and 1.22 km wide. Furthermore, a precision factor of nine creates a cell 4.77 m high and 4.77 m wide. It is important to note that cells are not always square and can have varying dimensions.

    In Figure 2, we have exemplified a geohash 6 grid and its code is wsdt33.

    Figure 2 – An example of geohash code wsdt33

    Using less expensive operations

    Calculating the inclusion of the roads inside a certain border is an expensive operation. However, quantifying the exact expense is challenging as it depends on several factors. One factor is the complexity of the border. Borders are usually irregular and very detailed, as they need to correctly reflect the actual border. The complexity of the road geometry is another factor that plays an important role as roads are not always straight lines.

    Figure 3 – Roads to localise

    Since this operation is expensive both in terms of cloud cost and time to run, we need to identify a cheaper and faster way that would yield similar results. Knowing that the complexity of the border lines is the cause of the problem, we tried using a different alternative, a rectangle. Calculating the inclusion of a polyline inside a rectangle is a cheaper operation.

    Figure 4 – Roads inside a rectangle

    So we transformed this large, one step operation, where we test each road segment for inclusion in a border, into a series of smaller operations where we perform the following steps:

    1. Identify all the geohashes that are part of a certain area or belong to a certain border. In this process we include additional areas to make sure that we cover the entire surface inside the border.
    2. For each road segment, we identify the list of geohashes that it belongs to. A road, depending on its length or depending on its shape, might belong to multiple geohashes.

    In Figure 5, we identify that the road belongs to two geohashes and that the two geohashes are part of the border we use.

    Figure 5 – Geohashes as proxy

    Now, all we need to do is join the two data sets together. This kind of operation is a great candidate for a big data approach, as it allows us to run it in parallel and speed up the processing time.

    Precision tradeoff

    We mentioned earlier that, for the sake of argument, we replace precision with a decent approximation. Let’s now delve into the real tradeoff by adopting this approach.

    The first thing that stands out with this approach is that we traded precision for cost. We are able to reduce the cost as this approach uses less hardware resources and computation time. However, this reduction in precision suffers, particularly for roads located near the borders as they might be wrongly classified.

    Going back to the initial example, let’s take the case of the external road, on the left side of the area. As you can see in Figure 6, it is clear that the road does not belong to our border. But when we apply the geohash approach it gets included into the middle geohash.

    Figure 6 – Wrong road localisation

    Given that just a small part of the geohash falls inside the border, the entire geohash will be classified as belonging to that area, and, as a consequence, the road that belongs to that geohash will be wrongly localised and we’ll end up adding the wrong localisation information to that road. This is clearly a consequence of the precision tradeoff. So, how can we solve this?

    Geohash precision

    One option is to increase the geohash precision. By using smaller and smaller geohashes, we can better reflect the actual area. As we go deeper and we further split the geohash, we can accurately follow the border. However, a high geohash precision also equates to a computationally intensive operation bringing us back to our initial situation. Therefore, it is crucial to find the right balance between the geohash size and the complexity of operations.

    Figure 7 – Geohash precision

    Geohash coverage percentage

    To find a balance between precision and data loss, we looked into calculating the geohash coverage percentage. For example, in Figure 8, the blue geohash is entirely within the border. Here we can say that it has a 100% geohash coverage.

    Figure 8 – Geohash inside the border

    However, take for example the geohash in Figure 9. It touches the border and has only around 80% of its surface inside the area. Given that most of its surface is within the border, we still can say that it belongs to the area.

    Figure 9 – Geohash partially inside the border

    Let’s look at another example. In Figure 10, only a small part of the geohash is within the border. We can say that the geohash coverage percentage here is around 5%. For these cases, it becomes difficult for us to determine whether the geohash does belong to the area. What would be a good tradeoff in this case?

    Figure 10 – Geohash barely inside the border

    Border shape

    To go one step further, we can consider a mixed solution, where we use the border shape but only for the geohashes touching the border. This would still be an intensive computational operation but the number of roads located in these geohashes will be much smaller, so it is still a gain.

    For the geohashes with full coverage inside the area, we’ll use the geohash for the localisation, the simpler operation. For the geohashes that are near the border, we’ll use a different approach. To increase the precision around the borders, we can cut the geohash following the border’s shape. Instead of having a rectangle, we’ll use a more complex shape which is still simpler than the initial border shape.

    Figure 11 – Geohash following a border’s shape

    Result

    We began with a simple approach and we enhanced it to improve precision. This also increased the complexity of the operation. We then asked, what are the actual gains? Was it worthwhile to go through all this process? In this section, we put this to the test.

    We first created a benchmark by taking a small sample of the data and ran the localisation process on a laptop. The sample comprised approximately 2% of the borders and 0.0014% of the roads. We ran the localisation process using two approaches.

    • With the first approach, we calculated the intersection between all the roads and borders. The entire operation took around 38 minutes.
    • For the second approach, we optimised the operation using geohashes. In this approach, the runtime was only 78 seconds (1.3 minutes).

    However, it is important to note that this is not an apples-to-apples comparison. The operation that we measured was the localisation of the roads but we did not include the border filling operation where we fill the borders with geohashes. This is because this operation does not need to be run every time. It can be run once and reused multiple times.

    Though not often required, it is still crucial to understand and consider the operation of precomputing areas and filling borders with geohashes. The precomputation process depends on several factors:

    • Number and shape of the borders – The more borders and the more complex the borders are, the longer the operation will take.
    • Geohash precision – How accurate do we need our localisation to be? The more accurate it needs to be, the longer it will take.
    • Hardware availability

    Going back to our hypothesis, although this precomputation might be expensive, it is rarely run as the borders don’t change often and can be triggered only when needed. However, regular computation, where we find the area to which each road belongs to, is often run as the roads change constantly. In our system, we run this localisation for each map processing.

    We can also further optimise this process by applying the opposite approach. Geohashes that have full coverage inside a border can be merged together into larger geohashes thus simplifying the computation inside the border. In the end, we can have a solution that is fully optimised for our needs with the best cost-to-performance ratio.

    Figure 12 – Optimised geohashes

    Conclusion

    Although geohashes seem to be the right solution for this kind of problem, we also need to monitor their content. One consideration is the road density inside a geohash. For example, a geohash inside a city centre usually has a lot of roads while one in the countryside may have much less. We need to consider this aspect to have a balanced computation operation and take full advantage of the big data approach. In our case, we achieve this balance by considering the number of road kilometres within a geohash.

    Figure 13 – Unbalanced data

    Additionally, the resources that we choose also matter. To optimise time and cost, we need to find the right balance between the running time and resource cost. As shown in Figure 14, based on a sample data we ran, sometimes, we get the best result when using smaller machines.

    Figure 14 – Cost vs runtime

    The achievements and insights showcased in this article are indebted to the contributions made by Mihai Chintoanu. His expertise and collaborative efforts have profoundly enriched the content and findings presented herein.

    Join us

    Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

    Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

    Building In-Video Search

    Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/building-in-video-search-936766f0017c

    Boris Chen, Ben Klein, Jason Ge, Avneesh Saluja, Guru Tahasildar, Abhishek Soni, Juan Vimberg, Elliot Chow, Amir Ziai, Varun Sekhri, Santiago Castro, Keila Fong, Kelli Griggs, Mallia Sherzai, Robert Mayer, Andy Yao, Vi Iyengar, Jonathan Solorzano-Hamilton, Hossein Taghavi, Ritwik Kumar

    Introduction

    Today we’re going to take a look at the behind the scenes technology behind how Netflix creates great trailers, Instagram reels, video shorts and other promotional videos.

    Suppose you’re trying to create the trailer for the action thriller The Gray Man, and you know you want to use a shot of a car exploding. You don’t know if that shot exists or where it is in the film, and you have to look for it it by scrubbing through the whole film.

    Exploding cars — The Gray Man (2022)

    Or suppose it’s Christmas, and you want to create a great instagram piece out all the best scenes across Netflix films of people shouting “Merry Christmas”! Or suppose it’s Anya Taylor Joy’s birthday, and you want to create a highlight reel of all her most iconic and dramatic shots.

    Making these comes down to finding the right video clips amongst hundreds of thousands movies and TV shows to find the right line of dialogue or the right visual elements (objects, scenes, emotions, actions, etc.). We have built an internal system that allows someone to perform in-video search across the entire Netflix video catalog, and we’d like to share our experience in building this system.

    Building in-video search

    To build such a visual search engine, we needed a machine learning system that can understand visual elements. Our early attempts included object detection, but found that general labels were both too limiting and too specific, yet not specific enough. Every show has special objects that are important (e.g. Demogorgon in Stranger Things) that don’t translate to other shows. The same was true for action recognition, and other common image and video tasks.

    The Approach

    We found that contrastive learning between images and text pairs work well for our goals because these models are able to learn joint embedding spaces between the two modalities. This approach is also able to learn about objects, scenes, emotions, actions, and more in a single model. We also found that extending contrastive learning to videos and text provided a substantial improvement over frame-level models.

    In order to train the model on internal training data (video clips with aligned text descriptions), we implemented a scalable version on Ray Train and switched to a more performant video decoding library. Lastly, the embeddings from the video encoder exhibit strong zero or few-shot performance on multiple video and content understanding tasks at Netflix and are used as a starting point in those applications.

    The recent success of large-scale models that jointly train image and text embeddings has enabled new use cases around multimodal retrieval. These models are trained on large amounts of image-caption pairs via in-batch contrastive learning. For a (large) batch of N examples, we wish to maximize the embedding (cosine) similarity of the N correct image-text pairs, while minimizing the similarity of the other N²-N paired embeddings. This is done by treating the similarities as logits and minimizing the symmetric cross-entropy loss, which gives equal weighting to the two settings (treating the captions as labels to the images and vice versa).

    Consider the following two images and captions:

    Images are from Glass Onion: A Knives Out Mystery (2022)

    Once properly trained, the embeddings for the corresponding images and text (i.e. captions) will be close to each other and farther away from unrelated pairs.

    Typically embedding spaces are hundred/thousand dimensional.

    At query time, the input text query can be mapped into this embedding space, and we can return the closest matching images.

    The query may have not existed in the training set. Cosine similarity can be used as a similarity measure.

    While these models are trained on image-text pairs, we have found that they are an excellent starting point to learning representations of video units like shots and scenes. As videos are a sequence of images (frames), additional parameters may need to be introduced to compute embeddings for these video units, although we have found that for shorter units like shots, an unparameterized aggregation like averaging (mean-pooling) can be more effective. To train these parameters as well as fine-tune the pretrained image-text model weights, we leverage in-house datasets that pair shots of varying durations with rich textual descriptions of their content. This additional adaptation step improves performance by 15–25% on video retrieval tasks (given a text prompt), depending on the starting model used and metric evaluated.

    On top of video retrieval, there are a wide variety of video clip classifiers within Netflix that are trained specifically to find a particular attribute (e.g. closeup shots, caution elements). Instead of training from scratch, we have found that using the shot-level embeddings can give us a significant head start, even beyond the baseline image-text models that they were built on top of.

    Lastly, shot embeddings can also be used for video-to-video search, a particularly useful application in the context of trailer and promotional asset creation.

    Engineering and Infrastructure

    Our trained model gives us a text encoder and a video encoder. Video embeddings are precomputed on the shot level, stored in our media feature store, and replicated to an elastic search cluster for real-time nearest neighbor queries. Our media feature management system automatically triggers the video embedding computation whenever new video assets are added, ensuring that we can search through the latest video assets.

    The embedding computation is based on a large neural network model and has to be run on GPUs for optimal throughput. However, shot segmentation from a full-length movie is CPU-intensive. To fully utilize the GPUs in the cloud environment, we first run shot segmentation in parallel on multi-core CPU machines, store the result shots in S3 object storage encoded in video formats such as mp4. During GPU computation, we stream mp4 video shots from S3 directly to the GPUs using a data loader that performs prefetching and preprocessing. This approach ensures that the GPUs are efficiently utilized during inference, thereby increasing the overall throughput and cost-efficiency of our system.

    At query time, a user submits a text string representing what they want to search for. For visual search queries, we use the text encoder from the trained model to extract an text embedding, which is then used to perform appropriate nearest neighbor search. Users can also select a subset of shows to search over, or perform a catalog wide search, which we also support.

    If you’re interested in more details, see our other post covering the Media Understanding Platform.

    Conclusion

    Finding a needle in a haystack is hard. We learned from talking to video creatives who make trailers and social media videos that being able to find needles was key, and a big pain point. The solution we described has been fruitful, works well in practice, and is relatively simple to maintain. Our search system allows our creatives to iterate faster, try more ideas, and make more engaging videos for our viewers to enjoy.

    We hope this post has been interesting to you. If you are interested in working on problems like this, Netflix is always hiring great researchers, engineers and creators.


    Building In-Video Search was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

    LLM-powered data classification for data entities at scale

    Post Syndicated from Grab Tech original https://engineering.grab.com/llm-powered-data-classification

    Editor’s note: This post was originally published in October 2023 and has been updated to reflect Grab’s partnership with the Infocomm Media Development Authority as part of its Privacy Enhancing Technology Sandbox that concluded in March 2024.

    Introduction

    At Grab, we deal with PetaByte-level data and manage countless data entities ranging from database tables to Kafka message schemas. Understanding the data inside is crucial for us, as it not only streamlines the data access management to safeguard the data of our users, drivers and merchant-partners, but also improves the data discovery process for data analysts and scientists to easily find what they need.

    The Caspian team (Data Engineering team) collaborated closely with the Data Governance team on automating governance-related metadata generation. We started with Personal Identifiable Information (PII) detection and built an orchestration service using a third-party classification service. With the advent of the Large Language Model (LLM), new possibilities dawned for metadata generation and sensitive data identification at Grab. This prompted the inception of the project, which aimed to integrate LLM classification into our existing service. In this blog, we share insights into the transformation from what used to be a tedious and painstaking process to a highly efficient system, and how it has empowered the teams across the organisation.

    For ease of reference, here’s a list of terms we’ve used and their definitions:

    • Data Entity: An entity representing a schema that contains rows/streams of data, for example, database tables, stream messages, data lake tables.
    • Prediction: Refers to the model’s output given a data entity, unverified manually.
    • Data Classification: The process of classifying a given data entity, which in the context of this blog, involves generating tags that represent sensitive data or Grab-specific types of data.
    • Metadata Generation: The process of generating the metadata for a given data entity. In this blog, since we limit the metadata to the form of tags, we often use this term and data classification interchangeably.
    • Sensitivity: Refers to the level of confidentiality of data. High sensitivity means that the data is highly confidential. The lowest level of sensitivity often refers to public-facing or publicly-available data.

    Background

    When we first approached the data classification problem, we aimed to solve something more specific – Personal Identifiable Information (PII) detection. Initially, to protect sensitive data from accidental leaks or misuse, Grab implemented manual processes and campaigns targeting data producers to tag schemas with sensitivity tiers. These tiers ranged from Tier 1, representing schemas with highly sensitive information, to Tier 4, indicating no sensitive information at all. As a result, half of all schemas were marked as Tier 1, enforcing the strictest access control measures.

    The presence of a single Tier 1 table in a schema with hundreds of tables justifies classifying the entire schema as Tier 1. However, since Tier 1 data is rare, this implies that a large volume of non-Tier 1 tables, which ideally should be more accessible, have strict access controls.

    Shifting access controls from the schema-level to the table-level could not be done safely due to the lack of table classification in the data lake. We could have conducted more manual classification campaigns for tables, however this was not feasible for two reasons:

    1. The volume, velocity, and variety of data had skyrocketed within the organisation, so it took significantly more time to classify at table level compared to schema level. Hence, a programmatic solution was needed to streamline the classification process, reducing the need for manual effort.
    2. App developers, despite being familiar with the business scope of their data, interpreted internal data classification policies and external data regulations differently, leading to inconsistencies in understanding.

    A service called Gemini (named before Google announced the Gemini model!) was built internally to automate the tag generation process using a third party data classification service. Its purpose was to scan the data entities in batches and generate column/field level tags. These tags would then go through a review process by the data producers. The data governance team provided classification rules and used regex classifiers, alongside the third-party tool’s own machine learning classifiers, to discover sensitive information.

    After the implementation of the initial version of Gemini, a few challenges remained.

    1. The third-party tool did not allow customisations of its machine learning classifiers, and the regex patterns produced too many false positives during our evaluation.
    2. Building in-house classifiers would require a dedicated data science team to train a customised model. They would need to invest a significant amount of time to understand data governance rules thoroughly and prepare datasets with manually labelled training data.

    LLM came up on our radar following its recent “iPhone moment” with ChatGPT’s explosion onto the scene. It is trained using an extremely large corpus of text and contains trillions of parameters. It is capable of conducting natural language understanding tasks, writing code, and even analysing data based on requirements. The LLM naturally solves the mentioned pain points as it provides a natural language interface for data governance personnel. They can express governance requirements through text prompts, and the LLM can be customised effortlessly without code or model training.

    Methodology

    In this section, we dive into the implementation details of the data classification workflow. Please refer to the diagram below for a high-level overview:

    Figure 1 – Overview of data classification workflow

    This diagram illustrates how data platforms, the metadata generation service (Gemini), and data owners work together to classify and verify metadata. Data platforms trigger scan requests to the Gemini service to initiate the tag classification process. After the tags are predicted, data platforms consume the predictions, and the data owners are notified to verify these tags.

    Orchestration

    Figure 2 – Architecture diagram of the orchestration service Gemini

    Our orchestration service, Gemini, manages the data classification requests from data platforms. From the diagram, the architecture contains the following components:

    1. Data platforms: These platforms are responsible for managing data entities and initiating data classification requests.
    2. Gemini: This orchestration service communicates with data platforms, schedules and groups data classification requests.
    3. Classification engines: There are two available engines (a third-party classification service and GPT3.5) for executing the classification jobs and return results. Since we are still in the process of evaluating two engines, both of the engines are working concurrently.

    When the orchestration service receives requests, it helps aggregate the requests into reasonable mini-batches. Aggregation is achievable through the message queue at fixed intervals. In addition, a rate limiter is attached at the workflow level. It allows the service to call the Cloud Provider APIs with respective rates to prevent the potential throttling from the service providers.

    Specific to LLM orchestration, there are two limits to be mindful of. The first one is the context length. The input length cannot surpass the context length, which was 4000 tokens for GPT3.5 at the time of development (or around 3000 words). The second one is the overall token limit (since both the input and output share the same token limit for a single request). Currently, all Azure OpenAI model deployments share the same quota under one account, which is set at 240K tokens per minute.

    Classification

    In this section, we focus on LLM-powered column-level tag classification. The tag classification process is defined as follows:

    Given a data entity with a defined schema, we want to tag each field of the schema with metadata classifications that follow an internal classification scheme from the data governance team. For example, the field can be tagged as a <particular kind of business metric> or a <particular type of personally identifiable information (PII). These tags indicate that the field contains a business metric or PII.

    We ask the language model to be a column tag generator and to assign the most appropriate tag to each column. Here we showcase an excerpt of the prompt we use:

    You are a database column tag classifier, your job is to assign the most appropriate tag based on table name and column name. The database columns are from a company that provides ride-hailing, delivery, and financial services. Assign one tag per column. However not all columns can be tagged and these columns should be assigned <None>. You are precise, careful and do your best to make sure the tag assigned is the most appropriate.
    
    The following is the list of tags to be assigned to a column. For each line, left hand side of the : is the tag and right hand side is the tag definition
    
    …
    <Personal.ID> : refers to government-provided identification numbers that can be used to uniquely identify a person and should be assigned to columns containing "NRIC", "Passport", "FIN", "License Plate", "Social Security" or similar. This tag should absolutely not be assigned to columns named "id", "merchant id", "passenger id", “driver id" or similar since these are not government-provided identification numbers. This tag should be very rarely assigned.
    
    <None> : should be used when none of the above can be assigned to a column.
    …
    
    Output Format is a valid json string, for example:
    
    [{
            "column_name": "",
            "assigned_tag": ""
    }]
    
    Example question
    
    `These columns belong to the "deliveries" table
    
            1. merchant_id
            2. status
            3. delivery_time`
    
    Example response
    
    [{
            "column_name": "merchant_id",
            "assigned_tag": "<Personal.ID>"
    },{
            "column_name": "status",
            "assigned_tag": "<None>"
    },{
            "column_name": "delivery_time",
            "assigned_tag": "<None>"
    }]
    

    We also curated a tag library for LLM to classify. Here is an example:

    Column-level Tag Definition
    Personal.ID Refers to external identification numbers that can be used to uniquely identify a person and should be assigned to columns containing “NRIC”, “Passport”, “FIN”, “License Plate”, “Social Security” or similar.
    Personal.Name Refers to the name or username of a person and should be assigned to columns containing “name”, “username” or similar.
    Personal.Contact_Info Refers to the contact information of a person and should be assigned to columns containing “email”, “phone”, “address”, “social media” or similar.
    Geo.Geohash Refers to a geohash and should be assigned to columns containing “geohash” or similar.
    None Should be used when none of the above can be assigned to a column.

    The output of the language model is typically in free text format, however, we want the output in a fixed format for downstream processing. Due to this nature, prompt engineering is a crucial component to make sure downstream workflows can process the LLM’s output.

    Here are some of the techniques we found useful during our development:

    1. Articulate the requirements: The requirement of the task should be as clear as possible, LLM is only instructed to do what you ask it to do.
    2. Few-shot learning: By showing the example of interaction, models understand how they should respond.
    3. Schema Enforcement: Leveraging its ability of understanding code, we explicitly provide the DTO (Data Transfer Object) schema to the model so that it understands that its output must conform to it.
    4. Allow for confusion: In our prompt we specifically added a default tag – the LLM is instructed to output the default <None> tag when it cannot make a decision or is confused.

    Regarding classification accuracy, we found that it is surprisingly accurate with its great semantic understanding. For acknowledged tables, users on average change less than one tag. Also, during an internal survey done among data owners at Grab in September 2023, 80% reported that this new tagging process helped them in tagging their data entities.

    Publish and verification

    The predictions are published to the Kafka queue to downstream data platforms. The platforms inform respective users weekly to verify the classified tags to improve the model’s correctness and to enable iterative prompt improvement. Meanwhile, we plan to remove the verification mandate for users once the accuracy reaches a certain level.

    Figure 3 – Verification message shown in the data platform for user to verify the tags

    Impact

    Since the new system was rolled out, we have successfully integrated this with Grab’s metadata management platform and production database management platform. Within a month since its rollout, we have scanned more than 20,000 data entities, averaging around 300-400 entities per day.

    Using a quick back-of-the-envelope calculation, we can see the significant time savings achieved through automated tagging. Assuming it takes a data owner approximately 2 minutes to classify each entity, we are saving approximately 360 man-days per year for the company. This allows our engineers and analysts to focus more on their core tasks of engineering and analysis rather than spending excessive time on data governance.

    The classified tags pave the way for more use cases downstream. These tags, in combination with rules provided by data privacy office in Grab, enable us to determine the sensitivity tier of data entities, which in turn will be leveraged for enforcing the Attribute-based Access Control (ABAC) policies and enforcing Dynamic Data Masking for downstream queries. To learn more about the benefits of ABAC, readers can refer to another engineering blog posted earlier.

    Cost wise, for the current load, it is extremely affordable contrary to common intuition. This affordability enables us to scale the solution to cover more data entities in the company.

    What’s next?

    Prompt improvement

    We are currently exploring feeding sample data and user feedback to greatly increase accuracy. Meanwhile, we are experimenting on outputting the confidence level from LLM for its own classification. With confidence level output, we would only trouble users when the LLM is uncertain of its answers. Hopefully this can remove even more manual processes in the current workflow.

    Prompt evaluation

    To track the performance of the prompt given, we are building analytical pipelines to calculate the metrics of each version of the prompt. This will help the team better quantify the effectiveness of prompts and iterate better and faster.

    Scaling out

    We are also planning to scale out this solution to more data platforms to streamline governance-related metadata generation to more teams. The development of downstream applications using our metadata is also on the way. These exciting applications are from various domains such as security, data discovery, etc.

    Acknowledgements

    Grab recently participated in the Singapore government’s regulatory sandbox, where we successfully demonstrated how LLMs can efficiently and effectively perform data classification, allowing Grab to compound the value of its data for innovative use cases while safeguarding sensitive information such as PII.

    Join us

    Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

    Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

    Scaling marketing for merchants with targeted and intelligent promos

    Post Syndicated from Grab Tech original https://engineering.grab.com/scaling-marketing-for-merchants

    Introduction

    A promotional campaign is a marketing effort that aims to increase sales, customer engagement, or brand awareness for a product, service, or company. The target is to have more orders and sales by assigning promos to consumers within a given budget during the campaign period.

    Figure 1 – Merchant feedback on marketing

    From our research, we found that merchants have specific goals for the promos they are willing to offer. They want a simple and cost-effective way to achieve their specific business goals by providing well-designed offers to target the correct customers. From Grab’s perspective, we want to help merchants set up and run campaigns efficiently, and help them achieve their specific business goals.

    Problem statement

    One of Grab’s platform offerings for merchants is the ability to create promotional campaigns. With the emergence of AI technologies, we found that there are opportunities for us to further optimise the platform. The following are the gaps and opportunities we identified:

    • Globally assigned promos without smart targeting: The earlier method targeted every customer, so everyone could redeem until the promo reached the redemption limits. However, this method did not accurately meet business goals or optimise promo spending. The promotional campaign should intelligently target the best promo for each customer to increase sales and better utilise promo spending.
    • No customised promos for every merchant: To better optimise sales for each merchant, merchants should offer customised promos based on their historical consumer trends, not just a general offer set. For example, for a specific merchant, a 27% discount may be the appropriate offer to uplift revenue and sales based on user bookings. However, merchants do not always have the expertise to decide which offer to select to increase profit.
    • No AI-driven optimisation: Without AI models, it was harder for merchants to assign the right promos at scale to each consumer and optimise their business goals.

    As shown in the following figure, AI-driven promotional campaigns are expected to bring higher sales with more promo spend than heuristic ones. Hence, at Grab we looked to introduce an automated, AI-driven tool that helps merchants intelligently target consumers with appropriate promos, while optimising sales and promo spending. That’s where Bullseye comes in.

    Figure 2 – Graph showing the sales expectations for AI-driven pomotional campaigns

    Solution

    Bullseye is an automated, AI-driven promo assignment system that leverages the following capabilities:

    • Automated user segmentation: Enables merchants to target new, churned, and active users or all users.
    • Automatic promo design: Enables a merchant-level promo design framework to customise promos for each merchant or merchant group according to their business goals.
    • Assign each user the optimal promo: Users will receive promos selected from an array of available promos based on the merchant’s business objective.
    • Achieve different Grab and merchant objectives: Examples of objectives are to increase merchant sales and decrease Grab promo spend.
    • Flexibility to optimise for an individual merchant brand or group of merchant brands: For promotional campaigns, targeting and optimisation can be performed for a single or group of merchants (e.g. enabling GrabFood to run cuisine-oriented promo campaigns).

    Architecture

    Figure 3 – Bullseye architecture

    The Bullseye architecture consists of a user interface (UI) and a backend service to handle requests. To use Bullseye, our operations team inputs merchant information into the Bullseye UI. The backend service will then interact with APIs to process the information using the AI model. As we work with a large customer population, data is stored in S3 and the API service triggering Chimera Spark job is used to run the prediction model and generate promo assignments. During the assignment, the Spark job parses the input parameters, pre-validates the input, makes some predictions, and then returns the promo assignment results to the backend service.

    Implementation

    The key components in Bullseye are shown in the following figure:

    Figure 4 – Key components of Bullseye
    • Eater Segments Identifier: Identifies each user as active, churned, or new based on their historical orders from target merchants.
    • Promo Designer: We constructed a promo variation design framework to adaptively design promo variations for each campaign request as shown in the diagram below.
      • Offer Content Candidate Generation: Generates variant settings of promos based on the promo usage history.
      • Campaign Impact Simulator: Predicts business metrics such as revenue, sales, and cost based on the user and merchant profiles and offer features.
      • Optimal Promo Selection: Selects the optimal offer based on the predicted impact and the given campaign objective. The optimal would be based on how you define optimal. For example, if the goal is to maximise merchant sales, the model selects the top candidate which can bring the highest revenue. Finally, with the promo selection, the service returns the promo set to be used in the target campaign.

        Figure 5 – Optimal Promo Selection
    • Customer Response Model: Predicts customer responses such as order value, redemption, and take-up rate if assigning a specific promo. Bullseye captures various user attributes and compares it with an offer’s attributes. Examples of attributes are cuisine type, food spiciness, and discount amount. When there is a high similarity in the attributes, there is a higher probability that the user will take up the offer.

      Figure 6 – Customer Response Model

    • Hyper-parameter Selection: Optimises toward multiple business goals. Tuning of hyper-parameters allows the AI assignment model to learn how to meet success criteria such as cost per merchant sales (cpSales) uplift and sales uplift. The success criteria is the achieving of business goals. For example, the merchant wants the sales uplift after assigning promo, but cpSales uplift cannot be higher than 10%. With tuning, the optimiser can find optimal points to meet business goals and use AI models to search for better settings with high efficiency compared to manual specification. We need to constantly tune and iterate models and hyper-parameters to adapt to ever-evolving business goals and the local landscape.

      As shown in the image below, AI assignments without hyper-parameter tuning (HPT) leads to a high cpSales uplift but low sales uplift (red dot). So the hyper-parameters would help to fine-tune the assignment result to be in the optimal space such as the blue dot, which may have lower sales than the red dot but meet the success criteria.

      Figure 7 – Graph showing the impact of using AI assignments with HPT

    Impact

    We started using Bullseye in 2021. From its use we found that:

    • Hyper-parameters tuning and auto promo design can increase sales and reduce promo spend for food campaigns.
    • Promo Designer optimises budget utilisation and increases the number of promo redemptions for food campaigns.
    • The Customer Response Model reduced promo spending for Mart promotional campaigns.

    Conclusion

    We have seen positive results with the implementation of Bullseye such as reduced promo spending and maximised budget spending returns. In our efforts to serve our merchants better and help them achieve their business goals, we will continue to improve Bullseye. In the next phase, we plan to implement a more intelligent service, enabling reinforcement learning, and online assignment. We also aim to scale AI adoption by onboarding regional promotional campaigns as much as possible.

    Special thanks to William Wu, Rui Tan, Rahadyan Pramudita, Krishna Murthy, and Jiesin Chia for making this project a success.

    Join us

    Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

    Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

    Stepping up marketing for advertisers: Scalable lookalike audience

    Post Syndicated from Grab Tech original https://engineering.grab.com/scalable-lookalike-audiences

    The advertising industry is constantly evolving, driven by advancements in technology and changes in consumer behaviour. One of the key challenges in this industry is reaching the right audience, reaching people who are most likely to be interested in your product or service. This is where the concept of a lookalike audience comes into play. By identifying and targeting individuals who share similar characteristics with an existing customer base, businesses can significantly improve the effectiveness of their advertising campaigns.

    However, as the scale of Grab advertisements grows, there are several optimisations needed to maintain the efficacy of creating lookalike audiences such as high service level agreement (SLA), high cost of audience creation, and unstable data ingestion.

    The need for an even more efficient and scalable solution for creating lookalike audiences was the motivation behind the development of the scalable lookalike audience platform. By developing a high-performance in-memory lookalike audience retrieval service and embedding-based lookalike audience creation and updating pipelines, t​his improved platform builds on the existing system and provides an even more effective tool for advertisers to reach their target audience.

    Constant optimisation for greater precision

    In the dynamic world of digital advertising, the ability to quickly and efficiently reach the right audience is paramount and a key strategy is targeted advertising. As such, we have to constantly find ways to improve our current approach to creating lookalike audiences that impacts both advertisers and users. Some of the gaps we identified included:

    • Long SLA for audience creation. Earlier, the platform stored results on Segmentation Platform (SegP) and it took two working days to generate a lookalike audience list. This is because inserting a single audience into SegP took three times longer than generating the audience. Extended creation times impacted the effectiveness of advertising campaigns, as it limited the ability of advertisers to respond quickly to changing market dynamics.

    • Low scalability. As the number of onboarded merchant-partners increased, the time and cost of generating lookalike audiences also increased proportionally. This limited the availability of lookalike audience generation for all advertisers, particularly those with large customer bases or rapidly changing audience profiles.

    • Low updating frequency of lookalike audiences. With automated updates only occurring on a weekly basis, this increased the likelihood that audiences may become outdated and ineffective. This meant there was scope to further improve to help advertisers more effectively reach their campaign goals, by targeting individuals who fit the desired audience profile.

    • High cost of creation. The cost of producing one segment can add up quickly for advertisers who need to generate multiple audiences. This could impact scalability for advertisers as they could hesitate to effectively use multiple lookalike audiences in their campaigns.

    Solution

    To efficiently identify the top N lookalike audiences for each Grab user from our pool of millions of users, we developed a solution that leverages user and audience representations in the form of embeddings. Embeddings are vector representations of data that utilise linear distances to capture structure from the original datasets. With embeddings, large sets of data are compressed and easily processed without affecting data integrity. This approach ensures high accuracy, low latency, and low cost in retrieving the most relevant audiences.

    Our solution takes into account the fact that representation drift varies among entities as data is added. For instance, merchant-partner embeddings are more stable than passenger embeddings. By acknowledging this reality, we optimised our process to minimise cost while maintaining a desirable level of accuracy. Furthermore, we believe that having a strong representation learning strategy in the early stages reduced the need for complex models in the following stages.

    Our solution comprises two main components:

    1. Real-time lookalike audience retrieving: We developed an in-memory high-performance retrieving service that stores passenger embeddings, audience embeddings, and audience score thresholds. To further reduce cost, we designed a passenger embedding compression algorithm that reduces the memory needs of passenger embeddings by around 90%.

    2. Embedding-based audience creation and updating: The output of this part of the project is an online retrieving model that includes passenger embeddings, audience embeddings, and thresholds. To minimise costs, we leverage the passenger embeddings that are also utilised by other projects within Grab, beyond advertising, thus sharing the cost. The audience embeddings and thresholds are produced with a low-cost small neural network.

    In summary, our approach to creating scalable lookalike audiences is designed to be cost-effective, accurate, and efficient, leveraging the power of embeddings and smart computational strategies to deliver the best possible audiences for our advertisers.

    Solution architecture

    • The advertiser creates a campaign with a custom audience, which triggers the audience creation process. During this process, the audience service stores the audience metadata provided by advertisers in a message queue.
    • A scheduled Data Science (DS) job then retrieves the pending audience metadata, creates the audience, and updates the TensorFlow Serving (TFS) model.
    • During the serving period, the Backend (BE) service calls the DS service to retrieve all audiences that include the target user. Ads that are targeting these audiences are then selected by the Click-Through Rate (CTR) model to be displayed to the user.

    Implementation

    To ensure the efficiency of the lookalike audience retrieval model and minimise the costs associated with audience creation and serving, we’ve trained the user embedding model using billions of user actions. This extensive training allows us to employ straightforward methods for audience creation and serving, while still maintaining high levels of accuracy.

    Creating lookalike audiences

    The Audience Creation Job retrieves the audience metadata from the online audience service, pulls the passenger embeddings, and then averages these embeddings to generate the audience embedding.

    We use the cosine score of a user and the audience embedding to identify the audiences the user belongs to. Hence, it’s sufficient to store only the audience embedding and score threshold. Additionally, a global target-all-pax Audience list is stored to return these audiences for each online request.

    Serving lookalike audiences

    The online audience service is also tasked with returning all the audiences to which the current user belongs. This is achieved by utilising the cosine score of the user embedding and audience embeddings, and filtering out all audiences that surpass the audience thresholds.

    To adhere to latency requirements, we avoid querying any external feature stores like Redis and instead, store all the embeddings in memory. However, the embeddings of all users are approximately 20 GB, which could affect model loading. Therefore, we devised an embedding compression method based on hash tricks inspired by Bloom Filter.

    • We utilise hash functions to obtain the hash64 value of the paxID, which is then segmented into four 16-bit values. Each 16-bit value corresponds to a 16-dimensional embedding block, and the compressed embedding is the concatenation of these four 16-dimensional embeddings.
    • For each paxID, we have both the original user embedding and the compressed user embedding. The compressed user embeddings are learned by minimising the Mean Square Error loss.
    • We can balance the storage cost and the accuracy by altering the number of hash functions used.

    Impact

    • Users can see advertisements targeting a new audience within 15 mins after the advertiser creates a campaign.
    • This new system doubled the impressions and clicks, while also improving the CTR, conversion rate, and return on investment.
    • Costs for generating lookalike audiences decreased by 98%.

    Learnings/Conclusion

    To evaluate the effectiveness of our new scalable system besides addressing these issues, we conducted an A/B test to compare it with the earlier system. The results revealed that this new system effectively doubled the number of impressions and clicks while also enhancing the CTR, conversion rate, and return on investment.

    Over the years, we have amassed over billions of user actions, which have been instrumental in training the model and creating a comprehensive representation of user interests in the form of embeddings.

    What’s next?

    While this scalable system has proved its effectiveness and demonstrated impressive results in CTR, conversion rate, and return on investment, there is always room for improvement.  

    In the next phase, we plan to explore more advanced algorithms, refine our feature engineering process, and conduct more extensive hyperparameter tuning. Additionally, we will continue to monitor the system’s performance and make necessary adjustments to ensure it remains robust and effective in serving our advertisers’ needs.

    References

    Join us

    Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

    Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

    Building hyperlocal GrabMaps

    Post Syndicated from Grab Tech original https://engineering.grab.com/building-hyperlocal-grabmaps

    Introduction

    Southeast Asia (SEA) is a dynamic market, very different from other parts of the world. When travelling on the road, you may experience fast-changing road restrictions, new roads appearing overnight, and high traffic congestion. To address these challenges, GrabMaps has adapted to the SEA market by leveraging big data solutions. One of the solutions is the integration of hyperlocal data in GrabMaps.

    Hyperlocal information is oriented around very small geographical communities and obtained from the local knowledge that our map team gathers. The map team is spread across SEA, enabling us to define clear specifications (e.g. legal speed limits), and validate that our solutions are viable.

    Figure 1 – Map showing detections from images and probe data, and hyperlocal data.

    Hyperlocal inputs make our mapping data even more robust, adding to the details collected from our image and probe detection pipelines. Figure 1 shows how data from our detection pipeline is overlaid with hyperlocal data, and then mapped across the SEA region. If you are curious and would like to check out the data yourself, you can download it here.

    Processing hyperlocal data

    Now let’s go through the process of detecting hyperlocal data.

    Download data

    GrabMaps is based on OpenStreetMap (OSM). The first step in the process is to download the .pbf file for Asia from geofabrick.de. This .pbf file contains all the data that is available on OSM, such as details of places, trees, and roads. Take for example a park, the .pbf file would contain data on the park name, wheelchair accessibility, and many more.

    For this article, we will focus on hyperlocal data related to the road network. For each road, you can obtain data such as the type of road (residential or motorway), direction of traffic (one-way or more), and road name.

    Convert data

    To take advantage of big data computing, the next step in the process is to convert the .pbf file into Parquet format using a Parquetizer. This will convert the binary data in the .pbf file into a table format. Each road in SEA is now displayed as a row in a table as shown in Figure 2.

    Figure 2 – Road data in Parquet format.

    Identify hyperlocal data

    After the data is prepared, GrabMaps then identifies and inputs all of our hyperlocal data, and delivers a consolidated view to our downstream services. Our hyperlocal data is obtained from various sources, either by looking at geometry, or other attributes in OSM such as the direction of travel and speed limit. We also apply customised rules defined by our local map team, all in a fully automated manner. This enhances the map together with data obtained from our rides and deliveries GPS pings and from KartaView, Grab’s product for imagery collection.

    Figure 3 – Architecture diagram showing how hyperlocal data is integrated into GrabMaps.

    Benefit of our hyperlocal GrabMaps

    GrabNav, a turn-by-turn navigation tool available on the Grab driver app, is one of our products that benefits from having hyperlocal data. Here are some hyperlocal data that are made available through our approach:

    • Localisation of roads: The country, state/county, or city the road is in
    • Language spoken, driving side, and speed limit
    • Region-specific default speed regulations
    • Consistent name usage using language inference
    • Complex attributes like intersection links

    To further explain the benefits of this hyperlocal feature, we will use intersection links as an example. In the next section, we will explain how intersection links data is used and how it impacts our driver-partners and passengers.

    An intersection link is when two or more roads meet. Figure 4 and 5 illustrates what an intersection link looks like in a GrabMaps mock and in OSM.

    Figure 4 – Mock of an intersection link.
    Figure 5 – Intersection link illustration from a real road network in OSM.

    To locate intersection links in a road network, there are computations involved. We would first combine big data processing (which we do using Spark) with graphs. We use geohash as the unit of processing, and for each geohash, a bi-directional graph is created.

    From such resulting graphs, we can determine intersection links if:

    • Road segments are parallel
    • The roads have the same name
    • The roads are one way roads
    • Angles and the shape of the road are in the intervals or requirements we seek

    Each intersection link we identify is tagged in the map as intersection_links. Our downstream service teams can then identify them by searching for the tag.

    Impact

    The impact we create with our intersection link can be explained through the following example.

    Figure 6 – Longer route, without GrabMaps intersection link feature. The arrow indicates where the route should have suggested a U-turn.
    Figure 7 – Shorter route using GrabMaps by taking a closer link between two main roads.

    Figure 6 and Figure 7 show two different routes for the same origin and destination. However, you can see that Figure 7 has a shorter route and this is made available by taking an intersection link early on in the route. The highlighted road segment in Figure 7 is an intersection link, tagged by the process we described earlier. The route is now much shorter making GrabNav more efficient in its route suggestion.

    There are numerous factors that can impact a driver-partner’s trip, and intersection links are just one example. There are many more features that GrabMaps offers across Grab’s services that allow us to “outserve” our partners.

    Conclusion

    GrabMaps and GrabNav deliver enriched experiences to our driver-partners. By integrating certain hyperlocal data features, we are also able to provide more accurate pricing for both our driver-partners and passengers. In our mission towards sustainable growth, this is an area that we will keep on improving by leveraging scalable tech solutions.

    Join us

    Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

    Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

    Unsupervised graph anomaly detection – Catching new fraudulent behaviours

    Post Syndicated from Grab Tech original https://engineering.grab.com/graph-anomaly-model

    Earlier in this series, we covered the importance of graph networks, graph concepts, graph visualisation, and graph-based fraud detection methods. In this article, we will discuss how to automatically detect new types of fraudulent behaviour and swiftly take action on them.

    One of the challenges in fraud detection is that fraudsters are incentivised to always adversarially innovate their way of conducting frauds, i.e., their modus operandi (MO in short). Machine learning models trained using historical data may not be able to pick up new MOs, as they are new patterns that are not available in existing training data. To enhance Grab’s existing security defences and protect our users from these new MOs, we needed a machine learning model that is able to detect them quickly without the need for any label supervision, i.e., an unsupervised learning model rather than the regular supervised learning model.

    To address this, we developed an in-house machine learning model for detecting anomalous patterns in graphs, which has led to the discovery of new fraud MOs. Our focus was initially on GrabFood and GrabMart verticals, where we monitored the interactions between consumers and merchants. We modelled these interactions as a bipartite graph (a type of graph for modelling interactions between two groups) and then performed anomaly detection on the graph. Our in-house anomaly detection model was also presented at the International Joint Conference on Neural Networks (IJCNN) 2023, a premier academic conference in the area of neural networks, machine learning, and artificial intelligence.

    In this blog, we discuss the model and its application within Grab. For avid audiences that want to read the details of our model, you can access it here. Note that even though we implemented our model for anomaly detection in GrabFood and GrabMart, the model is designed for general purposes and is applicable to interaction graphs between any two groups.

    Interaction-Focused Anomaly Detection on Bipartite Node-and-Edge-Attributed Graphs
    By Rizal Fathony, Jenn Ng, Jia Chen
    Presented at International Joint Conference on Neural Networks (IJCNN) 2023

    Before we dive into how our model works, it is important to understand the process of graph construction in our application as the model assumes the availability of the graphs in a standardised format.

    Graph construction 

    We modelled the interactions between consumers and merchants in GrabFood and GrabMart platforms as bipartite graphs (G), where the first group of nodes (U) represents the consumers, the second group of nodes (V) represents the merchants, and the edges (E) connecting them means that the consumers have placed some food/mart orders to the merchants. The graph is also supplied with rich transactional information about the consumers and the merchants in the form of node features (Xu and Xv), as well as order information in the form of edge features (Xe).

    Fig 1. Graph construction process

    The goal of our anomaly model is to detect anomalous and suspicious behaviours from the consumers or merchants (node-level anomaly detection), as well as anomalous order interactions (edge-level anomaly detection). As mentioned, this detection needs to be done without any label supervision.

    Model architecture

    We designed our graph anomaly model as a type of autoencoder, with an encoder and two decoders – a feature decoder and a structure decoder. The key feature of our model is that it accepts a bipartite graph with both node and edge attributes as the input. This is important as both node and edge attributes encode essential information for determining if certain behaviours are suspicious. Many previous works on graph anomaly detection only support node attributes. In addition, our model can produce both node and edge level anomaly scores, unlike most of the previous works that produce node-level scores only. We named our model GraphBEAN, which is short for Bipartite Node-and-Edge-Attributed Networks.

    From the input, the encoder then processes the attributed bipartite graph into a series of graph convolution layers to produce latent representations for both node groups. Our graph convolution layers produce new representations for each node in both node groups (U and V), as well as for each edge in the graph. Note that the last convolution layer in the encoder only produces the latent representations for nodes, without producing edge representations. The reason for this design is that we only put the latent representations for the active actors, the nodes representing consumers and merchants, but not their interactions.

    Fig 2. GraphBEAN architecture

    From the nodes’ latent representations, the feature decoder is tasked to reconstruct the original graph with both node and edge attributes via a series of graph convolution layers. As the graph structure is provided by the feature decoder, we task the structure decoder to learn the graph structure by predicting if there exists an edge connecting two nodes. This edge prediction, as well as the graph reconstructed by the feature decoder, are then compared to the original input graph via a reconstruction loss function.

    The model is then trained using the bipartite graph constructed from GrabFood and GrabMart transactions. We use a reconstruction-based loss function as the training objective of the model. After the training is completed, we compute the anomaly score of each node and edge in the graph using the trained model.

    Anomaly score computation

    Our anomaly scores are reconstruction-based. The score design assumes that normal behaviours are common in the dataset and thus, can be easily reconstructed by the model. On the other hand, anomalous behaviours are rare. Therefore the model will have a hard time reconstructing them, hence producing high errors.

    Fig 3. Edge-level and node-level anomaly scores computation

    The model produces two types of anomaly scores. First, the edge-level anomaly scores, which are calculated from the edge reconstruction error. Second, the node-level anomaly scores, which are calculated from node reconstruction error plus an aggregate over the edge scores from the edges connected to the node. This aggregate could be a mean or max aggregate.

    Actioning system

    In our implementation of GraphBEAN within Grab, we designed a full pipeline of anomaly detection and actioning systems. It is a fully-automated system for constructing a bipartite graph from GrabFood and GrabMart transactions, training a GraphBEAN model using the graph, and computing anomaly scores. After computing anomaly scores for all consumers and merchants (node-level), as well as all of their interactions (edge-level), it automatically passes the scores to our actioning system. But before that, it also passes them through a system we call fraud type tagger. This is also a fully-automated heuristic-based system that tags some of the detected anomalies with some fraud tags. The purpose of this tagging is to provide some context in general, like the types of detected anomalies. Some examples of these tags are promo abuse or possible collusion.

    Fig 4. Pipeline in our actioning system

    Both the anomaly scores and the fraud type tags are then forwarded to our actioning system. The system consists of two subsystems:

    • Human expert actioning system: Our fraud experts analyse the detected anomalies and perform certain actioning on them, like suspending certain transaction features from suspicious merchants.
    • Automatic actioning system: Combines the anomaly scores and fraud type tags with other external signals to automatically do actioning on the detected anomalies, like preventing promos from being used by fraudsters or preventing fraudulent transactions from occurring. These actions vary depending on the type of fraud and the scores.

    What’s next?

    The GraphBEAN model enables the detection of suspicious behaviour on graph data without the need for label supervision. By implementing the model on GrabFood and GrabMart platforms, we learnt that having such a system enables us to quickly identify new types of fraudulent behaviours and then swiftly perform action on them. This also allows us to enhance Grab’s defence against fraudulent activity and actively protect our users.

    We are currently working on extending the model into more generic heterogeneous (multi-entity) graphs. In addition, we are also working on implementing it to more use cases within Grab.

    Join us

    Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

    Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

    Zabbix in: exploratory data analysis rehearsal – Part 3

    Post Syndicated from Paulo R. Deolindo Jr. original https://blog.zabbix.com/zabbix-in-exploratory-data-analysis-rehearsal-part-2-2/26266/

    Abstract

    This will be the last blog post of the “Zabbix in… exploratory data analysis rehearsal” series. To continue our initial proposal, we’ll close the third and fourth data distribution moments concept. This time, we’ll talk about Skewness and Kurtosis.

    Remember the first and second articles of the series to be aware of what we are discussing here.

    The four moments for a data distribution

    While the first moment helps us with the location estimate for some data distribution, the second moment works with its variance. The third moment, called asymmetry, allows us to understand the value trends and the degree of the asymmetry. The fourth moment is called kurtosis and is about the probability of the peak’s existence (outliers).

    These four moments are not the final study about the data distribution. There is so much to learn and apply to data science when considering statistical concepts, but for now we must finish the initial proposal and bring forward some insights for decision makers.

    Let’s get started!

    Asymmetry

    Based on our web application scenario, we can see a certain asymmetry in response time in most cases. This is normal and expected – so far, no problems. But it is also true that some symmetry is also possible in certain cases. Again, no problems here.

    So, where is the problem? When does it happen?

    Sometimes, the web application response time can be too different from the previous one, and we have no control over it. In these cases, the outliers must be found, and the correct interpretation must be applied. At that point, we must consider anomalies in the environment. Sometimes, the outliers are just a deviation. In all cases, we must pay attention and monitor the metrics that can make the difference.

    Speaking of asymmetry – why is this topic so special? One of the possible answers is that we need to understand the degree of the asymmetry – whether it is high or moderated and whether the values were in most cases smaller than or bigger than the mean or median. In other words, what does the asymmetry say about the web application performance?

    Let’s check some implementations.

    The key skewness

    From version 6.0, Zabbix introduced the item key skewness.

    For example, it can be used like this:

    skewness(/host/key,1h) # the skewness for the last hour until now

    Now, let’s see how this formula could be applied to our scenario:

    skewness(//net.tcp.service.perf[http,”{HOST.CONN}”,”{$NGINX.STUB_STATUS.PORT}”],1h:now/h)

    Using skewness and time shift “1h:now/h”, we are looking for some web application response time asymmetry at the previous hour.

    The asymmetry can be negative (left skew), zero, positive (right skew) or undefined.

    Definition: a left-skewed distribution is longer on the left side of its peak than on its right. In other words, a left-skewed distribution has a long tail on its left side.

    Considering a left skew, it is possible to state that at the previous hour, the web application had more higher values than smaller values. This means that our web application does not perform as well as it should.

    Look at the graph above. You can see some bars on the left side of the mean and other bars on the right side of the mean, with the same size as a mirror. We can consider this a normal distribution for web application response time, but it does not mean that the response times were good or bad – it only means that they had some balance and suggests more investigation.

    Definition: a right-skewed distribution is longer on the right side of its peak than on its left. In other words, a right-skewed distribution has a long tail on its right side.

    Considering a right skew, it is possible to state that at the previous hour, the web application had more smaller values than higher values. This means our web application performs as expected.

    Value Map for Skewness

    You must create in your template the following value map:

    If you wish, the value map can also be as below:

    “is greater than or equals”                                 0.1              à Mais tempos bons, se comparados à média

    “equals”                                                               0               à Tempos de resposta simétricos ou bem distribuídos em bons e ruins

    “is less than or equals”                                       0                à Mais tempos ruins, se comparados à média

    Pearson Skewness Coefficient

    The Pearson’s Coefficient is a very interesting indicator. Considering some skewness for a data distribution, it tells us if the asymmetry is strong or only moderate.

    We can create a calculated item for the Pearson’s Coefficient:

    (3*(avg-median))/stddevpop

    In Zabbix, we need:

    • One item for the response time average considering the previous hour:
      • Key: resp.time.previous.hour
        • Formula: trendavg(//net.tcp.service.perf[http,”{HOST.CONN}”,”{$NGINX.STUB_STATUS.PORT}”],1h:now/h)
      • One item for calculating the median (for percentile, see the previous blog post):
        • Key: response.time.previous.hour
          • Formula: (last(//p51.previous.hour)+last(//p50.previous.hour))/2
        • One item for the standard deviation calculation:
          • Key:
            • Formula: response.time.previous.hour stddevpop(//net.tcp.service.perf[http,”{HOST.CONN}”,”{$NGINX.STUB_STATUS.PORT}”],1h:now/h)

    Finally, a calculated item for a Pearson’s Coefficient:

    • Key: coefficient.requests.previous.hour
      • Formula:
        ((last(//trendavg.requests.per.minute)-
        last(//median.access.previous.hour))*3)
        /
        last(//stddevpop.requests.previous.hour)

    To finish the exercise, create a value map:

    Curtose

    Kurtosis is the fourth data distribution and can indicate if the values are prone to peaks.

    In Zabbix, you can implement or calculate kurtosis with other calculated items:

    kurtosis(/host/key,1h)

    Um valor negativo de curtose nos diz que a distribuição não está propensa ou produziu poucos outliers. Já um valor positivo de curtose nos diz que a distribuição está propensa ou produziu muitos outliers. Tudo gira em torno da média da distribuição de dados. Já um valor neutro, ou zero, nos diz que a distribuição é considerada simétrica.

    Para nosso cenário web, temos:

    kurtosis(//net.tcp.service.perf[http,”{HOST.CONN}”,”{$NGINX.STUB_STATUS.PORT}”],1h:now/h)

    Explanatory Dashboard

    Considering the image above, the data distribution for the web application response time at the previous hour has a left skew. This suggests that, when considering the response time mean at the previous hour, there were more higher values than smallest values. That’s sad – our web application performed poorly. Why?

    However, because the skewness was moderated (considering the Pearson’s Coefficient), it means that the response time values were not so different from the others at the same hour.

    As for kurtosis, we can say that the data distribution is prone to peaks because we have positive kurtosis. In basic statistics, the values are near the mean and there is a high probability of producing outliers.

    This is a point to pay attention to if you are looking at a critical service.

    Looking at other metrics

    To check and validate our interpretation, let’s visualize the collected values on a simple Zabbix graph, using a graph widget.

    Please consider using the following “time period” configuration:

    The graph will show only the data collected at the previous hour – in this case, from 11:00 to 11:59.

    The graph in Zabbix allows us to visualize the 50th percentile as well. We do not display the mean on the graph because it wouldn’t be well represented visually as it was collected only once, thus lacking an interesting visual trend line like the 50th percentile. However, notice that the mean and the 50th percentile values are very close, which will give us an idea of the data distribution around this measure.

    Partial Conclusion

    Skewness and Kurtosis, respectively, are the third and fourth moments of a data distribution. They help us understand the environment’s behavior and allow us to gain insight into a lot of things (in this case, we simply applied these concepts to IT infrastructure monitoring and focused on our web application to analyze its performance).

    In most cases, the asymmetry will exist – it’s normal and expected. However, knowing some properties of the skewness can help us understand the response times, indicating good or poor performance. The skewness coefficient allows us to know if the asymmetry was strong or just moderated. Meanwhile, kurtosis helps us to understand if the data distribution produced some peaks considering an observation period or whether it is prone to produce peaks. We can then create some triggers for that and avoid some undesirable behaviors in the future, based on our data distribution observation. This is applied data science at its best.

    Conclusion

    Data science can be easily applied to (and bring up insights about) Zabbix and its Aggregate functions. It’s true that there are some special functions such as skewness, kurtosis, stddevpop, stddevsamp, mad, and so on, but there are old functions to help us too, such as percentile, forecast, timeleft, etc. All these functions must be used in calculated item formulas.

    One of the interesting advantages of using Zabbix in data science and performing an exploratory data analysis is the fact that Zabbix can monitor everything. This means that the database already exists with the relevant date to analyze, in real time.

    In the blog posts in this series, we took as a basis some data referring to the previous hour, the previous day, and so on, but we did nothing regarding the current hour. If we applied the concepts studied in real time data, we would have other “live” results, including support for decision-making. This is because we will not only study historical data, but instead will have the opportunity to change the course of events.

    Zabbix is improving dashboards significantly. From Zabbix 6.4, we have many new out-of-the-box widgets and the possibility to create our own. However, there is a concern – Zabbix administrators sometimes show unnecessary data in dashboards, which can cloud the decision-making process. Zabbix administrators, in general, might want to learn storytelling techniques to rectify this situation. Maybe in a perfect world!

    I hope you have enjoyed this blog post series.

    Keep studying!

     

    The post Zabbix in: exploratory data analysis rehearsal – Part 3 appeared first on Zabbix Blog.

    Zabbix in: exploratory data analysis rehearsal – Part 2

    Post Syndicated from Paulo R. Deolindo Jr. original https://blog.zabbix.com/zabbix-in-exploratory-data-analysis-rehearsal-part-2/26151/

    Abstract

    In the previous blog post, we just explored some of the basic statistic concepts to estimate KPIs for a web application response time: in that case, average, median and percentile. Additionally, we improved the nginx out-of-the-box template and showed some results in simple dashboards. Now we must continue our work but this time, analyzing some variances of the collected metrics, considering a certain period.

    Please, read the previous blog post to understand the context in a better way. I wish you a good reading.

    A little about basic statistic

    In basic statistic, a data distribution has at least four moments:

    • Location estimate
    • Variance
    • Skewness
    • Kurtosis

    In the previous blog post, we introduced the 1st moment, knowing some estimates of our data distribution. It means that we have analyzed some values of our web application response time. It reveals that the response time can have minimum and maximum values, average, a value that can represent a central value of the distribution and so on. Some metrics, such as average, can be influenced by outliers, but other metrics do not, such as 50th percentile or median. To conclude, now we know something about the variance of those values, but it isn’t enough. Let’s check the 2nd moment of the data distribution: Variance.

    Variance

    So, we have some notion about the variance of the web application response time, meaning that it can have some asymmetry (in most cases) and we also know that some KPIs must be considered but, which of them?

    In exploratory data analysis, we can discover some key metrics but, in most cases, we won’t use all of them, so we have to know each one’s relevance to choose properly which metric can represent the reality of our scenario.

    Yes! There are some cases when some metrics must be added to other metrics so that they make sense otherwise, we can discard them: we must create and understand the context for all those metrics.

    Let’s check some concepts of the variance:

    • Variance
    • Standard deviation
    • Median absolute deviation (MAD)
    • Amplitude
    • IQR – Interquartile range

    Amplitude

    This concept is simple and its formula too: it is the difference between the maximum and the minimum value in a data distribution. In this case, we are talking about data distribution at the previous hour (,1h:now/h). We are interested in knowing the range of variation in response times in that period.

    Let’s create a Calculated item to Amplitude metric in “Nginx by HTTP modified” template.

    • trendmax(//net.tcp.service.perf[http,”{HOST.CONN}”,”{$NGINX.STUB_STATUS.PORT}”],1h:now/h)
    • trendmin(//net.tcp.service.perf[http,”{HOST.CONN}”,”{$NGINX.STUB_STATUS.PORT}”],1h:now/h)

    In other terms, it could be:

    • max(/host/key)-min(/host/key)

    However, we are analysing a data distribution based on the previous hour, so…

    • trendmax(/host/key,1h:now/h)-trendmin(/host/key,1h:now/h)

    Modifying our dashboard, we’ll see something like this:

     

    This result interpretation could be: between the worst and the best response times, the variance is too small. It means, during that hour the response times had no significant differences.

    However, amplitude itself is not enough to get some web application diagnosis at that moment. It’s necessary to combine this result with other results and we’ll see how to do it.

    To complement, we can create some triggers based on it:

    • Fire if the response time amplitude was bigger than 5 seconds at the previous hour. It means that the web application did not perform as expected considering the web application requests.
      • Expression = last(/Nginx by HTTP modified/amplitude.previous.hour)>5
      • Level = Information
    • Fire if the response time amplitude reaches 5 seconds at least 3 consecutive times. It means at the last 3 hours, there was too much variance among the web application response times and it is not the expected.
      • Expression = max(/Nginx by HTTP modified/amplitude.previous.hour,#3)>5
      • Level = Warning

    Remember, we are evaluating the previous hour and it makes no sense to generate this metric every single minute. Let’s create a Custom interval period for it.

     

    By doing it, we are avoiding flapping on triggers environment.

    IQR – Interquartile range

    Consider these values below:

    3, 5, 2, 1, 3, 3, 2, 6, 7, 8, 6, 7, 6

    Open the shell environment. Create the file “vaules.txt” and insert each one, one per line. Now, read the file:

    # cat values.txt

    3
    5
    2
    1
    3
    3
    2
    6
    7
    8
    6
    7
    6

    Now, send the value to Zabbix using Zabbix sender:

    # for x in `cat values.txt`; do zabbix_sender -z 127.0.0.1 -s “Web server A” -k input.values -o $x; done

    Look at the historical data using Zabbix frontend.

     

    Now, let’s create some Calculated items to 75th  percentile and 25th  percentile.

    • Key: iqr.test.75
      Formula: percentile(//input.values,#13,75)
      Type: Numeric (float)
    • Key: iqr.test.25
      Formula: percentile(//input.values,#13,25)
      Type: Numeric (float)

    If we apply it on a Linux terminal the command “sort values.txt”, we’ll get the same values ordered by size. Let’s check:

    # sort values.txt

     

    We’ll use the same concept here.

    From the left to the right, go to the 25th percentile. You will get the number 3.

    Do it again, but this time go to the 50th percentile. You will get the number 5.

    And again, go to the 75th percentile. You will get the number 6.

    The IQR is the difference between the 75th percentile (Q3) and the 25th percentile (Q1). So, we are excluding the outliers (the smallest values on the left and the biggest values on the right).

    To calculate the IQR, you can create the following Calculate item:

    • key: iqr.test
      Formula: last(//iqr.test.75)-last(//iqr.test.25)

    Now, we’ll apply this concept in Web Application Response Time.

    The Calculated Item for the 75th percentile:

    key: percentile.75.response.time.previous.hour
    Formula: percentile(//net.tcp.service.perf[http,”{HOST.CONN}”,”{$NGINX.STUB_STATUS.PORT}”],1h:now/h,75)

    The Calculated item for the 25th percentile:

    key: percentile.25.response.time.previous.hour
    Formula: percentile(//net.tcp.service.perf[http,”{HOST.CONN}”,”{$NGINX.STUB_STATUS.PORT}”],1h:now/h,25)

    The Calculate item for the IQR:

    key: iqr.response.time.previous.hour
    Formula: last(//percentile.75.response.time.previous.hour)-last(//percentile.25.response.time.previous.hour)

    Keep the monitoring schedule at the 1st minute for each hour, to avoid repetition (it’s very important) and adjust the dashboard.

    Considering the worst web response time and the best web response time at the previous hour, the AMPLITUDE returns a big value in comparison to the IQR and it happens because the outliers were discarded in IQR calculation. So, just as the mean is a location estimate that is influenced by outliers and the median is not, so are the RANGE and IQR. The IQR is a robust indicator and allows us to know the difference between the web response time variance in a central position.

    P.S.: we are considering only the previous hour, however, you can apply the IQR concept to an entire period, such as the previous day, or the previous week, the previous month and so on, using the correct time shift notation. You can use it to compare the web application response time variance between the periods you wish to observe and then, get some insights about the web application behavior at different times and situations.

    Variance

    The variance is a way to calculate the dispersion of data, considering its average. In Zabbix, calculating the variance is simple, since there is a specific formula for that, through a Calculated item.

    The formula is the following:

    • Key: varpop.response.time.previous.hour
      Formula: varpop(//net.tcp.service.perf[http,”{HOST.CONN}”,”{$NGINX.STUB_STATUS.PORT}”],1h:now/h)

    In this case, the formula returns the dispersion of data, however, there is a characteristic: at some point, the data is squared and then, the data scale changes.

    Let’s check the steps for calculating the variance of the data:

    1st) Calculate the mean;
    2nd) Subtract the mean of each value;
    3rd) Square each subtraction result;
    4th) Perform the sum of all squares;
    5th) Divide the result of the sum by the total observations.

    At the 3rd step, we have the scale change. This new data can be used to other calculations in the future.

    Standard Deviation

    The root square of the variance.

    Calculating the root square of the variance, the data can come back to its original scale!

    There are at least two ways to do it:

    1. Using the root square key and formula in Zabbix:
      1. Key: varpop.previous.hour
      2. Formula: sqrt(last(//varpop.response.time.previous.hour))
    2. Using the standard deviation key and formula in Zabbix:
      1. Key:previous.hour
      2. Formula: stddevpop(//host,key,1h:now/h) # an example for the previous hour

    A simple way to understand the standard deviation concept is: a way of knowing how “far” values are from the average. So, applying the specific formula, we’ll get that indicator.

    Look at this:

    The image above is a common image that can be found on the Internet, and it can help us understand some results. The standard deviation value must be near “zero”, otherwise, we’ll have serious deviations.

    Let’s check the following Calculated item:

    • stddevpop(//net.tcp.service.perf[http,”{HOST.CONN}”,”{$NGINX.STUB_STATUS.PORT}”],1h:now/h)

    We are calculating the standard deviation based on the collected values at the previous hour. Let’s check the Test item on the frontend:

    The test returned some value less than 1, at about 0.000446. If this value is less than 1, we don’t have a complete deviation and it means that the collected values at the previous hour are near the average.

    For a web application response time, it can represent a good behavior, with no significant variances and as expected, off course, other indicators must be checked to a complete and reliable diagnosis.

    Important notes about standard deviation:

    • Sensitive to outliers
    • Can be calculated based on the population of a data distribution or based on its sample.
      • Using this formula: stddevsamp. In this case, it can return a different value from the the previous one.

    Median Absolute Deviation (MAD)

    While the Standard Deviation is a simple way to understand if the data of a distribution are far from its mean, MAD help us understand if these values are far from its median. So, MAD can be considered a robust estimate, because is not sensitive to outliers.

    Warning: If you need to identify outliers or consider them in your analysis, the MAD function is not recommended, because it ignores them.

    Let’s check our dashboard and compare different deviation calculations for the same data distribution:

    Note that the last one is based on MAD function, and it is less than the other items, just because it is not considering outliers.

    In this particular case, the web application is stable, and its response times are near to the mean or median (considering the MAD algorithm).

    Exploratory Dashboard

    Partial conclusion

    In this post, we have just introduced the data distribution moments, presenting the variability or variance concept and then, we learned some techniques to achieve some KPIs or indicators.

    What do we know? The response time for a web application can be different from the previous one and so on, so the knowledge of the variance can help us understand about the application behaviors using some extraordinary data. Then, we can decide if an application had a good or poor performance, for example.

    Of course, it was a didactic example for some data distribution and the location estimate and variance concepts can be applied to other data exploratory analysis considering a long period, such as days, weeks, months, years, and so on. In those case, is very important consider using trends instead of history data.

    Our goal is bringing to the light some extraordinary data and insights, instead of common data, allow us knowing better our application.

    In the next posts, we’ll talk about Skewness and Kurtosis, the 3rd and 4th moments for a data distribution, respectively.

    The post Zabbix in: exploratory data analysis rehearsal – Part 2 appeared first on Zabbix Blog.