Tag Archives: Event Processing

Abacus – Issuing points for multiple sources

Post Syndicated from Grab Tech original https://engineering.grab.com/abacus-issuing-points-for-multiple-sources

Introduction

Earlier in 2021 we published an article on Trident, Grab’s in-house real-time if this, then that (IFTTT) engine which manages campaigns for the Grab Loyalty Programme. The Grab Loyalty Programme encourages consumers to make Grab transactions by rewarding points when transactions are made. Grab rewards two types of points namely OVOPoints and GrabRewards Points (GRP). OVOPoints are issued for transactions made in Indonesia and GRP are for the transactions that are made in all other markets. In this article, the term GRP will be used to refer to both OVOPoints and GrabRewards Points.

Rewarding GRP is one of the main components of the Grab Loyalty Programme. By rewarding GRP, our consumers are incentivised to transact within the Grab ecosystem. Consumers can then redeem their GRP for a range of exciting items on the GrabRewards catalogue or to offset the cost of their spendings.

As we continue to grow our consumer base and our product offerings, a more robust platform is needed to ensure successful points transactions. In this post, we will share the challenges in rewarding GRP and how Abacus, our Point Issuance platform helps to overcome these challenges while managing various use cases.

Challenges

Growing number of products

The number of Grab’s product offerings has grown as part of Grab’s goal in becoming a superapp. The demand for rewarding GRP increased as each product team looked for ways to retain consumer loyalty. For this, we needed a platform which could support the different requirements from each product team.

External partnerships

Grab’s external partnerships consist of both one- and two-way point exchanges. With selected partners, Grab users are able to convert their GRP for the partner’s loyalty programme points, and the other way around.

Use cases

Besides the need to cater for the growing number of products and external partnerships, Grab needed a centralised points management system which could cater to various use cases of points rewarding. Let’s take a look at the use cases.

Any product, any points

There are many products in Grab and each product should be able to reward different GRP for different scenarios. Each product rewards GRP based on the goal they are trying to achieve.

The following examples illustrate the different scenarios:

GrabCar: Reward 100 GRP for when a driver cancels a booking as a form of compensation or to reward GRP for every ride a consumer makes.

GrabFood: Reward consumers for each meal order.

GrabPay: Reward consumers three times the number of GRP for using GrabPay instead of cash as the mode of payment.

More points for loyal consumers

Another use case is to reward loyal consumers with more points. This incentivises consumers to transact within the Grab ecosystem. One example are membership tiers granted based on the number of GRP a consumer has accumulated. There are four membership tiers: Member, Silver, Gold and Platinum.

Point multiplier
Point multiplier

There are different points multipliers for different membership tiers. For example, a Gold member would earn 2.25 GRP for every dollar spent while a Silver member earns only 1.5 GRP for the same amount spent. A consumer can view their membership tier and GRP information from the account page on the Grab app.

GrabRewards Points and membership tier information
GrabRewards Points and membership tier information

Growing number of transactions

Teams within Grab and external partners use GRP in their business. There is a need for a platform that can process millions of transactions every day with high availability rates. Errors can easily impact the issuance of points which may affect our consumers’ trust.

Our solution – Abacus

To overcome the challenges and cater for various use cases, we developed a Points Management System known as Abacus. It offers an interface for external partners with the capability to handle millions of daily transactions without significant downtime.

Points rewarding

There are seven main components of Abacus as shown in the following architectural diagram. Details of each component are explained in this section.

Abacus architecture
Abacus architecture

Transaction input source

The points rewarding process begins when a transaction is complete. Abacus listens to streams for completed transactions on the Grab platform. Each transaction that abacus receives in the stream carries the data required to calculate the GRP to be rewarded such as country ID, product ID, and payment ID etc.

Apart from computing the number of GRP to be rewarded for a transaction and then rewarding the points, Abacus also allows clients from within the Grab platform and outside of the Grab platform to make an API call to reward GRP to consumers. The client who wants to reward their consumers with GRP will call Abacus with either a specific point value (for example 100 points) or will provide the necessary details like transaction amount and the relevant multipliers for Abacus to compute the points and then reward them.

Point Calculation module

The Point Calculation module calculates the GRP using the data and multipliers that are unique to each transaction.

Point Calculation dependencies for internal services

Point Calculation dependencies are the multipliers needed to calculate the number of points. The Point Calculation module fetches the correct point multipliers for each transaction. The multipliers are configured by specific country teams when the product is launched. They may vary by country to allow country teams the flexibility to achieve their growth and retention targets. There are different types of multipliers.

Vertical multiplier: The multiplier for each vertical. A vertical is a service or product offered by Grab. Examples of verticals are GrabCar and GrabFood. The multiplier can be different for each vertical.

EPPF multiplier: The effective price per fare multiplier. EPPF is the reference conversion rate per point. For example:

  • EPPF = 1.0; if you are issuing X points per SGD1

  • EPPF = 0.1; if you are issuing X points per THB10

  • EPPF = 0.0001; if you are issuing X points per IDR10,000

Payment Type multiplier: The multiplier for different modes of payments.

Tier multiplier: The multiplier for each tier.

Point Calculation formula for internal clients

The Point Calculation module uses a formula to calculate GRP. The formula is the product of all the multipliers and the transaction amount.

GRP = Amount * Vertical multiplier * EPPF multiplier * Cashless multiplier * Tier multiplier

The following are examples for calculating GRP:

Example 1:

Bob is a platinum member of Grab. He orders lunch in Singapore for SGD15 using GrabPay as the payment method. Let’s assume the following:

Vertical multiplier = 2

EPPF multiplier = 1

Cashless multiplier = 2

Tier multiplier = 3

GRP = Amount * Vertical multiplier * EPPF multiplier * Cashless multiplier * Tier multiplier

= 15 * 2 * 1 * 2 * 3

= 180

From this transaction, Bob earns 180 GRP.

Example 2:

Jane is a Gold member of Grab. She orders lunch in Indonesia for Rp150000 using GrabPay as the payment method. Let’s assume the following:

Vertical multiplier = 2

EPPF multiplier = 0.00005

Cashless multiplier = 2

Tier multiplier = 2

GRP = Amount * Vertical multiplier * EPPF multiplier * Cashless multiplier * Tier multiplier

= 150000 * 2 * 0.00005 * 2 * 2

= 60

From this transaction, Jane earns 60 GRP.

Example of multipliers for payment options and tiers
Example of multipliers for payment options and tiers

Point Calculation dependencies for external clients

External partners supply the Point Calculation dependencies which are then configured in our backend at the time of integration. These external partners can set their own multipliers instead of using the above mentioned multipliers which are specific to Grab. This document details the APIs which are used to award points for external clients.

Simple Queue Service

Abacus uses Amazon Simple Queue Service (SQS) to ensure that the points system process is robust and fault tolerant.

Point Awarding SQS

If there are no errors during the Point Calculation process, the Point Calculation module will send a message containing the points to be awarded to the Point Awarding SQS.

Retry SQS

The Point Calculation module may not receive the required data when there is a downtime in the Point Calculation dependencies. If this occurs, an error is triggered and the Point Calculation module will send a message to Retry SQS. Messages sent to the Retry SQS will be re-processed by the Point Calculation module. This ensures that the points are properly calculated despite having outages on dependencies. Every message that we push to either the Point Awarding SQS or Retry SQS will have a field called Idempotency key which is used to ensure that we reward the points only once to a particular transaction.

Point Awarding module

The successful calculation of GRP triggers a message to the Point Awarding module via the Point SQS. The Point Awarding module tries to reward GRP to the consumer’s account. Upon successful completion, an ACK is sent back to the Point SQS signalling that the message was successfully processed and triggers deletion of the message. If Point SQS does not receive an ACK, the message is redelivered after an interval. This process ensures that the points system is robust and fault tolerant.

Ledger

GRP is rewarded to the consumer once it is updated in the Ledger. The Ledger tracks how many GRP a consumer has accumulated, what they were earned for, and the running total number of GRP.

Notification service

Once the Ledger is updated, the Notification service sends the consumer a message about the GRP they receive.

Point Kafka stream

For all successful GRP transactions, Abacus sends a message to the Point Kafka stream. Downstream services listen to this stream to identify the consumer’s behaviour and take the appropriate actions. Services of this stream can listen to events they are interested in and execute their business logic accordingly. For example, a service can use the information from the Point Kafka stream to determine a consumer’s membership tier.

Points expiry

Further addition to Abacus is the handling of points expiry. The Expiry Extension module enables activity-based points expiry. This enables GRP to not expire as long as the consumer makes one Grab transaction within the next three or six months from their last transaction.

The Expiry Extension module updates the point expiry date to the database after successfully rewarding GRP to the consumer. At the end of each month, a process loads all consumers whose points will expire in that particular month and sends it to the Point Expiry SQS. The Point Expiry Consumer will then expire all the points for the consumers and this data is updated in the Ledger. This process repeats on a monthly basis.

Expiry Extension module
Expiry Extension module

Points expiry date is always the last day of the third or sixth month. For example, Adam makes a transaction on 10 January. His points expiry date is 31 July which is six months from the month of his last transaction. Adam then makes a transaction on 28 February. His points expiry period is shifted by one month to 31 August.

Points expiry
Points expiry

Conclusion

The Abacus platform enables us to perform millions of GRP transactions on a daily basis. Being able to curate rewards for consumers increases the value proposition of our products and consumer retention. If you have any comments or questions about Abacus, feel free to leave a comment below.


Special thanks to Arianto Wibowo and Vaughn Friesen.


Join us

Grab is a leading superapp in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across over 400 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Trident – Real-time event processing at scale

Post Syndicated from Grab Tech original https://engineering.grab.com/trident-real-time-event-processing-at-scale

Ever wondered what goes behind the scenes when you receive advisory messages on a confirmed booking? Or perhaps how you are awarded with rewards or points after completing a GrabPay payment transaction? At Grab, thousands of such campaigns targeting millions of users are operated daily by a backbone service called Trident. In this post, we share how Trident supports Grab’s daily business, the engineering challenges behind it, and how we solved them.

60-minute GrabMart delivery guarantee campaign operated via Trident
60-minute GrabMart delivery guarantee campaign operated via Trident

What is Trident?

Trident is essentially Grab’s in-house real-time if this, then that (IFTTT) engine, which automates various types of business workflows. The nature of these workflows could either be to create awareness or to incentivize users to use other Grab services.

If you are an active Grab user, you might have noticed new rewards or messages that appear in your Grab account. Most likely, these originate from a Trident campaign. Here are a few examples of types of campaigns that Trident could support:

  • After a user makes a GrabExpress booking, Trident sends the user a message that says something like “Try out GrabMart too”.
  • After a user makes multiple ride bookings in a week, Trident sends the user a food reward as a GrabFood incentive.
  • After a user is dropped off at his office in the morning, Trident awards the user a ride reward to use on the way back home on the same evening.
  • If  a GrabMart order delivery takes over an hour of waiting time, Trident awards the user a free-delivery reward as compensation.
  • If the driver cancels the booking, then Trident awards points to the user as a compensation.
  • With the current COVID pandemic, when a user makes a ride booking, Trident sends a message to both the passenger and driver reminding about COVID protocols.

Trident processes events based on campaigns, which are basically a logic configuration on what event should trigger what actions under what conditions. To illustrate this better, let’s take a sample campaign as shown in the image below. This mock campaign setup is taken from the Trident Internal Management portal.

Trident process flow
Trident process flow

This sample setup basically translates to: for each user, count his/her number of completed GrabMart orders. Once he/she reaches 2 orders, send him/her a message saying “Make one more order to earn a reward”. And if the user reaches 3 orders, award him/her the reward and send a congratulatory message. 😁

Other than the basic event, condition, and action, Trident also allows more fine-grained configurations such as supporting the overall budget of a campaign, adding limitations to avoid over awarding, experimenting A/B testing, delaying of actions, and so on.

An IFTTT engine is nothing new or fancy, but building a high-throughput real-time IFTTT system poses a challenge due to the scale that Grab operates at. We need to handle billions of events and run thousands of campaigns on an average day. The amount of actions triggered by Trident is also massive.

In the month of October 2020, more than 2,000 events were processed every single second during peak hours. Across the entire month, we awarded nearly half a billion rewards, and sent over 2.5 billion communications to our end-users.

Now that we covered the importance of Trident to the business, let’s drill down on how we designed the Trident system to handle events at a massive scale and overcame the performance hurdles with optimization.

Architecture design

We designed the Trident architecture with the following goals in mind:

  • Independence: It must run independently of other services, and must not bring performance impacts to other services.
  • Robustness: All events must be processed exactly once (i.e. no event missed, no event gets double processed).
  • Scalability: It must be able to scale up processing power when the event volume surges and withstand when popular campaigns run.

The following diagram depicts how the overall system architecture looks like.

Trident architecture
Trident architecture

Trident consumes events from multiple Kafka streams published by various backend services across Grab (e.g. GrabFood orders, Transport rides, GrabPay payment processing, GrabAds events). Given the nature of Kafka streams, Trident is completely decoupled from all other upstream services.

Each processed event is given a unique event key and stored in Redis for 24 hours. For any event that triggers an action, its key is persisted in MySQL as well. Before storing records in both Redis and MySQL, we make sure any duplicate event is filtered out. Together with the at-least-once delivery guaranteed by Kafka, we achieve exactly-once event processing.

Scalability is a key challenge for Trident. To achieve high performance under massive event volume, we needed to scale on both the server level and data store level. The following mind map shows an outline of our strategies.

Outline of Trident’s scale strategy
Outline of Trident’s scale strategy

Scale servers

Our source of events are Kafka streams. There are mostly two factors that could affect the load on our system:

  1. Number of events produced in the streams (more rides, food orders, etc. results in more events for us to process).
  2. Number of campaigns running.
  3. Nature of campaigns running. The campaigns that trigger actions for more users cause higher load on our system.

There are naturally two types of approaches to scale up server capacity:

  • Distribute workload among server instances.
  • Reduce load (i.e. reduce the amount of work required to process each event).

Distribute load

Distributing workload seems trivial with the load balancing and auto-horizontal scaling based on CPU usage that cloud providers offer. However, an additional server sits idle until it can consume from a Kafka partition.

Each Kafka partition can only be consumed by one consumer within the same consumer group (our auto-scaling server group in this case). Therefore, any scaling in or out requires matching the Kafka partition configuration with the server auto-scaling configuration.

Here’s an example of a bad case of load distribution:

Kafka partitions config mismatches server auto-scaling config
Kafka partitions config mismatches server auto-scaling config

And here’s an example of a good load distribution where the configurations for the Kafka partitions and the server auto-scaling match:

Kafka partitions config matches server auto-scaling config
Kafka partitions config matches server auto-scaling config

Within each server instance, we also tried to increase processing throughput while keeping the resource utilization rate in check. Each Kafka partition consumer has multiple goroutines processing events, and the number of active goroutines is dynamically adjusted according to the event volume from the partition and time of the day (peak/off-peak).

Reduce load

You may ask how we reduced the amount of processing work for each event. First, we needed to see where we spent most of the processing time. After performing some profiling, we identified that the rule evaluation logic was the major time consumer.

What is rule evaluation?

Recall that Trident needs to operate thousands of campaigns daily. Each campaign has a set of rules defined. When Trident receives an event, it needs to check through the rules for all the campaigns to see whether there is any match. This checking process is called rule evaluation.

More specifically, a rule consists of one or more conditions combined by AND/OR Boolean operators. A condition consists of an operator with a left-hand side (LHS) and a right-hand side (RHS). The left-hand side is the name of a variable, and the right-hand side a value. A sample rule in JSON:

Country is Singapore and taxi type is either JustGrab or GrabCar.
  {
    "operator": "and",
    "conditions": [
    {
      "operator": "eq",
      "lhs": "var.country",
      "rhs": "sg"
      },
      {
        "operator": "or",
        "conditions": [
        {
          "operator": "eq",
          "lhs": "var.taxi",
          "rhs": <taxi-type-id-for-justgrab>
          },
          {
            "operator": "eq",
            "lhs": "var.taxi",
            "rhs": <taxi-type-id-for-grabcard>
          }
        ]
      }
    ]
  }

When evaluating the rule, our system loads the values of the LHS variable, evaluates against the RHS value, and returns as result (true/false) whether the rule evaluation passed or not.

To reduce the resources spent on rule evaluation, there are two types of strategies:

  • Avoid unnecessary rule evaluation
  • Evaluate “cheap” rules first

We implemented these two strategies with event prefiltering and weighted rule evaluation.

Event prefiltering

Just like the DB index helps speed up data look-up, having a pre-built map also helped us narrow down the range of campaigns to evaluate. We loaded active campaigns from the DB every few minutes and organized them into an in-memory hash map, with event type as key, and list of corresponding campaigns as the value. The reason we picked event type as the key is that it is very fast to determine (most of the time just a type assertion), and it can distribute events in a reasonably even way.

When processing events, we just looked up the map, and only ran rule evaluation on the campaigns in the matching hash bucket. This saved us at least 90% of the processing time.

Event prefiltering
Event prefiltering
Weighted rule evaluation

Evaluating different rules comes with different costs. This is because different variables (i.e. LHS) in the rule can have different sources of values:

  1. The value is already available in memory (already consumed from the event stream).
  2. The value is the result of a database query.
  3. The value is the result of a call to an external service.

These three sources are ranked by cost:

In-memory < database < external service

We aimed to maximally avoid evaluating expensive rules (i.e. those that require calling external service, or querying a DB) while ensuring the correctness of evaluation results.

First optimization – Lazy loading

Lazy loading is a common performance optimization technique, which literally means “don’t do it until it’s necessary”.

Take the following rule as an example:

A & B

If we load the variable values for both A and B before passing to evaluation, then we are unnecessarily loading B if A is false. Since most of the time the rule evaluation fails early (for example, the transaction amount is less than the given minimum amount), there is no point in loading all the data beforehand. So we do lazy loading ie. load data only when evaluating that part of the rule.

Second optimization – Add weight

Let’s take the same example as above, but in a different order.

B & A
Source of data for A is memory and B is external service

Now even if we are doing lazy loading, in this case, we are loading the external data always even though it potentially may fail at the next condition whose data is in memory.

Since most of our campaigns are targeted, a popular condition is to check if a user is in a certain segment, which is usually the first condition that a campaign creator sets. This data resides in another service. So it becomes quite expensive to evaluate this condition first even though the next condition’s data can be already in memory (e.g. if the taxi type is JustGrab).

So, we did the next phase of optimization here, by sorting the conditions based on weight of the source of data (low weight if data is in memory, higher if it’s in our database and highest if it’s in an external system). If AND was the only logical operator we supported, then it would have been quite simple. But the presence of OR made it complex. We came up with an algorithm that sorts the evaluation based on weight keeping in mind the AND/OR. Here’s what the flowchart looks like:

Event flowchart
Event flowchart

An example:

Conditions: A & ( B | C ) & ( D | E )

Actual result: true & ( false | false ) & ( true | true ) --> false

Weight: B < D < E < C < A

Expected check order: B, D, C

Firstly, we start validating B which is false. Apparently, we cannot skip the sibling conditions here since B and C are connected by |. Next, we check D. D is true and its only sibling E is connected by | so we can mark E “skip”. Then, we check E but since E has been marked “skip”, we just skip it. Still, we cannot get the final result yet, so we need to continue validating C which is false. Now we know (B | C) is false so the whole condition is false too. We can stop now.

Sub-streams

After investigation, we learned that we consumed a particular stream that produced terabytes of data per hour. It caused our CPU usage to shoot up by 30%. We found out that we process only a handful of event types from that stream. So we introduced a sub-stream in between, which contains the event types we want to support. This stream is populated from the main stream by another server, thereby reducing the load on Trident.

Protect downstream

While we scaled up our servers wildly, we needed to keep in mind that there were many downstream services that received more traffic. For example, we call the GrabRewards service for awarding rewards or the LocaleService for checking the user’s locale. It is crucial for us to have control over our outbound traffic to avoid causing any stability issues in Grab.

Therefore, we implemented rate limiting. There is a total rate limit configured for calling each downstream service, and the limit varies in different time ranges (e.g. tighter limit for calling critical service during peak hour).

Scale data store

We have two types of storage in Trident: cache storage (Redis) and persistent storage (MySQL and others).

Scaling cache storage is straightforward, since Redis Cluster already offers everything we need:

  • High performance: Known to be fast and efficient.
  • Scaling capability: New shards can be added at any time to spread out the load.
  • Fault tolerance: Data replication makes sure that data does not get lost when any single Redis instance fails, and auto election mechanism makes sure the cluster can always auto restore itself in case of any single instance failure.

All we needed to make sure is that our cache keys can be hashed evenly into different shards.

As for scaling persistent data storage, we tackled it in two ways just like we did for servers:

  • Distribute load
  • Reduce load (both overall and per query)

Distribute load

There are two levels of load distribution for persistent storage: infra level and DB level. On the infra level, we split data with different access patterns into different types of storage. Then on the DB level, we further distributed read/write load onto different DB instances.

Infra level

Just like any typical online service, Trident has two types of data in terms of access pattern:

  • Online data: Frequent access. Requires quick access. Medium size.
  • Offline data: Infrequent access. Tolerates slow access. Large size.

For online data, we need to use a high-performance database, while for offline data, we can  just use cheap storage. The following table shows Trident’s online/offline data and the corresponding storage.

Trident’s online/offline data and storage
Trident’s online/offline data and storage

Writing of offline data is done asynchronously to minimize performance impact as shown below.

Online/offline data split
Online/offline data split

For retrieving data for the users, we have high timeout for such APIs.

DB level

We further distributed load on the MySQL DB level, mainly by introducing replicas, and redirecting all read queries that can tolerate slightly outdated data to the replicas. This relieved more than 30% of the load from the master instance.

Going forward, we plan to segregate the single MySQL database into multiple databases, based on table usage, to further distribute load if necessary.

Reduce load

To reduce the load on the DB, we reduced the overall number of queries and removed unnecessary queries. We also optimized the schema and query, so that query completes faster.

Query reduction

We needed to track usage of a campaign. The tracking is just incrementing the value against a unique key in the MySQL database. For a popular campaign, it’s possible that multiple increment (a write query) queries are made to the database for the same key. If this happens, it can cause an IOPS burst. So we came up with the following algorithm to reduce the number of queries.

  • Have a fixed number of threads per instance that can make such a query to the DB.
  • The increment queries are queued into above threads.
  • If a thread is idle (not busy in querying the database) then proceed to write to the database then itself.
  • If the thread is busy, then increment in memory.
  • When the thread becomes free, increment by the above sum in the database.

To prevent accidental over awarding of benefits (rewards, points, etc), we require campaign creators to set the limits. However, there are some campaigns that don’t need a limit, so the campaign creators just specify a large number. Such popular campaigns can cause very high QPS to our database. We had a brilliant trick to address this issue- we just don’t track if the number is high. Do you think people really want to limit usage when they set the per user limit to 100,000? 😉

Query optimization

One of our requirements was to track the usage of a campaign – overall as well as per user (and more like daily overall, daily per user, etc). We used the following query for this purpose:

INSERT INTO … ON DUPLICATE KEY UPDATE value = value + inc

The table had a unique key index (combining multiple columns) along with a usual auto-increment integer primary key. We encountered performance issues arising from MySQL gap locks when high write QPS hit this table (i.e. when popular campaigns ran). After testing out a few approaches, we ended up making the following changes to solve the problem:

  1. Removed the auto-increment integer primary key.
  2. Converted the secondary unique key to the primary key.

Conclusion

Trident is Grab’s in-house real-time IFTTT engine, which processes events and operates business mechanisms on a massive scale. In this article, we discussed the strategies we implemented to achieve large-scale high-performance event processing. The overall ideas of distributing and reducing load may be straightforward, but there were lots of thoughts and learnings shared in detail. If you have any comments or questions about Trident, feel free to leave a comment below.

All the examples of campaigns given in the article are for demonstration purpose only, they are not real live campaigns.

Join us

Grab is more than just the leading ride-hailing and mobile payments platform in Southeast Asia. We use data and technology to improve everything from transportation to payments and financial services across a region of more than 620 million people. We aspire to unlock the true potential of Southeast Asia and look for like-minded individuals to join us on this ride.

If you share our vision of driving South East Asia forward, apply to join our team today.