All posts by Grab Tech

Android App Size at Scale with Project Bonsai

Post Syndicated from Grab Tech original https://engineering.grab.com/project-bonsai

Grab is Southeast Asia’s leading superapp, providing a suite of services that brings essential needs to users throughout the region. Its offerings include ride-hailing, food delivery, parcel delivery, mobile payments, and more. With safety, efficiency, and user-centered design at heart, Grab remains dedicated to solving everyday issues and improving the lives of millions.

As the app continues to expand with more features, Grab identified the need for a consistent, high-quality experience for new users who may have limited storage space or restricted internet bandwidth. Read to find out more about Project Bonsai and how it reduced app download size and app disk size.

Introduction

In 2020, Google conducted research that highlighted the negative impact of app sizes on conversion rates, revealing a 1% decrease for every 6MB expansion of the app APK size. This finding prompted Grab to ensure new and existing users had a consistently excellent Grab superapp experience, given the prevalence of low-end devices and disparate internet infrastructure in Southeast Asian regions. As a result, Grab initiated Project Bonsai in Q3 2021, with the goal of reducing and optimising the app size while enhancing user experience, reducing installation barriers, and boosting user acquisition.

Understanding the problem

The Grab superapp, with over 4 million lines of code and integration with hundreds of third-party libraries, had a significant app size. Given the prevalence of low-end devices and disparate internet infrastructure in our target region, it is crucial for us to proactively and constantly ensure we are delivering excellence in app-based user experience.

Objectives of the Bonsai project

The Bonsai project focused on these two key metrics:

  • App Download Size: This represents the total size of the compressed APK file that users need to download from Google Play when performing a fresh installation.
  • App Disk Size: This encompasses the total storage space occupied by the app on user devices, including both the binary and data generated by the app.

In this article, we will share the strategy and solutions that resulted in a successful 26% reduction in App Download Size, while also reducing the App Disk Size.

Status quo

Prior to the Bonsai project, the Grab app project had implemented various measures to achieve optimal app size. Here are some notable highlights:

  • Leveraging App Bundle: Since 2019, Grab has been using the app bundle approach to optimise app delivery. This approach generates smaller APKs tailored to specific device configurations, ensuring users receive optimised APKs. This helps reduce the overall app size and improve installation efficiency.

  • Monitoring: With a team of over 100 Android engineers and multiple collaborative teams, the Grab app undergoes a weekly release process involving hundreds of commits for each release. Closely monitoring app size changes with every commit is essential for our team. The team established debug build (APK file size) monitoring for every commit merged to the master branch. Regular weekly reviews are conducted to stay updated on the app size and identify commits that might lead to changes in app size. However, occasional mismatches may occur due to discrepancies between the debug and release builds.

Monitoring the changes in APK size
  • R8 Integration: R8/Proguard, known as the code shrinker, obfuscator, and optimiser, has been enabled since the beginning. This powerful tool helps reduce the app’s bytecode and resources, leading to further size optimisation and improved app performance.

  • Resource Optimisation: The team diligently pursued resource optimisation strategies, including:

    • Images: Engineers were encouraged to use vector images whenever possible, as they usually have smaller file sizes than raster images. In exceptional cases where raster images were necessary, Grab adopted the webp format instead of png, utilising better image compression to minimise app size.
    • Language ResourceConfig: Grab enabled resourceConfig to support only the languages actively used by the Grab app, reducing unnecessary resource overhead and enhancing app efficiency.
  • Third-Party Libraries Review: The team established a review process for third-party libraries, assessing their size impact on the app. This practice ensured that only essential libraries were included, preventing unnecessary bloating of the app size.

Despite the application of these measures and solutions aimed at managing the app size, there was still the potential of significant expansion in magnitude.

Strategy

The Bonsai project revolves around strategic pillars, namely Measurement, Reduction, and Containment.

Project Bonsai’s three strategic pillars for continuous app size reduction

In the Measurement phase, the focus is on providing accurate information on the app’s binary composition and how individual features, modules, libraries impact the overall app size. This allows teams to make informed decisions and gain insights into their components’ influence on the app’s size.

The insights from the Measure phase provided us with a list of actionable items for our backlog. In the Reduction phase, we employ strategic action to tackle this backlog to constantly achieve optimal app size.

Optimising the app size is not a one-time endeavour, especially as more features are added over time, potentially increasing the project’s size. While there may be limited solutions to manage app size, it’s important to find a balance between size and functionality. Else, the effort and trade-offs required may become overwhelming. Therefore, in the Containment phase, we intend to introduce effective long-term strategies and solutions designed to manage the app’s size.

In the remainder of this blog post, we explore the strategic pillars and actions taken to contain the download size.

Measure

The Grab Passenger App Core team actively engages in optimisation projects and recognised the importance of measurement as the foundation for improvement. For example, enhancing the app startup time, pipeline time, build time, and more.

In every optimisation endeavour, we adhere to a crucial principle: “MEASURE” – the first and most critical step for any improvement project. As the famous quote goes, “If you can’t measure it, you can’t improve it.” This emphasises the significance of accurate and comprehensive measurement as the foundation for driving successful optimisation efforts.

In the third quarter of 2021, our team initiated an investigation into existing tools provided by both Google and the broader community. The intention was to employ tools such as APK Analyzer or Android Studio to conduct a thorough analysis of the app binary. However, it soon became evident that these tools were not well-suited to accommodate the extensive scope of our project.

In order to accommodate our discovery, we developed a custom analytics tool called App Sizer. This tool is specifically designed to analyse app binaries from bundle files. Our primary goal was to construct a solution that adheres effectively to our unique needs.

The tool was seamlessly integrated into Grab’s CI system and sends data to a Grafana instance. As a result, the tool collates and transmits daily analytics data from the release candidate branch. It offers the following key functionalities and monitors important aspects such as:

Device-specific App Download Size: Precise information about the app download size for specific devices, focusing on optimising the App Download Size.

Trends for app download size by device type

Comprehensive Size Breakdown: A breakdown of the app’s size, including the proportion attributed to the codebase Kotlin/Java, Kotlin/Java-based libraries, native libraries, resources, and other relevant factors.

Comprehensive breakdown of app download size by component

Size Contribution by Teams: Insights into the size contributed by each individual team within the project’s scope.

Breakdown of Grab’s codebase by TF

Module-wise Size Contribution: Insights into the size impacted by each module, categorised by team.

Breakdown of the codebase by TF modules

Size Contribution by Third-Party Libraries: Information about the size attributed to each third-party library incorporated within the app.

App download size contribution by external libraries and SDK breakdown

List of Large Files: A categorised list of large files (file size exceeding X value), organised by each respective team.

Large file categories broken down by TF

It’s important to note that all the size values presented within these dashboards specifically pertain to the download size, representing the contribution of each item to the overall app download size.

As part of our commitment to the developer community, we plan to open-source this tool in the near future, allowing others to benefit from its capabilities as well.

Reduce

To optimise the app based on the analysis data obtained from the measuring step, we focused on applying common solutions from Google and the suggestions from the community. There were no fancy solutions that we invented. Our concentration centered on optimising the dex file size, refining resources, and eliminating duplication and redundancy.

dex file optimisation (Java/Kotlin)

In our initial findings, it became evident that Java/Kotlin code was the major contributor of app size. Recognising this, we made it our top priority for optimisation.

R classes

During our investigation, we discovered that a proportion of the overall app size was attributable to R classes. Further research unveiled two primary reasons behind this phenomenon:

  1. Transitive R classes: R classes contained ID references not only to their own resources but also to resources from their transitive dependencies. This meant that if Module A depended on Module B, and Module B in turn, depended on Module C (Module A -> Module B -> Module C), then Module A’s R class included IDs references to resources from Modules B and C, even if Module A didn’t directly utilise these resources. This explained why R classes in a modularised project could accumulate millions of lines of code.
  2. A spread of Modules and Third-Party Libraries: Our Grab project comprised over 1,500 modules and integrates hundreds of third-party libraries, leading to the generation of significantly large R classes within the project. Furthermore, this discovery also explained instances where our app size monitor exhibited spikes during certain commits despite no significant additions of resources, libraries, or code within those commits. These fluctuations were linked to changes in the dependency graph, further emphasising the impact of Transitive R classes.

It is worth noting that the team had long been cognisant of the challenges posed by Transitive R classes, especially in terms of optimising build times. Consequently, we had already undertaken various initiatives to address this specific challenge related to build times.

However, it wasn’t long before we started wondering why R8 wasn’t removing unused fields from the R classes, which would have resulted in a size reduction for these classes. It turned out that back in mid-2021, we were using Android Gradle Plugin 4.0 along with the default R8 rules. One of these rules was preserving all fields in the R classes:

-keepclassmembers class **.R$* {

   public static <fields>;

}

This rule was the root cause of why unused fields in the R classes were persisting. Google removed this rule in AGP 4.1, and the solution was straightforward: updating AGP to version 4.1.1 (or newer) helped us resolve the issue.

However, due to the project’s unusual size, there was a risk of inadvertently removing non-used R class fields if there were any instances of code accessing R classes through reflection within the codebase or third-party libraries. Since our automation testing did not yet support R8, conducting a full test of the entire project was possible, but would have demanded significant effort from the team. To avoid this substantial effort, we developed a script to search the entire codebase and identify instances where reflections were used, allowing us to assess their usage. For third-party libraries, we decompiled the libraries and applied the same script to the decompiled code.

Fix & Optimise R8 Rules

Subsequently, we conducted a revision of the R8 configuration rules. This involved assessing the compiled R8 configuration file and paying specific attention to any ‘keep’ rules that contained package wildcards. It is crucial to decipher the purpose behind each rule and its reason for existence. Any rules identified as redundant were recommended for removal. Post the thorough scrutiny of the R8 rules, we initiated request tickets urging the respective teams to work on the elimination and optimisation of these rules.

Enable more aggressive optimisations

In 2019, Google began recommending the utilisation of the proguard-android-optimise.txt configuration with code optimisation enabled. However, our project’s origins predate the introduction of Google’s R8, a time when Proguard was the primary tool for code obfuscation and size reduction. Prior to the release of Android Gradle Plugin 3.4.0, there were no explicit recommendations for enabling code optimisations during the minification process. As a result, our project has persisted in using the proguard-android.txt configuration without activating the code optimisation feature.

Our team has considered adopting a more aggressive approach towards optimisation. This approach spans from exploring the optimisation mode to incorporating the R8 full mode. This includes substantial effort required for testing and addressing issues arising from the introduction of these new modes. We encountered a particular challenge wherein the R8 optimisation exhibits instability, an issue that has been reported to Google. A definitive solution remains a work-in-progress.

At present, we have decided to postpone the implementation of a more aggressive R8 mode. However, this remains a high-priority item on our agenda, and we intend to address it in the near future.

Resources optimisation

In addition to optimising the dex file, we also address resource optimisation.

Handling large resources

During the Measure phase, we use the List Of Large Files dashboard to identify large files categorised by teams. For each team, we create request tickets with straightforward guidance. These guidelines encourage the following actions:

  • Explore the possibility of removing unnecessary resources.
  • Consider offloading the resource to the Internet (server) when feasible. Within Grab, we have the Asset Delivery Kit, which facilitates hosting and downloading resources on the client side.
  • Optimise files by converting them to alternative formats or reducing their size. For instance, for images, we recommend utilising vector images and the Webp format, among other optimisations.

Convert PNG to Webp

The Grab app project has a long history, and while the team has recently established guidelines and implemented CI processes to promote the use of vector and Webp images, there are still existing images that have not been optimised. The team has undertaken an initiative to address these images and has converted all PNG images to Webp format wherever a reduction in file size is achievable.

Fonts

Fonts are another group of files that have a notable impact on the project’s size. We collaborate with the teams to:

  • Remove fonts that are rarely used in the project.
  • Eliminate duplicate fonts.

While the project still contains numerous fonts, we have a project to unify all features and transition to using a single font. Our recommendation is to explore the use of one primary font style, with the flexibility to incorporate different typeface variations in your programming to achieve various typefaces using the same font.

Remove stale features and replace large library

Based on the data, it was discovered that a specific library, which was contributing approximately 8% to the overall app size, had an adverse impact. This library has since been removed from the project. Moreover, through analysing the Size Contribution by Third-Party Libraries dashboard, we identified duplicates in functions and have made efforts to eliminate these redundancies.

Moreover, in Grab, we are using the feature toggle to enable or disable a feature. The feature flags are controlled remotely. It’s very useful for running an experiment or turning off if a feature causes us any problems. So, many features in the project are controlled under a feature flag. In certain cases, even when some features are deactivated, the corresponding code remains included in the binary. We identify these cases and collaborate with teams to remove the redundant code.

After six months of working on the above initiatives, the Bonsai team managed to reduce the Grab app download size by 26%. This is particularly noteworthy, considering that prior to the commencement of the Bonsai Project, the average app size exhibited a monthly increase of approximately 1%.

Containment

After dedicating over a semester to the Reduce phase, we started the transition to the Containment phase. The first step for this phase involved setting up an App Growth Rate dashboard that presents the growth rate of app download size per release. Our goal is to keep this rate as low as possible.

The team has been discovering a few solutions, such as introducing the common UI design components to prevent duplication, and experimenting with Dynamic Delivery Feature. This phase of exploration is still ongoing and we are optimistic that it will help maintain a manageable app download size, or perhaps even contribute to further optimization.

Considering alternative initiatives, the team is contemplating recognising app size as a confined resource of our application. We believe it should be the responsibility of every team to maintain an optimal app size. Based on the measurements we have, which provide an insight into each team’s impact on the total app download size, it could be advantageous to allocate an ‘app size budget’ to each team. This would entail each team taking responsibility for managing and maintaining the size influenced by their work.

Conclusion

Grab’s Project Bonsai demonstrated the company’s commitment to optimising the app experience for users in Southeast Asia. By prioritising code optimisation, resource management, modularisation, and asset bundling, we achieved substantial optimisations in app size while enhancing user experience. These efforts not only addressed the challenges we outlined, but also contributed to increased user acquisition and improved user retention rates.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Enabling near real-time data analytics on the data lake

Post Syndicated from Grab Tech original https://engineering.grab.com/enabling-near-realtime-data-analytics

Introduction

In the domain of data processing, data analysts run their ad hoc queries on the data lake. The lake serves as an interface between our analytics and production environment, preventing downstream queries from impacting upstream data ingestion pipelines. To ensure efficient data processing in the data lake, choosing appropriate storage formats is crucial.

The vanilla data lake solution is built on top of cloud object storage with Hive metastore, where data files are written in Parquet format. Although this setup is optimised for scalable analytics query patterns, it struggles to handle frequent updates to the data due to two reasons:

  1. The Hive table format requires us to rewrite the Parquet files with the latest data. For instance, to update one record in a Hive unpartitioned table, we would need to read all the data, update the record, and write back the entire data set.
  2. Writing Parquet files is expensive due to the overhead of organising the data to a compressed columnar format, which is more complex than a row format.

The issue is further exacerbated by the scheduled downstream transformations. These necessary steps, which clean and process the data for use, increase the latency because the total delay now includes the combined scheduled intervals of these processing jobs.

Fortunately, the introduction of the Hudi format, which supports fast writes by allowing Avro and Parquet files to co-exist on a Merge On Read (MOR) table, opens up the possibility of having a data lake with minimal data latency. The concept of a commit timeline further allows data to be served with Atomicity, Consistency, Isolation, and Durability (ACID) guarantees.

We employ different sets of configurations for the different characteristics of our input sources:

  1. High or low throughput. A high-throughput source refers to one that has a high level of activity. One example of this can be our stream of booking events generated from each customer transaction. On the other hand, a low-throughput source would be one that has a relative low level of activity. An example of this can be transaction events generated from reconciliation happening on a nightly basis.
  2. Kafka (unbounded) or Relational Database Sources (bounded). Our sinks have sources that can be broadly categorised into unbounded and bounded sources. Unbounded sources are usually related to transaction events materialised as Kafka topics, representing user-generated events as they interact with the Grab superapp. Bounded sources usually refer to Relational Database (RDS) sources, whose size is bound to storage provisioned.

The following sections will delve into the differences between each source and our corresponding configurations optimised for them.

High throughput source

For our data sources with high throughput, we have chosen to write the files in MOR format since the writing of files in Avro format allows for fast writes to meet our latency requirements.

Figure 1 Architecture for MOR tables

As seen in Figure 1, we use Flink to perform the stream processing and write out log files in Avro format in our setup. We then set up a separate Spark writer which periodically converts the Avro files into Parquet format in the Hudi compaction process.

We have further simplified the coordination between the Flink and Spark writers by enabling asynchronous services on the Flink writer so it can generate the compaction plans for Spark writers to act on. During the Spark job runs, it checks for available compaction plans and acts on them, placing the burden of orchestrating the writes solely on the Flink writer. This approach could help minimise potential concurrency problems that might otherwise arise, as there would be a single actor
orchestrating the associated Hudi table services.

Low throughput source

Figure 2 Architecture for COW tables

For low throughput sources, we gravitate towards the choice of Copy On Write (COW) tables given the simplicity of its design, since it only involves one component, which is the Flink writer. The downside is that it has higher data latency because this setup only generates Parquet format data snapshots at each checkpoint interval, which is typically about 10-15 minutes.

Connecting to our Kafka (unbounded) data source

Grab uses Protobuf as our central data format in Kafka, ensuring schema evolution compatibility. However, the derivation of the schema of these topics still requires some transformation to make it compatible with Hudi’s accepted schema. Some of these transformations include ensuring that Avro record fields do not contain just a single array field, and handling logical decimal schemas to transform them to fixed byte schema for Spark compatibility.

Given the unbounded nature of the source, we decided to partition it by Kafka event time up to the hour level. This ensured that our Hudi operations would be faster. Parquet file writes would be faster since they would only affect files within the same partition, and each Parquet file within the same event time partition would have a bounded size given the monotonically increasing nature of Kafka event time.

By partitioning tables by Kafka event time, we can further optimise compaction planning operations, since the amount of file lookups required is now reduced with the use of BoundedPartitionAwareCompactionStrategy. Only log files in recent partitions would be selected for compaction and the job manager need not list every partition to figure out which log files to select for compaction during the planning phase anymore.

Connecting to our RDS (bounded) data source

For our RDS, we decided to use the Flink Change Data Capture (CDC) connectors by Veverica to obtain the binlog streams. The RDS would then treat the Flink writer as a replication server and start streaming its binlog data to it for each MySQL change. The Flink CDC connector presents the data as a Kafka Connect (KC) Source record, since it uses the Debezium connector under the hood. It is then a straightforward task to deserialise these records and transform them into Hudi records, since
the Avro schema and associated data changes are already captured within the KC source record.

The obtained binlog timestamp is also emitted as a metric during consumption for us to monitor the observed data latency at the point of ingestion.

Optimising for these sources involves two phases:

  1. First, assigning more resources for the cold start incremental snapshot process where Flink takes a snapshot of the current data state in the RDS and loads the Hudi table with that snapshot. This phase is usually resource-heavy as there are a lot of file writes and data ingested during this process.
  2. Once the snapshotting is completed, Flink would then start to process the binlog stream and the observed throughput would drop to a level similar to the DB write throughput. The resources required by the Flink writer at this stage would be much lower than in the snapshot phase.

Indexing for Hudi tables

Indexing is important for upserting Hudi tables when the writing engine performs updates, allowing it to efficiently locate the file groups of the data to be updated.

As of version 0.14, the Flink engine only supports Bucket Index or Flink State Index. Bucket Index performs indexing of the file record by hashing the record key and matching it to a specific bucket of files indicated by the naming convention of the written data files. Flink State Index on the other hand stores the index map of record keys to files in memory.

Given that our tables include unbounded Kafka sources, there is a possibility for our state indexes to grow indefinitely. Furthermore, the requirement of state preservation for Flink State Index across version deployments and configuration updates adds complexity to the overall solution.

Thus, we opted for the simple Bucket Index for its simplicity and the fact that our Hudi table size per partition does not change drastically across the week. However, this comes with a limitation whereby the number of buckets cannot be updated easily and imposes a parallelism limit at which our Flink pipelines can scale. Thus, as traffic grows organically, we would find ourselves in a situation whereby our configuration grows obsolete and cannot handle the increased load.

To resolve this going forward, using consistent hashing for the Bucket Index would be something to explore to optimise our Parquet file sizes and allow the number of buckets to grow seamlessly as traffic grows.

Impact

Fresh business metrics

Post creation of our Hudi Data Ingestion solution, we have enabled various users such as our data analysts to perform ad hoc queries much more easily on data that has lower latency. Furthermore, Hudi tables can be seamlessly joined with Hive tables in Trino for additional context. This enabled the construction of operational dashboards reflecting fresh business metrics to our various operators, empowering them with the necessary information to quickly respond to any abnormalities (such as high-demand events like F1 or seasonal holidays).

Quicker fraud detection

Another significant user of our solution is our fraud detection analysts. This enabled them to rapidly access fresh transaction events and analyse them for fraudulent patterns, particularly during the emergence of a new attack pattern that hadn’t been detected by their rules engine. Our solution also allowed them to perform multiple ad hoc queries that involve lookbacks of various days’ worth of data without impacting our production RDS and Kafka clusters by using the data lake as the data interface, reducing the data latency to the minute level and, in turn, empowering them to respond more quickly to attacks.

What’s next?

As the landscape of data storage solutions evolves rapidly, we are eager to test and integrate new features like Record Level Indexing and the creation of Pre Join tables. This evolution extends beyond the Hudi community to other table formats such as IceBerg and DeltaLake. We remain ready to adapt ourselves to these changes and incorporate the advantages of each format into our data lake within Grab.

References

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

The journey of building a comprehensive attribution platform

Post Syndicated from Grab Tech original https://engineering.grab.com/attribution-platform

The Grab superapp offers a comprehensive array of services from ride-hailing and food delivery to financial services. This creates multifaceted user journeys, traversing homepages, product pages, checkouts, and interactions with diverse content, including advertisements and promo codes.

Background: Why ads and attribution matter in our superapp

Ads are crucial for Grab in driving user engagement and supporting our ecosystem by seamlessly connecting users with our services. In the ever-evolving world of advertising, the ability to gauge the impact of marketing investments takes on pivotal significance. Advertisers dedicate substantial resources to promote their businesses, necessitating a clear understanding of the return on AdSpend (ROAS) for each campaign. In this context, attribution plays a central role, serving as the guiding compass for advertisers and marketers, elucidating the effectiveness of touchpoints within campaigns.

For instance, a merchant-partner seeks to enhance its reach by advertising on the Grab food delivery homepage. With the assistance of our attribution system, the merchant-partner can now precisely gauge the impact of their homepage ads on Grab. This involves tracking user engagement and monitoring the resulting orders that stem from these interactions. This level of granularity not only highlights the value of attribution but also demonstrates its capability in providing detailed insights into the effectiveness of advertising campaigns and enabling merchant-partners to optimise their campaigns with more precision.

In this blog, we delve into the technical intricacies, software architecture, challenges, and solutions involved in crafting a state-of-the-art engineering solution for the attribution platform.

Genesis: Pre-project landscape

When our journey began in 2020, Grab’s marketing efforts had limited attribution capabilities and data analytics was predominantly reliant on ad hoc queries conducted by business and data analysts. Before the introduction of a standardised approach, we had to manage discrepant results and a time-consuming manual process of data preparation, cleansing, and storage across teams. When issues arose in the analytical pipeline, resolution efforts took relatively longer and were reoccurring. We needed a comprehensive engineering solution that would address the identified gaps, and significantly enhance metrics related to ROI, attribution accuracy, and data-handling efficiency.

Inception: The pure ads attribution engine (Kappa architecture)

We chose Kappa architecture due to its imperative role in achieving near real-time attribution, especially in support of our new pricing model, cost per order (CPO). With this solution, we aimed to drastically reduce data latency from 2-3 days to just a few minutes. Traditional ETL (Extract, Transform, and Load) based batch processing methods were evaluated but quickly found to be inadequate for our purposes, mainly due to their speed.

In the advertising industry, rapid decision-making is critical. Traditional batch processing solutions would introduce significant latency, hampering our ability to make real-time, data-driven decisions. With its architecture’s inherent capability for real-time stream processing, Kappa emerged as the logical choice. Additionally, Kappa offers the agility required to empower our ad-serving team for real-time decision support, and better ad ranking and selection, enabling dynamic and effective targeting decisions without delay.

The first step on this journey was to create a pure and near real-time stream processing Ads Attribution Engine. This engine was based on the Kappa architecture to provide advertisers with quick insights into their ROAS offering real-time attribution, enabling advertisers to optimise their campaigns efficiently.

High-level workflow of the Ads Attribution Engine

In this solution, we used the following tools in our tech stack:

  • Kafka for event streams
  • DDB for events storage
  • Amazon S3 as the data lake
  • An in-house stream processing framework similar to Keystone
  • Redis for caching events
  • ScyllaDB for storing ad metadata
  • Amazon relational database service (RDS) for analytics
Architecture of the near real-time stream processing Ads Attribution Engine

Evolution: Merging marketing levers – Ads and promos

We began to envision a world where we could merge various marketing levers into a unified Attribution Engine, starting with ads and promos. This evolved vision also aimed to prevent order double counting (when a user interacts with both ads and promos in the same checkout), which would provide a more holistic attribution solution.

With the unified Attribution Engine, we would also enable more sophisticated personalisation through machine learning models and drive higher conversions.

The unified Attribution Engine workflow, which included Promo touch points

The unified attribution engine used mostly the same tech stack, except for analytics where Druid was used instead of RDS.

Architecture of the unified Attribution Engine

Introspection: Identifying shortcomings and the path to improvement

While the unified attribution engine was a step in the right direction, it wasn’t without its challenges. There were challenges related to real-time data processing costs, scalability for longer attribution windows, latency and lag issues, out-of-order events leading to misattribution, and the complexity of implementing multi-touch attribution models. To truly empower advertisers and enhance the attribution process, we knew we needed to evolve further.

Rebirth: The birth of a full-fledged attribution platform (Lambda architecture)

This journey eventually led us to build a full-fledged attribution platform using Lambda architecture, which blended both batch and real-time stream processing methods. With this change, our platform could rapidly and accurately process data and attribute the impact of ads and promos on user behaviour.

Why Lambda architecture?

This choice was a strategic one – real-time processing is vital for tracking events as they occur, but it offers only a current snapshot of user behaviour. This means we would not be able to analyse historical data, which is a crucial aspect of accurate attribution and exploring multiple attribution models. Historical data allows us to identify trends, patterns, and correlations not evident in real-time data alone.

High level workflow for the full-fledged attribution platform with Lambda architecture

In this system’s tech stack, the key components are:

  • Coban, an in-house stream processing framework used for real-time data processing
  • Spark-based ETL jobs for batch processing
  • Amazon S3 as the data warehouse
  • An offline layer that is capable of providing historical context, handling large data volumes, performing complex analytics, and so on.

Key benefits of the offline layer

  • Provides historical context: The offline layer enriches the attribution process by providing a historical perspective on user interactions, essential for precise attribution analysis spanning extended time periods.
  • Handles enormous data volumes: This layer efficiently manages and processes extensive data generated by advertising campaigns, ensuring that attribution seamlessly accommodates large-scale data sets.
  • Performs complex analytics: Enables more intricate computations and data analysis than real-time processing alone, the offline layer is instrumental in fine-tuning attribution models and enhancing their accuracy.
  • Ensures reliability in the face of challenges: By providing fault tolerance and resilience against system failures, the offline layer ensures the continuous and dependable operation of the attribution system, even during unexpected events.
  • Optimises data storage and serving: Relying on Amazon S3, the storage layer for raw data optimises storage by building interactive reporting APIs.
Architecture of our comprehensive offline attribution platform

Challenges with Lambda and mitigation

Lambda architecture allows us to have the accuracy and robustness of batch processing along with real-time stream processing. However, we noticed some drawbacks that may lead to complexity due to maintaining both batch and stream processing:

  • Operating two parallel systems for batch and stream processing can lead to increased complexity in production environments.
  • Lambda architecture requires two sets of business logic – one for the batch layer and another for the stream layer.
  • Synchronisation across both layers can make system alterations more challenging.
  • This dual implementation could also allude to inconsistencies and introduce potential bugs into the system.

To mitigate these complications, we’re establishing an optimisation strategy for our current system. By distinctly separating the responsibilities of our real-time pipelines from those of our offline jobs, we intend to harness the full potential of each approach, while simultaneously curbing the added complexity.

Hence, redefining the way we utilise Lambda architecture, striking an efficient balance between real-time responsiveness and sturdy accuracy with the below proposal.

Vanguard: Enhancements in the future

In the coming months, we will be implementing the optimisation strategy and improving our attribution platform solution. This strategy can be broken down into the following sections.

Real-time pipeline handling time-sensitive data: Real-time pipelines can process and deliver time-sensitive metrics like CPO-related data in near real-time, allowing for budget capping and immediate adjustments to marketing spend. This can provide us with actionable insights that can help with areas like real-time bidding, real-time marketing, or dynamic pricing. By limiting the volume of data through the real-time path, we can ensure it’s more manageable and focused on immediate actionable data.

Batch jobs handling all other reporting data: Batch processing is best suited for computations that are not time-bound and where completeness is more important. By dedicating more time to the processing phase, batch processing can handle larger volumes and more complex computations, providing more comprehensive and accurate reporting.

This approach will simplify our Lambda architecture, as the batch and real-time pipelines will have clear separation of duties. It may also reduce the chance of discrepancies between the real-time and batch-processing datasets and lower the operational load of our real-time system.

Conclusion: A holistic attribution picture

Through our journey of building a comprehensive attribution platform, we can now deliver a holistic and dependable view of user behaviour and empower merchant-partners to use insights from advertisements and promotions. This journey has been a long one, but we were able to improve our attribution solution in several ways:

  • Attribution latency: Successfully reduced attribution latency from 2-3 days to just a few minutes, ensuring that advertisers can access real-time insights and feedback.
  • Data accuracy: Through improved data collection and processing, we achieved data discrepancies of less than 1%, enhancing the accuracy and reliability of attribution data.
  • Conversion rate: Advertisers witnessed a significant increase in conversion rates, a direct result of our real-time attribution capabilities.
  • Cost efficiency: Embracing the Lambda architecture led to a ~25% reduction in real-time data processing costs, allowing for more efficient campaign optimisations.
  • Operational resilience: Building an offline layer provided fault tolerance and resilience against system failures, ensuring that our attribution system continued to operate seamlessly, even during unexpected events.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Managing dynamic marketplace content at scale: Grab’s approach to content moderation

Post Syndicated from Grab Tech original https://engineering.grab.com/dynamic-marketplace

In the fast-paced world of on-demand delivery, maintaining safe marketplaces is a complex undertaking. Grab, a leading superapp in Southeast Asia, operates GrabFood and GrabMart, two popular marketplaces that connect consumers with a wide range of food and daily necessities. With more than 100k listings for different items updated daily by our merchants across eight different countries, Grab is rising to the challenge of ensuring that its marketplaces remain compliant with its own policies, government regulations as well as platform policies.

This article provides an overview of how Grab employs a combination of automated and manual content moderation to manage its dynamic marketplace content efficiently, while also collaborating with Google to ensure marketplace safety. Stay tuned for future articles that will delve deeper into the technology and solutions used for content moderation.

Dynamic Marketplace Landscape

Marketplaces like GrabFood and GrabMart are at the forefront of connecting merchants and consumers. These marketplaces provide an avenue for merchants to showcase their offerings, enabling consumers to conveniently access a plethora of on-demand options. However, in an environment characterized by rapid changes as well as evolving regulatory frameworks, maintaining the integrity of these marketplaces becomes a formidable task.

Scale and Flexibility: A Dual Challenge

The cornerstone of Grab’s success lies in its ability to adapt to the unique regulations and requirements of each country it operates in. This necessitates a nuanced and multifaceted approach to content moderation. To achieve both scale and flexibility, Grab employs a proactive strategy that combines and leverages automated and manual moderation processes.

Automated Moderation

Automated moderation plays a pivotal role in efficiently managing the high volume of listings that undergo daily updates. Grab utilises advanced algorithms and machine learning technologies, built in-house, to scan listings everyday for potential violations of its own policies, government regulations and platform policies. This automation not only speeds up the process to put eligible listings on the Grab platform, but also ensures consistent adherence to predefined guidelines. However, automated moderation is not without its limitations, as contextual understanding and subjective judgment often require human intervention.

Manual Moderation

Recognising the nuanced nature of content moderation, Grab employs a team of human moderators who possess the cultural awareness and contextual understanding necessary to assess complex cases. These moderators review listings flagged by algorithms and machine learning technologies that require human judgment, ensuring that content aligns with Grab’s policies, local regulations as well as platform policies. Manual moderation adds a layer of human insight that automated systems may lack, contributing to a more accurate and contextually sensitive approach.

In its commitment to ensuring marketplace safety, Grab has also established a strong collaboration with Google. Grab works hand in hand with Google to collectively ensure adherence to Play Store policies and guidelines.

Grab

  • Programme Management: Poonam Gambhire, Shuyang Sun
  • Product: Chris Collard
  • Engineering: Shuya Ding, Kirubakaran Duraisamy, Xu Chen

Google

  • Play Policy: Siddhartha Paul Tiwari
  • Business Development: Mika Igarashi

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Rethinking Stream Processing: Data Exploration

Post Syndicated from Grab Tech original https://engineering.grab.com/rethinking-streaming-processing-data-exploration

Introduction

In this digital age, companies collect multitudes of data that enable the tracking of business metrics and performance. Over the years, data analytics tools for data storage and processing have evolved from the days of Excel sheets and macros to more advanced Map Reduce model tools like Spark, Hadoop, and Hive. This evolution has allowed companies, including Grab, to perform modern analytics on the data ingested into the Data Lake, empowering them to make better data-driven business decisions. This form of data will be referenced within this document as “Offline Data”.

With innovations in stream processing technology like Spark and Flink, there is now more interest in unlocking value from streaming data. This form of continuously-generated data in high volume will be referenced within this document as “Online Data”. In the context of Grab, the streaming data is usually materialised as Kafka topics (“Kafka Stream”) as the result of stream processing in its framework. This data is largely unexplored until they are eventually sunk into the Data Lake as Offline Data, part of the data journey (see Figure 1 below). This induces some data latency before the data can be used by data analysts to inform decisions.

Figure 1. Simplified data journey for Offline Data vs. Online Data, from data generation to data analysis.

As seen in Figure 1 above, the Time to Value (“TTV”) of Online Data is shorter as compared to that of Offline Data in a simplified data journey from data generation to data analysis where complexities of data cleaning and transformation have been removed. This is because the role of the data analyst or data scientist (“Data End User”) has been enabled forward to the Kafka stage for Online Data instead of the Data Lake stage for Offline Data. We recognise that allowing earlier data exploration on Online Data allows Data End Users to build context around the data inputs they are using in an earlier stage. This can help them process Offline Data more meaningfully in subsequent stages. We are interested in opening up the possibility for Data End Users to at least explore the Online Data before they architect a full solution to clean and/or process the data directly or more efficiently post-ingestion into the Data Lake. After their data exploration, the users would have more information to decide whether to spin up a stream processing pipeline for Online Data, or to continue processing Offline Data with their current solution, but with a more refined understanding and logic strategy against their source data inputs. However, of course, in this blog, we acknowledge that not all analysis on Online Data could be done in this manner.

Problem statement

Online Data is underutilised within Grab mainly because of, among other reasons, difficulty in performing data exploration on data that is not yet properly stored in the Data Lake.

For the purpose of this blog post, we will focus only on the problem of exploration of Online Data because this problem is the precursor to allowing us to fully democratise such data.

The problem of data exploration manifests itself when Data End Users need to find the proper data inputs to base and develop their data models. These users would then often need to parse through a multitude of documentation and connect with multiple upstream data producers, to know the range of data signals that are currently available and understand what each data signal is trying to measure.

Given the ephemeral nature of Online Data, this implies that the lack of correct tool adoption to seamlessly perform quick tests with application logic on Online Data disincentivises the Data End Users to work on these Online Data. Testing such logic on Offline Data is generally much easier since iteration testing on the exact same dataset is possible.

This difficulty in performing data exploration including ad hoc queries on Online Data has therefore made development of stream processing applications hard for Data End Users, creating headwinds in Grab’s aim to evolve from making data-driven business decisions to also making data-driven operation decisions. Doing both would allow Grab to react much quicker to abrupt changes in its business landscape.

Adoption of Zeppelin notebook environment

To address the difficulty in performing data exploration on Online Data, we have adopted Apache Zeppelin, a web-based notebook that enables data-drive, interactive data analytics with the support of multiple interpreters to work with various data processing backends e.g. Spark, Flink. The full solution of the adopted Zeppelin notebook environment is enabled seamlessly within our internal data-streaming platform, through its control plane. If you are interested, you may check out our previous blog post titled An elegant platform for more details on the abovementioned streaming platform and its control plane.

Figure 2. Zeppelin login page via web-based notebook environment.

As seen from Figure 2 above, after successful creation of the Zeppelin cluster, users can log in with their generated credentials delivered to them via the integrated instant messenger, and start using the notebook environment.

Figure 3. Zeppelin programme flow in the notebook environment.

Figure 3 above explains the Zeppelin notebook programme flow as follows:

  • The users enter their queries into the notebook session and run querying statements interactively with the established web-based notebook session.
  • The queries are passed to the Flink interpreter within the cluster to generate the Flink job as a Jar file, to be then submitted to a Flink session cluster.
  • When the Flink session cluster job manager receives the job, it would spin up the corresponding Flink task managers (workers) to run the application and retrieve the results.
  • The query results would then be piped back to the notebook session, to be displayed back to the user on the notebook session.

Data query and visualisation

Figure 4. Example of simple select query of data on Kafka.
Note: All variable names, schema, values, and other details used in this article are only created for use as examples.

Flink has a planned roadmap to create a unified streaming language for both stream processing and data analytics. In line with the roadmap, we have based our Zeppelin solution on supporting Structured Query Language (“SQL”) as the query language of choice as seen in Figure 4 above. Data End Users can now write queries in SQL, which is a language that they are comfortable with, and perform adequate data exploration.

As discussed in this section, data exploration on streaming data at the Kafka stage by adopting the right tool enables Data End Users to seamlessly have visibility to quickly understand the current schema of a Kafka topic (explained more in the next section. This kind of data exploration also enables Data End Users to understand the type of data the Kafka topic represents, such as the ability to determine if a country code data field is in alpha-2 or alpha-3 format while the data is still part of streaming data. This might seem inconsequential and immediately identifiable even in Offline Data, but by enabling data exploration at an earlier stage in the data journey for Online Data, Data End Users have the opportunity to react much more quickly. For example, a change of expected country code format from the data producer would usually lead to errors in the downstream joins or other stream processing pipelines due to incompatible parsing or filtering of the modified country codes. Instead of waiting for the data to be ingested to Offline Data, users can investigate the issue with Online Data retrieved from Kafka.

Figure 5. Simple visualisation of queried data on Zeppelin’s notebook environment.
Note: All variable names, schema, values, and other details used in this article are only created for use as examples.

Besides query features, Zeppelin notebook provides simple visualisation and analytics of the off-the-shelf data as presented above in Figure 5. Furthermore, users are now able to perform interactive ad hoc queries on Online Data. These queries will eventually become much more advanced and/or effective SQL queries to be deployed as a streaming pipeline later on in the data journey. This reduces the inertia in setting up a separate development environment or learning other programming languages like Java or Scala during the development of streaming pipelines. With Zeppelin’s notebook environment, our Data End Users are more empowered to quickly derive value from Online Data.

Need for a more dynamic table schema derivation process

For the Data End Users performing data exploration on Online Data, we see a need for these users to derive the Data Definition Language (“DDL”) associated with a Kafka stream at an earlier stage of the data journey. Within Grab, even though Kafka streams are transmitted in Protobuf format and are thus structured, both the schema and the corresponding DDL changes are added over time as new fields. Typically, the data producer (service owners) and the data engineers responsible for the data ingestion pipeline coordinate to perform such updates. Since the Data End Users are not involved in such schema update processes nor do they directly interact with the data producers, many of them find the discovery of changes in the current Kafka stream schema an issue. Granted that this is an issue our metadata platform is actively solving using Datahub, we hope to also solve the challenge by being able to derive the DDL more dynamically within the tooling, for data exploration on Online Data to reduce friction.

Figure 6. Common functions to derive DDL of a Kafka Stream in SQL.
Note: All variable names, schema, values, and other details used in this article are only created for use as examples.

As seen from Figure 6 above, we have an integrated tooling for Data End Users to derive the DDL associated with a Kafka stream using SQL language. A Kafka stream in Grab’s context is a logical concept describing a Kafka topic, associating it with its metadata like Kafka bootstrap servers and associated Java class created by Protoc. This tool maps the Protobuf schema definition of a Kafka stream to a DDL, allowing it to be expressed and used in SQL language. This reduces the manual effort involved in creating these table definitions from scratch based on the associated Protobuf schema. Users can now derive the DDL associated with a Kafka stream more easily.

Mitigating risks arising from data exploration on Online Data – data access authorisation/audit

While we rethink stream processing and are open to options that enable data exploration on Online Data as mentioned above, we realised that new security requirements related to data access authorisation and maintaining proper audit trail have emerged. Even with Personally Identifiable Information (PII) obfuscation enforcement by our streaming pipeline, it means we need to implement stricter guardrails in place along with audit trails to ensure users only have access to what they are allowed to, and this access can be removed in a break-glass scenario. If you are interested, you may check out our previous blog post titled PII masking for privacy-grade machine learning for more details about how we enforce PII masking on machine learning data streaming pipelines.

To enable data access authorisation, we utilised Strimzi, the operator of running Kafka on Kubernetes. We integrated Strimzi’s Open Policy Agent (OPA) with Kafka to define policies that authorise specific read-only user access to specific Kafka Topics. The identification of users is done via mutualTLS (mTLS) connection with our Kafka clusters, where their user details are part of the SSL certificate details used for authentication.

With these tools in place, each user’s request to explore Online Data would be properly logged, and each data access can be controlled by an OPA policy managed by a central team.

If you are interested, you may check out our previous post Zero trust with Kafka where we discussed our efforts to continue strengthening the security of our data-streaming platform.

Impact

With the proliferation of our data-streaming platform, we expect to see improvements in the way our data becomes gradually democratised. We have already been receiving use cases from the Data End Users who are interested in validating a chain of events on Online Data, i.e. retrieving information of all events associated with a particular booking, which is not currently something that can be done easily.

More importantly, the tools in place for data exploration on Online Data form the foundation required for us to embark on our next step of the stream processing journey. This foundation makes the development and validation of the stream processing logic much quicker. This occurs when ad hoc queries in a notebook environment are possible, removing the need for local developer environment setups and the need to go through the whole pipeline deployment process for eventual validation of the developed logic. We believe that this would prove to reduce our lead time in creating stream processing pipelines significantly.

What’s next?

Our next step is to rethink further how our stream processing pipelines are defined and start to provision SQL as the unified streaming language of our pipelines. This helps facilitate better discussion between upstream data producers, data engineers, and Data End Users, since SQL is the common language among these stakeholders.

We will also explore handling schema discovery in a more controlled manner by utilising a Hive catalogue to store our Kafka table definitions. This removes the need for users to retrieve and run the table DDL statement for every session, making the data exploration experience even more seamless.

References

[1] Apache Zeppelin | Web-based notebook that enables data-driven, interactive data analytics and collaborative documents with SQL, Scala, Python, R and more.

[2] An elegant platform | Grab engineering blog.

[3] Apache Flink | Roadmap on Unified SQL Platform.

[4] ISO | ISO 3166 Country Codes.

[5] Protobuf (Protocol Buffers)| Language-neutral, platform-neutral extensible mechanisms for serializing structured data.

[6] Datahub | Extensible metadata platform that enables data discovery, data observability and federated governance to help tame the complexity of your data ecosystem.

[7] Protoc | Protocol buffer compiler installation.

[8] PII masking for privacy-grade machine learning | Grab engineering blog.

[9] Zero trust with Kafka | Grab engineering blog.

[10] Open Policy Agent (OPA) | Policy-based control for cloud native environments.

[11] Strimzi | Using Open Policy Agent with Strimzi and Apache Kafka.

[12] Confluent Documentation | Configure mTLS authentication and RBAC for kafka brokers.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Kafka on Kubernetes: Reloaded for fault tolerance

Post Syndicated from Grab Tech original https://engineering.grab.com/kafka-on-kubernetes

Introduction

Coban – Grab’s real-time data streaming platform – has been operating Kafka on Kubernetes with Strimzi in
production for about two years. In a previous article (Zero trust with Kafka), we explained how we leveraged Strimzi to enhance the security of our data streaming offering.

In this article, we are going to describe how we improved the fault tolerance of our initial design, to the point where we no longer need to intervene if a Kafka broker is unexpectedly terminated.

Problem statement

We operate Kafka in the AWS Cloud. For the Kafka on Kubernetes design described in this article, we rely on Amazon Elastic Kubernetes Service (EKS), the managed Kubernetes offering by AWS, with the worker nodes deployed as self-managed nodes on Amazon Elastic Compute Cloud (EC2).

To make our operations easier and limit the blast radius of any incidents, we deploy exactly one Kafka cluster for each EKS cluster. We also give a full worker node to each Kafka broker. In terms of storage, we initially relied on EC2 instances with non-volatile memory express (NVMe) instance store volumes for
maximal I/O performance. Also, each Kafka cluster is accessible beyond its own Virtual Private Cloud (VPC) via a VPC Endpoint Service.

Fig. 1 Initial design of a 3-node Kafka cluster running on Kubernetes.

Fig. 1 shows a logical view of our initial design of a 3-node Kafka on Kubernetes cluster, as typically run by Coban. The Zookeeper and Cruise-Control components are not shown for clarity.

There are four Kubernetes services (1): one for the initial connection – referred to as “bootstrap” – that redirects incoming traffic to any Kafka pods, plus one for each Kafka pod, for the clients to target each Kafka broker individually (a requirement to produce or consume from/to a partition that resides on any particular Kafka broker). Four different listeners on the Network Load Balancer (NLB) listening on four different TCP ports, enable the Kafka clients to target either the bootstrap
service or any particular Kafka broker they need to reach. This is very similar to what we previously described in Exposing a Kafka Cluster via a VPC Endpoint Service.

Each worker node hosts a single Kafka pod (2). The NVMe instance store volume is used to create a Kubernetes Persistent Volume (PV), attached to a pod via a Kubernetes Persistent Volume Claim (PVC).

Lastly, the worker nodes belong to Auto-Scaling Groups (ASG) (3), one by Availability Zone (AZ). Strimzi adds in node affinity to make sure that the brokers are evenly distributed across AZs. In this initial design, ASGs are not for auto-scaling though, because we want to keep the size of the cluster under control. We only use ASGs – with a fixed size – to facilitate manual scaling operation and to automatically replace the terminated worker nodes.

With this initial design, let us see what happens in case of such a worker node termination.

Fig. 2 Representation of a worker node termination. Node C is terminated and replaced by node D. However the Kafka broker 3 pod is unable to restart on node D.

Fig. 2 shows the worker node C being terminated along with its NVMe instance store volume C, and replaced (by the ASG) by a new worker node D and its new, empty NVMe instance store volume D. On start-up, the worker node D automatically joins the Kubernetes cluster. The Kafka broker 3 pod that was running on the faulty worker node C is scheduled to restart on the new worker node D.

Although the NVMe instance store volume C is terminated along with the worker node C, there is no data loss because all of our Kafka topics are configured with a minimum of three replicas. The data is poised to be copied over from the surviving Kafka brokers 1 and 2 back to Kafka broker 3, as soon as Kafka broker 3 is effectively restarted on the worker node D.

However, there are three fundamental issues with this initial design:

  1. The Kafka clients that were in the middle of producing or consuming to/from the partition leaders of Kafka broker 3 are suddenly facing connection errors, because the broker was not gracefully demoted beforehand.
  2. The target groups of the NLB for both the bootstrap connection and Kafka broker 3 still point to the worker node C. Therefore, the network communication from the NLB to Kafka broker 3 is broken. A manual reconfiguration of the target groups is required.
  3. The PVC associating the Kafka broker 3 pod with its instance store PV is unable to automatically switch to the new NVMe instance store volume of the worker node D. Indeed, static provisioning is an intrinsic characteristic of Kubernetes local volumes. The PVC is still in Bound state, so Kubernetes does not take any action. However, the actual storage beneath the PV does not exist anymore. Without any storage, the Kafka broker 3 pod is unable to start.

At this stage, the Kafka cluster is running in a degraded state with only two out of three brokers, until a Coban engineer intervenes to reconfigure the target groups of the NLB and delete the zombie PVC (this, in turn, triggers its re-creation by Strimzi, this time using the new instance store PV).

In the next section, we will see how we have managed to address the three issues mentioned above to make this design fault-tolerant.

Solution

Graceful Kafka shutdown

To minimise the disruption for the Kafka clients, we leveraged the AWS Node Termination Handler (NTH). This component provided by AWS for Kubernetes environments is able to cordon and drain a worker node that is going to be terminated. This draining, in turn, triggers a graceful shutdown of the Kafka
process by sending a polite SIGTERM signal to all pods running on the worker node that is being drained (instead of the brutal SIGKILL of a normal termination).

The termination events of interest that are captured by the NTH are:

  • Scale-in operations by an ASG.
  • Manual termination of an instance.
  • AWS maintenance events, typically EC2 instances scheduled for upcoming retirement.

This suffices for most of the disruptions our clusters can face in normal times and our common maintenance operations, such as terminating a worker node to refresh it. Only sudden hardware failures (AWS issue events) would fall through the cracks and still trigger errors on the Kafka client side.

The NTH comes in two modes: Instance Metadata Service (IMDS) and Queue Processor. We chose to go with the latter as it is able to capture a broader range of events, widening the fault tolerance capability.

Scale-in operations by an ASG

Fig. 3 Architecture of the NTH with the Queue Processor.

Fig. 3 shows the NTH with the Queue Processor in action, and how it reacts to a scale-in operation (typically triggered manually, during a maintenance operation):

  1. As soon as the scale-in operation is triggered, an Auto Scaling lifecycle hook is invoked to pause the termination of the instance.
  2. Simultaneously, an Auto Scaling lifecycle hook event is issued to an Amazon Simple Queue Service (SQS) queue. In Fig. 3, we have also materialised EC2 events (e.g. manual termination of an instance, AWS maintenance events, etc.) that transit via Amazon EventBridge to eventually end up in the same SQS queue. We will discuss EC2 events in the next two sections.
  3. The NTH, a pod running in the Kubernetes cluster itself, constantly polls that SQS queue.
  4. When a scale-in event pertaining to a worker node of the Kubernetes cluster is read from the SQS queue, the NTH sends to the Kubernetes API the instruction to cordon and drain the impacted worker node.
  5. On draining, Kubernetes sends a SIGTERM signal to the Kafka pod residing on the worker node.
  6. Upon receiving the SIGTERM signal, the Kafka pod gracefully migrates the leadership of its leader partitions to other brokers of the cluster before shutting down, in a transparent manner for the clients. This behaviour is ensured by the controlled.shutdown.enable parameter of Kafka, which is enabled by default.
  7. Once the impacted worker node has been drained, the NTH eventually resumes the termination of the instance.

Strimzi also comes with a terminationGracePeriodSeconds parameter, which we have set to 180 seconds to give the Kafka pods enough time to migrate all of their partition leaders gracefully on termination. We have verified that this is enough to migrate all partition leaders on our Kafka clusters (about 60 seconds for 600 partition leaders).

Manual termination of an instance

The Auto Scaling lifecycle hook that pauses the termination of an instance (Fig. 3, step 1) as well as the corresponding resuming by the NTH (Fig. 3, step 7) are invoked only for ASG scaling events.

In case of a manual termination of an EC2 instance, the termination is captured as an EC2 event that also reaches the NTH. Upon receiving that event, the NTH cordons and drains the impacted worker node. However, the instance is immediately terminated, most likely before the leadership of all of its Kafka partition leaders has had the time to get migrated to other brokers.

To work around this and let a manual termination of an EC2 instance also benefit from the ASG lifecycle hook, the instance must be terminated using the terminate-instance-in-auto-scaling-group AWS CLI command.

AWS maintenance events

For AWS maintenance events such as instances scheduled for upcoming retirement, the NTH acts immediately when the event is first received (typically adequately in advance). It cordons and drains the soon-to-be-retired worker node, which in turn triggers the SIGTERM signal and the graceful termination of Kafka as described above. At this stage, the impacted instance is not terminated, so the Kafka partition leaders have plenty of time to complete their migration to other brokers.

However, the evicted Kafka pod has nowhere to go. There is a need for spinning up a new worker node for it to be able to eventually restart somewhere.

To make this happen seamlessly, we doubled the maximum size of each of our ASGs and installed the Kubernetes Cluster Autoscaler. With that, when such a maintenance event is received:

  • The worker node scheduled for retirement is cordoned and drained by the NTH. The state of the impacted Kafka pod becomes Pending.
  • The Kubernetes Cluster Autoscaler comes into play and triggers the corresponding ASG to spin up a new EC2 instance that joins the Kubernetes cluster as a new worker node.
  • The impacted Kafka pod restarts on the new worker node.
  • The Kubernetes Cluster Autoscaler detects that the previous worker node is now under-utilised and terminates it.

In this scenario, the impacted Kafka pod only remains in Pending state for about four minutes in total.

In case of multiple simultaneous AWS maintenance events, the Kubernetes scheduler would honour our PodDisruptionBudget and not evict more than one Kafka pod at a time.

Dynamic NLB configuration

To automatically map the NLB’s target groups with a newly spun up EC2 instance, we leveraged the AWS Load Balancer Controller (LBC).

Let us see how it works.

Fig. 4 Architecture of the LBC managing the NLB’s target groups via TargetGroupBinding custom resources.

Fig. 4 shows how the LBC automates the reconfiguration of the NLB’s target groups:

  1. It first retrieves the desired state described in Kubernetes custom resources (CR) of type TargetGroupBinding. There is one such resource per target group to maintain. Each TargetGroupBinding CR associates its respective target group with a Kubernetes service.
  2. The LBC then watches over the changes of the Kubernetes services that are referenced in the TargetGroupBinding CRs’ definition, specifically the private IP addresses exposed by their respective Endpoints resources.
  3. When a change is detected, it dynamically updates the corresponding NLB’s target groups with those IP addresses as well as the TCP port of the target containers (containerPort).

This automated design sets up the NLB’s target groups with IP addresses (targetType: ip) instead of EC2 instance IDs (targetType: instance). Although the LBC can handle both target types, the IP address approach is actually more straightforward in our case, since each pod has a routable private IP address in the AWS subnet, thanks to the AWS Container Networking Interface (CNI) plug-in.

This dynamic NLB configuration design comes with a challenge. Whenever we need to update the Strimzi CR, the rollout of the change to each Kafka pod in a rolling update fashion is happening too fast for the NLB. This is because the NLB inherently takes some time to mark each target as healthy before enabling it. The Kafka brokers that have just been rolled out start advertising their broker-specific endpoints to the Kafka clients via the bootstrap service, but those
endpoints are actually not immediately available because the NLB is still checking their health. To mitigate this, we have reduced the HealthCheckIntervalSeconds and HealthyThresholdCount parameters of each target group to their minimum values of 5 and 2 respectively. This reduces the maximum delay for the NLB to detect that a target has become healthy to 10 seconds. In addition, we have configured the LBC with a Pod Readiness Gate. This feature makes the Strimzi rolling deployment wait for the health check of the NLB to pass, before marking the current pod as Ready and proceeding with the next pod.

Fig. 5 Steps for a Strimzi rolling deployment with a Pod Readiness Gate. Only one Kafka broker and one NLB listener and target group are shown for simplicity.

Fig. 5 shows how the Pod Readiness Gate works during a Strimzi rolling deployment:

  1. The old Kafka pod is terminated.
  2. The new Kafka pod starts up and joins the Kafka cluster. Its individual endpoint for direct access via the NLB is immediately advertised by the Kafka cluster. However, at this stage, it is not reachable, as the target group of the NLB still points to the IP address of the old Kafka pod.
  3. The LBC updates the target group of the NLB with the IP address of the new Kafka pod, but the NLB health check has not yet passed, so the traffic is not forwarded to the new Kafka pod just yet.
  4. The LBC then waits for the NLB health check to pass, which takes 10 seconds. Once the NLB health check has passed, the NLB resumes forwarding the traffic to the Kafka pod.
  5. Finally, the LBC updates the pod readiness gate of the new Kafka pod. This informs Strimzi that it can proceed with the next pod of the rolling deployment.

Data persistence with EBS

To address the challenge of the residual PV and PVC of the old worker node preventing Kubernetes from mounting the local storage of the new worker node after a node rotation, we adopted Elastic Block Store (EBS) volumes instead of NVMe instance store volumes. Contrary to the latter, EBS volumes can conveniently be attached and detached. The trade-off is that their performance is significantly lower.

However, relying on EBS comes with additional benefits:

  • The cost per GB is lower, compared to NVMe instance store volumes.
  • Using EBS decouples the size of an instance in terms of CPU and memory from its storage capacity, leading to further cost savings by independently right-sizing the instance type and its storage. Such a separation of concerns also opens the door to new use cases requiring disproportionate amounts of storage.
  • After a worker node rotation, the time needed for the new node to get back in sync is faster, as it only needs to catch up the data that was produced during the downtime. This leads to shorter maintenance operations and higher iteration speed. Incidentally, the associated inter-AZ traffic cost is also lower, since there is less data to transfer among brokers during this time.
  • Increasing the storage capacity is an online operation.
  • Data backup is supported by taking snapshots of EBS volumes.

We have verified with our historical monitoring data that the performance of EBS General Purpose 3 (gp3) volumes is significantly above our maximum historical values for both throughput and I/O per second (IOPS), and we have successfully benchmarked a test EBS-based Kafka cluster. We have also set up new monitors to be alerted in case we need to
provision either additional throughput or IOPS, beyond the baseline of EBS gp3 volumes.

With that, we updated our instance types from storage optimised instances to either general purpose or memory optimised instances. We added the Amazon EBS Container Storage Interface (CSI) driver to the Kubernetes cluster and created a new Kubernetes storage class to let the cluster dynamically provision EBS gp3 volumes.

We configured Strimzi to use that storage class to create any new PVCs. This makes Strimzi able to automatically create the EBS volumes it needs, typically when the cluster is first set up, but also to attach/detach the volumes to/from the EC2 instances whenever a Kafka pod is relocated to a different worker node.

Note that the EBS volumes are not part of any ASG Launch Template, nor do they scale automatically with the ASGs.

Fig. 6 Steps for the Strimzi Operator to create an EBS volume and attach it to a new Kafka pod.

Fig. 6 illustrates how this works when Strimzi sets up a new Kafka broker, for example the first broker of the cluster in the initial setup:

  1. The Strimzi Cluster Operator first creates a new PVC, specifying a volume size and EBS gp3 as its storage class. The storage class is configured with the EBS CSI Driver as the volume provisioner, so that volumes are dynamically provisioned [1]. However, because it is also set up with volumeBindingMode: WaitForFirstConsumer, the volume is not yet provisioned until a pod actually claims the PVC.
  2. The Strimzi Cluster Operator then creates the Kafka pod, with a reference to the newly created PVC. The pod is scheduled to start, which in turn claims the PVC.
  3. This triggers the EBS CSI Controller. As the volume provisioner, it dynamically creates a new EBS volume in the AWS VPC, in the AZ of the worker node where the pod has been scheduled to start.
  4. It then attaches the newly created EBS volume to the corresponding EC2 instance.
  5. After that, it creates a Kubernetes PV with nodeAffinity and claimRef specifications, making sure that the PV is reserved for the Kafka broker 1 pod.
  6. Lastly, it updates the PVC with the reference of the newly created PV. The PVC is now in Bound state and the Kafka pod can start.

One important point to take note of is that EBS volumes can only be attached to EC2 instances residing in their own AZ. Therefore, when rotating a worker node, the EBS volume can only be re-attached to the new instance if both old and new instances reside in the same AZ. A simple way to guarantee this is to set up one ASG per AZ, instead of a single ASG spanning across 3 AZs.

Also, when such a rotation occurs, the new broker only needs to synchronise the recent data produced during the brief downtime, which is typically an order of magnitude faster than replicating the entire volume (depending on the overall retention period of the hosted Kafka topics).

Table 1 Comparison of the resynchronization of the Kafka data after a broker rotation between the initial design and the new design with EBS volumes.
Initial design (NVMe instance store volumes) New design (EBS volumes)
Data to synchronise All of the data Recent data produced during the brief downtime
Function of (primarily) Retention period Downtime
Typical duration Hours Minutes

Outcome

With all that, let us revisit the initial scenario, where a malfunctioning worker node is being replaced by a fresh new node.

Fig. 7 Representation of a worker node termination after implementing the solution. Node C is terminated and replaced by node D. This time, the Kafka broker 3 pod is able to start and serve traffic.

Fig. 7 shows the worker node C being terminated and replaced (by the ASG) by a new worker node D, similar to what we have described in the initial problem statement. The worker node D automatically joins the Kubernetes cluster on start-up.

However, this time, a seamless failover takes place:

  1. The Kafka clients that were in the middle of producing or consuming to/from the partition leaders of Kafka broker 3 are gracefully redirected to Kafka brokers 1 and 2, where Kafka has migrated the leadership of its leader partitions.
  2. The target groups of the NLB for both the bootstrap connection and Kafka broker 3 are automatically updated by the LBC. The connectivity between the NLB and Kafka broker 3 is immediately restored.
  3. Triggered by the creation of the Kafka broker 3 pod, the Amazon EBS CSI driver running on the worker node D re-attaches the EBS volume 3 that was previously attached to the worker node C, to the worker node D instead. This enables Kubernetes to automatically re-bind the corresponding PV and PVC to Kafka broker 3 pod. With its storage dependency resolved, Kafka broker 3 is able to start successfully and re-join the Kafka cluster. From there, it only needs to catch up with the new data that was produced
    during its short downtime, by replicating it from Kafka brokers 1 and 2.

With this fault-tolerant design, when an EC2 instance is being retired by AWS, no particular action is required from our end.

Similarly, our EKS version upgrades, as well as any operations that require rotating all worker nodes of the cluster in general, are:

  • Simpler and less error-prone: We only need to rotate each instance in sequence, with no need for manually reconfiguring the target groups of the NLB and deleting the zombie PVCs anymore.
  • Faster: The time between each instance rotation is limited to the short amount of time it takes for the restarted Kafka broker to catch up with the new data.
  • More cost-efficient: There is less data to transfer across AZs (which is charged by AWS).

It is worth noting that we have chosen to omit Zookeeper and Cruise Control in this article, for the sake of clarity and simplicity. In reality, all pods in the Kubernetes cluster – including Zookeeper and Cruise Control – now benefit from the same graceful stop, triggered by the AWS termination events and the NTH. Similarly, the EBS CSI driver improves the fault tolerance of any pods that use EBS volumes for persistent storage, which includes the Zookeeper pods.

Challenges faced

One challenge that we are facing with this design lies in the EBS volumes’ management.

On the one hand, the size of EBS volumes cannot be increased consecutively before the end of a cooldown period (minimum of 6 hours and can exceed 24 hours in some cases [2]). Therefore, when we need to urgently extend some EBS volumes because the size of a Kafka topic is suddenly growing, we need to be relatively generous when sizing the new required capacity and add a comfortable security margin, to make sure that we are not running out of storage in the short run.

On the other hand, shrinking a Kubernetes PV is not a supported operation. This can affect the cost efficiency of our design if we overprovision the storage capacity by too much, or in case the workload of a particular cluster organically diminishes.

One way to mitigate this challenge is to tactically scale the cluster horizontally (ie. adding new brokers) when there is a need for more storage and the existing EBS volumes are stuck in a cooldown period, or when the new storage need is only temporary.

What’s next?

In the future, we can improve the NTH’s capability by utilising webhooks. Upon receiving events from SQS, the NTH can also forward the events to the specified webhook URLs.

This can potentially benefit us in a few ways, e.g.:

  • Proactively spinning up a new instance without waiting for the old one to be terminated, whenever a termination event is received. This would shorten the rotation time even further.
  • Sending Slack notifications to Coban engineers to keep them informed of any actions taken by the NTH.

We would need to develop and maintain an application that receives webhook events from the NTH and performs the necessary actions.

In addition, we are also rolling out Karpenter to replace the Kubernetes Cluster Autoscaler, as it is able to spin up new instances slightly faster, helping reduce the four minutes delay a Kafka pod remains in Pending state during a node rotation. Incidentally, Karpenter also removes the need for setting up one ASG by AZ, as it is able to deterministically provision instances in a specific AZ, for example where a particular EBS volume resides.

Lastly, to ensure that the performance of our EBS gp3 volumes is both sufficient and cost-efficient, we want to explore autoscaling their throughput and IOPS beyond the baseline, based on the usage metrics collected by our monitoring stack.

References

[1] Dynamic Volume Provisioning | Kubernetes

[2] Troubleshoot EBS volume stuck in Optimizing state during modification | AWS re:Post

We would like to thank our team members and Grab Kubernetes gurus that helped review and improve this blog before publication: Will Ho, Gable Heng, Dewin Goh, Vinnson Lee, Siddharth Pandey, Shi Kai Ng, Quang Minh Tran, Yong Liang Oh, Leon Tay, Tuan Anh Vu.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Championing CyberSecurity: Grab’s bug bounty programme in 2023

Post Syndicated from Grab Tech original https://engineering.grab.com/cybersec-bug

Launched in 2015, Grab’s Security bug bounty programme has achieved remarkable success and forged strong partnerships within a thriving bounty community. By holding quarterly campaigns with HackerOne, Grab has been dedicated to security and giving back to the global security community to research further. Over the years, Grab has paid over $700,000 in cumulative payments to committed security researchers, aiding their research.

Our journey doesn’t stop there – we’ve also expanded our internal bug bounty team, ensuring that we have the necessary resources to stay at the forefront of security challenges. As we continue to innovate and evolve, it’s critical that our team remains at the cutting edge of security developments.

Marking its eighth year in 2023, this initiative has achieved new milestones and continues to set the stage for an even more successful ninth year. In 2023, this included a special campaign in Threatcon Nepal, aimed at increasing our bounty engagements. A key development was the enrichment of monetary incentives to honour our hacker community’s remarkable contributions to our programme’s success.

Let’s look at the key takeaways we gained from the bug bounty programme in 2023.

Highlights from 2023

This year, we had some of the highest participation and engagement rates we’ve seen since the programme launched.

  • We’ve processed ~1000 submissions through our HackerOne bug bounty programme.
  • Impressive record of 400 submissions in the Q1 2023 campaign.
  • We’ve maintained a consistent schedule of campaigns and innovative efforts to enhance hacker engagement.
  • Released a comprehensive report of our seven-year bug bounty journey – check out some key highlights in the image below.

What’s next?

As Grab expands and transforms its product and service portfolio, we are dedicated to ensuring that our bug bounty programme reflects this growth. In our rigorous pursuit of boosting security, we regularly introduce new areas of focus to our scope. In 2024, expect the inclusion of new scopes, enhanced response times, heightened engagement from the hacker community, and more competitive rewards.

In the past year, we have incorporated Joint Ventures and Acquisitions into the scope of our bug bounty programme. By doing so, we proactively address emerging security challenges, while fortifying the safety and integrity of our expanding ecosystem. We remain fully dedicated to embracing change and growth as integral parts of our journey to provide a secure and seamless experience for our users.

On top of that, we continue to improve our methods of motivating researchers through the bug bounty programme. One recent change is to diversify our reward methods by incorporating both financial rewards and recognition. This allows us to cater to different researcher motivations, cultivate stronger relationships, and acknowledge researchers’ contributions.

That said, we recognise that there’s always room for improvement and the bug bounty programme is uniquely poised for substantial expansion. In the near future, we will be:

  • Introducing more elements to the scope of our bug bounty programme
  • Enhancing feedback loops on the HackerOne platform

With these improvements, we can drive continuous improvement efforts to provide a secure experience for our users while strengthening our connection with the security research community.

A word of thanks

2023 has been an exhilarating year for our team. We’re grateful for the continued support from all the security researchers who’ve actively participated in our programme.

Here are the top three researchers in 2023:

  1. Damian89 
  2. Happy_csr 
  3. mclaren650sspider 

As we head into our ninth year, we know there are new opportunities and challenges that await us. We strive to remain dedicated to the values of collaboration and continuous improvement, working hand in hand with the security community to enhance our superapp’s security and deliver an even safer experience for our users.

We’re gearing up for another exciting year ahead in our programme, and looking forward to interesting submissions from our participants. We extend an open invitation to all researchers to submit reports to our bug bounty programme. Your contributions hold immense value and have a significant impact on the safety and security of our products, our users, and the broader security community. For comprehensive information about the programme scope, rules, and rewards, visit our website.

Until next year, keep up the great work, and happy hacking!

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Sliding window rate limits in distributed systems

Post Syndicated from Grab Tech original https://engineering.grab.com/frequency-capping

Like many other companies, Grab uses marketing communications to notify users of promotions or other news. If a user receives these notifications from multiple companies, it would be a form of information overload and they might even start considering these communications as spam. Over time, this could lead to some users revoking their consent to receive marketing communications altogether. Hence, it is important to find a rate-limited solution that sends the right amount of communications to our users.

Background

In Grab, marketing emails and push notifications are part of carefully designed campaigns to ensure that users get the right notifications (i.e. based on past orders or usage patterns). Trident is Grab’s in-house tool to compose these campaigns so that they run efficiently at scale. An example of a campaign is scheduling a marketing email blast to 10 million users at 4 pm. Read more about Trident’s architecture here.

Trident relies on Hedwig, another in-house service, to deliver the messages to users. Hedwig does the heavy lifting of delivering large amounts of emails and push notifications to users while maintaining a high query per second (QPS) rate and minimal delay. The following high-level architectural illustration demonstrates the interaction between Trident and Hedwig.

Diagram of data interaction between Trident and Hedwig

The aim is to regulate the number of marketing comms sent to users daily and weekly, tailored based on their interaction patterns with the Grab superapp.

Solution

Based on their interaction patterns with our superapp, we have clustered users into a few segments.

For example:

New: Users recently signed up to the Grab app but haven’t taken any rides yet.
Active: Users who took rides in the past month.

With these metrics, we came up with optimal daily and weekly frequency limit values for each clustered user segment. The solution discussed in this article ensures that the comms sent to a user do not exceed the daily and weekly thresholds for the segment. This is also called frequency capping.

However, frequency capping can be split into two sub-problems:

Efficient storage of clustered user data

With a huge customer base of over 270 million users, storing the user segment membership information has to be cost-efficient and memory-sleek. Querying the segment to which a user belongs should also have minimal latency.

Persistent tracking of comms sent per user

To stay within the daily and weekly thresholds, we need to actively track the number of comms sent to each user, which can be referred to make rate limiting decisions. The rate limiting logic should also have minimal latency, be cost efficient, and not take up too much memory storage.

Optimising storage of user segment data

The problem here is figuring out which segment a particular user belongs to and ensuring that the user doesn’t appear in more than one segment. There are two options that suit our needs and we’ll explain more about each option, as well as what was the best option for us.

Bloom filter 

A Bloom filter is a space-efficient probabilistic data structure that addresses this problem well. Simply put, Bloom filters internally use arrays to track memberships of the elements.

For our scenario, each user segment would need its own bloom filter. We used this bloom filter calculator to estimate the memory required for each bloom filter. We found that we needed approximately 1 GB of memory and 23 hash functions to accurately represent the membership information of 270 million users in an array. Additionally, this method guarantees a false positive rate of  1.0E-7, which means 1 in 1 million elements may get wrong membership results because of hash collision.

With Grab’s existing segments, this approach needs 4GB of memory, which may increase as we increase the number of segments in the future. Moreover, the potential hash collision needs to be handled by increasing the memory size with even more hash functions. Another thing to note is that Bloom filters do not support deletion so every time a change needs to be done, you need to create a new version of the Bloom filter. Although Bloom filters have many advantages, these shortcomings led us to explore another approach.

Roaring bitmaps Roaring bitmaps are sets of unsigned integers consisting of containers of disjoint subsets, which can store large amounts of data in a compressed form. Essentially, roaring bitmaps could reduce memory storage significantly and overcome the hash collision problem. To understand the intuition behind this, first, we need to know how bitmaps work and the possible drawbacks behind it.

To represent a list of numbers as a bitmap, we first need to create an array with a size equivalent to the largest element in the list. For every element in the list, we then mark the bit value as 1 in the corresponding index in the array. While bitmaps work very well for storing integers in closer intervals, they occupy more space and become sparse when storing integer ranges with uneven distribution, as shown in the image below.

Diagram of bitmaps with uneven distribution

To reduce memory footprint and improve the performance of bitmaps, there are compression techniques such as Run-Length Encoding (RLE), and Word Aligned Hybrid (WAH). However, this would require additional effort to implement, whereas using roaring bitmaps would solve these issues.

Roaring bitmaps’ hybrid data storage approach offers the following advantages:

  • Faster set operations (union, intersection, differencing).
  • Better compression ratio when handling mixed datasets (both dense and sparse data distribution).
  • Ability to scale to large datasets without significant performance loss.

To summarise, roaring bitmaps can store positive integers from 0 to (2^32)-1. Each positive integer value is converted to a 32-bit binary, where the 16 Most Significant Bits (MSB) are used as the key and the remaining 16 Least Significant Bits (LSB) are represented as the value. The values are then stored in an array, a bitmap, or used to run containers with RLE encoding data structures.

If the number of integers mapped to the key is less than 4096, then all the integers are stored in an array in sorted order and converted into a bitmap container in the runtime as the size exceeds. Roaring bitmap analyses the distribution of set bits in the bitmap container i.e. if the continuous interval of set bits is more than a given threshold, the bitmap container can be more efficiently represented using the RLE container. Internally, the RLE container uses an array where the even indices store the beginning of the runs and the odd indices represent the length of the runs. This enables the roaring bitmap to dynamically switch between the containers to optimise storage and performance.

The following diagram shows how a set of elements with different distributions are stored in roaring bitmaps.

Diagram of how roaring bitmaps store elements with different distributions

In Grab, we developed a microservice that abstracts roaring bitmaps implementations and provides an API to check set membership and enumeration of elements in the sets. Check out this blog to learn more about it.

Distributed rate limiting

The second part of the problem involves rate limiting the number of communication messages sent to users on a daily or weekly basis and each segment has specific daily and weekly limits. By utilising roaring bitmaps, we can determine the segment to which a user belongs. After identifying the appropriate segment, we will apply the personalised limits to the user using a distributed rate limiter, which will be discussed in further detail in the following sections.

Choosing the right datastore

Based on our use case, Amazon ElasticCache for Redis and DynamoDB were two viable options for storing the sent communication messages count per user. However, we decided to choose Redis due to a number of factors:

  • Higher throughput at lower latency – Redis shards data across nodes in the cluster.
  • Cost-effective – Usage of Lua script reduces unnecessary data transfer overheads.
  • Better at handling spiky rate limiting workloads at scale.

Distributed rate limiter

To appropriately limit the comms our users receive, we needed a rate limiting algorithm, which could execute directly in the datastore cluster, then return the results in the application logic for further processing. The two rate limiting algorithms we considered were the sliding window rate limiter and sliding log rate limiter.

The sliding window rate limiter algorithm divides time into a fixed-size window (we defined this as 1 minute) and counts the number of requests within each window. On the other hand, the sliding log maintains a log of each request timestamp and counts the number of requests between two timestamp ranges, providing a more fine-grained method of rate limiting. Although sliding log consumes more memory to store the log of request timestamp, we opted for the sliding log approach as the accuracy of the rate limiting was more important than memory consumption.

The sliding log rate limiter utilises a Redis sorted set data structure to efficiently track and organise request logs. Each timestamp in milliseconds is stored as a unique member in the set. The score assigned to each member represents the corresponding timestamp, allowing for easy sorting in ascending order. This design choice optimises the speed of search operations when querying for the total request count within specific time ranges.

Sliding Log Rate limiter Algorithm:

Input:
  # user specific redis key where the request timestamp logs are stored as sorted set
  keys => user_redis_key

  # limit_value is the limit that needs to be applied for the user
  # start_time_in_millis is the starting point of the time window
  # end_time_in_millis is the ending point of the time window
  # current_time_in_millis is the current time the request is sent
  # eviction_time_in_millis, members in the set whose value is less than this will be evicted from the set

  args => limit_value, start_time_in_millis, end_time_in_millis, current_time_in_millis, eviction_time_in_millis

Output:
  # 0 means not_allowed and 1 means allowed
  response => 0 / 1

Logic:
  # zcount fetches the count of the request timestamp logs falling between the start and the end timestamp
  request_count = zcount user_redis_key start_time_in_millis end_time_in_millis

  response = 0
  # if the count of request logs is less than allowed limits then record the usage by adding current timestamp in sorted set

  if request_count < limit_value then
    zadd user_redis_key current_time_in_millis current_time_in_millis
    response = 1

  # zremrangebyscore removes the members in the sorted set whose score is less than eviction_time_in_millis

  zremrangebyscore user_redis_key -inf eviction_time_in_millis
  return response

This algorithm takes O(log n) time complexity, where n is the number of request logs stored in the sorted set. It is not possible to evict entries in the sorted set like how we have time-to-live (TTL) for Redis keys. To prevent the size of the sorted set from increasing over time, we have a fixed variable eviction_time_in_millis that is passed to the script. The zremrangebyscore command then deletes members from the sorted set whose score is less than eviction_time_in_millis in O(log n) time complexity.

Lua script optimisations

In Redis Cluster mode, all Redis keys accessed by a Lua script must be present on the same node, and they should be passed as part of the KEYS input array of the script. If the script attempts to access keys located on different nodes within the cluster, a CROSSSLOT error will be thrown. Redis keys, or userIDs, are distributed across multiple nodes in the cluster so it is not feasible to send a batch of userIDs within the same Lua script for rate limiting, as this might result in a CROSSSLOT error.

Invoking a separate Lua script call for each user is a possible approach, but it incurs a significant number of network calls, which can be optimised further with the following approach:

  1. Upload the Lua script into the Redis server during the server startup with the SCRIPT LOAD command and we get the SHA1 hash of the script if the upload is successful.
  2. The SHA1 hash can then be used to invoke the Lua script with the EVALSHA command passing the keys and arguments as script input.
  3. Redis pipelining takes in multiple EVALSHA commands that call the Lua script and each invocation corresponds to a userID for getting the rate limiting result.
  4. Redis pipelining groups the EVALSHA Redis commands with Redis keys located on the same nodes internally. It then sends the grouped commands in a single network call to the relevant nodes within the Redis cluster and provides the rate limiting outcome to the client.

Since Redis operates on a single thread, any long-running Lua script can cause other Redis commands to be blocked until the script completes execution. Thus, it’s optimal for the Lua script to execute in under 5 milliseconds. Additionally, the current time is passed as an argument to the script to account for potential variations in time when the script is executed on a node’s replica, which could be caused by clock drift.

By bringing together roaring bitmaps and the distributed rate limiter, this is what our final solution looks like:

Our final solution using roaring bitmaps and distributed rate limiter

The roaring bitmaps structure is serialised and stored in an AWS S3 bucket, which is then downloaded in the instance during server startup. After which, triggering a user segment membership check can simply be done with a local method call. The configuration service manages the mapping information between the segment and allowed rate limiting values.

Whenever a marketing message needs to be sent to a user, we first find the segment to which the user belongs, retrieve the defined rate limiting values from the configuration service, then execute the Lua script to get the rate limiting decision. If there is enough quota available for the user, we send the comms.

The architecture of the messaging service looks something like this:

Architecture of the messaging service

Impact

In addition to decreasing the unsubscription rate, there was a significant enhancement in the latency of sending communications. Eliminating redundant communications also alleviated the system load, resulting in a reduction of the delay between the scheduled time and the actual send time of comms.

Conclusion

Applying rate limiters to safeguard our services is not only a standard practice but also a necessary process. Many times, this can be achieved by configuring the rate limiters at the instance level. The need for rate limiters for business logic may not be as common, but when you need it, the solution must be lightning-fast, and capable of seamlessly operating within a distributed environment.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

An elegant platform

Post Syndicated from Grab Tech original https://engineering.grab.com/an-elegant-platform

Coban is Grab’s real-time data streaming platform team. As a platform team, we thrive on providing our internal users from all verticals with self-served data-streaming resources, such as Kafka topics, Flink and Change Data Capture (CDC) pipelines, various kinds of Kafka-Connect connectors, as well as Apache Zeppelin notebooks, so that they can effortlessly leverage real-time data to build intelligent applications and services.

In this article, we present our journey from pure Infrastructure-as-Code (IaC) towards a more sophisticated control plane that has revolutionised the way data streaming resources are self-served at Grab. This change also leads to improved scalability, stability, security, and user adoption of our data streaming platform.

Problem statement

In the early ages of public cloud, it was a common practice to create virtual resources by clicking through the web console of a cloud provider, which is sometimes referred to as ClickOps.

ClickOps has many downsides, such as:

  • Inability to review, track, and audit changes to the infrastructure.
  • Inability to massively scale the infrastructure operations.
  • Inconsistencies between environments, e.g. staging and production.
  • Inability to quickly recover from a disaster by re-creating the infrastructure at a different location.

That said, ClickOps has one tremendous advantage; it makes creating resources using a graphical User Interface (UI) fairly easy for anyone like Infrastructure Engineers, Software Engineers, Data Engineers etc. This also leads to a high iteration speed towards innovation in general.

IaC resolved many of the limitations of ClickOps, such as:

  • Changes are committed to a Version Control System (VCS) like Git: They can be reviewed by peers before being merged. The full history of all changes is available for investigating issues and for audit.
  • The infrastructure operations scale better: Code for similar pieces of infrastructure can be modularised. Changes can be rolled out automatically by Continuous Integration (CI) pipelines in the VCS system, when a change is merged to the main branch.
  • The same code can be used to deploy the staging and production environments consistently.
  • The infrastructure can be re-created anytime from its source code, in case of a disaster.

However, IaC unwittingly posed a new entry barrier too, requiring the learning of new tools like Ansible, Puppet, Chef, Terraform, etc.

Some organisations set up dedicated Site Reliability Engineer (SRE) teams to centrally manage, operate, and support those tools and the infrastructure as a whole, but that soon created the potential of new bottlenecks in the path to innovation.

On the other hand, others let engineering teams manage their own infrastructure, and Grab adopted that same approach. We use Terraform to manage infrastructure, and all teams are expected to have select engineers who have received Terraform training and have a clear understanding of it.

In this context, Coban’s platform initially started as a handful of Git repositories where users had to submit their Merge Requests (MR) of Terraform code to create their data streaming resources. Once reviewed by a Coban engineer, those Terraform changes would be applied by a CI pipeline running Atlantis.

While this was a meaningful first step towards self-service and platformisation of Coban’s offering within Grab, it had several significant downsides:

  • Stability: Due to the lack of control on the Terraform changes, the CI pipeline was prone to human errors and frequent failures. For example, users would initiate a new Terraform project by duplicating an existing one, but then would forget to change the location of the remote Terraform state, leading to the in-place replacement of an existing resource.
  • Scalability: The Coban team needed to review all MRs and provide ad hoc support whenever the pipeline failed.
  • Security: In the absence of Identity and Access Management (IAM), MRs could potentially contain changes pertaining to other teams’ resources, or even changes to Coban’s core infrastructure, with code review as the only guardrail.
  • Limited user growth: We could only acquire users who were well-versed in Terraform.

It soon became clear that we needed to build a layer of abstraction between our users and the Terraform code, to increase the level of control and lower the entry barrier to our platform, while still retaining all of the benefits of IaC under the hood.

Solution

We designed and built an in-house three-tier control plane made of:

  • Coban UI, a front-end web interface, providing our users with a seamless ClickOps experience.
  • Heimdall, the Go back-end of the web interface, transforming ClickOps into IaC.
  • Khone, the storage and provisioner layer, a Git repository storing Terraform code and metadata of all resources as well as the CI pipelines to plan and apply the changes.

In the next sections, we will deep dive in those three components.

Fig. 1 Simplified architecture of a request flowing from the user to the Coban infrastructure, via the three components of the control plane: the Coban UI, Heimdall, and Khone.

Although we designed the user journey to start from the Coban UI, our users can still opt to communicate with Heimdall and with Khone directly, e.g. for batch changes, or just because many engineers love Git and we want to encourage broad adoption. To make sure that data is eventually consistent across the three systems, we made Khone the only persistent storage layer. Heimdall regularly fetches data from Khone, caches it, and presents it to the Coban UI upon each query.

We also continued using Terraform for all resources, instead of mixing various declarative infrastructure approaches (e.g. Kubernetes Custom Resource Definition, Helm charts), for the sake of consistency of the logic in Khone’s CI pipelines.

Coban UI

The Coban UI is a React Single Page Application (React SPA) designed by our partner team Chroma, a dedicated team of front-end engineers who thrive on building legendary UIs and reusable components for platform teams at Grab.

It serves as a comprehensive self-service portal, enabling users to effortlessly create data streaming resources by filling out web forms with just a few clicks.

Fig. 2 Screen capture of a new Kafka topic creation in the Coban UI.

In addition to facilitating resource creation and configuration, the Coban UI is seamlessly integrated with multiple monitoring systems. This integration allows for real-time monitoring of critical metrics and health status for Coban infrastructure components, including Kafka clusters, Kafka topic bytes in/out rates, and more. Under the hood, all this information is exposed by Heimdall APIs.

Fig. 3 Screen capture of the metrics of a Kafka cluster in the Coban UI.

In terms of infrastructure, the Coban UI is hosted in AWS S3 website hosting. All dynamic content is generated by querying the APIs of the back-end: Heimdall.

Heimdall

Heimdall is the Go back-end of the Coban UI. It serves a collection of APIs for:

  • Managing the data streaming resources of the Coban platform with Create, Read, Update and Delete (CRUD) operations, treating the Coban UI as a first-class citizen.
  • Exposing the metadata of all Coban resources, so that they can be used by other platforms or searched in the Coban UI.

All operations are authenticated and authorised. Read more about Heimdall’s access control in Migrating from Role to Attribute-based Access Control.

In the next sections, we are going to dive deeper into these two features.

Managing the data streaming resources

First and foremost, Heimdall enables our users to self-manage their data streaming resources. It primarily relies on Khone as its storage and provisioner layer for actual resource management via Git CI pipelines. Therefore, we designed Heimdall’s resource management workflow to leverage the underlying Git flow.

Fig. 4 Diagram flow of a request in Heimdall.

Fig. 4 shows the diagram flow of a typical request in Heimdall to create, update, or delete a resource.

  1. An authenticated user initiates a request, either by navigating in the Coban UI or by calling the Heimdall API directly. At this stage, the request state is Initiated on Heimdall.
  2. Heimdall validates the request against multiple validation rules. For example, if an ongoing change request exists for the same resource, the request fails. If all tests succeed, the request state moves to Ongoing.
  3. Heimdall then creates an MR in Khone, which contains the Terraform files describing the desired state of the resource, as well as an in-house metadata file describing the key attributes of both resource and requester.
  4. After the MR has been created successfully, Heimdall notifies the requester via Slack and shares the MR URL.
  5. After that, Heimdall starts polling the status of the MR in a loop.
  6. For changes pertaining to production resources, an approver who is code owner in the repository of the resource has to approve the MR. Typically, the approver is an immediate teammate of the requester. Indeed, as a platform team, we empower our users to manage their own resources in a self-service fashion. Ultimately, the requester would merge the MR to trigger the CI pipeline applying the actual Terraform changes. Note that for staging resources, this entire step 6 is automatically performed by Heimdall.
  7. Depending on the MR status and the status of its CI pipeline in Khone, the final state of the request can be:
    • Failed if the CI pipeline has failed in Khone.
    • Completed if the CI pipeline has succeeded in Khone.
    • Cancelled if the MR was closed in Khone.

Heimdall exposes APIs to let users track the status of their requests. In the Coban UI, a page queries those APIs to elegantly display the requests.

Fig. 5 Screen capture of the Coban UI showing all requests.

Exposing the metadata

Apart from managing the data streaming resources, Heimdall also centralises and exposes the metadata pertaining to those resources so other Grab systems can fetch and use it. They can make various queries, for example, listing the producers and consumers of a given Kafka topic, or determining if a database (DB) is the data source for any CDC pipeline.

To make this happen, Heimdall not only retains the metadata of all of the resources that it creates, but also regularly ingests additional information from a variety of upstream systems and platforms, to enrich and make this metadata comprehensive.

Fig. 6 Diagram showing some of Heimdall’s upstreams (on the left) and downstreams (on the right) for metadata collection, enrichment, and serving. The arrows show the data flow. The network connection (client -> server) is actually the other way around.

On the left side of Fig. 6, we illustrate Heimdall’s ingestion mechanism with several examples (step 1):

  • The metadata of all Coban resources is ingested from Khone. This means the metadata of the resources that were created directly in Khone is also available in Heimdall.
  • The list of Kafka producers is retrieved from our monitoring platform, where most of them emit metrics.
  • The list of Kafka consumers is retrieved directly from the respective Kafka clusters, by listing the consumer groups and respective Client IDs of each partition.
  • The metadata of all DBs, that are used as a data source for CDC pipelines, is fetched from Grab’s internal DB management platform.
  • The Kafka stream schemas are retrieved from the Coban schema repository.
  • The Kafka stream configuration of each stream is retrieved from Grab Universal Configuration Management platform.

With all of this ingested data, Heimdall can provide comprehensive and accurate information about all data streaming resources to any other Grab platforms via a set of dedicated APIs.

The right side of Fig. 6 shows some examples (step 2) of Heimdall’s serving mechanism:

  • As a downstream of Heimdall, the Coban UI enables our direct users to conveniently browse their data streaming resources and access their attributes.
  • The entire resource inventory is ingested into the broader Grab inventory platform, based on backstage.io.
  • The Kafka streams are ingested into Grab’s internal data discovery platform, based on DataHub, where users can discover and trace the lineage of any piece of data.
  • The CDC connectors pertaining to DBs are ingested by Grab internal DB management platform, so that they are made visible in that platform when users are browsing their DBs.

Note that the downstream platforms that ingest data from Heimdall each expose a particular view of the Coban inventory that serves their purpose, but the Coban platform remains the only source of truth for any data streaming resource at Grab.

Lastly, Heimdall leverages an internal MySQL DB to support quick data query and exploration. The corresponding API is called by the Coban UI to let our users conveniently search globally among all resources’ attributes.

Fig. 7 Screen capture of the global search feature in the Coban UI.

Khone

Khone is the persistent storage layer of our platform, as well as the executor for actual resource creation, changes, and deletion. Under the hood, it is actually a GitLab repository of Terraform code in typical GitOps fashion, with CI pipelines to plan and apply the Terraform changes automatically. In addition, it also stores a metadata file for each resource.

Compared to letting the platform create the infrastructure directly and keep track of the desired state in its own way, relying on a standard IaC tool like Terraform for the actual changes to the infrastructure presents two major advantages:

  • The Terraform code can directly be used for disaster recovery. In case of a disaster, any entitled Cobaner with a local copy of the main branch of the Khone repository is able to recreate all our platform resources directly from their machine. There is no need to rebuild the entire platform’s control plane, thus reducing our Recovery Time Objective (RTO).
  • Minimal effort required to follow the API changes of our infrastructure ecosystem (AWS, Kubernetes, Kafka, etc.). When such a change happens, all we need to do is to update the corresponding Terraform provider.

If you’d like to read more about Khone, check out Securing GitOps pipelines. In this section, we will only focus on Khone’s features that are relevant from the platform perspective.

Lightweight Terraform

In Khone, each resource is stored as a Terraform definition. There are two major differences from a normal Terraform project:

  • No Terraform environment, such as the required Terraform providers and the location of the remote Terraform state file. They are automatically generated by the CI pipeline via a simple wrapper.
  • Only vetted Khone Terraform modules can be used. This is controlled and enforced by the CI pipeline via code inspection. There is one such Terraform module for each kind of supported resource of our platform (e.g. Kafka topic, Flink pipeline, Kafka Connect mirror source connector etc.). Furthermore, those in-house Terraform modules are designed to automatically derive their key variables (e.g. resource name, cluster name, environment) from the relative path of the parent Terraform project in the Khone repository.

Those characteristics are designed to limit the risk and blast radius of human errors. They also make sure that all resources created in Khone are supported by our platform, so that they can also be discovered and managed in Heimdall and the Coban UI. Lastly, by generating the Terraform environment on the fly, we can destroy resources simply by deleting the directory of the project in the code base – this would not be possible otherwise.

Resource metadata

All resource metadata is stored in a YAML file that is present in the Terraform directory of each resource in the Khone repository. This is mainly used for ownership and cost attribution.

With this metadata, we can:

  • Better communicate with our users whenever their resources are impacted by an incident or an upcoming maintenance operation.
  • Help teams understand the costs of their usage of our platform, a significant step towards cost efficiency.

There are two different ways resource metadata can be created:

  • Automatically through Heimdall: The YAML metadata file is automatically generated by Heimdall.
  • Through Khone by a human user: The user needs to prepare the YAML metadata file and include it in the MR. This file is then verified by the CI pipeline.

Outcome

The initial version of the three-tier Coban platform, as described in this article, was internally released in March 2022, supporting only Kafka topic management at the time. Since then, we have added support for Flink pipelines, four kinds of Kafka Connect connectors, CDC pipelines, and more recently, Apache Zeppelin notebooks. At the time of writing, the Coban platform manages about 5000 data streaming resources, all described as IaC under the hood.

Our platform also exposes enriched metadata that includes the full data lineage from Kafka producers to Kafka consumers, as well as ownership information, and cost attribution.

With that, our monthly active users have almost quadrupled, truly moving the needle towards democratising the usage of real-time data within all Grab verticals.

In spite of that user growth, the end-to-end workflow success rate for self-served resource creation, change or deletion, remained well above 90% in the first half of 2023, while the Heimdall API uptime was above 99.95%.

Challenges faced

A common challenge for platform teams resides in the misalignment between the Service Level Objective (SLO) of the platform, and the various environments (e.g. staging, production) of the managed resources and upstream/downstream systems and platforms.

Indeed, the platform aims to guarantee the same level of service, regardless of whether it is used to create resources in the staging or the production environment. From the platform team’s perspective, the platform as a whole is considered production-grade, as soon as it serves actual users.

A naive approach to address this challenge is to let the production version of the platform manage all resources regardless of their respective environments. However, doing so does not permit a hermetic segregation of the staging and production environments across the organisation, which is a good security practice, and often a requirement for compliance. For example, the production version of the platform would have to connect to upstream systems in the staging environment, e.g. staging Kafka clusters to collect their consumer groups, in the case of Heimdall. Conversely, the staging version of certain downstreams would have to connect to the production version of Heimdall, to fetch the metadata of relevant staging resources.

The alternative approach, generally adopted across Grab, is to instantiate all platforms in each environment (staging and production), while still considering both instances as production-grade and guaranteeing tight SLOs in both environments.

Fig. 8 Architecture of the Coban platform, broken down by environment.

In Fig. 8, both instances of Heimdall have equivalent SLOs. The caveat is that all upstream systems and platforms must also guarantee a strict SLO in both environments. This obviously comes with a cost, for example, tighter maintenance windows for the operations pertaining to the Kafka clusters in the staging environment.

A strong “platform” culture is required for platform teams to fully understand that their instance residing in the staging environment is not their own staging environment and should not be used for testing new features.

What’s next?

Currently, users creating, updating, or deleting production resources in the Coban UI (or directly by calling Heimdall API) receive the URL of the generated GitLab MR in a Slack message. From there, they must get the MR approved by a code owner, typically another team member, and finally merge the MR, for the requested change to be actually implemented by the CI pipeline.

Although this was a fairly easy way to implement a maker/checker process that was immediately compliant with our regulatory requirements for any changes in production, the user experience is not optimal. In the near future, we plan to bring the approval mechanism into Heimdall and the Coban UI, while still providing our more advanced users with the option to directly create, approve, and merge MRs in GitLab. In the longer run, we would also like to enhance the Coban UI with the output of the Khone CI jobs that include the Terraform plan and apply results.

There is another aspect of the platform that we want to improve. As Heimdall regularly polls the upstream platforms to collect their metadata, this introduces a latency between a change in one of those platforms and its reflection in the Coban platform, which can hinder the user experience. To refresh resource metadata in Heimdall in near real time, we plan to leverage an existing Grab-wide event stream, where most of the configuration and code changes at Grab are produced as events. Heimdall will soon be able to consume those events and update the metadata of the affected resources immediately, without waiting for the next periodic refresh.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Road localisation in GrabMaps

Post Syndicated from Grab Tech original https://engineering.grab.com/road-localisation-grabmaps

Introduction

In 2022, Grab achieved self-sufficiency in its Geo services. As part of this transition, one crucial step was moving towards using an internally-developed map tailored specifically to the market in which Grab operates. Now that we have full control over the map layer, we can add more data to it or improve it according to the needs of the services running on top. One key aspect that this transition unlocked for us was the possibility of creating hyperlocal data at map level.

For instance, by determining the country to which a road belongs, we can now automatically infer the official language of that country and display the street name in that language. In another example, knowing the country for a specific road, we can automatically infer the driving side (left-handed or right-handed) leading to an improved navigation experience. Furthermore, this capability also enables us to efficiently handle various scenarios. For example, if we know that a road is part of a gated community, an area where our driver partners face restricted access, we can prevent the transit through that area.

These are just some examples of the possibilities from having full control over the map layer. By having an internal map, we can align our maps with specific markets and provide better experiences for our driver-partners and customers.

Background

For all these to be possible, we first needed to localise the roads inside the map. Our goal was to include hyperlocal data into the map, which refers to data that is specific to a certain area, such as a country, city, or even a smaller part of the city like a gated community. At the same time, we aimed to deliver our map with a high cadence, thus, we needed to find the right way to process this large amount of data while continuing to create maps in a cost-effective manner.

Solution

In the following sections of this article, we will use an extract from the Southeast Asia map to provide visual representations of the concepts discussed.

In Figure 1, Image 1 shows a visualisation of the road network, the roads belonging to this area. The coloured lines in Image 2 represent the borders identifying the countries in the same area. Overlapping the information from Image 1 and Image 2, we can extrapolate and say that the entire surface included in a certain border could have the same set of common properties as shown in Image 3. In Image 4, we then proceed with adding localised roads for each area.

Figure 1 – Map of Southeast Asia

For this to be possible, we have to find a way to localise each road and identify its associated country. Once this localisation process is complete, we can replicate all this information specific to a given border onto each individual road. This information includes details such as the country name, driving side, and official language. We can go even further and infer more information, and add hyperlocal data. For example, in Vietnam, we can automatically prevent motorcycle access on the motorways.

Assigning each road on the map to a specific area, such as a country, service area, or subdivision, presents a complex task. So, how can we efficiently accomplish this?

Implementation

The most straightforward approach would be to test the inclusion of each road into each area boundary, but that is easier said than done. With close to 30 million road segments in the Southeast Asia map and over 10 thousand areas, the computational cost of determining inclusion or intersection between a polyline and a polygon is expensive.

Our solution to this challenge involves replacing the expensive yet precise operation with a decent approximation. We introduce a proxy entity, the geohash, and we use it to approximate the areas and also to localise the roads.

We replace the geometrical inclusion with a series of simpler and less expensive operations. First, we conduct an inexpensive precomputation where we identify all the geohases that belong to a certain area or within a defined border. We then identify the geohashes to which the roads belong to. Finally, we use these precomputed values to assign roads to their respective areas. This process is also computationally inexpensive.

Given the large area we process, we leverage big data techniques to distribute the execution across multiple nodes and thus speed up the operation. We want to deliver the map daily and this is one of the many operations that are part of the map-making process.

What is a geohash?

To further understand our implementation we will first explain the geohash concept. A geohash is a unique identifier of a specific region on the Earth. The basic idea is that the Earth is divided into regions of user-defined size and each region is assigned a unique id, which is known as its geohash. For a given location on earth, the geohash algorithm converts its latitude and longitude into a string.

Geohashes uses a Base-32 alphabet encoding system comprising characters ranging from 0 to 9 and A to Z, excluding “A”, “I”, “L” and “O”. Imagine dividing the world into a grid with 32 cells. The first character in a geohash identifies the initial location of one of these 32 cells. Each of these cells are then further subdivided into 32 smaller cells.This subdivision process continues and refines to specific areas in the world. Adding characters to the geohash sub-divides a cell, effectively zooming in to a more detailed area.

The precision factor of the geohash determines the size of the cell. For instance, a precision factor of one creates a cell 5,000 km high and 5,000 km wide. A precision factor of six creates a cell 0.61km high and 1.22 km wide. Furthermore, a precision factor of nine creates a cell 4.77 m high and 4.77 m wide. It is important to note that cells are not always square and can have varying dimensions.

In Figure 2, we have exemplified a geohash 6 grid and its code is wsdt33.

Figure 2 – An example of geohash code wsdt33

Using less expensive operations

Calculating the inclusion of the roads inside a certain border is an expensive operation. However, quantifying the exact expense is challenging as it depends on several factors. One factor is the complexity of the border. Borders are usually irregular and very detailed, as they need to correctly reflect the actual border. The complexity of the road geometry is another factor that plays an important role as roads are not always straight lines.

Figure 3 – Roads to localise

Since this operation is expensive both in terms of cloud cost and time to run, we need to identify a cheaper and faster way that would yield similar results. Knowing that the complexity of the border lines is the cause of the problem, we tried using a different alternative, a rectangle. Calculating the inclusion of a polyline inside a rectangle is a cheaper operation.

Figure 4 – Roads inside a rectangle

So we transformed this large, one step operation, where we test each road segment for inclusion in a border, into a series of smaller operations where we perform the following steps:

  1. Identify all the geohashes that are part of a certain area or belong to a certain border. In this process we include additional areas to make sure that we cover the entire surface inside the border.
  2. For each road segment, we identify the list of geohashes that it belongs to. A road, depending on its length or depending on its shape, might belong to multiple geohashes.

In Figure 5, we identify that the road belongs to two geohashes and that the two geohashes are part of the border we use.

Figure 5 – Geohashes as proxy

Now, all we need to do is join the two data sets together. This kind of operation is a great candidate for a big data approach, as it allows us to run it in parallel and speed up the processing time.

Precision tradeoff

We mentioned earlier that, for the sake of argument, we replace precision with a decent approximation. Let’s now delve into the real tradeoff by adopting this approach.

The first thing that stands out with this approach is that we traded precision for cost. We are able to reduce the cost as this approach uses less hardware resources and computation time. However, this reduction in precision suffers, particularly for roads located near the borders as they might be wrongly classified.

Going back to the initial example, let’s take the case of the external road, on the left side of the area. As you can see in Figure 6, it is clear that the road does not belong to our border. But when we apply the geohash approach it gets included into the middle geohash.

Figure 6 – Wrong road localisation

Given that just a small part of the geohash falls inside the border, the entire geohash will be classified as belonging to that area, and, as a consequence, the road that belongs to that geohash will be wrongly localised and we’ll end up adding the wrong localisation information to that road. This is clearly a consequence of the precision tradeoff. So, how can we solve this?

Geohash precision

One option is to increase the geohash precision. By using smaller and smaller geohashes, we can better reflect the actual area. As we go deeper and we further split the geohash, we can accurately follow the border. However, a high geohash precision also equates to a computationally intensive operation bringing us back to our initial situation. Therefore, it is crucial to find the right balance between the geohash size and the complexity of operations.

Figure 7 – Geohash precision

Geohash coverage percentage

To find a balance between precision and data loss, we looked into calculating the geohash coverage percentage. For example, in Figure 8, the blue geohash is entirely within the border. Here we can say that it has a 100% geohash coverage.

Figure 8 – Geohash inside the border

However, take for example the geohash in Figure 9. It touches the border and has only around 80% of its surface inside the area. Given that most of its surface is within the border, we still can say that it belongs to the area.

Figure 9 – Geohash partially inside the border

Let’s look at another example. In Figure 10, only a small part of the geohash is within the border. We can say that the geohash coverage percentage here is around 5%. For these cases, it becomes difficult for us to determine whether the geohash does belong to the area. What would be a good tradeoff in this case?

Figure 10 – Geohash barely inside the border

Border shape

To go one step further, we can consider a mixed solution, where we use the border shape but only for the geohashes touching the border. This would still be an intensive computational operation but the number of roads located in these geohashes will be much smaller, so it is still a gain.

For the geohashes with full coverage inside the area, we’ll use the geohash for the localisation, the simpler operation. For the geohashes that are near the border, we’ll use a different approach. To increase the precision around the borders, we can cut the geohash following the border’s shape. Instead of having a rectangle, we’ll use a more complex shape which is still simpler than the initial border shape.

Figure 11 – Geohash following a border’s shape

Result

We began with a simple approach and we enhanced it to improve precision. This also increased the complexity of the operation. We then asked, what are the actual gains? Was it worthwhile to go through all this process? In this section, we put this to the test.

We first created a benchmark by taking a small sample of the data and ran the localisation process on a laptop. The sample comprised approximately 2% of the borders and 0.0014% of the roads. We ran the localisation process using two approaches.

  • With the first approach, we calculated the intersection between all the roads and borders. The entire operation took around 38 minutes.
  • For the second approach, we optimised the operation using geohashes. In this approach, the runtime was only 78 seconds (1.3 minutes).

However, it is important to note that this is not an apples-to-apples comparison. The operation that we measured was the localisation of the roads but we did not include the border filling operation where we fill the borders with geohashes. This is because this operation does not need to be run every time. It can be run once and reused multiple times.

Though not often required, it is still crucial to understand and consider the operation of precomputing areas and filling borders with geohashes. The precomputation process depends on several factors:

  • Number and shape of the borders – The more borders and the more complex the borders are, the longer the operation will take.
  • Geohash precision – How accurate do we need our localisation to be? The more accurate it needs to be, the longer it will take.
  • Hardware availability

Going back to our hypothesis, although this precomputation might be expensive, it is rarely run as the borders don’t change often and can be triggered only when needed. However, regular computation, where we find the area to which each road belongs to, is often run as the roads change constantly. In our system, we run this localisation for each map processing.

We can also further optimise this process by applying the opposite approach. Geohashes that have full coverage inside a border can be merged together into larger geohashes thus simplifying the computation inside the border. In the end, we can have a solution that is fully optimised for our needs with the best cost-to-performance ratio.

Figure 12 – Optimised geohashes

Conclusion

Although geohashes seem to be the right solution for this kind of problem, we also need to monitor their content. One consideration is the road density inside a geohash. For example, a geohash inside a city centre usually has a lot of roads while one in the countryside may have much less. We need to consider this aspect to have a balanced computation operation and take full advantage of the big data approach. In our case, we achieve this balance by considering the number of road kilometres within a geohash.

Figure 13 – Unbalanced data

Additionally, the resources that we choose also matter. To optimise time and cost, we need to find the right balance between the running time and resource cost. As shown in Figure 14, based on a sample data we ran, sometimes, we get the best result when using smaller machines.

Figure 14 – Cost vs runtime

The achievements and insights showcased in this article are indebted to the contributions made by Mihai Chintoanu. His expertise and collaborative efforts have profoundly enriched the content and findings presented herein.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Graph modelling guidelines

Post Syndicated from Grab Tech original https://engineering.grab.com/graph-modelling-guidelines

Introduction

Graph modelling is a highly effective technique for representing and analysing complex and interconnected data across various domains. By deciphering relationships between entities, graph modelling can reveal insights that might be otherwise difficult to identify using traditional data modelling approaches. In this article, we will explore what graph modelling is and guide you through a step-by-step process of implementing graph modelling to create a social network graph.

What is graph modelling?

Graph modelling is a method for representing real-world entities and their relationships using nodes, edges, and properties. It employs graph theory, a branch of mathematics that studies graphs, to visualise and analyse the structure and patterns within complex datasets. Common applications of graph modelling include social network analysis, recommendation systems, and biological networks.

Graph modelling process

Step 1: Define your domain

Before diving into graph modelling, it’s crucial to have a clear understanding of the domain you’re working with. This involves getting acquainted with the relevant terms, concepts, and relationships that exist in your specific field. To create a social network graph, familiarise yourself with terms like users, friendships, posts, likes, and comments.

Step 2: Identify entities and relationships

After defining your domain, you need to determine the entities (nodes) and relationships (edges) that exist within it. Entities are the primary objects in your domain, while relationships represent how these entities interact with each other. In a social network graph, users are entities, and friendships are relationships.

Step 3: Establish properties

Each entity and relationship may have a set of properties that provide additional information. In this step, identify relevant properties based on their significance to the domain. A user entity might have properties like name, age, and location. A friendship relationship could have a ‘since’ property to denote the establishment of the friendship.

Step 4: Choose a graph model

Once you’ve identified the entities, relationships, and properties, it’s time to choose a suitable graph model. Two common models are:

  • Property graph: A versatile model that easily accommodates properties on both nodes and edges. It’s well-suited for most applications.
  • Resource Description Framework (RDF): A World Wide Web Consortium (W3C) standard model, using triples of subject-predicate-object to represent data. It is commonly used in semantic web applications.

For a social network graph, a property graph model is typically suitable. This is because user entities have many attributes and features. Property graphs provide a clear representation of the relationships between people and their attribute profiles.

Figure 1 – Social network graph

Step 5: Develop a schema

Although not required, developing a schema can be helpful for large-scale projects and team collaborations. A schema defines the structure of your graph, including entity types, relationships, and properties. In a social network graph, you might have a schema that specifies the types of nodes (users, posts) and the relationships between them (friendships, likes, comments).

Step 6: Import or generate data

Next, acquire the data needed to populate your graph. This can come in the form of existing datasets or generated data from your application. For a social network graph, you can import user information from a CSV file and generate simulated friendships, posts, likes, and comments.

Step 7: Implement the graph using a graph database or other storage options

Finally, you need to store your graph data using a suitable graph database. Neo4j, Amazon Neptune, or Microsoft Azure Cosmos DB are examples of graph databases. Alternatively, depending on your specific requirements, you can use a non-graph database or an in-memory data structure to store the graph.

Step 8: Analyse and visualise the graph

After implementing the graph, you can perform various analyses using graph algorithms, such as shortest path, centrality, or community detection. In addition, visualising your graph can help you gain insights and facilitate communication with others.

Conclusion

By following these steps, you can effectively create and analyse graph models for your specific domain. Remember to adjust the steps according to your unique domain and requirements, and always ensure that confidential and sensitive data is properly protected.

References

[1] What is a Graph Database?

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

LLM-powered data classification for data entities at scale

Post Syndicated from Grab Tech original https://engineering.grab.com/llm-powered-data-classification

Introduction

At Grab, we deal with PetaByte-level data and manage countless data entities ranging from database tables to Kafka message schemas. Understanding the data inside is crucial for us, as it not only streamlines the data access management to safeguard the data of our users, drivers and merchant-partners, but also improves the data discovery process for data analysts and scientists to easily find what they need.

The Caspian team (Data Engineering team) collaborated closely with the Data Governance team on automating governance-related metadata generation. We started with Personal Identifiable Information (PII) detection and built an orchestration service using a third-party classification service. With the advent of the Large Language Model (LLM), new possibilities dawned for metadata generation and sensitive data identification at Grab. This prompted the inception of the project, which aimed to integrate LLM classification into our existing service. In this blog, we share insights into the transformation from what used to be a tedious and painstaking process to a highly efficient system, and how it has empowered the teams across the organisation.

For ease of reference, here’s a list of terms we’ve used and their definitions:

  • Data Entity: An entity representing a schema that contains rows/streams of data, for example, database tables, stream messages, data lake tables.
  • Prediction: Refers to the model’s output given a data entity, unverified manually.
  • Data Classification: The process of classifying a given data entity, which in the context of this blog, involves generating tags that represent sensitive data or Grab-specific types of data.
  • Metadata Generation: The process of generating the metadata for a given data entity. In this blog, since we limit the metadata to the form of tags, we often use this term and data classification interchangeably.
  • Sensitivity: Refers to the level of confidentiality of data. High sensitivity means that the data is highly confidential. The lowest level of sensitivity often refers to public-facing or publicly-available data.

Background

When we first approached the data classification problem, we aimed to solve something more specific – Personal Identifiable Information (PII) detection. Initially, to protect sensitive data from accidental leaks or misuse, Grab implemented manual processes and campaigns targeting data producers to tag schemas with sensitivity tiers. These tiers ranged from Tier 1, representing schemas with highly sensitive information, to Tier 4, indicating no sensitive information at all. As a result, half of all schemas were marked as Tier 1, enforcing the strictest access control measures.

The presence of a single Tier 1 table in a schema with hundreds of tables justifies classifying the entire schema as Tier 1. However, since Tier 1 data is rare, this implies that a large volume of non-Tier 1 tables, which ideally should be more accessible, have strict access controls.

Shifting access controls from the schema-level to the table-level could not be done safely due to the lack of table classification in the data lake. We could have conducted more manual classification campaigns for tables, however this was not feasible for two reasons:

  1. The volume, velocity, and variety of data had skyrocketed within the organisation, so it took significantly more time to classify at table level compared to schema level. Hence, a programmatic solution was needed to streamline the classification process, reducing the need for manual effort.
  2. App developers, despite being familiar with the business scope of their data, interpreted internal data classification policies and external data regulations differently, leading to inconsistencies in understanding.

A service called Gemini (named before Google announced the Gemini model!) was built internally to automate the tag generation process using a third party data classification service. Its purpose was to scan the data entities in batches and generate column/field level tags. These tags would then go through a review process by the data producers. The data governance team provided classification rules and used regex classifiers, alongside the third-party tool’s own machine learning classifiers, to discover sensitive information.

After the implementation of the initial version of Gemini, a few challenges remained.

  1. The third-party tool did not allow customisations of its machine learning classifiers, and the regex patterns produced too many false positives during our evaluation.
  2. Building in-house classifiers would require a dedicated data science team to train a customised model. They would need to invest a significant amount of time to understand data governance rules thoroughly and prepare datasets with manually labelled training data.

LLM came up on our radar following its recent “iPhone moment” with ChatGPT’s explosion onto the scene. It is trained using an extremely large corpus of text and contains trillions of parameters. It is capable of conducting natural language understanding tasks, writing code, and even analysing data based on requirements. The LLM naturally solves the mentioned pain points as it provides a natural language interface for data governance personnel. They can express governance requirements through text prompts, and the LLM can be customised effortlessly without code or model training.

Methodology

In this section, we dive into the implementation details of the data classification workflow. Please refer to the diagram below for a high-level overview:

Figure 1 – Overview of data classification workflow

This diagram illustrates how data platforms, the metadata generation service (Gemini), and data owners work together to classify and verify metadata. Data platforms trigger scan requests to the Gemini service to initiate the tag classification process. After the tags are predicted, data platforms consume the predictions, and the data owners are notified to verify these tags.

Orchestration

Figure 2 – Architecture diagram of the orchestration service Gemini

Our orchestration service, Gemini, manages the data classification requests from data platforms. From the diagram, the architecture contains the following components:

  1. Data platforms: These platforms are responsible for managing data entities and initiating data classification requests.
  2. Gemini: This orchestration service communicates with data platforms, schedules and groups data classification requests.
  3. Classification engines: There are two available engines (a third-party classification service and GPT3.5) for executing the classification jobs and return results. Since we are still in the process of evaluating two engines, both of the engines are working concurrently.

When the orchestration service receives requests, it helps aggregate the requests into reasonable mini-batches. Aggregation is achievable through the message queue at fixed intervals. In addition, a rate limiter is attached at the workflow level. It allows the service to call the Cloud Provider APIs with respective rates to prevent the potential throttling from the service providers.

Specific to LLM orchestration, there are two limits to be mindful of. The first one is the context length. The input length cannot surpass the context length, which was 4000 tokens for GPT3.5 at the time of development (or around 3000 words). The second one is the overall token limit (since both the input and output share the same token limit for a single request). Currently, all Azure OpenAI model deployments share the same quota under one account, which is set at 240K tokens per minute.

Classification

In this section, we focus on LLM-powered column-level tag classification. The tag classification process is defined as follows:

Given a data entity with a defined schema, we want to tag each field of the schema with metadata classifications that follow an internal classification scheme from the data governance team. For example, the field can be tagged as a ** or a *<particular type of personally identifiable information (PII)>. These tags indicate that the field contains a business metric or PII.

We ask the language model to be a column tag generator and to assign the most appropriate tag to each column. Here we showcase an excerpt of the prompt we use:

You are a database column tag classifier, your job is to assign the most appropriate tag based on table name and column name. The database columns are from a company that provides ride-hailing, delivery, and financial services. Assign one tag per column. However not all columns can be tagged and these columns should be assigned <None>. You are precise, careful and do your best to make sure the tag assigned is the most appropriate.

The following is the list of tags to be assigned to a column. For each line, left hand side of the : is the tag and right hand side is the tag definition

…
<Personal.ID> : refers to government-provided identification numbers that can be used to uniquely identify a person and should be assigned to columns containing "NRIC", "Passport", "FIN", "License Plate", "Social Security" or similar. This tag should absolutely not be assigned to columns named "id", "merchant id", "passenger id", “driver id" or similar since these are not government-provided identification numbers. This tag should be very rarely assigned.

<None> : should be used when none of the above can be assigned to a column.
…

Output Format is a valid json string, for example:

[{
        "column_name": "",
        "assigned_tag": ""
}]

Example question

`These columns belong to the "deliveries" table

        1. merchant_id
        2. status
        3. delivery_time`

Example response

[{
        "column_name": "merchant_id",
        "assigned_tag": "<Personal.ID>"
},{
        "column_name": "status",
        "assigned_tag": "<None>"
},{
        "column_name": "delivery_time",
        "assigned_tag": "<None>"
}]

We also curated a tag library for LLM to classify. Here is an example:

Column-level Tag Definition
Personal.ID Refers to external identification numbers that can be used to uniquely identify a person and should be assigned to columns containing “NRIC”, “Passport”, “FIN”, “License Plate”, “Social Security” or similar.
Personal.Name Refers to the name or username of a person and should be assigned to columns containing “name”, “username” or similar.
Personal.Contact_Info Refers to the contact information of a person and should be assigned to columns containing “email”, “phone”, “address”, “social media” or similar.
Geo.Geohash Refers to a geohash and should be assigned to columns containing “geohash” or similar.
None Should be used when none of the above can be assigned to a column.

The output of the language model is typically in free text format, however, we want the output in a fixed format for downstream processing. Due to this nature, prompt engineering is a crucial component to make sure downstream workflows can process the LLM’s output.

Here are some of the techniques we found useful during our development:

  1. Articulate the requirements: The requirement of the task should be as clear as possible, LLM is only instructed to do what you ask it to do.
  2. Few-shot learning: By showing the example of interaction, models understand how they should respond.
  3. Schema Enforcement: Leveraging its ability of understanding code, we explicitly provide the DTO (Data Transfer Object) schema to the model so that it understands that its output must conform to it.
  4. Allow for confusion: In our prompt we specifically added a default tag – the LLM is instructed to output the default ** tag when it cannot make a decision or is confused.

Regarding classification accuracy, we found that it is surprisingly accurate with its great semantic understanding. For acknowledged tables, users on average change less than one tag. Also, during an internal survey done among data owners at Grab in September 2023, 80% reported that this new tagging process helped them in tagging their data entities.

Publish and verification

The predictions are published to the Kafka queue to downstream data platforms. The platforms inform respective users weekly to verify the classified tags to improve the model’s correctness and to enable iterative prompt improvement. Meanwhile, we plan to remove the verification mandate for users once the accuracy reaches a certain level.

Figure 3 – Verification message shown in the data platform for user to verify the tags

Impact

Since the new system was rolled out, we have successfully integrated this with Grab’s metadata management platform and production database management platform. Within a month since its rollout, we have scanned more than 20,000 data entities, averaging around 300-400 entities per day.

Using a quick back-of-the-envelope calculation, we can see the significant time savings achieved through automated tagging. Assuming it takes a data owner approximately 2 minutes to classify each entity, we are saving approximately 360 man-days per year for the company. This allows our engineers and analysts to focus more on their core tasks of engineering and analysis rather than spending excessive time on data governance.

The classified tags pave the way for more use cases downstream. These tags, in combination with rules provided by data privacy office in Grab, enable us to determine the sensitivity tier of data entities, which in turn will be leveraged for enforcing the Attribute-based Access Control (ABAC) policies and enforcing Dynamic Data Masking for downstream queries. To learn more about the benefits of ABAC, readers can refer to another engineering blog posted earlier.

Cost wise, for the current load, it is extremely affordable contrary to common intuition. This affordability enables us to scale the solution to cover more data entities in the company.

What’s next?

Prompt improvement

We are currently exploring feeding sample data and user feedback to greatly increase accuracy. Meanwhile, we are experimenting on outputting the confidence level from LLM for its own classification. With confidence level output, we would only trouble users when the LLM is uncertain of its answers. Hopefully this can remove even more manual processes in the current workflow.

Prompt evaluation

To track the performance of the prompt given, we are building analytical pipelines to calculate the metrics of each version of the prompt. This will help the team better quantify the effectiveness of prompts and iterate better and faster.

Scaling out

We are also planning to scale out this solution to more data platforms to streamline governance-related metadata generation to more teams. The development of downstream applications using our metadata is also on the way. These exciting applications are from various domains such as security, data discovery, etc.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Scaling marketing for merchants with targeted and intelligent promos

Post Syndicated from Grab Tech original https://engineering.grab.com/scaling-marketing-for-merchants

Introduction

A promotional campaign is a marketing effort that aims to increase sales, customer engagement, or brand awareness for a product, service, or company. The target is to have more orders and sales by assigning promos to consumers within a given budget during the campaign period.

Figure 1 – Merchant feedback on marketing

From our research, we found that merchants have specific goals for the promos they are willing to offer. They want a simple and cost-effective way to achieve their specific business goals by providing well-designed offers to target the correct customers. From Grab’s perspective, we want to help merchants set up and run campaigns efficiently, and help them achieve their specific business goals.

Problem statement

One of Grab’s platform offerings for merchants is the ability to create promotional campaigns. With the emergence of AI technologies, we found that there are opportunities for us to further optimise the platform. The following are the gaps and opportunities we identified:

  • Globally assigned promos without smart targeting: The earlier method targeted every customer, so everyone could redeem until the promo reached the redemption limits. However, this method did not accurately meet business goals or optimise promo spending. The promotional campaign should intelligently target the best promo for each customer to increase sales and better utilise promo spending.
  • No customised promos for every merchant: To better optimise sales for each merchant, merchants should offer customised promos based on their historical consumer trends, not just a general offer set. For example, for a specific merchant, a 27% discount may be the appropriate offer to uplift revenue and sales based on user bookings. However, merchants do not always have the expertise to decide which offer to select to increase profit.
  • No AI-driven optimisation: Without AI models, it was harder for merchants to assign the right promos at scale to each consumer and optimise their business goals.

As shown in the following figure, AI-driven promotional campaigns are expected to bring higher sales with more promo spend than heuristic ones. Hence, at Grab we looked to introduce an automated, AI-driven tool that helps merchants intelligently target consumers with appropriate promos, while optimising sales and promo spending. That’s where Bullseye comes in.

Figure 2 – Graph showing the sales expectations for AI-driven pomotional campaigns

Solution

Bullseye is an automated, AI-driven promo assignment system that leverages the following capabilities:

  • Automated user segmentation: Enables merchants to target new, churned, and active users or all users.
  • Automatic promo design: Enables a merchant-level promo design framework to customise promos for each merchant or merchant group according to their business goals.
  • Assign each user the optimal promo: Users will receive promos selected from an array of available promos based on the merchant’s business objective.
  • Achieve different Grab and merchant objectives: Examples of objectives are to increase merchant sales and decrease Grab promo spend.
  • Flexibility to optimise for an individual merchant brand or group of merchant brands: For promotional campaigns, targeting and optimisation can be performed for a single or group of merchants (e.g. enabling GrabFood to run cuisine-oriented promo campaigns).

Architecture

Figure 3 – Bullseye architecture

The Bullseye architecture consists of a user interface (UI) and a backend service to handle requests. To use Bullseye, our operations team inputs merchant information into the Bullseye UI. The backend service will then interact with APIs to process the information using the AI model. As we work with a large customer population, data is stored in S3 and the API service triggering Chimera Spark job is used to run the prediction model and generate promo assignments. During the assignment, the Spark job parses the input parameters, pre-validates the input, makes some predictions, and then returns the promo assignment results to the backend service.

Implementation

The key components in Bullseye are shown in the following figure:

Figure 4 – Key components of Bullseye
  • Eater Segments Identifier: Identifies each user as active, churned, or new based on their historical orders from target merchants.
  • Promo Designer: We constructed a promo variation design framework to adaptively design promo variations for each campaign request as shown in the diagram below.
    • Offer Content Candidate Generation: Generates variant settings of promos based on the promo usage history.
    • Campaign Impact Simulator: Predicts business metrics such as revenue, sales, and cost based on the user and merchant profiles and offer features.
    • Optimal Promo Selection: Selects the optimal offer based on the predicted impact and the given campaign objective. The optimal would be based on how you define optimal. For example, if the goal is to maximise merchant sales, the model selects the top candidate which can bring the highest revenue. Finally, with the promo selection, the service returns the promo set to be used in the target campaign.

      Figure 5 – Optimal Promo Selection
  • Customer Response Model: Predicts customer responses such as order value, redemption, and take-up rate if assigning a specific promo. Bullseye captures various user attributes and compares it with an offer’s attributes. Examples of attributes are cuisine type, food spiciness, and discount amount. When there is a high similarity in the attributes, there is a higher probability that the user will take up the offer.

    Figure 6 – Customer Response Model

  • Hyper-parameter Selection: Optimises toward multiple business goals. Tuning of hyper-parameters allows the AI assignment model to learn how to meet success criteria such as cost per merchant sales (cpSales) uplift and sales uplift. The success criteria is the achieving of business goals. For example, the merchant wants the sales uplift after assigning promo, but cpSales uplift cannot be higher than 10%. With tuning, the optimiser can find optimal points to meet business goals and use AI models to search for better settings with high efficiency compared to manual specification. We need to constantly tune and iterate models and hyper-parameters to adapt to ever-evolving business goals and the local landscape.

    As shown in the image below, AI assignments without hyper-parameter tuning (HPT) leads to a high cpSales uplift but low sales uplift (red dot). So the hyper-parameters would help to fine-tune the assignment result to be in the optimal space such as the blue dot, which may have lower sales than the red dot but meet the success criteria.

    Figure 7 – Graph showing the impact of using AI assignments with HPT

Impact

We started using Bullseye in 2021. From its use we found that:

  • Hyper-parameters tuning and auto promo design can increase sales and reduce promo spend for food campaigns.
  • Promo Designer optimises budget utilisation and increases the number of promo redemptions for food campaigns.
  • The Customer Response Model reduced promo spending for Mart promotional campaigns.

Conclusion

We have seen positive results with the implementation of Bullseye such as reduced promo spending and maximised budget spending returns. In our efforts to serve our merchants better and help them achieve their business goals, we will continue to improve Bullseye. In the next phase, we plan to implement a more intelligent service, enabling reinforcement learning, and online assignment. We also aim to scale AI adoption by onboarding regional promotional campaigns as much as possible.

Special thanks to William Wu, Rui Tan, Rahadyan Pramudita, Krishna Murthy, and Jiesin Chia for making this project a success.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Stepping up marketing for advertisers: Scalable lookalike audience

Post Syndicated from Grab Tech original https://engineering.grab.com/scalable-lookalike-audiences

The advertising industry is constantly evolving, driven by advancements in technology and changes in consumer behaviour. One of the key challenges in this industry is reaching the right audience, reaching people who are most likely to be interested in your product or service. This is where the concept of a lookalike audience comes into play. By identifying and targeting individuals who share similar characteristics with an existing customer base, businesses can significantly improve the effectiveness of their advertising campaigns.

However, as the scale of Grab advertisements grows, there are several optimisations needed to maintain the efficacy of creating lookalike audiences such as high service level agreement (SLA), high cost of audience creation, and unstable data ingestion.

The need for an even more efficient and scalable solution for creating lookalike audiences was the motivation behind the development of the scalable lookalike audience platform. By developing a high-performance in-memory lookalike audience retrieval service and embedding-based lookalike audience creation and updating pipelines, t​his improved platform builds on the existing system and provides an even more effective tool for advertisers to reach their target audience.

Constant optimisation for greater precision

In the dynamic world of digital advertising, the ability to quickly and efficiently reach the right audience is paramount and a key strategy is targeted advertising. As such, we have to constantly find ways to improve our current approach to creating lookalike audiences that impacts both advertisers and users. Some of the gaps we identified included:

  • Long SLA for audience creation. Earlier, the platform stored results on Segmentation Platform (SegP) and it took two working days to generate a lookalike audience list. This is because inserting a single audience into SegP took three times longer than generating the audience. Extended creation times impacted the effectiveness of advertising campaigns, as it limited the ability of advertisers to respond quickly to changing market dynamics.

  • Low scalability. As the number of onboarded merchant-partners increased, the time and cost of generating lookalike audiences also increased proportionally. This limited the availability of lookalike audience generation for all advertisers, particularly those with large customer bases or rapidly changing audience profiles.

  • Low updating frequency of lookalike audiences. With automated updates only occurring on a weekly basis, this increased the likelihood that audiences may become outdated and ineffective. This meant there was scope to further improve to help advertisers more effectively reach their campaign goals, by targeting individuals who fit the desired audience profile.

  • High cost of creation. The cost of producing one segment can add up quickly for advertisers who need to generate multiple audiences. This could impact scalability for advertisers as they could hesitate to effectively use multiple lookalike audiences in their campaigns.

Solution

To efficiently identify the top N lookalike audiences for each Grab user from our pool of millions of users, we developed a solution that leverages user and audience representations in the form of embeddings. Embeddings are vector representations of data that utilise linear distances to capture structure from the original datasets. With embeddings, large sets of data are compressed and easily processed without affecting data integrity. This approach ensures high accuracy, low latency, and low cost in retrieving the most relevant audiences.

Our solution takes into account the fact that representation drift varies among entities as data is added. For instance, merchant-partner embeddings are more stable than passenger embeddings. By acknowledging this reality, we optimised our process to minimise cost while maintaining a desirable level of accuracy. Furthermore, we believe that having a strong representation learning strategy in the early stages reduced the need for complex models in the following stages.

Our solution comprises two main components:

  1. Real-time lookalike audience retrieving: We developed an in-memory high-performance retrieving service that stores passenger embeddings, audience embeddings, and audience score thresholds. To further reduce cost, we designed a passenger embedding compression algorithm that reduces the memory needs of passenger embeddings by around 90%.

  2. Embedding-based audience creation and updating: The output of this part of the project is an online retrieving model that includes passenger embeddings, audience embeddings, and thresholds. To minimise costs, we leverage the passenger embeddings that are also utilised by other projects within Grab, beyond advertising, thus sharing the cost. The audience embeddings and thresholds are produced with a low-cost small neural network.

In summary, our approach to creating scalable lookalike audiences is designed to be cost-effective, accurate, and efficient, leveraging the power of embeddings and smart computational strategies to deliver the best possible audiences for our advertisers.

Solution architecture

  • The advertiser creates a campaign with a custom audience, which triggers the audience creation process. During this process, the audience service stores the audience metadata provided by advertisers in a message queue.
  • A scheduled Data Science (DS) job then retrieves the pending audience metadata, creates the audience, and updates the TensorFlow Serving (TFS) model.
  • During the serving period, the Backend (BE) service calls the DS service to retrieve all audiences that include the target user. Ads that are targeting these audiences are then selected by the Click-Through Rate (CTR) model to be displayed to the user.

Implementation

To ensure the efficiency of the lookalike audience retrieval model and minimise the costs associated with audience creation and serving, we’ve trained the user embedding model using billions of user actions. This extensive training allows us to employ straightforward methods for audience creation and serving, while still maintaining high levels of accuracy.

Creating lookalike audiences

The Audience Creation Job retrieves the audience metadata from the online audience service, pulls the passenger embeddings, and then averages these embeddings to generate the audience embedding.

We use the cosine score of a user and the audience embedding to identify the audiences the user belongs to. Hence, it’s sufficient to store only the audience embedding and score threshold. Additionally, a global target-all-pax Audience list is stored to return these audiences for each online request.

Serving lookalike audiences

The online audience service is also tasked with returning all the audiences to which the current user belongs. This is achieved by utilising the cosine score of the user embedding and audience embeddings, and filtering out all audiences that surpass the audience thresholds.

To adhere to latency requirements, we avoid querying any external feature stores like Redis and instead, store all the embeddings in memory. However, the embeddings of all users are approximately 20 GB, which could affect model loading. Therefore, we devised an embedding compression method based on hash tricks inspired by Bloom Filter.

  • We utilise hash functions to obtain the hash64 value of the paxID, which is then segmented into four 16-bit values. Each 16-bit value corresponds to a 16-dimensional embedding block, and the compressed embedding is the concatenation of these four 16-dimensional embeddings.
  • For each paxID, we have both the original user embedding and the compressed user embedding. The compressed user embeddings are learned by minimising the Mean Square Error loss.
  • We can balance the storage cost and the accuracy by altering the number of hash functions used.

Impact

  • Users can see advertisements targeting a new audience within 15 mins after the advertiser creates a campaign.
  • This new system doubled the impressions and clicks, while also improving the CTR, conversion rate, and return on investment.
  • Costs for generating lookalike audiences decreased by 98%.

Learnings/Conclusion

To evaluate the effectiveness of our new scalable system besides addressing these issues, we conducted an A/B test to compare it with the earlier system. The results revealed that this new system effectively doubled the number of impressions and clicks while also enhancing the CTR, conversion rate, and return on investment.

Over the years, we have amassed over billions of user actions, which have been instrumental in training the model and creating a comprehensive representation of user interests in the form of embeddings.

What’s next?

While this scalable system has proved its effectiveness and demonstrated impressive results in CTR, conversion rate, and return on investment, there is always room for improvement.  

In the next phase, we plan to explore more advanced algorithms, refine our feature engineering process, and conduct more extensive hyperparameter tuning. Additionally, we will continue to monitor the system’s performance and make necessary adjustments to ensure it remains robust and effective in serving our advertisers’ needs.

References

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Building hyperlocal GrabMaps

Post Syndicated from Grab Tech original https://engineering.grab.com/building-hyperlocal-grabmaps

Introduction

Southeast Asia (SEA) is a dynamic market, very different from other parts of the world. When travelling on the road, you may experience fast-changing road restrictions, new roads appearing overnight, and high traffic congestion. To address these challenges, GrabMaps has adapted to the SEA market by leveraging big data solutions. One of the solutions is the integration of hyperlocal data in GrabMaps.

Hyperlocal information is oriented around very small geographical communities and obtained from the local knowledge that our map team gathers. The map team is spread across SEA, enabling us to define clear specifications (e.g. legal speed limits), and validate that our solutions are viable.

Figure 1 – Map showing detections from images and probe data, and hyperlocal data.

Hyperlocal inputs make our mapping data even more robust, adding to the details collected from our image and probe detection pipelines. Figure 1 shows how data from our detection pipeline is overlaid with hyperlocal data, and then mapped across the SEA region. If you are curious and would like to check out the data yourself, you can download it here.

Processing hyperlocal data

Now let’s go through the process of detecting hyperlocal data.

Download data

GrabMaps is based on OpenStreetMap (OSM). The first step in the process is to download the .pbf file for Asia from geofabrick.de. This .pbf file contains all the data that is available on OSM, such as details of places, trees, and roads. Take for example a park, the .pbf file would contain data on the park name, wheelchair accessibility, and many more.

For this article, we will focus on hyperlocal data related to the road network. For each road, you can obtain data such as the type of road (residential or motorway), direction of traffic (one-way or more), and road name.

Convert data

To take advantage of big data computing, the next step in the process is to convert the .pbf file into Parquet format using a Parquetizer. This will convert the binary data in the .pbf file into a table format. Each road in SEA is now displayed as a row in a table as shown in Figure 2.

Figure 2 – Road data in Parquet format.

Identify hyperlocal data

After the data is prepared, GrabMaps then identifies and inputs all of our hyperlocal data, and delivers a consolidated view to our downstream services. Our hyperlocal data is obtained from various sources, either by looking at geometry, or other attributes in OSM such as the direction of travel and speed limit. We also apply customised rules defined by our local map team, all in a fully automated manner. This enhances the map together with data obtained from our rides and deliveries GPS pings and from KartaView, Grab’s product for imagery collection.

Figure 3 – Architecture diagram showing how hyperlocal data is integrated into GrabMaps.

Benefit of our hyperlocal GrabMaps

GrabNav, a turn-by-turn navigation tool available on the Grab driver app, is one of our products that benefits from having hyperlocal data. Here are some hyperlocal data that are made available through our approach:

  • Localisation of roads: The country, state/county, or city the road is in
  • Language spoken, driving side, and speed limit
  • Region-specific default speed regulations
  • Consistent name usage using language inference
  • Complex attributes like intersection links

To further explain the benefits of this hyperlocal feature, we will use intersection links as an example. In the next section, we will explain how intersection links data is used and how it impacts our driver-partners and passengers.

An intersection link is when two or more roads meet. Figure 4 and 5 illustrates what an intersection link looks like in a GrabMaps mock and in OSM.

Figure 4 – Mock of an intersection link.
Figure 5 – Intersection link illustration from a real road network in OSM.

To locate intersection links in a road network, there are computations involved. We would first combine big data processing (which we do using Spark) with graphs. We use geohash as the unit of processing, and for each geohash, a bi-directional graph is created.

From such resulting graphs, we can determine intersection links if:

  • Road segments are parallel
  • The roads have the same name
  • The roads are one way roads
  • Angles and the shape of the road are in the intervals or requirements we seek

Each intersection link we identify is tagged in the map as intersection_links. Our downstream service teams can then identify them by searching for the tag.

Impact

The impact we create with our intersection link can be explained through the following example.

Figure 6 – Longer route, without GrabMaps intersection link feature. The arrow indicates where the route should have suggested a U-turn.
Figure 7 – Shorter route using GrabMaps by taking a closer link between two main roads.

Figure 6 and Figure 7 show two different routes for the same origin and destination. However, you can see that Figure 7 has a shorter route and this is made available by taking an intersection link early on in the route. The highlighted road segment in Figure 7 is an intersection link, tagged by the process we described earlier. The route is now much shorter making GrabNav more efficient in its route suggestion.

There are numerous factors that can impact a driver-partner’s trip, and intersection links are just one example. There are many more features that GrabMaps offers across Grab’s services that allow us to “outserve” our partners.

Conclusion

GrabMaps and GrabNav deliver enriched experiences to our driver-partners. By integrating certain hyperlocal data features, we are also able to provide more accurate pricing for both our driver-partners and passengers. In our mission towards sustainable growth, this is an area that we will keep on improving by leveraging scalable tech solutions.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Streamlining Grab’s Segmentation Platform with faster creation and lower latency

Post Syndicated from Grab Tech original https://engineering.grab.com/streamlining-grabs-segmentation-platform

Launched in 2019, Segmentation Platform has been Grab’s one-stop platform for user segmentation and audience creation across all business verticals. User segmentation is the process of dividing passengers, driver-partners, or merchant-partners (users) into sub-groups (segments) based on certain attributes. Segmentation Platform empowers Grab’s teams to create segments using attributes available within our data ecosystem and provides APIs for downstream teams to retrieve them.

Checking whether a user belongs to a segment (Membership Check) influences many critical flows on the Grab app:

  1. When a passenger launches the Grab app, our in-house experimentation platform will tailor the app experience based on the segments the passenger belongs to.
  2. When a driver-partner goes online on the Grab app, the Drivers service calls Segmentation Platform to ensure that the driver-partner is not blacklisted.
  3. When launching marketing campaigns, Grab’s communications platform relies on Segmentation Platform to determine which passengers, driver-partners, or merchant-partners to send communication to.

This article peeks into the current design of Segmentation Platform and how the team optimised the way segments are stored to reduce read latency thus unlocking new segmentation use cases.

Architecture

Segmentation Platform comprises two major subsystems:

  1. Segment creation
  2. Segment serving
Fig 1. Segmentation Platform architecture

Segment creation

Segment creation is powered by Spark jobs. When a Grab team creates a segment, a Spark job is started to retrieve data from our data lake. After the data is retrieved, cleaned, and validated, the Spark job calls the serving sub-system to populate the segment with users.

Segment serving

Segment serving is powered by a set of Go services. For persistence and serving, we use ScyllaDB as our primary storage layer. We chose to use ScyllaDB as our NoSQL store due to its ability to scale horizontally and meet our <80ms p99 SLA. Users in a segment are stored as rows indexed by the user ID. The table is partitioned by the user ID ensuring that segment data is evenly distributed across the ScyllaDB clusters.

User ID Segment Name Other metadata columns
1221 Segment_A
3421 Segment_A
5632 Segment_B
7889 Segment_B

With this design, Segmentation Platform handles up to 12K read and 36K write QPS, with a p99 latency of 40ms.

Problems

The existing system has supported Grab, empowering internal teams to create rich and personalised experiences. However, with the increased adoption and use, certain challenges began to emerge:

  1. As more and larger segments are being created, the write QPS became a bottleneck leading to longer wait times for segment creation.
  2. Grab services requested even lower latency for membership checks.

Long segment creation times

As more segments were created by different teams within Grab, the write QPS was no longer able to keep up with the teams’ demands. Teams would have to wait for hours for their segments to be created, reducing their operational velocity.

Read latency

Further, while the platform already offers sub-40ms p99 latency for reads, this was still too slow for certain services and their use cases. For example, Grab’s communications platform needed to check whether a user belongs to a set of segments before sending out communication and incurring increased latency for every communication request was not acceptable. Another use case was for Experimentation Platform, where checks must have low latency to not impact the user experience.
Thus, the team explored alternative ways of storing the segment data with the goals of:

  1. Reducing segment creation time
  2. Reducing segment read latency
  3. Maintaining or reducing cost

Solution

Segments as bitmaps

One of the main engineering challenges was scaling the write throughput of the system to keep pace with the number of segments being created. As a segment is stored across multiple rows in ScyllaDB, creating a large segment incurs a huge number of writes to the database. What we needed was a better way to store a large set of user IDs. Since user IDs are represented as integers in our system, a natural solution to storing a set of integers was a bitmap.

For example, a segment containing the following user IDs: 1, 6, 25, 26, 89 could be represented with a bitmap as follows:

Fig 2. Bitmap representation of a segment

To perform a membership check, a bitwise operation can be used to check if the bit at the user ID’s index is 0 or 1. As a bitmap, the segment can also be stored as a single Blob in object storage instead of inside ScyllaDB.

However, as the number of user IDs in the system is large, a small and sparse segment would lead to prohibitively large bitmaps. For example, if a segment contains 2 user IDs 100 and 200,000,000, it will require a bitmap containing 200 million bits (25MB) where all but 2 of the bits are just 0. Thus, the team needed an encoding to handle sparse segments more efficiently.

Roaring Bitmaps

After some research, we landed on Roaring Bitmaps, which are compressed uint32 bitmaps. With roaring bitmaps, we are able to store a segment with 1 million members in a Blob smaller than 1 megabyte, compared to 4 megabytes required by a naive encoding.

Roaring Bitmaps achieve good compression ratios by splitting the set into fixed-size (216) integer chunks and using three different data structures (containers) based on the data distribution within the chunk. The most significant 16 bits of the integer are used as the index of the chunk, and the least significant 16 bits are stored in the containers.

Array containers

Array containers are used when data is sparse (<= 4096 values). An array container is a sorted array of 16-bit integers. It is memory-efficient for sparse data and provides logarithmic-time access.

Bitmap containers

Bitmap containers are used when data is dense. A bitmap container is a 216 bit container where each bit represents the presence or absence of a value. It is memory-efficient for dense data and provides constant-time access.

Run containers

Finally, run containers are used when a chunk has long consecutive values. Run containers use run-length encoding (RLE) to reduce the storage required for dense bitmaps. Run containers store a pair of values representing the start and the length of the run. It provides good memory efficiency and fast lookups.

The diagram below shows how a dense bitmap container that would have required 91 bits can be compressed into a run container by storing only the start (0) and the length (90). It should be noted that run containers are used only if it reduces the number of bytes required compared to a bitmap.

Fig 3. A dense bitmap container compressed into a run container

By using different containers, Roaring Bitmaps are able to achieve good compression across various data distributions, while maintaining excellent lookup performance. Additionally, as segments are represented as Roaring Bitmaps, service teams are able to perform set operations (union, interaction, and difference, etc) on the segments on the fly, which previously required re-materialising the combined segment into the database.

Caching with an SDK

Even though the segments are now compressed, retrieving a segment from the Blob store for each membership check would incur an unacceptable latency penalty. To mitigate the overhead of retrieving a segment, we developed an SDK that handles the retrieval and caching of segments.

Fig 4. How the SDK caches segments

The SDK takes care of the retrieval, decoding, caching, and watching of segments. Users of the SDK are only required to specify the maximum size of the cache to prevent exhausting the service’s memory. The SDK provides a cache with a least-recently-used eviction policy to ensure that hot segments are kept in the cache. They are also able to watch for updates on a segment and the SDK will automatically refresh the cached segment when it is updated.

Hero teams

Communications Platform

Communications Platform has adopted the SDK to implement a new feature to control the communication frequency based on which segments a user belongs to. Using the SDK, the team is able to perform membership checks on multiple multi-million member segments, achieving peak QPS 15K/s with a p99 latency of <1ms. With the new feature, they have been able to increase communication engagement and reduce the communication unsubscribe rate.

Experimentation Platform

Experimentation Platform powers experimentation across all Grab services. Segments are used heavily in experiments to determine a user’s experience. Prior to using the SDK, Experimentation Platform limited the maximum size of the segments that could be used to prevent exhausting a service’s memory.

After migrating to the new SDK, they were able to lift this restriction due to the compression efficiency of Roaring Bitmaps. Users are now able to use any segments as part of their experiment without worrying that it would require too much memory.

Closing

This blog post discussed the challenges that Segmentation Platform faced when scaling and how the team explored alternative storage and encoding techniques to improve segment creation time, while also achieving low latency reads. The SDK allows our teams to easily make use of segments without having to handle the details of caching, eviction, and updating of segments.

Moving forward, there are still existing use cases that are not able to use the Roaring Bitmap segments and thus continue to rely on segments from ScyllaDB. Therefore, the team is also taking steps to optimise and improve the scalability of our service and database.

Special thanks to Axel, the wider Segmentation Platform team, and Data Technology team for reviewing the post.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Unsupervised graph anomaly detection – Catching new fraudulent behaviours

Post Syndicated from Grab Tech original https://engineering.grab.com/graph-anomaly-model

Earlier in this series, we covered the importance of graph networks, graph concepts, graph visualisation, and graph-based fraud detection methods. In this article, we will discuss how to automatically detect new types of fraudulent behaviour and swiftly take action on them.

One of the challenges in fraud detection is that fraudsters are incentivised to always adversarially innovate their way of conducting frauds, i.e., their modus operandi (MO in short). Machine learning models trained using historical data may not be able to pick up new MOs, as they are new patterns that are not available in existing training data. To enhance Grab’s existing security defences and protect our users from these new MOs, we needed a machine learning model that is able to detect them quickly without the need for any label supervision, i.e., an unsupervised learning model rather than the regular supervised learning model.

To address this, we developed an in-house machine learning model for detecting anomalous patterns in graphs, which has led to the discovery of new fraud MOs. Our focus was initially on GrabFood and GrabMart verticals, where we monitored the interactions between consumers and merchants. We modelled these interactions as a bipartite graph (a type of graph for modelling interactions between two groups) and then performed anomaly detection on the graph. Our in-house anomaly detection model was also presented at the International Joint Conference on Neural Networks (IJCNN) 2023, a premier academic conference in the area of neural networks, machine learning, and artificial intelligence.

In this blog, we discuss the model and its application within Grab. For avid audiences that want to read the details of our model, you can access it here. Note that even though we implemented our model for anomaly detection in GrabFood and GrabMart, the model is designed for general purposes and is applicable to interaction graphs between any two groups.

Interaction-Focused Anomaly Detection on Bipartite Node-and-Edge-Attributed Graphs
By Rizal Fathony, Jenn Ng, Jia Chen
Presented at International Joint Conference on Neural Networks (IJCNN) 2023

Before we dive into how our model works, it is important to understand the process of graph construction in our application as the model assumes the availability of the graphs in a standardised format.

Graph construction 

We modelled the interactions between consumers and merchants in GrabFood and GrabMart platforms as bipartite graphs (G), where the first group of nodes (U) represents the consumers, the second group of nodes (V) represents the merchants, and the edges (E) connecting them means that the consumers have placed some food/mart orders to the merchants. The graph is also supplied with rich transactional information about the consumers and the merchants in the form of node features (Xu and Xv), as well as order information in the form of edge features (Xe).

Fig 1. Graph construction process

The goal of our anomaly model is to detect anomalous and suspicious behaviours from the consumers or merchants (node-level anomaly detection), as well as anomalous order interactions (edge-level anomaly detection). As mentioned, this detection needs to be done without any label supervision.

Model architecture

We designed our graph anomaly model as a type of autoencoder, with an encoder and two decoders – a feature decoder and a structure decoder. The key feature of our model is that it accepts a bipartite graph with both node and edge attributes as the input. This is important as both node and edge attributes encode essential information for determining if certain behaviours are suspicious. Many previous works on graph anomaly detection only support node attributes. In addition, our model can produce both node and edge level anomaly scores, unlike most of the previous works that produce node-level scores only. We named our model GraphBEAN, which is short for Bipartite Node-and-Edge-Attributed Networks.

From the input, the encoder then processes the attributed bipartite graph into a series of graph convolution layers to produce latent representations for both node groups. Our graph convolution layers produce new representations for each node in both node groups (U and V), as well as for each edge in the graph. Note that the last convolution layer in the encoder only produces the latent representations for nodes, without producing edge representations. The reason for this design is that we only put the latent representations for the active actors, the nodes representing consumers and merchants, but not their interactions.

Fig 2. GraphBEAN architecture

From the nodes’ latent representations, the feature decoder is tasked to reconstruct the original graph with both node and edge attributes via a series of graph convolution layers. As the graph structure is provided by the feature decoder, we task the structure decoder to learn the graph structure by predicting if there exists an edge connecting two nodes. This edge prediction, as well as the graph reconstructed by the feature decoder, are then compared to the original input graph via a reconstruction loss function.

The model is then trained using the bipartite graph constructed from GrabFood and GrabMart transactions. We use a reconstruction-based loss function as the training objective of the model. After the training is completed, we compute the anomaly score of each node and edge in the graph using the trained model.

Anomaly score computation

Our anomaly scores are reconstruction-based. The score design assumes that normal behaviours are common in the dataset and thus, can be easily reconstructed by the model. On the other hand, anomalous behaviours are rare. Therefore the model will have a hard time reconstructing them, hence producing high errors.

Fig 3. Edge-level and node-level anomaly scores computation

The model produces two types of anomaly scores. First, the edge-level anomaly scores, which are calculated from the edge reconstruction error. Second, the node-level anomaly scores, which are calculated from node reconstruction error plus an aggregate over the edge scores from the edges connected to the node. This aggregate could be a mean or max aggregate.

Actioning system

In our implementation of GraphBEAN within Grab, we designed a full pipeline of anomaly detection and actioning systems. It is a fully-automated system for constructing a bipartite graph from GrabFood and GrabMart transactions, training a GraphBEAN model using the graph, and computing anomaly scores. After computing anomaly scores for all consumers and merchants (node-level), as well as all of their interactions (edge-level), it automatically passes the scores to our actioning system. But before that, it also passes them through a system we call fraud type tagger. This is also a fully-automated heuristic-based system that tags some of the detected anomalies with some fraud tags. The purpose of this tagging is to provide some context in general, like the types of detected anomalies. Some examples of these tags are promo abuse or possible collusion.

Fig 4. Pipeline in our actioning system

Both the anomaly scores and the fraud type tags are then forwarded to our actioning system. The system consists of two subsystems:

  • Human expert actioning system: Our fraud experts analyse the detected anomalies and perform certain actioning on them, like suspending certain transaction features from suspicious merchants.
  • Automatic actioning system: Combines the anomaly scores and fraud type tags with other external signals to automatically do actioning on the detected anomalies, like preventing promos from being used by fraudsters or preventing fraudulent transactions from occurring. These actions vary depending on the type of fraud and the scores.

What’s next?

The GraphBEAN model enables the detection of suspicious behaviour on graph data without the need for label supervision. By implementing the model on GrabFood and GrabMart platforms, we learnt that having such a system enables us to quickly identify new types of fraudulent behaviours and then swiftly perform action on them. This also allows us to enhance Grab’s defence against fraudulent activity and actively protect our users.

We are currently working on extending the model into more generic heterogeneous (multi-entity) graphs. In addition, we are also working on implementing it to more use cases within Grab.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Zero traffic cost for Kafka consumers

Post Syndicated from Grab Tech original https://engineering.grab.com/zero-traffic-cost

Introduction

Coban, Grab’s real-time data streaming platform team, has been building an ecosystem around Kafka, serving all Grab verticals. Along with stability and performance, one of our priorities is also cost efficiency.

In this article, we explain how the Coban team has substantially reduced Grab’s annual cost for data streaming by enabling Kafka consumers to fetch from the closest replica.

Problem statement

The Grab platform is primarily hosted on AWS cloud, located in one region, spanning over three Availability Zones (AZs). When it comes to data streaming, both the Kafka brokers and Kafka clients run across these three AZs.

Figure 1 – Initial design, consumers fetching from the partition leader

Figure 1 shows the initial design of our data streaming platform. To ensure high availability and resilience, we configured each Kafka partition to have three replicas. We have also set up our Kafka clusters to be rack-aware (i.e. 1 “rack” = 1 AZ) so that all three replicas reside in three different AZs.

The problem with this design is that it generates staggering cross-AZ network traffic. This is because, by default, Kafka clients communicate only with the partition leader, which has a 67% probability of residing in a different AZ.

This is a concern as we are charged for cross-AZ traffic as per AWS’s network traffic pricing model. With this design, our cross-AZ traffic amounted to half of the total cost of our Kafka platform.

The Kafka cross-AZ traffic for this design can be broken down into three components as shown in Figure 1:

  • Producing (step 1): Typically, a single service produces data to a given Kafka topic. Cross-AZ traffic occurs when the producer does not reside in the same AZ as the partition leader it is producing data to. This cross-AZ traffic cost is minimal, because the data is transferred to a different AZ at most once (excluding retries).
  • Replicating (step 2): The ingested data is replicated from the partition leader to the two partition followers, which reside in two other AZs. The cost of this is also relatively small, because the data is only transferred to a different AZ twice.
  • Consuming (step 3): Most of the cross-AZ traffic occurs here because there are many consumers for a single Kafka topic. Similar to the producers, the consumers incur cross-AZ traffic when they do not reside in the same AZ as the partition leader. However, on the consuming side, cross-AZ traffic can occur as many times as there are consumers (on average, two-thirds of the number of consumers). The solution described in this article addresses this particular component of the cross-AZ traffic in the initial design.

Solution

Kafka 2.3 introduced the ability for consumers to fetch from partition replicas. This opens the door to a more cost-efficient design.

Figure 2 – Target design, consumers fetching from the closest replica

Step 3 of Figure 2 shows how consumers can now consume data from the replica that resides in their own AZ. Implementing this feature requires rack-awareness and extra configurations for both the Kafka brokers and consumers. We will describe this in the following sections.

The Coban journey

Kafka upgrade

Our journey started with the upgrade of our legacy Kafka clusters. We decided to upgrade them directly to version 3.1, in favour of capturing bug fixes and optimisations over version 2.3. This was a safe move as version 3.1 was deemed stable for almost a year and we projected no additional operational cost for this upgrade.

To perform an online upgrade with no disruptions for our users, we broke down the process into three stages.

  • Stage 1: Upgrading Zookeeper. All versions of Kafka are tested by the community with a specific version of Zookeeper. To ensure stability, we followed this same process. The upgraded Zookeeper would be backward compatible with the pre-upgrade version of Kafka which was still in use at this early stage of the operation.
  • Stage 2: Rolling out the upgrade of Kafka to version 3.1 with an explicit backward-compatible inter-broker protocol version (inter.broker.protocol.version). During this progressive rollout, the Kafka cluster is temporarily composed of brokers with heterogeneous Kafka versions, but they can communicate with one another because they are explicitly set up to use the same inter-broker protocol version. At this stage, we also upgraded Cruise Control to a compatible version, and we configured Kafka to import the updated cruise-control-metrics-reporter JAR file on startup.
  • Stage 3: Upgrading the inter-broker protocol version. This last stage makes all brokers use the most recent version of the inter-broker protocol. During the progressive rollout of this change, brokers with the new protocol version can still communicate with brokers on the old protocol version.

Configuration

Enabling Kafka consumers to fetch from the closest replica requires a configuration change on both Kafka brokers and Kafka consumers. They also need to be aware of their AZ, which is done by leveraging Kafka rack-awareness (1 “rack” = 1 AZ).

Brokers

In our Kafka brokers’ configuration, we already had broker.rack set up to distribute the replicas across different AZs for resiliency. Our Ansible role for Kafka automatically sets it with the AZ ID that is dynamically retrieved from the EC2 instance’s metadata at deployment time.

- name: Get availability zone ID
  uri:
    url: http://169.254.169.254/latest/meta-data/placement/availability-zone-id
    method: GET
    return_content: yes
  register: ec2_instance_az_id

Note that we use AWS AZ IDs (suffixed az1, az2, az3) instead of the typical AWS AZ names (suffixed 1a, 1b, 1c) because the latter’s mapping is not consistent across AWS accounts.

Also, we added the new replica.selector.class parameter, set with value org.apache.kafka.common.replica.RackAwareReplicaSelector, to enable the new feature on the server side.

Consumers

On the Kafka consumer side, we mostly rely on Coban’s internal Kafka SDK in Golang, which streamlines how service teams across all Grab verticals utilise Coban Kafka clusters. We have updated the SDK to support fetching from the closest replica.

Our users only have to export an environment variable to enable this new feature. The SDK then dynamically retrieves the underlying host’s AZ ID from the host’s metadata on startup, and sets a new client.rack parameter with that information. This is similar to what the Kafka brokers do at deployment time.

We have also implemented the same logic for our non-SDK consumers, namely Flink pipelines and Kafka Connect connectors.

Impact

We rolled out fetching from the closest replica at the turn of the year and the feature has been progressively rolled out on more and more Kafka consumers since then.

Figure 3 – Variation of our cross-AZ traffic before and after enabling fetching from the closest replica

Figure 3 shows the relative impact of this change on our cross-AZ traffic, as reported by AWS Cost Explorer. AWS charges cross-AZ traffic on both ends of the data transfer, thus the two data series. On the Kafka brokers’ side, less cross-AZ traffic is sent out, thereby causing the steep drop in the dark green line. On the Kafka consumers’ side, less cross-AZ traffic is received, causing the steep drop in the light green line. Hence, both ends benefit by fetching from the closest replica.

Throughout the observeration period, we maintained a relatively stable volume of data consumption. However, after three months, we observed a substantial 25% drop in our cross-AZ traffic compared to December’s average. This reduction had a direct impact on our cross-AZ costs as it directly correlates with the cross-AZ traffic volume in a linear manner.

Caveats

Increased end-to-end latency

After enabling fetching from the closest replica, we have observed an increase of up to 500ms in end-to-end latency, that comes from the producer to the consumers. Though this is expected by design, it makes this new feature unsuitable for Grab’s most latency-sensitive use cases. For these use cases, we retained the traditional design whereby consumers fetch directly from the partition leaders, even when they reside in different AZs.

Figure 4 – End-to-end latency (99th percentile) of one of our streams, before and after enabling fetching from the closest replica

Inability to gracefully isolate a broker

We have also verified the behaviour of Kafka clients during a broker rotation; a common maintenance operation for Kafka. One of the early steps of our corresponding runbook is to demote the broker that is to be rotated, so that all of its partition leaders are drained and moved to other brokers.

In the traditional architecture design, Kafka clients only communicate with the partition leaders, so demoting a broker gracefully isolates it from all of the Kafka clients. This ensures that the maintenance is seamless for them. However, by fetching from the closest replica, Kafka consumers still consume from the demoted broker, as it keeps serving partition followers. When the broker effectively goes down for maintenance, those consumers are suddenly disconnected. To work around this, they must handle connection errors properly and implement a retry mechanism.

Potentially skewed load

Another caveat we have observed is that the load on the brokers is directly determined by the location of the consumers. If they are not well balanced across all of the three AZs, then the load on the brokers is similarly skewed. At times, new brokers can be added to support an increasing load on an AZ. However, it is undesirable to remove any brokers from the less loaded AZs as more consumers can suddenly relocate there at any time. Having these additional brokers and underutilisation of existing brokers on other AZs can also impact cost efficiency.

Figure 5 – Average CPU utilisation by AZ of one of our critical Kafka clusters

Figure 5 shows the CPU utilisation by AZ for one of our critical Kafka clusters. The skewage is visible after 01/03/2023. To better manage this skewage in load across AZs, we have updated our SDK to expose the AZ as a new metric. This allows us to monitor the skewness of the consumers and take measures proactively, for example, moving some of them to different AZs.

What’s next?

We have implemented the feature to fetch from the closest replica on all our Kafka clusters and all Kafka consumers that we control. This includes internal Coban pipelines as well as the managed pipelines that our users can self-serve as part of our data streaming offering.

We are now evangelising and advocating for more of our users to adopt this feature.

Beyond Coban, other teams at Grab are also working to reduce their cross-AZ traffic, notably, Sentry, the team that is in charge of Grab’s service mesh.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Go module proxy at Grab

Post Syndicated from Grab Tech original https://engineering.grab.com/go-module-proxy

At Grab, we rely heavily on a large Go monorepo for backend development, which offers benefits like code reusability and discoverability. However, as we continue to grow, managing a large monorepo brings about its own set of unique challenges.

As an example, using Go commands such as go get and go list can be incredibly slow when fetching Go modules residing in a large multi-module repository. This sluggishness takes a toll on developer productivity, burdens our Continuous Integration (CI) systems, and strains our Version Control System host (VCS), GitLab.

In this blog post, we look at how Athens, a Go module proxy, helps to improve the overall developer experience of engineers working with a large Go monorepo at Grab.

Key highlights

  • We reduced the time of executing the go get command from ~18 minutes to ~12 seconds when fetching monorepo Go modules.
  • We scaled in and scaled down our entire Athens cluster by 70% by utilising the fallback network mode in Athens along with Golang’s GOVCS mode, resulting in cost savings and enhanced efficiency.

Problem statements and solutions

1. Painfully slow performance of Go commands

Problem summary: Running the go get command in our monorepo takes a considerable amount of time and can lead to performance degradation in our VCS.

When working with the Go programming language, go get is one of the most common commands that you’ll use every day. Besides developers, this command is also used by CI systems.

What does go get do?

The go get command is used to download and install packages and their dependencies in Go. Note that it operates differently depending on whether it is run in legacy GOPATH mode or module-aware mode. In Grab, we’re using the module-aware mode in a multi-module repository setup.

Every time go get is run, it uses Git commands, like git ls-remote, git tag, git fetch, etc, to search and download the entire worktree. The excessive use of these Git commands on our monorepo contributes to the long processing time and can be strenuous to our VCS.

How big is our monorepo?

To fully grasp the challenges faced by our engineering teams, it’s crucial to understand the vast scale of the monorepo that we work with daily. For this, we use git-sizer to analyse our monorepo.

Here’s what we found:

  • Overall repository size: The monorepo has a total uncompressed size of 69.3 GiB, a fairly substantial figure. To put things into perspective, the Linux kernel repository, known for its vastness, currently stands at 55.8 GiB.
  • Trees: The total number of trees is 3.21M and tree entries are 99.8M, which consume 3.65 GiB. This may cause performance issues during some Git operations.
  • References: Totalling 10.7k references.
  • Biggest checkouts: There are 64.7k directories in our monorepo. This affects operations like git status and git checkout. Moreover, our monorepo has a maximum path depth of 20. This contributes to a slow processing time on Git and negatively impacts developer experience. The number of files (354k) and the total size of files (5.08 GiB) are also concerns due to their potential impact on the repository’s performance.

To draw a comparison, refer to the git-sizer output of the Linux repository.

How slow is “slow”?

To illustrate the issue further, we will compare the time taken for various Go commands to fetch a single module in our monorepo at a 10 MBps download speed.

This is an example of how a module is structured in our monorepo:

gitlab.company.com/monorepo/go
  |-- go.mod
  |-- commons/util/gk
        |-- go.mod
Go commands GOPROXY Previously cached? Description Result (time taken)
go get -x gitlab.company.com/monorepo/go/commons/util/gk proxy.golang.org,direct Yes Download and install the latest version of the module. This is a common scenario that developers often encounter. 18:50.71 minutes
go get -x gitlab.company.com/monorepo/go/commons/util/gk proxy.golang.org,direct No Download and install the latest version of the module without any module cache 1:11:54.56 hour
go list -x -m -json -versions gitlab.company.com/monorepo/go/util/gk proxy.golang.org,direct Yes List information about the module 3.873 seconds
go list -x -m -json -versions gitlab.company.com/monorepo/go/util/gk proxy.golang.org,direct No List information about the module without any module cache 3:18.58 minutes

In this example, using go get to fetch a module took over 18 minutes to complete. If we needed to retrieve more than one module in our monorepo, it can be incredibly time-consuming.

Why is it slow in a monorepo?

In a large Go monorepo, go get commands can be slow due to several factors:

  1. Large number of files and directories: When running go get, the command needs to search and download the entire worktree. In a large multi-module monorepo, the vast number of files and directories make this search process very expensive and time-consuming.
  2. Number of refs: A large number of refs (branches or tags) in our monorepo can affect performance. Ref advertisements (git ls-remote), which contain every ref in our monorepo, are the first phase in any remote Git operation, such as git clone or git fetch. With a large number of refs, performance takes a hit when performing these operations.
  3. Commit history traversal: Operations that need to traverse a repository’s commit history and consider each ref will be slow in a monorepo. The larger the monorepo, the more time-consuming these operations become.

The consequences: Stifled productivity and strained systems

Developers and CI

When Go command operations like go get are slow, they contribute to significant delays and inefficiencies in software development workflows. This leads to reduced productivity and demotivated developers.

Optimising Go command operations’ speed is crucial to ensure efficient software development workflows and high-quality software products.

Version Control System

It’s also worth noting that overusing go get commands can also lead to performance issues for VCS. When Go packages are frequently downloaded using go get, we saw that it caused a bottleneck in our VCS cluster, which can lead to performance degradation or even cause rate-limiting queue issues.

This negatively impacts the performance of our VCS infrastructure, causing delays or sometimes unavailability for some users and CI.

Solution: Athens + fallback Network Mode + GOVCS + Custom Cache Refresh Solution

Problem summary: Speed up go get command by not fetching from our VCS

We addressed the speed issue by using Athens, a proxy server for Go modules (read more about the GOPROXY protocol).

How does Athens work?

The following sequence diagram describes the default flow of go get command with Athens.

Athens uses a storage system for Go module packages, which can also be configured to use various storage systems such as Amazon S3, and Google Cloud Storage, among others.

By caching these module packages in storage, Athens can serve the packages directly from storage rather than requesting them from an upstream VCS while serving Go commands such as go mod download and certain go build modes. However, just using a Go module proxy didn’t fully resolve our issue since the go get and go list commands still hit our VCS through the proxy.

With this in mind, we thought “what if we could just serve the Go modules directly from Athens’ storage for go get?” This question led us to discover Athens network mode.

What is Athens network mode?

Athens NetworkMode configures how Athens will return the results of the Go commands. It can be assembled from both its own storage and the upstream VCS. As of Athens v0.12.1, it currently supports these 3 modes:

  1. strict: merge VCS versions with storage versions, but fail if either of them fails.
  2. offline: only get storage versions, never reach out to VCS.
  3. fallback: only return storage versions, if VCS fails. Fallback mode does the best effort of giving you what’s available at the time of requesting versions.

Our Athens clusters were initially set to use strict network mode, but this was not ideal for us. So we explored the other network modes.

Exploring offline mode

We initially sought to explore the idea of putting Athens in offline network mode, which would allow Athens to serve Go requests only from its storage. This concept aligned with our aim of reducing VCS hits while also leading to significant performance improvement in Go workflows.

However in practice, it’s not an ideal approach. The default Athens setup (strict mode) automatically updates the module version when a user requests a new module version. Nevertheless, switching Athens to offline mode would disable the automatic updates as it wouldn’t connect to the VCS.

Custom cache refresh solution

To solve this, we implemented a CI pipeline that refreshes Athens’ module cache whenever a new module is released in our monorepo. Employing this with offline mode made Athens effective for the monorepo but it resulted in the loss of automatic updates for other repositories

Restoring this feature requires applying our custom cache refresh solution to all other Go repositories. However, implementing this workaround can be quite cumbersome and significant additional time and effort. We decided to look for another solution that would be easier to maintain in the long run.

A balanced approach: fallback Mode and GOVCS

This approach builds upon our aforementioned custom cache refresh which is specifically designed for the monorepo.

We came across the GOVCS environment variable, which we use in combination with the fallback network mode to effectively put only the monorepo in “offline” mode.

When GOVCS is set to gitlab.company.com/monorepo/go:off, Athens encounters an error whenever it tries to fetch modules from VCS:

gitlab.company.com/monorepo/go/commons/util/[email protected]: unrecognized import path "gitlab.company.com/monorepo/go/commons/util/gk": GOVCS disallows using git for private gitlab.company.com/monorepo/go; see 'go help vcs'

If Athens network mode is set to strict, Athens returns 404 errors to the user. By switching to fallback mode, Athens tries to retrieve the module from its storage if a GOVCS failure occurs.

Here’s the updated Athens configuration (example default config):

GoBinaryEnvVars = ["GOPROXY=direct", 
"GOPRIVATE=gitlab.company.com", 
"GOVCS=gitlab.company.com/monorepo/go:off"]

NetworkMode = "fallback"

With the custom cache refresh solution coupled with this approach, we not only accelerate the retrieval of Go modules within the monorepo but also allow for automatic updates for non-monorepo Go modules.

Final results

This solution resulted in a significant improvement in the performance of Go commands for our developers. With Athens, the same command is completed in just ~12 seconds (down from ~18 minutes), which is impressively fast.

Go commands GOPROXY Previously cached? Description Result (time taken)
go get -x gitlab.company.com/monorepo/go/commons/util/gk goproxy.company.com Yes Download and install the latest version of the module. This is a common scenario that developers often encounter. 11.556 seconds
go get -x gitlab.company.com/monorepo/go/commons/util/gk goproxy.company.com No Download and install the latest version of the module without any module cache 1:05.60 minutes
go list -x -m -json -versions gitlab.company.com/monorepo/go/util/gk goproxy.company.com Yes List information about the monorepo module 0.592 seconds
go list -x -m -json -versions gitlab.company.com/monorepo/go/util/gk goproxy.company.com No List information about the monorepo module without any module cache 1.023 seconds
Average cluster CPU utlisation
Average cluster memory utlisation

In addition, this change to our Athens cluster also leads to substantial reduction in average cluster CPU and memory utilisation. This also enabled us to scale in and scale down our entire Athens cluster by 70%, resulting in cost savings and enhanced efficiency. On top of that, we were also able to effectively eliminate VCS’s rate-limiting issues while making the monorepo’s command operation considerably faster.

2. Go modules in GitLab subgroups

Problem summary: Go modules are unable to work natively with private or internal repositories under GitLab subgroups.

When it comes to managing code repositories and packages, GitLab subgroups and Go modules have become an integral part of the development process at Grab. Go modules help to organise and manage dependencies, and GitLab subgroups provide an additional layer of structure to group related repositories together.

However, a common issue when using Go modules is that they do not work natively with private or internal repositories under a GitLab subgroup (see this GitHub issue).

For example, using go get to retrieve a module from gitlab.company.com/gitlab-org/subgroup/repo will result in a failure. This problem is not specific to Go modules, all repositories under the subgroup will face the same issue.

A cumbersome workaround

To overcome this issue, we had to use workarounds. One workaround is to authenticate the HTTPS calls to GitLab by adding authentication details to the .netrc file on your machine.

The following lines can be added to the .netrc file:

machine gitlab.company.com
    login [email protected]
    password <personal-access-token>

In our case, we are using a Personal Access Token (PAT) since we have 2FA enabled. If 2FA is not enabled, the GitLab password can be used instead. However, this approach would mean configuring the .netrc file in the CI environments as well as on the machine of every Go developer.

Solution: Athens + .netrc

A feasible solution is to set up the .netrc file in the Go proxy server. This method eliminates the need for N number of developers to configure their own .netrc files. Instead, the responsibility for this task is delegated to the Go proxy server.

3. Sharing common libraries

Problem summary: Distributing internal common libraries within a monorepo without granting direct repository access can be challenging.

At Grab, we work with various cross-functional teams, and some could have distinct network access like different VPNs. This adds complexity to sharing our monorepo’s internal common libraries with them. To maintain the security and integrity of our monorepo, we use a Go proxy for controlled access to necessary libraries.

The key difference between granting direct access to the monorepo via VCS and using a Go proxy is that the former allows users to read everything in the repository, while the latter enables us to grant access only to the specific libraries users need within the monorepo. This approach ensures secure and efficient collaboration across diverse network configurations.

Without Go module proxy

Without Athens, we would need to create a separate repository to store the code we want to share and then use a build system to automatically mirror the code from the monorepo to the public repository.

This process can be cumbersome and lead to inconsistencies in code versions between the two repositories, ultimately making it challenging to maintain the shared libraries.

Furthermore, copying code can lead to errors and increase the risk of security breaches by exposing confidential or sensitive information.

Solution: Athens + Download Mode File

To tackle this problem statement, we utilise Athens’ download mode file feature using an allowlist approach to specify which repositories can be downloaded by users.

Here’s an example of the Athens download mode config file:

downloadURL = "https://proxy.golang.org"

mode = "sync"

download "gitlab.company.com/repo/a" {
    mode = "sync"
}

download "gitlab.company.com/repo/b" {
    mode = "sync"
}

download "gitlab.company.com/*" {
    mode = "none"
}

In the configuration file, we specify allowlist entries for each desired repo, including their respective download modes. For example, in the snippet above, repo/a and repo/b are allowed (mode = “sync”), while everything else is blocked using mode = “none”.

Final results

By using Athens’ download mode feature in this case, the benefits are clear. Athens provides a secure, centralised place to store Go modules. This approach not only provides consistency but also improves maintainability, as all code versions are managed in one single location.

Additional benefits of Go proxy

As we’ve touched upon the impressive results achieved by implementing Athens Go proxy at Grab, it’s crucial to explore the supplementary advantages that accompany this powerful solution.

These unsung benefits, though possibly overlooked, play a vital role in enriching the overall developer experience at Grab and promoting more robust software development practices:

  1. Module immutability: ​​As the software world continues to face issues around changing or disappearing libraries, Athens serves as a useful tool in mitigating build disruptions by providing immutable storage for copied VCS code. The use of a Go proxy also ensures that builds remain deterministic, improving consistency across our software.
  2. Uninterrupted development: Athens allows users to fetch dependencies even when VCS is down, ensuring continuous and seamless development workflows.
  3. Enhanced security: Athens offers access control by enabling the blocking of specific packages within Grab. This added layer of security protects our work against potential risks from malicious third-party packages.
  4. Vendor directory removal: Athens prepares us for the eventual removal of the vendor directory, fostering faster workflows in the future.

What’s next?

Since adopting Athens as a Go module proxy, we have observed considerable benefits, such as:

  1. Accelerated Go command operations
  2. Reduced infrastructure costs
  3. Mitigated VCS load issues

Moreover, its lesser-known advantages like module immutability, uninterrupted development, enhanced security, and vendor directory transition have also contributed to improved development practices and an enriched developer experience for Grab engineers.

Today, the straightforward process of exporting three environment variables has greatly influenced our developers’ experience at Grab.

export GOPROXY="goproxy.company.com|proxy.golang.org,direct"

export GONOSUMDB="gitlab.company.com"

export GONOPROXY="none"

At Grab, we are always looking for ways to improve and optimise the way we work, so we contribute to open-sourced projects like Athens, where we help with bug fixes. If you are interested in setting up a Go module proxy, do give Athens (github.com/gomods/athens) a try!

Special thanks to Swaminathan Venkatraman, En Wei Soh, Anuj More, Darius Tan, and Fernando Christyanto for contributing to this project and this article.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

PII masking for privacy-grade machine learning

Post Syndicated from Grab Tech original https://engineering.grab.com/pii-masking

At Grab, data engineers work with large sets of data on a daily basis. They design and build advanced machine learning models that provide strategic insights using all of the data that flow through the Grab Platform. This enables us to provide a better experience to our users, for example by increasing the supply of drivers in areas where our predictive models indicate a surge in demand in a timely fashion.

Grab has a mature privacy programme that complies with applicable privacy laws and regulations and we use tools to help identify, assess, and appropriately manage our privacy risks. To ensure that our users’ data are well-protected and avoid any human-related errors, we always take extra measures to secure this data.

However, data engineers will still require access to actual production data in order to tune effective machine learning models and ensure the models work as intended in production.

In this article, we will describe how the Grab’s data streaming team (Coban), along with the data platform and user teams, have enforced Personally Identifiable Information (PII) masking on machine learning data streaming pipelines. This ensures that we uphold a high standard and embody a privacy by design culture, while enabling data engineers to refine their models with sanitised production data.

PII tagging

Data streaming at Grab leverages the Protocol Buffers (protobuf) data format to structure in-transit data. When creating a new stream, developers must describe its fields in a protobuf schema that is then used for serialising the data wherever it is sent over the wire, and deserialising it wherever it is consumed.

A fictional example schema looks like this (the indexes are arbitrary, but commonly created in sequence):

message Booking {
  string bookingID = 1;
  int64 creationTime = 2;
  int64 passengerID = 3;
  string passengerName = 4;
  ... truncated output ...
}

Over here, the fourth field passengerName involves a PII and the data pertaining to that field should never be accessible by any data engineer. Therefore, developers owning the stream must tag that field with a PII label like this:

import "streams/coban/options/v1/pii.proto";

message Booking {
  string bookingID = 1;
  int64 creationTime = 2;
  int64 passengerID = 3;
  string passengerName = 4 [(streams.coban.options.v1.pii_type) = PII_TYPE_NAME];
  ... truncated output ...
}

The imported pii.proto library defines the tags for all possible types of PII. In the example above, the passengerName field has not only been flagged as PII, but is also marked as PII_TYPE_NAME – a specific type of PII that conveys the names of individuals. This high-level typing enables more flexible PII masking methods, which we will explain later.

Once the PII fields have been properly identified and tagged, developers need to publish the schema of their new stream into Coban’s Git repository. A Continuous Integration (CI) pipeline described below ensures that all fields describing PII are correctly tagged.

The following diagram shows this CI pipeline in action.

Fig. 1 CI pipeline failure due to untagged PII fields

When a developer creates a Merge Request (MR) or pushes a new commit to create or update a schema (step 1), the CI pipeline is triggered. It runs an in-house Python script that scans each variable name of the committed schema and tests it against an extensive list of PII keywords that is regularly updated, such as name, address, email, phone, etc (step 2). If there is a match and the variable is not tagged with the expected PII label, the pipeline fails (step 3) with an explicit error message in the CI pipeline’s output, similar to this:

Field name [Booking.passengerName] should have been marked with type streams.coban.options.v1.pii_type = PII_TYPE_NAME

There are cases where a variable name in the schema is a partial match against a PII keyword but is legitimately not a PII – for example, carModelName is a partial match against name but does not contain PII data. In this case, the developer can choose to add it to a whitelist to pass the CI.

However, modifying the whitelist requires approval from the Coban team for verification purposes. Apart from this particular case, the requesting team can autonomously approve their MR in a self-service fashion.

Now let us look at an example of a successful CI pipeline execution.

Fig. 2 CI pipeline success and schema publishing

In Fig. 2, the committed schema (step 1) is properly tagged so our in-house Python script is unable to find any untagged PII fields (step 2). The MR is approved by a code owner (step 3), then merged to the master branch of the repository (step 4).

Upon merging, another CI pipeline is triggered to package the protobuf schema in a Java Archive (JAR) of Scala classes (step 5), which in turn is stored into a package registry (step 6). We will explain the reason for this in a later section.

Production environment

With the schemas published and all of their PII fields properly tagged, we can now take a look at the data streaming pipelines.

Fig. 3 PII flow in the production environment

In this example, the user generates data by interacting with the Grab superapp and making a booking (step 1). The booking service, compiled with the stream’s schema definition, generates and produces Kafka records for other services to consume (step 2). Among those consuming services are the production machine learning pipelines that are of interest to this article (step 3).

PII is not masked in this process because it is actually required by the consuming services. For example, the driver app needs to display the passenger’s actual name, so the driver can confirm their identity easily.

At this part of the process, this is not much of a concern because access to the sacrosanct production environment is highly restricted and monitored by Grab.

PII masking

To ensure the security, stability, and privacy of our users, data engineers who need to tune their new machine learning models based on production data are not granted access to the production environment. Instead, they have access to the staging environment, where production data is mirrored and PII is masked.

Fig. 4 PII masking pipeline from the production environment to the staging environment

The actual PII masking is performed by an in-house Flink application that resides in the production environment. Flink is a reference framework for data streaming that we use extensively. It is also fault tolerant, with the ability to restart from a checkpoint.

The Flink application is compiled along with the JAR containing the schema as Scala classes previously mentioned. Therefore, it is able to consume the original data as a regular Kafka consumer (step 1). It then dynamically masks the PII of the consumed data stream, based on the PII tags of the schema (step 2). Ultimately, it produces the sanitised data to the Kafka cluster in the staging environment as a normal Kafka producer (step 3).

Depending on the kind of PII, there are several methods of masking such as:

  • Names and strings of characters: They are replaced by consistent HMAC (Hash-based message authentication code). A HMAC is a digest produced by a one-way cryptographic hash function that takes a secret key as a parameter. Leveraging a secret key here is a defence against chosen plaintext attacks, i.e. computing the digest of a particular plaintext, like a targeted individual’s name.
  • Numbers and dates: Similarly, they are transformed in a consistent manner, by leveraging a random generator that takes the unmasked value as a seed, so that the same PII input consistently produces the same masked output.

Note that consistency is a recurring pattern. This is because it is a key requirement for certain machine learning models.

This sanitised data produced to the Kafka cluster in the staging environment is then consumed by the staging machine learning pipelines (step 4). There, it is used by data engineers to tune their models effectively with near real-time production data (step 5).

The Kafka cluster in the staging environment is secured with authorisation and authentication (see Zero Trust with Kafka). This is an extra layer of security in case some PII data inadvertently fall through the cracks of PII tagging, following the defence in depth principle.

Finally, whenever a new PII-tagged field is added to a schema, the PII masking Flink application needs to be compiled and deployed again. If the schema is not updated, the Flink pipeline is unable to decode this new field when deserialising the stream. Thus, the added field is just dropped and the new PII data does not make it to the staging environment.

What’s next?

For the immediate next steps, we are going to enhance this design with an in-house product based on AWS Macie to automatically detect the PII that would have fallen through the cracks. Caspian, Grab’s data lake team and one of Coban’s sister teams, has built a service that is already able to detect PII data in relational databases and data lake tables. It is currently being adapted for data streaming.

In the longer run, we are committed to taking our privacy by design posture to the next level. Indeed, the PII masking described in this article does not prevent a bad actor from retrieving the consistent hash of a particular individual based on their non-PII data. For example, the target might be identifiable by a signature in the masked data set, such as unique food or transportation habits.

A possible counter-measure could be one or a combination of the following techniques, ordered by difficulty of implementation:

  • Data minimisation: Non-essential fields in the data stream should not be mirrored at all. E.g. fields of the data stream that are not required by the data engineers to tune their models. We can introduce a dedicated tag in the schema to flag those fields and instruct the mirroring pipeline to drop them. This is the most straightforward approach.
  • Differential privacy: The mirroring pipeline could introduce some noise in the mirrored data, in a way that would obfuscate the signatures of particular individuals while still preserving the essential statistical properties of the dataset required for machine learning. It happens that Flink is a suitable framework to do so, as it can split a stream into multiple windows and apply computation over those windows. Designing and generalising a logic that meets the objective is challenging though.
  • PII encryption at source: PII could be encrypted by the producing services (like the booking service), and dynamically decrypted where plaintext values are required. However, key management and performance are two tremendous challenges of this approach.

We will explore these techniques further to find the solution that works best for Grab and ensures the highest level of privacy for our users.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!