Tag Archives: Platform

How we store and process millions of orders daily

Post Syndicated from Grab Tech original https://engineering.grab.com/how-we-store-millions-orders

Introduction

In the real world, after a passenger places a GrabFood order from the Grab App, the merchant-partner will prepare the order. A driver-partner will then collect the food and deliver it to the passenger. Have you ever wondered what happens in the backend system? The Grab Order Platform is a distributed system that processes millions of GrabFood or GrabMart orders every day. This post aims to share the journey of how we designed the database solution that powers the order platform.

Background

What are the design goals when building the database solution? We collected the requirements by analysing query patterns and traffic patterns.

Query patterns

Here are some important query examples that the Order Platform supports:

  1. Write queries:

    a. Create an order.

    b. Update an order.

  2. Read queries:

    a. Get order by id.

    b. Get ongoing orders by passenger id.

    c. Get historical orders by various conditions.

    d. Get order statistics (for example, get the number of orders)

We can break down queries into two categories: transactional queries and analytical queries. Transactional queries are critical to online order creation and completion, including the write queries and read queries such as 2a or 2b. Analytical queries like 2c and 2d retrieves historical orders or order statistics on demand. Analytical queries are not essential to the oncall order processing.

Traffic patterns

Grab’s Order Platform processes a significant amount of transaction data every month.

During peak hours, the write Queries per Second (QPS) is three times of primary key reads; whilst the range Queries per Second are four times of the primary key reads.

Design goals

From the query and traffic patterns, we arrived at the following three design goals:

  1. Stability – the database solution must be able to handle high read and write QPS. Online order processing queries must have high availability. Even when some part of the system is down, we must be able to provide a degraded experience to the end users allowing them to still be able to create and complete an order.
  2. Scalability and cost – the database solution must be able to support fast evolution of business requirements, given now we handle up to a million orders per month. The solution must also be cost effective at a large scale.
  3. Consistency – strong consistency for transactional queries, and eventually consistency for analytical queries.

Solution

The first design principle towards a stable and scalable database solution is to use different databases to serve transactional and analytical queries, also known as OLTP and OLAP queries. An OLTP database serves queries critical to online order processing. This table keeps data for only a short period of time. Meanwhile, an OLAP database has the same set of data, but serves our historical and statistical queries. This database keeps data for a longer time.

What are the benefits from this design principle? From a stability point of view, we can choose different databases which can better fulfil our different query patterns and QPS requirements. An OLTP database is the single source of truth for online order processing; any failure in the OLAP database will not affect online transactions. From a scalability and cost point of view, we can choose a flexible database for OLAP to support our fast evolution of business requirements. We can maintain less data in our OLTP database while keeping some older data in our OLAP database.

To ensure that the data in both databases are consistent, we introduced the second design principle – data ingestion pipeline. In Figure 1, Order Platform writes data to the OLTP database to process online orders and asynchronously pushes the data into the data ingestion pipeline. The data ingestion pipeline ensures that the OLAP database data is eventually consistent.

Figure 1: Order Platform database solution overview

Architecture details

OLTP database

There are two categories of OLTP queries, the key-value queries (for example, load by order id) and the batch queries (for example, Get ongoing orders by passenger id). We use DynamoDB as the database to support these OLTP queries.

Why DynamoDB?

  1. Scalable and highly available: the tables of DynamoDB are partitioned and each partition is three-way replicated.
  2. Support for strong consistent reads by primary key.
  3. DynamoDB has a mechanism called adaptive capacity to handle hotkey traffic. Internally, DynamoDB will distribute higher capacity to high-traffic partitions, and isolate frequently accessed items to a dedicated partition. This way, the hotkey can utilise the full capacity of an entire partition, which is up to 3000 read capacity units and 1000 write capacity units.
Figure 2: DynamoDB table structure overview. Source: Amazon Web Services (2019, 28 April)

In each DynamoDB table, it has many items with attributes. In each item, it has a partition key and sort key. The partition key is used for key-value queries, and the sort key is used for range queries. In our case, the table contains multiple order items. The partition key is order ID. We can easily support key-value queries by the partition key.

order_id (PK) state pax_id created_at pax_id_gsi
order1 Ongoing Alice 9:00am
order2 Ongoing Alice 9:30am
order3 Completed Alice 8:30am

Batch queries like ‘Get ongoing orders by passenger id’ are supported by DynamoDB Global Secondary Index (GSI). A GSI is like a normal DynamoDB table, which also has keys and attributes.

In our case, we have a GSI table where the partition key is the pax_id_gsi. The attribute pax_id_gsi is linked to the main table. It is eventually consistent with the main table that is maintained by DynamoDB. If the Order Platform queries ongoing orders for Alice, two items will be returned from the GSI table.

pax_id_gsi (PK) created_at (SK) order_id
Alice 9:00am order1
Alice 9:30am order2

We also make use of an advanced feature of GSI named sparse index to support ongoing order queries. When we update order status from ongoing to completed, at the same time, we set the pax_id_gsi to empty, so that the linked item in the GSI will be automatically deleted by DynamoDB. At any time, the GSI table only stores the ongoing orders. We use a sparse index mechanism to control our table size for better performance and to be more cost effective.

The next problem is data retention. This is achieved with the DynamoDB Time To Live (TTL) feature. DynamoDB will auto-scan expired items and delete them. But the challenge is when we add TTL to big tables, it will bring a heavy load to the background scanner and might result in an outage. Our solution is to only add a TTL attribute to the new items in the table. Then, we manually delete the items without TTL attributes, and run a script to delete items with TTL attributes that are too old. After this process, the table size will be quite small, so we can enable the TTL feature on the TTL attribute that we previously added without any concern. The retention period of our DynamoDB data is three months.

Costwise, DynamoDB is charged by storage size and the provision of the read write capability. The provision capability is actually auto scalable. The cost is on-demand. So it’s generally cheaper than RDS.

OLAP database

We use MySQL RDS as the database to support historical and statistical OLAP queries.

Why not Aurora? We choose RDS mainly because it is a mature database solution. Even if Aurora can provide better high-availability, RDS is enough to support our less critical use cases. Costwise, Aurora charges by data storage and the number of requested Input/Output Operations per Second (IOPS). RDS charges only by data storage. As we are using General Purpose (SSD) storage, IOPS is free and supports up to 16k IOPS.

We use MySQL partitioning for data retention. The order table is partitioned by creation time monthly. Since the data access pattern is mostly by month, the partition key can reduce cross-partition queries. Partitions older than six months are dropped at the beginning of each month.

Data ingestion pipeline

Figure 3: Data Ingestion Pipeline Architecture.

A Kafka stream is used to process data in the data ingestion pipeline. We choose the Kafka stream, because it has 99.95% SLA. It is not restricted by the OLTP and OLAP database types.

Even if Kafka can provide 99.95% SLA, there is still the chance of stream producer failures. When the producer fails, we will store the message in an Amazon Simple Queue Service (SQS) and retry. If the retry also fails, it will be moved to the SQS dead letter queue (DLQ), to be consumed at a later time.

On the stream consumer side, we use back-off retry at both stream and database levels to ensure consistency. In a worst-case scenario, we can rewind the stream events from Kafka.

It is important for the data ingestion pipeline to handle duplicate messages and out-of-order messages.

Duplicate messages are handled by the database level unique key (for example, order ID + creation time).

For the out-of-order messages, we implemented the following two mechanisms:

  1. Version update: we only update the most recently updated data. The precision of the update time is in microseconds, which is enough for most of the use cases.
  2. Upsert: if the update events occur before the create events, we simulate an upsert operation.

Impact

After launching our solution this year, we have saved significantly on cloud costs. In the earlier solution, Order Platform synchronously writes to DynamoDB and Aurora and the data is kept forever.

Conclusion

In terms of stability, we use DynamoDB as the critical OLTP database to ensure high availability for online order processing. Scalability wise, we use RDS as the OLAP database to support our quickly evolving business requirements by using a rich, multiple index. Cost efficiency is achieved by data retention in both databases. For consistency, we built a single source of truth OLTP database and an OLAP database that is eventually consistent with the help of the data ingestion pipeline.

What’s next?

Currently, the database solution is running on the production environment. Even though the database solution is proven to be stable, scalable and consistent, we still see some potential areas of improvement.

We use MySQL RDS for OLAP data storage. Even though MySQL is stable and cost effective, it is difficult to serve more complicated queries like free text search. Hence, we plan to explore other NoSQL databases like ElasticSearch.

We hope this post helps you understand how we store Grab orders and fulfil the queries from the Grab Order Platform.

References

Amazon Web Services. (2019, 28 April) Build with DynamoDB: S1 E1 – Intro to Amazon DynamoDB [Video]. YouTube.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Optimally scaling Kafka consumer applications

Post Syndicated from Grab Tech original https://engineering.grab.com/optimally-scaling-kafka-consumer-applications

Earlier this year, we took you on a journey on how we built and deployed our event sourcing and stream processing framework at Grab. We’re happy to share that we’re able to reliably maintain our uptime and continue to service close to 400 billion events a week. We haven’t stopped there though. To ensure that we can scale our framework as the Grab business continuously grows, we have spent efforts optimizing our infrastructure.

In this article, we will dive deeper into our Kubernetes infrastructure setup for our stream processing framework. We will cover why and how we focus on optimal scalability and availability of our infrastructure.

Quick Architecture Recap

Coban Platform Architecture

The Coban platform provides lightweight Golang plugin architecture-based data processing pipelines running in Kubernetes. These are essentially Kafka consumer pods that consume data, process it, and then materialize the results into various sinks (RDMS, other Kafka topics).

Anatomy of a Processing Pod

Anatomy of a Processing Pod

Each stream processing pod (the smallest unit of a pipeline’s deployment) has three top level components:

  • Trigger: An interface that connects directly to the source of the data and converts it into an event channel.
  • Runtime: This is the app’s entry point and the orchestrator of the pod. It manages the worker pools, triggers, event channels, and lifecycle events.
  • Pipeline plugin: This is provided by the user, and conforms to a contract that the platform team publishes. It contains the domain logic for the pipeline and houses the pipeline orchestration defined by a user based on our Stream Processing Framework.

Optimal Scaling

We initially architected our Kubernetes setup around horizontal pod autoscaling (HPA), which scales the number of pods per deployment based on CPU and memory usage. HPA keeps CPU and memory per pod specified in the deployment manifest and scales horizontally as the load changes.

These were the areas of application wastage we observed on our platform:

  • As Grab’s traffic is uneven, we’d always have to provision for peak traffic. As users would not (or could not) always account for ramps, they would be fairly liberal with setting limit values (CPU and memory), leading to resource wastage.
  • Pods often had uneven traffic distribution despite fairly even partition load distribution in Kafka. The Stream Processing Framework(SPF) is essentially Kafka consumers consuming from Kafka topics, hence the number of pods scaling in and out resulted in unequal partition load per pod.

Vertically Scaling with Fixed Number of Pods

We initially kept the number of pods for a pipeline equal to the number of partitions in the topic the pipeline consumes from. This ensured even distribution of partitions to each pod providing balanced consumption. In order to abstract this from the end user, we automated the application deployment process to directly call the Kafka API to fetch the number of partitions during runtime.

After achieving a fixed number of pods for the pipeline, we wanted to move away from HPA. We wanted our pods to scale up and down as the load increases or decreases without any manual intervention. Vertical pod autoscaling (VPA) solves this problem as it relieves us from any manual operation for setting up resources for our deployment.

We just deploy the application and let VPA handle the resources required for its operation. It’s known to not be very susceptible to quick load changes as it trains its model to monitor the deployment’s load trend over a period of time before recommending an optimal resource. This process ensures the optimal resource allocation for our pipelines considering the historic trends on throughput.

We saw a ~45% reduction in our total resource usage vs resource requested after moving to VPA with a fixed number of pods from HPA.

Anatomy of a Processing Pod

Managing Availability

We broadly classify our workloads as latency sensitive (critical) and latency tolerant (non-critical). As a result, we could optimize scheduling and cost efficiency using priority classes and overprovisioning on heterogeneous node types on AWS.

Kubernetes Priority Classes

The main cost of running EKS in AWS is attributed to the EC2 machines that form the worker nodes for the Kubernetes cluster. Running On-Demand brings all the guarantees of instance availability but it is definitely very expensive. Hence, our first action to drive cost optimisation was to include Spot instances in our worker node group.

With the uncertainty of losing a spot instance, we started assigning priority to our various applications. We then let the user choose the priority of their pipeline depending on their use case. Different priorities would result in different node affinity to different kinds of instance groups (On-Demand/Spot). For example, Critical pipelines (latency sensitive) run on On-Demand worker node groups and Non-critical pipelines (latency tolerant) on Spot instance worker node groups.

We use priority class as a method of preemption, as well as a node affinity that chooses a certain priority pipeline for the node group to deploy to.

Overprovisioning

With spot instances running we realised a need to make our cluster quickly respond to failures. We wanted to achieve quick rescheduling of evicted pods, hence we added overprovisioning to our cluster. This means we keep some noop pods occupying free space running in our worker node groups for the quick scheduling of evicted or deploying pods.

The overprovisioned pods are the lowest priority pods, thus can be preempted by any pod waiting in the queue for scheduling. We used cluster proportional autoscaler to decide the right number of these overprovisioned pods, which scales up and down proportionally to cluster size (i.e number of nodes and CPU in worker node group). This relieves us from tuning the number of these noop pods as the cluster scales up or down over the period keeping the free space proportional to current cluster capacity.

Lastly, overprovisioning also helped improve the deployment time because there is no  dependency on the time required for Auto Scaling Groups (ASG) to add a new node to the cluster every time we want to deploy a new application.

Future Improvements

Evolution is an ongoing process. In the next few months, we plan to work on custom resources for combining VPA and fixed deployment size. Our current architecture setup works fine for now, but we would like to create a more tuneable in-house CRD(Custom Resource Definition) for VPA that incorporates rightsizing our Kubernetes deployment horizontally.


Authored By Shubham Badkur on behalf of the Coban team at Grab – Ryan Ooi, Karan Kamath, Hui Yang, Yuguang Xiao, Jump Char, Jason Cusick, Shrinand Thakkar, Dean Barlan, Shivam Dixit, Andy Nguyen, and Ravi Tandon.


Join us

Grab is more than just the leading ride-hailing and mobile payments platform in Southeast Asia. We use data and technology to improve everything from transportation to payments and financial services across a region of more than 620 million people. We aspire to unlock the true potential of Southeast Asia and look for like-minded individuals to join us on this ride.

If you share our vision of driving South East Asia forward, apply to join our team today.

The journey of deploying Apache Airflow at Grab

Post Syndicated from Grab Tech original https://engineering.grab.com/the-journey-of-deploying-apache-airflow-at-Grab

At Grab, we use Apache Airflow to schedule and orchestrate the ingestion and transformation of data,  train machine learning models, and the copy data between clouds. There are many engineering teams at Grab that use Airflow, each of which originally had their own Airflow instance.

The proliferation of independently managed Airflow instances resulted in inefficient use of resources, where each team ended up solving the same problems of logging, scaling, monitoring, and more. From this morass came the idea of having a single dedicated team to manage all the Airflow instances for anyone in Grab that wants to use Airflow as a scheduling tool.

We designed and implemented an Apache Airflow-based scheduling and orchestration platform that currently runs close to 20 Airflow instances for different teams at Grab. Sounds interesting? What follows is a brief history.

Early days

Circa 2018, we were running a few hundred Directed Acyclic Graphs (DAGs) on one Airflow instance in the Data Engineering team. There was no dedicated team to maintain it, and no Airflow expert in our team. We were struggling to maintain our Airflow instance, which was causing many jobs to fail every day. We were facing issues with library management, scaling, managing and syncing artefacts across all Airflow components, upgrading Airflow versions, deployment, rollbacks, etc.

After a few postmortem reports, we realized that we needed a dedicated team to maintain our Airflow. This was how our Airflow team was born.

In the initial months, we dedicated ourselves to stabilizing our Airflow environment. During this process, we realized that Airflow has a steep learning curve and requires time and effort to understand and maintain properly. Also, we found that tweaking of Airflow configurations required a thorough understanding of Airflow internals.

We felt that for the benefit of everyone at Grab, we should leverage what we learned about Airflow to help other teams at Grab; there was no need for anyone else to go through the same struggles we did. That’s when we started thinking about managing Airflow for other teams.

We talked to the Data Science and Engineering teams who were also running Airflow to schedule their jobs. Almost all the teams were struggling to maintain their Airflow instance. A few teams didn’t have enough technical expertise to maintain their instance. The Data Scientists and Analysts that we spoke to were more than happy to outsource the overhead of Airflow maintenance and wanted to focus more on their Data Science use cases instead.

We started working with one of the Data Science teams and initiated the discussion to create a dockerized Airflow instance and run it on our Kubernetes cluster.

We created the Airflow instance and maintained it for them. Later, we were approached by two more teams to help with their Airflow instances. This was the trigger for us to design and create a platform on which we can efficiently manage Airflow instances for different teams.

Current state

As mentioned, we are currently serving close to 20 Airflow instances for various teams on this platform and leverage Apache Airflow to schedule thousands of daily jobs.  Each Airflow instance is currently scheduling 1k to 60k daily jobs. Also, new teams can quickly try out Airflow without worrying about infrastructure and maintenance overhead. Let’s go through the important aspects of this platform such as design considerations, architecture, deployment, scalability, dependency management, monitoring and alerting, and more.

Design considerations

The initial step we took towards building our scheduling platform was to define a set of expectations and guidelines around ownership, infrastructure, authentication, common artifacts and CI/CD, to name a few.

These were the considerations we had in mind:

  • Deploy containerized Airflow instances on Kubernetes cluster to isolate Airflow instances at the team level. It should scale up and scale out according to usage.
  • Each team can have different sets of jobs that require specific dependencies on the Airflow server.
  • Provide common CI/CD templates to build, test, and deploy Airflow instances. These CI/CD templates should be flexible enough to be extended by users and modified according to their use case.
  • Common plugins, operators, hooks, sensors will be shipped to all Airflow instances. Moreover, each team can have its own plugins, operators, hooks, and sensors.
  • Support LDAP based authentication as it is natively supported by Apache Airflow. Each team can authenticate Airflow UI by their LDAP credentials.
  • Use the Hashicorp Vault to store Airflow specific secrets. Inject these secrets via sidecar in Airflow servers.
  • Use ELK stack to access all application logs and infrastructure logs.
  • Datadog and PagerDuty will be used for monitoring and alerting.
  • Ingest job statistics such as total number of jobs scheduled, no of failed jobs, no of successful jobs, active DAGs, etc. into the data lake and will be accessible via Presto.
Architecture diagram
Architecture diagram

Infrastructure management

Initially, we started deploying Airflow instances on  Kubernetes clusters managed via Kubernetes Operations (KOPS). Later, we migrated to Amazon EKS to reduce the overhead of managing the Kubernetes control plane. Each Kubernetes namespace deploys one Airflow instance.

We chose Terraform to manage infrastructure as code. We deployed each Airflow instance using Terraform modules, which include a helm_release Terraform resource on top of our customized Airflow Helm Chart.

Each Airflow instance connects to its own Redis and RDS. RDS is responsible for storing Airflow metadata and Redis is acting as a celery broker between Airflow scheduler and Airflow workers.

The Hashicorp Vault is used to store secrets required by Airflow instances and injected via sidecar by each Airflow component. The ELK stack stores all logs related to Airflow instances and is used for troubleshooting any instance. Datadog, Slack, and PagerDuty are used to send alerts.

Presto is used to access job statistics, such as numbers on scheduled jobs, failed jobs, successful jobs, and active DAGs, to help each team to analyze their usage and stability of their jobs.

Doing things at scale

There are two kinds of scaling we need to talk about:

  • scaling of Airflow instances on a resource level handling different loads
  • scaling in terms of teams served on the platform

To scale Airflow instances, we set the request and the limit of each Airflow component allowing any of the components to scale up easily. To scale out Airflow workers, we decided to enable the horizontal pod autoscaler (HPA) using Memory and CPU parameters. The cluster autoscaler on EKS helps in scaling the platform to accommodate more teams.

Moreover, we categorized all our Airflow instances in three sizes (small, medium, and large) to efficiently use the resources. This was based on how many hourly/daily jobs it scheduled. Each Airflow instance type has a specific RDS instance type and storage, Redis instance type and CPU and memory, request/limit for scheduler, worker, web server, and flower. There are different Airflow configurations for each instance type to optimize the given resources to the Airflow instance.

Airflow image and version management

The Airflow team builds and releases one common base Docker image for each Airflow version. The base image has Airflow installed with specific versions, as well as common Python packages, plugins, helpers, tests, patches, and so on.

Each team has their customized Docker image on top of the base image. In their customized Docker image, they can update the Python packages and can download other artifacts that they require. Each Airflow instance will be deployed using the team’s customized image.

There are common CI/CD templates provided by the Airflow team to build the customized image, run unit tests, and deploy Airflow instances from their GitLab pipeline.

To upgrade the Airflow version, the Airflow team reviews and studies the changelog of the released Airflow version, note down the important features and its impacts, open issues, bugs, and workable solutions. Later, we build and release the base Docker image using the new Airflow version.

We support only one Airflow version for all Airflow instances to have less maintenance overhead. In the case of minor or major versions, we support one old and new versions until the retirement period.

How do we deploy

There is a deployment ownership guideline that explains the schedule of deployments and the corresponding PICs. All teams have agreed on this guideline and share the responsibility with the Airflow Team.

There are two kinds of deployment:

  • DAG deployment: This is part of the common GitLab CI/CD template. The Airflow team doesn’t trigger the DAG deployment, it’s fully owned by the teams.
  • Airflow instance deployment: The Airflow instance deployment is required in these scenarios:
    1. update in base Docker image
    2. add/update in Python packages by any team
    3. customization in the base image by any team
    4. change in Airflow configurations
    5. change in the resource of scheduler, worker, web server or flower

Base Docker image update

The Airflow team maintains the base Docker image on the AWS Elastic Container Registry. The GitLab CI/CD builds the updated base image whenever the Airflow team changes the base image. The base image is validated by automated deployment on the test environment and automated smoke test. The Airflow instance owner of each team needs to trigger their build and deployment pipeline to apply the base image changes on their Airflow instance.

Python package additions or updates

Each team can add or update their Python dependencies. The Gitlab CI/CD pipeline builds a new image with updated changes. The Airflow instance owner manually triggers the deployment from their CI/CD pipeline. There is a flag to make it automated deployment as well.

Based image customization

Each team can add any customizations on the base image. Similar to the above scenario, the Gitlab CI/CD pipeline builds a new image with updated changes. The Airflow instance owner manually triggers the deployment from their CI/CD pipeline. To automate the deployment, a flag is made available.

Configuration Airflow and Airflow component resource changes

To optimize the Airflow instances, the Airflow Team makes changes to the Airflow configurations and resources of any of the Airflow components. The Airflow configurations and resources are also part of the Terraform code. Atlantis (https://www.runatlantis.io/) deploys the Airflow instances with Terraform changes.

There is no downtime in any form of deployment and doesn’t impact the running tasks and the Airflow UI.

Testing

During the process of making our first Airflow stable, we started exploring testing in Airflow. We wanted to validate the correctness of DAGs, duplicate DAG IDs, checking typos and cyclicity in DAGs, etc. We then later wrote the tests by ourselves and published a detailed blog in several channels: usejournal (part1) and medium (part2).

These tests are available in the base image and run in the GitLab pipeline from the user’s repository to validate their DAGs. The unit tests run using the common GitLab CI/CD template provided by the Airflow team.

Monitoring & alerting

Our scheduling platform runs the Airflow instance for many critical jobs scheduled by each team. It’s important for us to monitor all Airflow instances and alert respective stakeholders in case of any failure.

We use a Datadog for monitoring and alerting. To create a common Datadog dashboard, it is required to pass tags with metrics from Airflow and till Airflow 1.10.x, it doesn’t support tagging to Datadog metrics.

We have contributed to the community to enable Datadog support and it will be released in Airflow 2.0.0 (https://github.com/apache/Airflow/pull/7376). We internally patched this pull request and created the common Datadog dashboard.

There are three categories of metrics that we are interested in:

  • EKS cluster metrics: It includes total In-Service Nodes, allocated CPU cores, allocated Memory, Node status, CPU/Memory request vs limit, Node disk and Memory pressure, Rx-Tx packets dropped/errors, etc.
  • Host Metrics: These metrics are for each host participating in the EKS cluster. It includes Host CPU/Memory utilization, Host free memory, System disk, and EBS IOPS, etc.
  • Airflow instance metrics: These metrics are for each Airflow instance. It includes scheduler heartbeats, DagBag size, DAG processing import errors, DAG processing time, open/used slots in a pool, each pod’s Memory/CPU usage, CPU and Memory utilization of metadata DB, database connections as well as the number of workers, active/paused DAGs, successful/failed/queued/running tasks, etc.
Sample Datadog dashboard
Sample Datadog dashboard

We alert respective stakeholders and oncalls using Slack and PagerDuty.

Benefits

These are the benefits of having our own Scheduling Platform:

  • Scaling: HPA on Airflow workers running on EKS with autoscaler helps Airflow workers to scale automatically to theoretically infinite scale. This enables teams to run thousands of DAGs.
  • Logging: Centralized logging using Kibana.
  • Better Isolation: Separate Docker images for each team provide better isolation.
  • Better Customization: All teams are provided with a mechanism to customize their Airflow worker environment according to their requirements.
  • Zero Downtime: Rolling upgrade and termination period on Airflow workers helps in zero downtime during the deployment.
  • Efficient usage of infrastructure: Each team doesn’t need to allocate infrastructure for Airflow instances. All Airflow instances are deployed on one shared EKS cluster.
  • Less maintenance overhead for users:  Users can focus on their core work and don’t need to spend time maintaining Airflow instances and it’s resources.
  • Common plugins and helpers: All common plugins and helpers available to use on Airflow instances. Each team doesn’t need to add.

Conclusion

Designing and implementing our own scheduling platform started with many challenges and unknowns. We were not sure about the scale we were aiming for, the heterogeneous workload from each team, or the level of triviality or complexity we were going to be faced. After two years, we have successfully built and productionized a scalable scheduling platform that helps teams at Grab to schedule their workload.

We have many failure stories, odd things we ran into, hacks and workarounds we patched. But, we went through it and provided a cost-effective and scalable scheduling platform with low maintenance overhead to all teams at Grab.

What’s Next

Moving ahead, we will be exploring to add the following capabilities:

  • REST APIs to enable teams to access their Airflow instance programmatically and have better integration with other tools and frameworks.
  • Support of dynamic DAGs at scale to help in decreasing the DAG maintenance overhead.
  • Template-based engine to act as a middle layer between the scheduling platform and external systems. It will have a set of templates to generate DAGs which helps in better integration with the external system.

We suggest anyone who is running multiple Airflow instances within different teams to look at this approach and build the centralized scheduling platform. Before you begin,  review the feasibility of building the centralized platform as it requires a vision, a lot of effort, and cross-communication with many teams.


Authored by Chandulal Kavar on behalf of the Airflow team at Grab – Charles Martinot, Vinson Lee, Akash Sihag, Piyush Gupta, Pramiti Goel, Dewin Goh, QuiHieu Nguyen, James Anh-Tu Nguyen, and the Data Engineering Team.


Join us

Grab is more than just the leading ride-hailing and mobile payments platform in Southeast Asia. We use data and technology to improve everything from transportation to payments and financial services across a region of more than 620 million people. We aspire to unlock the true potential of Southeast Asia and look for like-minded individuals to join us on this ride.

If you share our vision of driving South East Asia forward, apply to join our team today.