Tag Archives: Platform

How Cloudflare is powering the next generation of platforms with Workers

Post Syndicated from Nathan Disidore original http://blog.cloudflare.com/powering-platforms-on-workers/

How Cloudflare is powering the next generation of platforms with Workers

How Cloudflare is powering the next generation of platforms with Workers

We launched Workers for Platforms, our Workers offering for SaaS businesses, almost exactly one year ago to the date! We’ve seen a wide array of customers using Workers for Platforms – from e-commerce to CMS, low-code/no-code platforms and also a new wave of AI businesses running tailored inference models for their end customers!

Let’s take a look back and recap why we built Workers for Platforms, show you some of the most interesting problems our customers have been solving and share new features that are now available!

What is Workers for Platforms?

SaaS businesses are all too familiar with the never ending need to keep up with their users' feature requests. Thinking back, the introduction of Workers at Cloudflare was to solve this very pain point. Workers gave our customers the power to program our network to meet their specific requirements!

Need to implement complex load balancing across many origins? Write a Worker. Want a custom set of WAF rules for each region your business operates in? Go crazy, write a Worker.

We heard the same themes coming up with our customers – which is why we partnered with early customers to build Workers for Platforms. We worked with the Shopify Oxygen team early on in their journey to create a built-in hosting platform for Hydrogen, their Remix-based eCommerce framework. Shopify’s Hydrogen/Oxygen combination gives their merchants the flexibility to build out personalized shopping for buyers. It’s an experience that storefront developers can make their own, and it’s powered by Cloudflare Workers behind the scenes. For more details, check out Shopify’s “How we Built Oxygen” blog post.

Oxygen is Shopify's built-in hosting platform for Hydrogen storefronts, designed to provide users with a seamless experience in deploying and managing their ecommerce sites. Our integration with Workers for Platforms has been instrumental to our success in providing fast, globally-available, and secure storefronts for our merchants. The flexibility of Cloudflare's platform has allowed us to build delightful merchant experiences that integrate effortlessly with the best that the Shopify ecosystem has to offer.
Lance Lafontaine, Senior Developer Shopify Oxygen

Another customer that we’ve been working very closely with is Grafbase. Grafbase started out on the Cloudflare for Startups program, building their company from the ground up on Workers. Grafbase gives their customers the ability to deploy serverless GraphQL backends instantly. On top of that, their developers can build custom GraphQL resolvers to program their own business logic right at the edge. Using Workers and Workers for Platforms means that Grafbase can focus their team on building Grafbase, rather than having to focus on building and architecting at the infrastructure layer.

Our mission at Grafbase is to enable developers to deploy globally fast GraphQL APIs without worrying about complex infrastructure. We provide a unified data layer at the edge that accelerates development by providing a single endpoint for all your data sources. We needed a way to deploy serverless GraphQL gateways for our customers with fast performance globally without cold starts. We experimented with container-based workloads and FaaS solutions, but turned our attention to WebAssembly (Wasm) in order to achieve our performance targets. We chose Rust to build the Grafbase platform for its performance, type system, and its Wasm tooling. Cloudflare Workers was a natural fit for us given our decision to go with Wasm. On top of using Workers to build our platform, we also wanted to give customers the control and flexibility to deploy their own logic. Workers for Platforms gave us the ability to deploy customer code written in JavaScript/TypeScript or Wasm straight to the edge.
Fredrik Björk, Founder & CEO at Grafbase

Over the past year, it’s been incredible seeing the velocity that building on Workers allows companies both big and small to move at.

New building blocks

Workers for Platforms uses Dynamic Dispatch to give our customers, like Shopify and Grafbase, the ability to run their own Worker before user code that’s written by Shopify and Grafbase’s developers is executed. With Dynamic Dispatch, Workers for Platforms customers (referred to as platform customers) can authenticate requests, add context to a request or run any custom code before their developer’s Workers (referred to as user Workers) are called.

This is a key building block for Workers for Platforms, but we’ve also heard requests for even more levels of visibility and control from our platform customers. Delivering on this theme, we’re releasing three new highly requested features:

Outbound Workers

Dynamic Dispatch gives platforms visibility into all incoming requests to their user’s Workers, but customers have also asked for visibility into all outgoing requests from their user’s Workers in order to do things like:

  • Log all subrequests in order to identify malicious hosts or usage patterns
  • Create allow or block lists for hostnames requested by user Workers
  • Configure authentication to your APIs behind the scenes (without end developers needing to set credentials)

Outbound Workers sit between user Workers and fetch() requests out to the Internet. User Workers will trigger a FetchEvent on the Outbound Worker and from there platform customers have full visibility over the request before it’s sent out.

How Cloudflare is powering the next generation of platforms with Workers

It’s also important to have context in the Outbound Worker to answer questions like “which user Worker is this request coming from?”. You can declare variables to pass through to the Outbound Worker in the dispatch namespaces binding:

[[dispatch_namespaces]]
binding = "dispatcher"
namespace = "<NAMESPACE_NAME>"
outbound = {service = "<SERVICE_NAME>", parameters = [customer_name,url]}

From there, the variables declared in the binding can be accessed in the Outbound Worker through env. <VAR_NAME>.

Custom Limits

Workers are really powerful, but, as a platform, you may want guardrails around their capabilities to shape your pricing and packaging model. For example, if you run a freemium model on your platform, you may want to set a lower CPU time limit for customers on your free tier.

Custom Limits let you set usage caps for CPU time and number of subrequests on your customer’s Workers. Custom limits are set from within your dynamic dispatch Worker allowing them to be dynamically scripted. They can also be combined to set limits based on script tags.

Here’s an example of a Dynamic Dispatch Worker that puts both Outbound Workers and Custom Limits together:

export default {
async fetch(request, env) {
  try {
    let workerName = new URL(request.url).host.split('.')[0];
    let userWorker = env.dispatcher.get(
      workerName,
      {},
      {// outbound arguments
       outbound: {
           customer_name: workerName,
           url: request.url},
        // set limits
       limits: {cpuMs: 10, subRequests: 5}
      }
    );
    return await userWorker.fetch(request);
  } catch (e) {
    if (e.message.startsWith('Worker not found')) {
      return new Response('', { status: 404 });
    }
    return new Response(e.message, { status: 500 });
  }
}
};

They’re both incredibly simple to configure, and the best part – the configuration is completely programmatic. You have the flexibility to build on both of these features with your own custom logic!

Tail Workers

Live logging is an essential piece of the developer experience. It allows developers to monitor for errors and troubleshoot in real time. On Workers, giving users real time logs though wrangler tail is a feature that developers love! Now with Tail Workers, platform customers can give their users the same level of visibility to provide a faster debugging experience.

Tail Worker logs contain metadata about the original trigger event (like the incoming URL and status code for fetches), console.log() messages and capture any unhandled exceptions. Tail Workers can be added to the Dynamic Dispatch Worker in order to capture logs from both the Dynamic Dispatch Worker and any User Workers that are called.

A Tail Worker can be configured by adding the following to the wrangler.toml file of the producing script

tail_consumers = [{service = "<TAIL_WORKER_NAME>", environment = "<ENVIRONMENT_NAME>"}]

From there, events are captured in the Tail Worker using a new tail handler:

export default {
  async tail(events) => {
    fetch("https://example.com/endpoint", {
      method: "POST",
      body: JSON.stringify(events),
    })
  }
}

Tail Workers are full-fledged Workers empowered by the usual Worker ecosystem. You can send events to any HTTP endpoint, like for example a logging service that parses the events and passes on real-time logs to customers.

Try it out!

All three of these features are now in open beta for users with access to Workers for Platforms. For more details and try them out for yourself, check out our developer documentation:

Workers for Platforms is an enterprise only product (for now) but we’ve heard a lot of interest from developers. In the later half of the year, we’ll be bringing Workers for Platforms down to our pay as you go plan! In the meantime, if you’re itching to get started, reach out to us through the Cloudflare Developer Discord (channel name: workers-for-platforms).

How we store and process millions of orders daily

Post Syndicated from Grab Tech original https://engineering.grab.com/how-we-store-millions-orders

Introduction

In the real world, after a passenger places a GrabFood order from the Grab App, the merchant-partner will prepare the order. A driver-partner will then collect the food and deliver it to the passenger. Have you ever wondered what happens in the backend system? The Grab Order Platform is a distributed system that processes millions of GrabFood or GrabMart orders every day. This post aims to share the journey of how we designed the database solution that powers the order platform.

Background

What are the design goals when building the database solution? We collected the requirements by analysing query patterns and traffic patterns.

Query patterns

Here are some important query examples that the Order Platform supports:

  1. Write queries:

    a. Create an order.

    b. Update an order.

  2. Read queries:

    a. Get order by id.

    b. Get ongoing orders by passenger id.

    c. Get historical orders by various conditions.

    d. Get order statistics (for example, get the number of orders)

We can break down queries into two categories: transactional queries and analytical queries. Transactional queries are critical to online order creation and completion, including the write queries and read queries such as 2a or 2b. Analytical queries like 2c and 2d retrieves historical orders or order statistics on demand. Analytical queries are not essential to the oncall order processing.

Traffic patterns

Grab’s Order Platform processes a significant amount of transaction data every month.

During peak hours, the write Queries per Second (QPS) is three times of primary key reads; whilst the range Queries per Second are four times of the primary key reads.

Design goals

From the query and traffic patterns, we arrived at the following three design goals:

  1. Stability – the database solution must be able to handle high read and write QPS. Online order processing queries must have high availability. Even when some part of the system is down, we must be able to provide a degraded experience to the end users allowing them to still be able to create and complete an order.
  2. Scalability and cost – the database solution must be able to support fast evolution of business requirements, given now we handle up to a million orders per month. The solution must also be cost effective at a large scale.
  3. Consistency – strong consistency for transactional queries, and eventually consistency for analytical queries.

Solution

The first design principle towards a stable and scalable database solution is to use different databases to serve transactional and analytical queries, also known as OLTP and OLAP queries. An OLTP database serves queries critical to online order processing. This table keeps data for only a short period of time. Meanwhile, an OLAP database has the same set of data, but serves our historical and statistical queries. This database keeps data for a longer time.

What are the benefits from this design principle? From a stability point of view, we can choose different databases which can better fulfil our different query patterns and QPS requirements. An OLTP database is the single source of truth for online order processing; any failure in the OLAP database will not affect online transactions. From a scalability and cost point of view, we can choose a flexible database for OLAP to support our fast evolution of business requirements. We can maintain less data in our OLTP database while keeping some older data in our OLAP database.

To ensure that the data in both databases are consistent, we introduced the second design principle – data ingestion pipeline. In Figure 1, Order Platform writes data to the OLTP database to process online orders and asynchronously pushes the data into the data ingestion pipeline. The data ingestion pipeline ensures that the OLAP database data is eventually consistent.

Figure 1: Order Platform database solution overview

Architecture details

OLTP database

There are two categories of OLTP queries, the key-value queries (for example, load by order id) and the batch queries (for example, Get ongoing orders by passenger id). We use DynamoDB as the database to support these OLTP queries.

Why DynamoDB?

  1. Scalable and highly available: the tables of DynamoDB are partitioned and each partition is three-way replicated.
  2. Support for strong consistent reads by primary key.
  3. DynamoDB has a mechanism called adaptive capacity to handle hotkey traffic. Internally, DynamoDB will distribute higher capacity to high-traffic partitions, and isolate frequently accessed items to a dedicated partition. This way, the hotkey can utilise the full capacity of an entire partition, which is up to 3000 read capacity units and 1000 write capacity units.
Figure 2: DynamoDB table structure overview. Source: Amazon Web Services (2019, 28 April)

In each DynamoDB table, it has many items with attributes. In each item, it has a partition key and sort key. The partition key is used for key-value queries, and the sort key is used for range queries. In our case, the table contains multiple order items. The partition key is order ID. We can easily support key-value queries by the partition key.

order_id (PK) state pax_id created_at pax_id_gsi
order1 Ongoing Alice 9:00am
order2 Ongoing Alice 9:30am
order3 Completed Alice 8:30am

Batch queries like ‘Get ongoing orders by passenger id’ are supported by DynamoDB Global Secondary Index (GSI). A GSI is like a normal DynamoDB table, which also has keys and attributes.

In our case, we have a GSI table where the partition key is the pax_id_gsi. The attribute pax_id_gsi is linked to the main table. It is eventually consistent with the main table that is maintained by DynamoDB. If the Order Platform queries ongoing orders for Alice, two items will be returned from the GSI table.

pax_id_gsi (PK) created_at (SK) order_id
Alice 9:00am order1
Alice 9:30am order2

We also make use of an advanced feature of GSI named sparse index to support ongoing order queries. When we update order status from ongoing to completed, at the same time, we set the pax_id_gsi to empty, so that the linked item in the GSI will be automatically deleted by DynamoDB. At any time, the GSI table only stores the ongoing orders. We use a sparse index mechanism to control our table size for better performance and to be more cost effective.

The next problem is data retention. This is achieved with the DynamoDB Time To Live (TTL) feature. DynamoDB will auto-scan expired items and delete them. But the challenge is when we add TTL to big tables, it will bring a heavy load to the background scanner and might result in an outage. Our solution is to only add a TTL attribute to the new items in the table. Then, we manually delete the items without TTL attributes, and run a script to delete items with TTL attributes that are too old. After this process, the table size will be quite small, so we can enable the TTL feature on the TTL attribute that we previously added without any concern. The retention period of our DynamoDB data is three months.

Costwise, DynamoDB is charged by storage size and the provision of the read write capability. The provision capability is actually auto scalable. The cost is on-demand. So it’s generally cheaper than RDS.

OLAP database

We use MySQL RDS as the database to support historical and statistical OLAP queries.

Why not Aurora? We choose RDS mainly because it is a mature database solution. Even if Aurora can provide better high-availability, RDS is enough to support our less critical use cases. Costwise, Aurora charges by data storage and the number of requested Input/Output Operations per Second (IOPS). RDS charges only by data storage. As we are using General Purpose (SSD) storage, IOPS is free and supports up to 16k IOPS.

We use MySQL partitioning for data retention. The order table is partitioned by creation time monthly. Since the data access pattern is mostly by month, the partition key can reduce cross-partition queries. Partitions older than six months are dropped at the beginning of each month.

Data ingestion pipeline

Figure 3: Data Ingestion Pipeline Architecture.

A Kafka stream is used to process data in the data ingestion pipeline. We choose the Kafka stream, because it has 99.95% SLA. It is not restricted by the OLTP and OLAP database types.

Even if Kafka can provide 99.95% SLA, there is still the chance of stream producer failures. When the producer fails, we will store the message in an Amazon Simple Queue Service (SQS) and retry. If the retry also fails, it will be moved to the SQS dead letter queue (DLQ), to be consumed at a later time.

On the stream consumer side, we use back-off retry at both stream and database levels to ensure consistency. In a worst-case scenario, we can rewind the stream events from Kafka.

It is important for the data ingestion pipeline to handle duplicate messages and out-of-order messages.

Duplicate messages are handled by the database level unique key (for example, order ID + creation time).

For the out-of-order messages, we implemented the following two mechanisms:

  1. Version update: we only update the most recently updated data. The precision of the update time is in microseconds, which is enough for most of the use cases.
  2. Upsert: if the update events occur before the create events, we simulate an upsert operation.

Impact

After launching our solution this year, we have saved significantly on cloud costs. In the earlier solution, Order Platform synchronously writes to DynamoDB and Aurora and the data is kept forever.

Conclusion

In terms of stability, we use DynamoDB as the critical OLTP database to ensure high availability for online order processing. Scalability wise, we use RDS as the OLAP database to support our quickly evolving business requirements by using a rich, multiple index. Cost efficiency is achieved by data retention in both databases. For consistency, we built a single source of truth OLTP database and an OLAP database that is eventually consistent with the help of the data ingestion pipeline.

What’s next?

Currently, the database solution is running on the production environment. Even though the database solution is proven to be stable, scalable and consistent, we still see some potential areas of improvement.

We use MySQL RDS for OLAP data storage. Even though MySQL is stable and cost effective, it is difficult to serve more complicated queries like free text search. Hence, we plan to explore other NoSQL databases like ElasticSearch.

We hope this post helps you understand how we store Grab orders and fulfil the queries from the Grab Order Platform.

References

Amazon Web Services. (2019, 28 April) Build with DynamoDB: S1 E1 – Intro to Amazon DynamoDB [Video]. YouTube.

Join us

Grab is the leading superapp platform in Southeast Asia, providing everyday services that matter to consumers. More than just a ride-hailing and food delivery app, Grab offers a wide range of on-demand services in the region, including mobility, food, package and grocery delivery services, mobile payments, and financial services across 428 cities in eight countries.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Optimally scaling Kafka consumer applications

Post Syndicated from Grab Tech original https://engineering.grab.com/optimally-scaling-kafka-consumer-applications

Earlier this year, we took you on a journey on how we built and deployed our event sourcing and stream processing framework at Grab. We’re happy to share that we’re able to reliably maintain our uptime and continue to service close to 400 billion events a week. We haven’t stopped there though. To ensure that we can scale our framework as the Grab business continuously grows, we have spent efforts optimizing our infrastructure.

In this article, we will dive deeper into our Kubernetes infrastructure setup for our stream processing framework. We will cover why and how we focus on optimal scalability and availability of our infrastructure.

Quick Architecture Recap

Coban Platform Architecture

The Coban platform provides lightweight Golang plugin architecture-based data processing pipelines running in Kubernetes. These are essentially Kafka consumer pods that consume data, process it, and then materialize the results into various sinks (RDMS, other Kafka topics).

Anatomy of a Processing Pod

Anatomy of a Processing Pod

Each stream processing pod (the smallest unit of a pipeline’s deployment) has three top level components:

  • Trigger: An interface that connects directly to the source of the data and converts it into an event channel.
  • Runtime: This is the app’s entry point and the orchestrator of the pod. It manages the worker pools, triggers, event channels, and lifecycle events.
  • Pipeline plugin: This is provided by the user, and conforms to a contract that the platform team publishes. It contains the domain logic for the pipeline and houses the pipeline orchestration defined by a user based on our Stream Processing Framework.

Optimal Scaling

We initially architected our Kubernetes setup around horizontal pod autoscaling (HPA), which scales the number of pods per deployment based on CPU and memory usage. HPA keeps CPU and memory per pod specified in the deployment manifest and scales horizontally as the load changes.

These were the areas of application wastage we observed on our platform:

  • As Grab’s traffic is uneven, we’d always have to provision for peak traffic. As users would not (or could not) always account for ramps, they would be fairly liberal with setting limit values (CPU and memory), leading to resource wastage.
  • Pods often had uneven traffic distribution despite fairly even partition load distribution in Kafka. The Stream Processing Framework(SPF) is essentially Kafka consumers consuming from Kafka topics, hence the number of pods scaling in and out resulted in unequal partition load per pod.

Vertically Scaling with Fixed Number of Pods

We initially kept the number of pods for a pipeline equal to the number of partitions in the topic the pipeline consumes from. This ensured even distribution of partitions to each pod providing balanced consumption. In order to abstract this from the end user, we automated the application deployment process to directly call the Kafka API to fetch the number of partitions during runtime.

After achieving a fixed number of pods for the pipeline, we wanted to move away from HPA. We wanted our pods to scale up and down as the load increases or decreases without any manual intervention. Vertical pod autoscaling (VPA) solves this problem as it relieves us from any manual operation for setting up resources for our deployment.

We just deploy the application and let VPA handle the resources required for its operation. It’s known to not be very susceptible to quick load changes as it trains its model to monitor the deployment’s load trend over a period of time before recommending an optimal resource. This process ensures the optimal resource allocation for our pipelines considering the historic trends on throughput.

We saw a ~45% reduction in our total resource usage vs resource requested after moving to VPA with a fixed number of pods from HPA.

Anatomy of a Processing Pod

Managing Availability

We broadly classify our workloads as latency sensitive (critical) and latency tolerant (non-critical). As a result, we could optimize scheduling and cost efficiency using priority classes and overprovisioning on heterogeneous node types on AWS.

Kubernetes Priority Classes

The main cost of running EKS in AWS is attributed to the EC2 machines that form the worker nodes for the Kubernetes cluster. Running On-Demand brings all the guarantees of instance availability but it is definitely very expensive. Hence, our first action to drive cost optimisation was to include Spot instances in our worker node group.

With the uncertainty of losing a spot instance, we started assigning priority to our various applications. We then let the user choose the priority of their pipeline depending on their use case. Different priorities would result in different node affinity to different kinds of instance groups (On-Demand/Spot). For example, Critical pipelines (latency sensitive) run on On-Demand worker node groups and Non-critical pipelines (latency tolerant) on Spot instance worker node groups.

We use priority class as a method of preemption, as well as a node affinity that chooses a certain priority pipeline for the node group to deploy to.

Overprovisioning

With spot instances running we realised a need to make our cluster quickly respond to failures. We wanted to achieve quick rescheduling of evicted pods, hence we added overprovisioning to our cluster. This means we keep some noop pods occupying free space running in our worker node groups for the quick scheduling of evicted or deploying pods.

The overprovisioned pods are the lowest priority pods, thus can be preempted by any pod waiting in the queue for scheduling. We used cluster proportional autoscaler to decide the right number of these overprovisioned pods, which scales up and down proportionally to cluster size (i.e number of nodes and CPU in worker node group). This relieves us from tuning the number of these noop pods as the cluster scales up or down over the period keeping the free space proportional to current cluster capacity.

Lastly, overprovisioning also helped improve the deployment time because there is no  dependency on the time required for Auto Scaling Groups (ASG) to add a new node to the cluster every time we want to deploy a new application.

Future Improvements

Evolution is an ongoing process. In the next few months, we plan to work on custom resources for combining VPA and fixed deployment size. Our current architecture setup works fine for now, but we would like to create a more tuneable in-house CRD(Custom Resource Definition) for VPA that incorporates rightsizing our Kubernetes deployment horizontally.


Authored By Shubham Badkur on behalf of the Coban team at Grab – Ryan Ooi, Karan Kamath, Hui Yang, Yuguang Xiao, Jump Char, Jason Cusick, Shrinand Thakkar, Dean Barlan, Shivam Dixit, Andy Nguyen, and Ravi Tandon.


Join us

Grab is more than just the leading ride-hailing and mobile payments platform in Southeast Asia. We use data and technology to improve everything from transportation to payments and financial services across a region of more than 620 million people. We aspire to unlock the true potential of Southeast Asia and look for like-minded individuals to join us on this ride.

If you share our vision of driving South East Asia forward, apply to join our team today.