avatarMahesh Saini

Summary

Grab employs a sophisticated system design to efficiently execute large-scale marketing campaigns, leveraging message queues like Kafka for work distribution and batch processing for optimized resource utilization.

Abstract

Grab handles daily large marketing campaigns that involve actions such as sending rewards, awarding points, and messaging a vast user base. To address the challenges of quick execution, high turnover rate, and balanced server load, Grab has developed a system that distributes tasks across multiple servers using Kafka for message queuing. They have optimized their approach by processing user actions in batches, creating batch endpoints for services like rewards and messaging, and implementing database query optimizations. Further enhancements include sharding Kafka streams by action type and country to prevent blocking and ensure parallel processing, as well as removing unnecessary waiting times for asynchronous actions like sending messages.

Opinions

  • The naive approach to campaign execution, which is single-threaded and sequential, is deemed inefficient due to slow execution and low resource utilization.
  • The use of Kafka for distributing work among servers is considered a well-suited solution to address the scalability needs of Grab's marketing campaigns.
  • Batch processing and the creation of batch endpoints are seen as critical optimizations to reduce API latency and minimize the number of database queries, thus improving overall system performance.
  • Sharding Kafka streams by action type and country is an opinionated strategy to avoid bottlenecks and ensure that campaigns in different regions or of different types do not interfere with each other.
  • Asynchronous processing of certain actions, like sending messages, is viewed as an effective method to improve efficiency by eliminating unnecessary waiting times in the campaign workflow.

How Grab is Supporting Large Campaigns at Scale? — System Design

At Grab, they run large marketing campaigns every day. A typical campaign may require executing multiple actions for millions of users simultaneously. The actions may include sending rewards, awarding points, and sending messages.

Here is what a campaign may look like: On 1st Jan 2022, send two ride rewards to all the users in the “heavy users” segment. Then, send them a congratulatory message informing them about the reward.

What we will cover here…

  1. System Requirements — Functional Requirements — Nonfunctional Requirements
  2. Naive Approach
  3. Grab Team Approach
  4. Further Optimisations

System Requirements

1. Functional requirements

  • Apply a sequence of actions targeting a large segment of users at a scheduled time, display progress to the campaign manager, and provide a final report.
  • For each user, the actions must be executed in sequence; the latter action can only be executed if the preceding action is successful.

2. Non-functional requirements

  • Quick execution and high turnover rate.
  • Definition of turnover rate: the number of scheduled jobs completed per unit time.
  • Maximise resource utilization and balance server load.

Naive approach

  • Let’s start thinking from the most naive solution, and improve from there to reach an optimized solution.
  • Here is the pseudocode of a naive action executor.
def executeActionOnSegment(segment, actions):
   for user in fetchUsersInSegment(segment):
       for action in actions:
           success := doAction(user, action)
           if not success:
               break
           recordActionResult(user, action)

def doAction(user, action):
   if action.type == "awardReward":
       rewardService.awardReward(user, action.meta)
   elif action.type == "sendMessage":
       messagingService.sendMessage(user, action.meta)
   else:
       # other action types ...

One may be able to quickly tell that the naive solution does not satisfy their non-functional requirements for the following reasons:

Execution is slow:

  • 1. The programme is single-threaded.
  • 2. Actions are executed for users one by one in sequence.
  • 3. Each call to the rewards and messaging services will incur network trip time, which impacts time cost.

Resource utilization is low: The actions will only be executed on one server. When we have a cluster of servers, the other servers will sit idle.

Here are their alternatives for fixing the above issues:

  • Actions for different users should be executed in parallel.
  • API calls to other services should be minimized.
  • Distribute the work of executing actions evenly among different servers.

Grab Team Approach

  • A message queue is a well-suited solution to distribute work among multiple servers. They selected Kafka, among numerous massage services.
  • When a scheduled campaign is triggered, they retrieve the users from the segment in batches; each batch comprises around 100 users. They write the batches into a Kafka stream, and all their servers consume from the stream to execute the actions for the batches. The following diagram illustrates the overall flow.

Data in Kafka is stored in partitions. The partition configuration is important to ensure that the batches are evenly distributed among servers:

  1. Number of partitions: Ensure that the number of stream partitions is greater than or equal to the max number of servers we will have in our cluster. This is because one Kafka partition can only be consumed by one consumer. If we have more consumers than partitions, some consumers will not receive any data.
  2. Partition key: For each batch, assign a hash value as the partition key to randomly allocate batches into different partitions.

Now that work is distributed among servers in batches, we can consider how to process each batch faster. If we follow the naive logic, for each user in the batch, we need to call the rewards or messaging service to execute the actions. This will create very high QPS to those services, and incur significant network round trip time.

  • To solve this issue, they decided to build batch endpoints in rewards and messaging services. Each batch endpoint takes in a list of user IDs and action metadata as input parameters, and returns the action result for each user, regardless of success or failure. With that, their batch processing logic looks like the following:
def processBatch(userBatch, actions):
   users = userBatch
   for action in actions:
       successUsers, failedUsers = doAction(users, action)
       recordFailures(failedUsers, action)
       users = successUsers

def doAction(users, action):
   resp = {}
   if action.type == "awardReward":
       resp = rewardService.batchAwardReward(users, action.meta)
   elif action.type == "sendMessage":
       resp = messagingService.batchSendMessage(users, action.meta)
   else:
   # other action types ...
   return getSuccessUsers(resp), getFailedUsers(resp)
  • In the implementation of batch endpoints, they also made optimizations to reduce latency. For example, when awarding rewards, they need to write the records of a reward being given to a user in multiple database tables. If we make separate DB queries for each user in the batch, it will cause high QPS to DB and incur high network time cost. Therefore, they grouped all the users in the batch into one DB query for each table update instead.

Further Optimisations

1. Shard Stream by Action Type

  • Two widely used actions are awarding rewards and sending messages to users. They came across situations where the sending of messages was blocked because a different campaign of awarding rewards had already started.
  • They found out the API latency of awarding rewards is significantly higher than sending messages. Hence, to make sure messages are not blocked by long-running awarding jobs, they created a dedicated Kafka topic for messages. By having different Kafka topics based on the action type, they were able to run different types of campaigns in parallel.

2. Shard Stream by Country

  • Grab operates in multiple countries. They came across situations where a campaign of awarding rewards to a small segment of users in one country was delayed by another campaign that targeted a huge segment of users in another country.
  • Similar to the above solution, they added different Kafka topics for each country to enable the parallel processing of campaigns in different countries.

3. Remove Unnecessary Waiting

  • They observed that in the case of chained actions, messaging actions are generally the last action in the action list. For example, a congratulatory message would be sent to the user after awarding a reward.
  • They realized that it was not necessary to wait for a sending message action to complete before processing the next batch of users. Moreover, the latency of the sending messages API is lower than awarding rewards. Hence, they adjusted the sending messages API to be asynchronous, so that the task of awarding rewards to the next batch of users can start while messages are being sent to the previous batch.

Don’t forget to hit the Clap and Follow buttons to help me write more articles like this.

System Design Interview
Distributed Systems
Scalability
Prgramming
Grab
Recommended from ReadMedium