Tag Archives: Backend

One small step closer to containerising service binaries

Post Syndicated from Grab Tech original https://engineering.grab.com/reducing-your-go-binary-size

Grab’s engineering teams currently own and manage more than 250+ microservices. Depending on the business problems that each team tackles, our development ecosystem ranges from Golang, Java, and everything in between.

Although there are centralised systems that help automate most of the build and deployment tasks, there are still some teams working on different technologies that manage their own build, test and deployment systems at different maturity levels. Managing a varied build and deploy ecosystems brings their own challenges.

Build challenges

  • Broken external dependencies.
  • Non-reproducible builds due to changes in AMI, configuration keys and other build parameters.
  • Missing security permissions between different repositories.

Deployment challenges

  • Varied deployment environments necessitating a bigger learning curve.
  • Managing the underlying infrastructure as code.
  • Higher downtime when bringing the systems up after a scale down event.

Grab’s appetite for customer obsession and quality drives the engineering teams to innovate and deliver value rapidly. The time that the team spends in fixing build issues or deployment-related tasks has a direct impact on the time they spend on delivering business value.

Introduction to containerisation

Using the Container architecture helps the team deploy and run multiple applications, isolated from each other, on the same virtual machine or server and with much less overhead.

At Grab, both the platform and the core engineering teams wanted to move to the containerisation architecture to achieve the following goals:

  • Support to build and push container images during the CI process.
  • Create a standard virtual machine image capable of running container workloads. The AMI is maintained by a central team and comes with Grab infrastructure components such as (DataDog, Filebeat, Vault, etc.).
  • A deployment experience which allows existing services to migrate to container workload safely by initially running both types of workloads concurrently.

The core engineering teams wanted to adopt container workloads to achieve the following benefits:

  • Provide a containerised version of the service that can be run locally and on different cloud providers without any dependency on Grab’s internal (runtime) tooling.
  • Allow reuse of common Grab tools in different projects by running the zero dependency version of the tools on demand whenever needed.
  • Allow a more flexible staging/dev/shadow deployment of new features.

Adoption of containerisation

Engineering teams at Grab use the containerisation model to build and deploy services at scale. Our containerisation effort helps the development teams move faster by:

  • Providing a consistent environment across development, testing and production
  • Deploying software efficiently
  • Reducing infrastructure cost
  • Abstracting OS dependency
  • Increasing scalability between cloud vendors

When we started using containers we realised that building smaller containers had some benefits over bigger containers. For example, smaller containers:

  • Include only the needed libraries and therefore are more secure.
  • Build and deploy faster as they can be pulled to the running container cluster quickly.
  • Utilise disk space and memory efficiently.

During the course of containerising our applications, we noted that some service binaries appeared to be bigger (~110 MB) than they should be. For a statically-linked Golang binary, that’s pretty big! So how do we figure out what’s bloating the size of our binary?

Go binary size visualisation tool

In the course of poking around for tools that would help us analyse the symbols in a Golang binary, we found go-binsize-viz based on this article. We particularly liked this tool, because it utilises the existing Golang toolchain (specifically, Go tool nm) to analyse imports, and provides a straightforward mechanism for traversing through the symbols present via treemap. We will briefly outline the steps that we did to analyse a Golang binary here.

  1. First, build your service using the following command (important for consistency between builds):

    $ go build -a -o service_name ./path/to/main.go
    
  2. Next, copy the binary over to the cloned directory of go-binsize-viz repository.
  3. Run the following script that covers the steps in the go-binsize-viz README.

    #!/usr/bin/env bash
    #
    # This script needs more input parsing, but it serves the needs for now.
    #
    mkdir dist
    # step 1
    go tool nm -size $1 | c++filt > dist/$1.symtab
    # step 2
    python3 tab2pydic.py dist/$1.symtab > dist/$1-map.py
    # step 3
    # must be data.js
    python3 simplify.py dist/$1-map.py > dist/$1-data.js
    rm data.js
    ln -s dist/$1-data.js data.js
    

    Running this script creates a dist folder where each intermediate step is deposited, and a data.js symlink in the top-level directory which points to the consumable .js file by treemap.html.

    # top-level directory
    $ ll
    -rw-r--r –   1 stan.halka  staff   1.1K Aug 20 09:57 README.md
    -rw-r--r –   1 stan.halka  staff   6.7K Aug 20 09:57 app3.js
    -rw-r--r –   1 stan.halka  staff   1.6K Aug 20 09:57 cockroach_sizes.html
    lrwxr-xr-x   1 stan.halka  staff        65B Aug 25 16:49 data.js -> dist/v2.0.709356.segments-paxgroups-macos-master-go1.13-data.js
    drwxr-xr-x   8 stan.halka  staff   256B Aug 25 16:49 dist
    ...
    # dist folder
    $ ll dist
    total 71728
    drwxr-xr-x   8 stan.halka  staff   256B Aug 25 16:49 .
    drwxr-xr-x  21 stan.halka  staff   672B Aug 25 16:49 ..
    -rw-r--r –   1 stan.halka  staff   4.2M Aug 25 16:37 v2.0.709356.segments-paxgroups-macos-master-go1.13-data.js
    -rw-r--r –   1 stan.halka  staff   3.4M Aug 25 16:37 v2.0.709356.segments-paxgroups-macos-master-go1.13-map.py
    -rw-r--r –   1 stan.halka  staff    11M Aug 25 16:37 v2.0.709356.segments-paxgroups-macos-master-go1.13.symtab
    

    As you can probably tell from the file names, these steps were explored on the segments-paxgroups service, which is a microservice used for segment information at Grab. You can ignore the versioning metadata, branch name, and Golang information embedded in the name.

  4. Finally, run a local python3 server to visualise the binary components.

    $ python3 -m http.server
    Serving HTTP on 0.0.0.0 port 8000 (http://0.0.0.0:8000/) ...
    

    So now that we have a methodology to consistently generate a service binary, and a way to explore the symbols present, let’s dive in!

  5. Open your browser and visit http://localhost:8000/treemap_v3.html:

    Of the 103MB binary produced, 81MB are recognisable, with 66MB recognised as Golang (UNKNOWN is present, and also during parsing there were a fair number of warnings. Note that we haven’t spent enough time with the tool to understand why we aren’t able to recognise and index all the symbols present).

    Treemap

    The next step is to figure out where the symbols are coming from. There’s a bunch of Grab-internal stuff that for the sake of this blog isn’t necessary to go into, and it was reasonably easy to come to the right answer based on the intuitiveness of the go-binsize-viz tool.

    This visualisation shows us the source of how 11 MB of symbols are sneaking into the segments-paxgroups binary.

    Visualisation

    Every message format for any service that reads from, or writes to, streams at Grab is included in every service binary! Not cloud native!

How did this happen?

The short answer is that Golang doesn’t import only the symbols that it requires, but rather all the symbols defined within an imported directory and transitive symbols as well. So, when we think we’re importing just one directory, if our code structure doesn’t follow principles of encapsulation or isolation, we end up importing 11 MB of symbols that we don’t need! In our case, this occurred because a generic Message interface was included in the same directory with all the auto-generated code you see in the pretty picture above.

The Streams team did an awesome job of restructuring the code, which when built again, led to this outcome:

$$ ll | grep paxgroups
-rwxr-xr-x   1 stan.halka  staff   110M Aug 21 14:53 v2.0.709356.segments-paxgroups-macos-master-go1.12
-rwxr-xr-x   1 stan.halka  staff   103M Aug 25 16:34 v2.0.709356.segments-paxgroups-macos-master-go1.13
-rwxr-xr-x   1 stan.halka  staff        80M Aug 21 14:53 v2.0.709356.segments-paxgroups-macos-tinkered-go1.12
-rwxr-xr-x   1 stan.halka  staff        78M Aug 25 16:34 v2.0.709356.segments-paxgroups-macos-tinkered-go1.13

Not a bad reduction in service binary size!

Lessons learnt

The go-binsize-viz utility offers a treemap representation for imported symbols, and is very useful in determining what symbols are contributing to the overall size.

Code architecture matters: Keep binaries as small as possible!

To reduce your binary size, follow these best practices:

  • Structure your code so that the interfaces and common classes/utilities are imported from different locations than auto-generated classes.
  • Avoid huge, flat directory structures.
  • If it’s a platform offering and has too many interwoven dependencies, try to decouple the actual platform offering from the company specific instantiations. This fosters creating isolated, minimalistic code.

Join us

Grab is more than just the leading ride-hailing and mobile payments platform in Southeast Asia. We use data and technology to improve everything from transportation to payments and financial services across a region of more than 620 million people. We aspire to unlock the true potential of Southeast Asia and look for like-minded individuals to join us on this ride.

If you share our vision of driving South East Asia forward, apply to join our team today.

Optimally scaling Kafka consumer applications

Post Syndicated from Grab Tech original https://engineering.grab.com/optimally-scaling-kafka-consumer-applications

Earlier this year, we took you on a journey on how we built and deployed our event sourcing and stream processing framework at Grab. We’re happy to share that we’re able to reliably maintain our uptime and continue to service close to 400 billion events a week. We haven’t stopped there though. To ensure that we can scale our framework as the Grab business continuously grows, we have spent efforts optimizing our infrastructure.

In this article, we will dive deeper into our Kubernetes infrastructure setup for our stream processing framework. We will cover why and how we focus on optimal scalability and availability of our infrastructure.

Quick Architecture Recap

Coban Platform Architecture

The Coban platform provides lightweight Golang plugin architecture-based data processing pipelines running in Kubernetes. These are essentially Kafka consumer pods that consume data, process it, and then materialize the results into various sinks (RDMS, other Kafka topics).

Anatomy of a Processing Pod

Anatomy of a Processing Pod

Each stream processing pod (the smallest unit of a pipeline’s deployment) has three top level components:

  • Trigger: An interface that connects directly to the source of the data and converts it into an event channel.
  • Runtime: This is the app’s entry point and the orchestrator of the pod. It manages the worker pools, triggers, event channels, and lifecycle events.
  • Pipeline plugin: This is provided by the user, and conforms to a contract that the platform team publishes. It contains the domain logic for the pipeline and houses the pipeline orchestration defined by a user based on our Stream Processing Framework.

Optimal Scaling

We initially architected our Kubernetes setup around horizontal pod autoscaling (HPA), which scales the number of pods per deployment based on CPU and memory usage. HPA keeps CPU and memory per pod specified in the deployment manifest and scales horizontally as the load changes.

These were the areas of application wastage we observed on our platform:

  • As Grab’s traffic is uneven, we’d always have to provision for peak traffic. As users would not (or could not) always account for ramps, they would be fairly liberal with setting limit values (CPU and memory), leading to resource wastage.
  • Pods often had uneven traffic distribution despite fairly even partition load distribution in Kafka. The Stream Processing Framework(SPF) is essentially Kafka consumers consuming from Kafka topics, hence the number of pods scaling in and out resulted in unequal partition load per pod.

Vertically Scaling with Fixed Number of Pods

We initially kept the number of pods for a pipeline equal to the number of partitions in the topic the pipeline consumes from. This ensured even distribution of partitions to each pod providing balanced consumption. In order to abstract this from the end user, we automated the application deployment process to directly call the Kafka API to fetch the number of partitions during runtime.

After achieving a fixed number of pods for the pipeline, we wanted to move away from HPA. We wanted our pods to scale up and down as the load increases or decreases without any manual intervention. Vertical pod autoscaling (VPA) solves this problem as it relieves us from any manual operation for setting up resources for our deployment.

We just deploy the application and let VPA handle the resources required for its operation. It’s known to not be very susceptible to quick load changes as it trains its model to monitor the deployment’s load trend over a period of time before recommending an optimal resource. This process ensures the optimal resource allocation for our pipelines considering the historic trends on throughput.

We saw a ~45% reduction in our total resource usage vs resource requested after moving to VPA with a fixed number of pods from HPA.

Anatomy of a Processing Pod

Managing Availability

We broadly classify our workloads as latency sensitive (critical) and latency tolerant (non-critical). As a result, we could optimize scheduling and cost efficiency using priority classes and overprovisioning on heterogeneous node types on AWS.

Kubernetes Priority Classes

The main cost of running EKS in AWS is attributed to the EC2 machines that form the worker nodes for the Kubernetes cluster. Running On-Demand brings all the guarantees of instance availability but it is definitely very expensive. Hence, our first action to drive cost optimisation was to include Spot instances in our worker node group.

With the uncertainty of losing a spot instance, we started assigning priority to our various applications. We then let the user choose the priority of their pipeline depending on their use case. Different priorities would result in different node affinity to different kinds of instance groups (On-Demand/Spot). For example, Critical pipelines (latency sensitive) run on On-Demand worker node groups and Non-critical pipelines (latency tolerant) on Spot instance worker node groups.

We use priority class as a method of preemption, as well as a node affinity that chooses a certain priority pipeline for the node group to deploy to.

Overprovisioning

With spot instances running we realised a need to make our cluster quickly respond to failures. We wanted to achieve quick rescheduling of evicted pods, hence we added overprovisioning to our cluster. This means we keep some noop pods occupying free space running in our worker node groups for the quick scheduling of evicted or deploying pods.

The overprovisioned pods are the lowest priority pods, thus can be preempted by any pod waiting in the queue for scheduling. We used cluster proportional autoscaler to decide the right number of these overprovisioned pods, which scales up and down proportionally to cluster size (i.e number of nodes and CPU in worker node group). This relieves us from tuning the number of these noop pods as the cluster scales up or down over the period keeping the free space proportional to current cluster capacity.

Lastly, overprovisioning also helped improve the deployment time because there is no  dependency on the time required for Auto Scaling Groups (ASG) to add a new node to the cluster every time we want to deploy a new application.

Future Improvements

Evolution is an ongoing process. In the next few months, we plan to work on custom resources for combining VPA and fixed deployment size. Our current architecture setup works fine for now, but we would like to create a more tuneable in-house CRD(Custom Resource Definition) for VPA that incorporates rightsizing our Kubernetes deployment horizontally.


Authored By Shubham Badkur on behalf of the Coban team at Grab – Ryan Ooi, Karan Kamath, Hui Yang, Yuguang Xiao, Jump Char, Jason Cusick, Shrinand Thakkar, Dean Barlan, Shivam Dixit, Andy Nguyen, and Ravi Tandon.


Join us

Grab is more than just the leading ride-hailing and mobile payments platform in Southeast Asia. We use data and technology to improve everything from transportation to payments and financial services across a region of more than 620 million people. We aspire to unlock the true potential of Southeast Asia and look for like-minded individuals to join us on this ride.

If you share our vision of driving South East Asia forward, apply to join our team today.

Seamlessly Swapping the API backend of the Netflix Android app

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/seamlessly-swapping-the-api-backend-of-the-netflix-android-app-3d4317155187

How we migrated our Android endpoints out of a monolith into a new microservice

by Rohan Dhruva, Ed Ballot

As Android developers, we usually have the luxury of treating our backends as magic boxes running in the cloud, faithfully returning us JSON. At Netflix, we have adopted the Backend for Frontend (BFF) pattern: instead of having one general purpose “backend API”, we have one backend per client (Android/iOS/TV/web). On the Android team, while most of our time is spent working on the app, we are also responsible for maintaining this backend that our app communicates with, and its orchestration code.

Recently, we completed a year-long project rearchitecting and decoupling our backend from the centralized model used previously. We did this migration without slowing down the usual cadence of our releases, and with particular care to avoid any negative effects to the user experience. We went from an essentially serverless model in a monolithic service, to deploying and maintaining a new microservice that hosted our app backend endpoints. This allowed Android engineers to have much more control and observability over how we get our data. Over the course of this post, we will talk about our approach to this migration, the strategies that we employed, and the tools we built to support this.

Background

The Netflix Android app uses the falcor data model and query protocol. This allows the app to query a list of “paths” in each HTTP request, and get specially formatted JSON (jsonGraph) that we use to cache the data and hydrate the UI. As mentioned earlier, each client team owns their respective endpoints: which effectively means that we’re writing the resolvers for each of the paths that are in a query.

Screenshot from the Netflix Android app

As an example, to render the screen shown here, the app sends a query that looks like this:

paths: ["videos", 80154610, "detail"]

A path starts from a root object, and is followed by a sequence of keys that we want to retrieve the data for. In the snippet above, we’re accessing the detail key for the video object with id 80154610.

For that query, the response is:

Response for the query [“videos”, 80154610, “detail”]

In the Monolith

In the example you see above, the data that the app needs is served by different backend microservices. For example, the artwork service is separate from the video metadata service, but we need the data from both in the detail key.

We do this orchestration on our endpoint code using a library provided by our API team, which exposes an RxJava API to handle the downstream calls to the various backend microservices. Our endpoint route handlers are effectively fetching the data using this API, usually across multiple different calls, and massaging it into data models that the UI expects. These handlers we wrote were deployed into a service run by the API team, shown in the diagram below.

Diagram of Netflix API monolith
Image taken from a previously published blog post

As you can see, our code was just a part (#2 in the diagram) of this monolithic service. In addition to hosting our route handlers, this service also handled the business logic necessary to make the downstream calls in a fault tolerant manner. While this gave client teams a very convenient “serverless” model, over time we ran into multiple operational and devex challenges with this service. You can read more about this in our previous posts here: part 1, part 2.

The Microservice

It was clear that we needed to isolate the endpoint code (owned by each client team), from the complex logic of fault tolerant downstream calls. Essentially, we wanted to break out the client-specific code from this monolith into its own service. We tried a few iterations of what this new service should look like, and eventually settled on a modern architecture that aimed to give more control of the API experience to the client teams. It was a Node.js service with a composable JavaScript API that made downstream microservice calls, replacing the old Java API.

Java…Script?

As Android developers, we’ve come to rely on the safety of a strongly typed language like Kotlin, maybe with a side of Java. Since this new microservice uses Node.js, we had to write our endpoints in JavaScript, a language that many people on our team were not familiar with. The context around why the Node.js ecosystem was chosen for this new service deserves an article in and of itself. For us, it means that we now need to have ~15 MDN tabs open when writing routes 🙂

Let’s briefly discuss the architecture of this microservice. It looks like a very typical backend service in the Node.js world: a combination of Restify, a stack of HTTP middleware, and the Falcor-based API. We’ll gloss over the details of this stack: the general idea is that we’re still writing resolvers for paths like [videos, <id>, detail], but we’re now writing them in JavaScript.

The big difference from the monolith, though, is that this is now a standalone service deployed as a separate “application” (service) in our cloud infrastructure. More importantly, we’re no longer just getting and returning requests from the context of an endpoint script running in a service: we’re now getting a chance to handle the HTTP request in its entirety. Starting from “terminating” the request from our public gateway, we then make downstream calls to the api application (using the previously mentioned JS API), and build up various parts of the response. Finally, we return the required JSON response from our service.

The Migration

Before we look at what this change meant for us, we want to talk about how we did it. Our app had ~170 query paths (think: route handlers), so we had to figure out an iterative approach to this migration. Let’s take a look at what we built in the app to support this migration. Going back to the screenshot above, if you scroll a bit further down on that page, you will see the section titled “more like this”:

Screenshot from the Netflix app showing “more like this”

As you can imagine, this does not belong in the video details data for this title. Instead, it is part of a different path: [videos, <id>, similars]. The general idea here is that each UI screen (Activity/Fragment) needs data from multiple query paths to render the UI.

To prepare ourselves for a big change in the tech stack of our endpoint, we decided to track metrics around the time taken to respond to queries. After some consultation with our backend teams, we determined the most effective way to group these metrics were by UI screen. Our app uses a version of the repository pattern, where each screen can fetch data using a list of query paths. These paths, along with some other configuration, builds a Task. These Tasks already carry a uiLabel that uniquely identifies each screen: this label became our starting point, which we passed in a header to our endpoint. We then used this to log the time taken to respond to each query, grouped by the uiLabel. This meant that we could track any possible regressions to user experience by screen, which corresponds to how users navigate through the app. We will talk more about how we used these metrics in the sections to follow.

Fast forward a year: the 170 number we started with slowly but surely whittled down to 0, and we had all our “routes” (query paths) migrated to the new microservice. So, how did it go…?

The Good

Today, a big part of this migration is done: most of our app gets its data from this new microservice, and hopefully our users never noticed. As with any migration of this scale, we hit a few bumps along the way: but first, let’s look at good parts.

Migration Testing Infrastructure

Our monolith had been around for many years and hadn’t been created with functional and unit testing in mind, so those were independently bolted on by each UI team. For the migration, testing was a first-class citizen. While there was no technical reason stopping us from adding full automation coverage earlier, it was just much easier to add this while migrating each query path.

For each route we migrated, we wanted to make sure we were not introducing any regressions: either in the form of missing (or worse, wrong) data, or by increasing the latency of each endpoint. If we pare down the problem to absolute basics, we essentially have two services returning JSON. We want to make sure that for a given set of paths as input, the returned JSON is always exactly the same. With lots of guidance from other platform and backend teams, we took a 3-pronged approach to ensure correctness for each route migrated.

Functional Testing
Functional testing was the most straightforward of them all: a set of tests alongside each path exercised it against the old and new endpoints. We then used the excellent Jest testing framework with a set of custom matchers that sanitized a few things like timestamps and uuids. It gave us really high confidence during development, and helped us cover all the code paths that we had to migrate. The test suite automated a few things like setting up a test user, and matching the query parameters/headers sent by a real device: but that’s as far as it goes. The scope of functional testing was limited to the already setup test scenarios, but we would never be able to replicate the variety of device, language and locale combinations used by millions of our users across the globe.

Replay Testing
Enter replay testing. This was a custom built, 3-step pipeline:

  • Capture the production traffic for the desired path(s)
  • Replay the traffic against the two services in the TEST environment
  • Compare and assert for differences

It was a self-contained flow that, by design, captured entire requests, and not just the one path we requested. This test was the closest to production: it replayed real requests sent by the device, thus exercising the part of our service that fetches responses from the old endpoint and stitches them together with data from the new endpoint. The thoroughness and flexibility of this replay pipeline is best described in its own post. For us, the replay test tooling gave the confidence that our new code was nearly bug free.

Canaries
Canaries were the last step involved in “vetting” our new route handler implementation. In this step, a pipeline picks our candidate change, deploys the service, makes it publicly discoverable, and redirects a small percentage of production traffic to this new service. You can find a lot more details about how this works in the Spinnaker canaries documentation.

This is where our previously mentioned uiLabel metrics become relevant: for the duration of the canary, Kayenta was configured to capture and compare these metrics for all requests (in addition to the system level metrics already being tracked, like server CPU and memory). At the end of the canary period, we got a report that aggregated and compared the percentiles of each request made by a particular UI screen. Looking at our high traffic UI screens (like the homepage) allowed us to identify any regressions caused by the endpoint before we enabled it for all our users. Here’s one such report to get an idea of what it looks like:

Graph showing a 4–5% regression in the homepage latency.

Each identified regression (like this one) was subject to a lot of analysis: chasing down a few of these led to previously unidentified performance gains! Being able to canary a new route let us verify latency and error rates were within acceptable limits. This type of tooling required time and effort to create, but in the end, the feedback it provided was well worth the cost.

Observability

Many Android engineers will be familiar with systrace or one of the excellent profilers in Android Studio. Imagine getting a similar tracing for your endpoint code, traversing along many different microservices: that is effectively what distributed tracing provides. Our microservice and router were already integrated into the Netflix request tracing infrastructure. We used Zipkin to consume the traces, which allowed us to search for a trace by path. Here’s what a typical trace looks like:

Zipkin trace for a call
A typical zipkin trace (truncated)

Request tracing has been critical to the success of Netflix infrastructure, but when we operated in the monolith, we did not have the ability to get this detailed look into how our app interacted with the various microservices. To demonstrate how this helped us, let us zoom into this part of the picture:

Serialized calls to this service adds a few ms latency

It’s pretty clear here that the calls are being serialized: however, at this point we’re already ~10 hops disconnected from our microservice. It’s hard to conclude this, and uncover such problems, from looking at raw numbers: either on our service or the testservice above, and even harder to attribute them back to the exact UI platform or screen. With the rich end-to-end tracing instrumented in the Netflix microservice ecosystem and made easily accessible via Zipkin, we were able to pretty quickly triage this problem to the responsible team.

End-to-end Ownership

As we mentioned earlier, our new service now had the “ownership” for the lifetime of the request. Where previously we only returned a Java object back to the api middleware, now the final step in the service was to flush the JSON down the request buffer. This increased ownership gave us the opportunity to easily test new optimisations at this layer. For example, with about a day’s worth of work, we had a prototype of the app using the binary msgpack response format instead of plain JSON. In addition to the flexible service architecture, this can also be attributed to the Node.js ecosystem and the rich selection of npm packages available.

Local Development

Before the migration, developing and debugging on the endpoint was painful due to slow deployment and lack of local debugging (this post covers that in more detail). One of the Android team’s biggest motivations for doing this migration project was to improve this experience. The new microservice gave us fast deployment and debug support by running the service in a local Docker instance, which has led to significant productivity improvements.

The Not-so-good

In the arduous process of breaking a monolith, you might get a sharp shard or two flung at you. A lot of what follows is not specific to Android, but we want to briefly mention these issues because they did end up affecting our app.

Latencies

The old api service was running on the same “machine” that also cached a lot of video metadata (by design). This meant that data that was static (e.g. video titles, descriptions) could be aggressively cached and reused across multiple requests. However, with the new microservice, even fetching this cached data needed to incur a network round trip, which added some latency.

This might sound like a classic example of “monoliths vs microservices”, but the reality is somewhat more complex. The monolith was also essentially still talking to a lot of downstream microservices: it just happened to have a custom-designed cache that helped a lot. Some of this increased latency was mitigated by better observability and more efficient batching of requests. But, for a small fraction of requests, after a lot of attempts at optimization, we just had to take the latency hit: sometimes, there are no silver bullets.

Increased Partial Query Errors

As each call to our endpoint might need to make multiple requests to the api service, some of these calls can fail, leaving us with partial data. Handling such partial query errors isn’t a new problem: it is baked into the nature of composite protocols like Falcor or GraphQL. However, as we moved our route handlers into a new microservice, we now introduced a network boundary for fetching any data, as mentioned earlier.

This meant that we now ran into partial states that weren’t possible before because of the custom caching. We were not completely aware of this problem in the beginning of our migration: we only saw it when some of our deserialized data objects had null fields. Since a lot of our code uses Kotlin, these partial data objects led to immediate crashes, which helped us notice the problem early: before it ever hit production.

As a result of increased partial errors, we’ve had to improve overall error handling approach and explore ways to minimize the impact of the network errors. In some cases, we also added custom retry logic on either the endpoint or the client code.

Final Thoughts

This has been a long (you can tell!) and a fulfilling journey for us on the Android team: as we mentioned earlier, on our team we typically work on the app and, until now, we did not have a chance to work with our endpoint with this level of scrutiny. Not only did we learn more about the intriguing world of microservices, but for us working on this project, it provided us the perfect opportunity to add observability to our app-endpoint interaction. At the same time, we ran into some unexpected issues like partial errors and made our app more resilient to them in the process.

As we continue to evolve and improve our app, we hope to share more insights like these with you.

The planning and successful migration to this new service was the combined effort of multiple backend and front end teams.

On the Android team, we ship the Netflix app on Android to millions of members around the world. Our responsibilities include extensive A/B testing on a wide variety of devices by building highly performant and often custom UI experiences. We work on data driven optimizations at scale in a diverse and sometimes unforgiving device and network ecosystem. If you find these challenges interesting, and want to work with us, we have an open position.


Seamlessly Swapping the API backend of the Netflix Android app was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Plumbing At Scale

Post Syndicated from Grab Tech original https://engineering.grab.com/plumbing-at-scale

When you open the Grab app and hit book, a series of events are generated that define your personalised experience with us: booking state machines kick into motion, driver partners are notified, reward points are computed, your feed is generated, etc. While it is important for you to know that a request has been received, a lot happens asynchronously in our back-end services.

As custodians and builders of the streaming platform at Grab operating at massive scale (think terabytes of data ingress each hour), the Coban team’s mission is to provide a NoOps, managed platform for seamless, secure access to event streams in real-time, for every team at Grab.

Coban Sewu Waterfall In Indonesia
Coban Sewu Waterfall In Indonesia. (Streams, get it?)

Streaming systems are often at the heart of event-driven architectures, and what starts as a need for a simple message bus for asynchronous processing of events quickly evolves into one that requires a more sophisticated stream processing paradigms.
Earlier this year, we saw common patterns of event processing emerge across our Go backend ecosystem, including:

  • Filtering and mapping stream events of one type to another
  • Aggregating events into time windows and materializing them back to the event log or to various types of transactional and analytics databases

Generally, a class of problems surfaced which could be elegantly solved through an event sourcing1 platform with a stream processing framework built over it, similar to the Keystone platform at Netflix2.

This article details our journey building and deploying an event sourcing platform in Go, building a stream processing framework over it, and then scaling it (reliably and efficiently) to service over 300 billion events a week.

Event Sourcing

Event sourcing is an architectural pattern where changes to an application state are stored as a sequence of events, which can be replayed, recomputed, and queried for state at any time. An implementation of the event sourcing pattern typically has three parts to it:

  • An event log
  • Processor selection logic: The logic that selects which chunk of domain logic to run based on an incoming event
  • Processor domain logic: The domain logic that mutates an application’s state
Event Sourcing
Event Sourcing

Event sourcing is a building block on which architectural patterns such as Command Query Responsibility Segregation3, serverless systems, and stream processing pipelines are built.

The Case For Stream Processing

Below are some use cases serviced by stream processing, built on event sourcing.

Asynchronous State Management

A pub-sub system allows for change events from one service to be fanned out to multiple interested subscribers without letting any one subscriber block the progress of others. Abstracting the event log and centralising it democratises access to this log to all back-end services. It enables the back-end services to apply changes from this centralised log to their own state, independent of downstream services, and/or publish their state changes to it.

Time Windowed Aggregations

Time-windowed aggregates are a common requirement for machine learning models (as features) as well as analytics. For example, personalising the Grab app landing page requires counting your interaction with various widget elements in recent history, not any one event in particular. Similarly, an analyst may not be interested in the details of a singular booking in real-time, but in building demand heatmaps segmented by geohashes. For latency-sensitive lookups, especially for the personalisation example, pre-aggregations are preferred instead of post-aggregations.

Stream Joins, Filtering, Mapping

Event logs are typically sharded by some notion of topics to logically divide events of interest around a theme (booking events, profile updates, etc.). Building bigger topics out of smaller ones, as well as smaller ones from bigger ones are common ways to compose “substreams” of the log of interest directed towards specific services. For example, a promo service may only be interested in listening to booking events for promotional bookings.

Realtime Business Intelligence

Outputs of stream processing workloads are also plugged into realtime Business Intelligence (BI) and stream analytics solutions upstream, as raw data for visualizations on operations dashboards.

Archival

For offline analytics, as well as reconciliation and disaster recovery, having an archive in a cold store helps for certain mission critical streams.

Platform Requirements

Any processing platform for event sourcing and stream processing has certain expectations around its functionality.

Scaling and Elasticity

Stream/Event Processing pipelines need to be elastic and responsive to changes in traffic patterns, especially considering that user activity (rides, food, deliveries, payments) varies dramatically during the course of a day or week. A spike in food orders on rainy days shouldn’t cause indefinite order processing latencies.

NoOps

For a platform team, it’s important that users can easily onboard and manage their pipeline lifecycles, at their preferred cadence. To scale effectively, the process of scaffolding, configuring, and deploying pipelines needs to be standardised, and infrastructure managed. Both the platform and users are able to leverage common standards of telemetry, configuration, and deployment strategies, and users benefit from a lack of infrastructure management overhead.

Multi-Tenancy

Our platform has quickly scaled to support hundreds of pipelines. Workload isolation, independent processing uptime guarantees, and resource allocation and cost audit are important requirements necessitating multi-tenancy, which help amortize platform overhead costs.

Resiliency

Whether latency sensitive or latency tolerant, all workloads have certain expectations on processing uptime. From a user’s perspective, there must be guarantees on pipeline uptimes and data completeness, upper bounds on processing delays, instrumentation for alerting, and self-healing properties of the platform for remediation.

Tunable Tradeoffs

Some pipelines are latency sensitive, and rely on processing completeness seconds after event ingress. Other pipelines are latency tolerant, and can tolerate disruption to processing lasting in tens of minutes. A one size fits all solution is likely to be either cost inefficient or unreliable. Having a way for users to make these tradeoffs consciously becomes important for ensuring efficient processing guarantees at a reasonable cost. Similarly, in the case of upstream failures or unavailability, being able to tune failure modes (like wait, continue, or retry) comes in handy.

Stream Processing Framework

While basic event sourcing covers simple use cases like archival, more complicated ones benefit from a common framework that shifts the mental model for processing from per event processing to stream pipeline orchestration.
Given that Go is a “paved road” for back-end development at Grab, and we have service code and bindings for streaming data in a mono-repository, we built a Go framework with a subset of capabilities provided by other streaming frameworks like Flink4.

Logic Blocks In A Stream Processing Pipeline
Logic Blocks In A Stream Processing Pipeline

Capabilities

Some capabilities built into the framework include:

  • Deduplication: Enables pipelines to idempotently reprocess data in case of rewinds/replays, and provides some processing guarantees within a time window for certain use cases including sinking to datastores.
  • Filtering and Mapping: An ability to filter a source stream data and map them onto target streams.
  • Aggregation: An ability to generate and execute aggregation logic such as sum, avg, max, and min in a window.
  • Windowing: An ability to window processing into tumbling, sliding, and session windows.
  • Join: An ability to combine two streams together with certain join keys in a window.
  • Processor Chaining: Various functionalities can be chained to build more complicated pipelines from simpler ones. For example: filter a large stream into a smaller one, aggregate it over a time window, and then map it to a new stream.
  • Rewind: The ability to rewind the processing logic by a few hours through configuration.
  • Replay: The ability to replay archived data into the same or a separate pipeline via configuration.
  • Sinks: A number of connectors to standard Grab stores are provided, with concerns of auth, telemetry, etc. managed in the runtime.
  • Error Handling: Providing an easy way to indicate whether to wait, skip, and/or retry in case of upstream failures is an important tuning parameter that users need for making sensible tradeoffs in dimensions of backpressure, latency, correctness, etc.

Architecture

Coban Platform
Coban Platform

Our event log is primarily a bunch of critical Kafka clusters, which are being polled by various pipelines deployed by service teams on the platform for incoming events. Each pipeline is an isolated deployment, has an identity, and the ability to connect to various upstream sinks to materialise results into, including the event log itself.
There is also a metastore available as an intermediate store for processing pipelines, so the pipelines themselves are stateless with their lifecycle completely beholden to the whims of their owners.

Anatomy of a Processing Pipeline

Anatomy Of A Stream Processing Pod
Anatomy Of A Stream Processing Pod

Anatomy of a Stream Processing Pod
Each stream processing pod (the smallest unit of a pipeline’s deployment) has three top level components:

  • Triggers: An interface that connects directly to the source of the data and converts it into an event channel.
  • Runtime: This is the app’s entrypoint and the orchestrator of the pod. It manages the worker pools, triggers, event channels, and lifecycle events.
  • The Pipeline plugin: The plugin is provided by the user, and conforms to a contract that the platform team publishes. It contains the domain logic for the pipeline and houses the pipeline orchestration defined by a user based on our Stream Processing Framework.

Deployment Infrastructure

Our deployment infrastructure heavily leverages Kubernetes on AWS. After a (pretty high) initial cost for infrastructure set up, we’ve found scaling to hundreds of pipelines a breeze with the Kubernetes provided controls. We package our stateless pipeline workloads into Kubernetes deployments, with each pod containing a unit of a stream pipeline, with sidecars that integrate them with our monitoring systems. Other cluster wide tooling deployed (usually as DaemonSets) deal with metric collection, log ingestion, and autoscaling. We currently use the Horizontal Pod Autoscaler5 to manage traffic elasticity, and the Cluster Autoscaler6 to manage worker node scaling.

Kubernetes
A Typical Kubernetes Set Up On AWS

Metastore

Some pipelines require storage for use cases ranging from deduplication to stores for materialised results of time windowed aggregations. All our pipelines have access to clusters of ScyllaDB instances (which we use as our internal store), made available to pipeline authors via interfaces in the Stream Processing Framework. Results of these aggregations are then made available to backend services via our GrabStats service, which is a thin query layer over the latest pipeline results.

Compute Isolation

A nice property of packaging pipelines as Kubernetes deployments is a good degree of compute workload isolation for pipelines. While node resources of pipeline pods are still shared (and there are potential noisy neighbour issues on matters like logging throughput), the pipeline pods of various pods can be scheduled and rescheduled across a wide range of nodes safely and swiftly, with minimal impact to pods of other pipelines.

Redundancy

Stateless processing pods mean we can set up backup or redundant Kubernetes clusters in hot-hot, hot-warm, or hot-cold modes. We use this to ensure high processing availability despite limited control plane guarantees from any single cluster. (Since EKS SLAs for the Kubernetes control plane guarantee only 99.9% uptime today7.) Transparent to our users, we make the deployment systems aware of multiple available targets for scheduling.

Availability vs Cost

As alluded to in the “Platform Requirements” section, having a way of trading off availability for cost becomes important where the requirements and criticality of each processing pipeline are very different. Given that AWS spot instances are a lot cheaper8 than on-demand ones, we use user annotated Kubernetes priority classes to determine deployment targets for pipelines. For latency tolerant pipelines, we schedule them on Spot instances which are routinely between 40-90% cheaper than on demand instances on which latency sensitive pipelines run. The caveat is that Spot instances occasionally disappear, and these workloads are disrupted until a replacement node for their scheduling can be found.

What’s Next?

  • Expand the ecosystem of triggers to support custom sources of data i.e. the “event log”, as well as push based (RPC driven) versus just pull based triggers
  • Build a control plane for API integration with pipeline lifecycle management
  • Move some workloads to use the Vertical Pod Autoscaler9 in Kubernetes instead of horizontal scaling, as most of our workloads have a limit on parallelism (which is their partition count in Kafka topics)
  • Move from Go plugins for pipelines to plugins over RPC, like what HashiCorp does10, to enable processing logic in non-Go languages.
  • Use either pod gossip or a service mesh with a control plane to set up quotas for shared infrastructure usage per pipeline. This is to protect upstream dependencies and the metastore from surges in event backlogs.
  • Improve availability guarantees for pipeline pods by occasionally redistributing/rescheduling pods across nodes in our Kubernetes cluster to prevent entire workloads being served out of a few large nodes.

Authored By Karan Kamath on behalf of the Coban team at Grab-
Zezhou Yu, Ryan Ooi, Hui Yang, Yuguang Xiao, Ling Zhang, Roy Kim, Matt Hino, Jump Char, Lincoln Lee, Jason Cusick, Shrinand Thakkar, Dean Barlan, Shivam Dixit, Shubham Badkur, Fahad Pervaiz, Andy Nguyen, Ravi Tandon, Ken Fishkin, and Jim Caputo.


Footnotes

Coban Sewu Waterfall Photo by Dwinanda Nurhanif Mujito on Unsplash

Cover Photo by tian kuan on Unsplash

  1. https://martinfowler.com/eaaDev/EventSourcing.html 

  2. https://medium.com/netflix-techblog/keystone-real-time-stream-processing-platform-a3ee651812a 

  3. https://martinfowler.com/bliki/CQRS.html 

  4. https://flink.apache.org 

  5. https://kubernetes.io/docs/tasks/run-application/horizontal-pod-autoscale/ 

  6. https://github.com/kubernetes/autoscaler/tree/master/cluster-autoscaler 

  7. https://aws.amazon.com/eks/sla/ 

  8. https://aws.amazon.com/ec2/pricing/ 

  9. https://github.com/kubernetes/autoscaler/tree/master/vertical-pod-autoscaler 

  10. https://github.com/hashicorp/go-plugin 

Marionette – Enabling E2E user-scenario simulation

Post Syndicated from Grab Tech original https://engineering.grab.com/marionette-enabling-e2e-user-scenario-simulation

Introduction

A plethora of interconnected microservices is what powers the Grab’s app. The microservices work behind the scenes to delight millions of our customers in Southeast Asia. It is a no-brainer that we emphasize on strong testing tools, so our app performs flawlessly to continuously meet our customers’ needs.

Background

We have a microservices-based architecture, in which microservices are interconnected to numerous other microservices. Each passing day sees teams within Grab updating their microservices, which in turn enhances the overall app. If any of the microservices fail after changes are rolled out, it may lead to the whole app getting into an unstable state or worse. This is a major risk and that’s why we stress on conducting “end-to-end (E2E) testing” as an integral part of our software test life-cycle.

E2E tests are done for all crucial workflows in the app, but not for every detail. For that we have conventional tests such as unit tests, component tests, functional tests, etc. Consider E2E testing as the final approval in the quality assurance of the app.

Writing E2E tests in the microservices world is not a trivial task. We are not testing just a single monolithic application. To test a workflow on the app from a user’s perspective, we need to traverse multiple microservices, which communicate through different protocols such as HTTP/HTTPS and TCP. E2E testing gets even more challenging with the continuous addition of microservices. Over the years, we have grown tremendously with hundreds of microservices working in the background to power our app.

Some major challenges in writing E2E tests for the microservices-based apps are:

  • Availability

    Getting all microservices together for E2E testing is tough. Each development team works independently and is responsible only for its microservices. Teams use different programming languages, data stores, etc for each microservice. It’s hard to construct all pieces in a common test environment as a complete app for E2E testing each time.

  • Data or resource set up

    E2E testing requires comprehensive data set up. Otherwise, testing results are affected because of data constraints, and not due to any recent changes to underlying microservices. For example, we need to create real-life equivalent driver accounts, passenger accounts, etc and to have those, there are a few dependencies on other internal systems which manage user accounts. Further, data and booking generation should be robust enough to replicate real-world scenarios as far as possible.

  • Access and authentication

    Usually, the test cases require sequential execution in E2E testing. In a microservices architecture, it is difficult to test a workflow which requires access and permissions to several resources or data that should remain available throughout the test execution.

  • Resource and time intensive

    It is expensive and time consuming to run E2E tests; significant time is involved in deploying new changes, configuring all the necessary test data, etc.

Though there are several challenges, we had to find a way to overcome them and test workflows from the beginning to the end in our app.

Our approach to overcome challenges

We knew what our challenges were and what we wanted to achieve from E2E testing, so we started thinking about how to develop a platform for E2E tests. To begin with, we determined that the scope of E2E testing that we’re going to primarily focus on is Grab’s transport domain — the microservices powering the driver and passenger apps.

One approach is to “simulate” user scenarios through a single platform before any new versions of these microservices are released. Ideally, the platform should also have the capabilities to set up the data required for these simulations. For example, ride booking requires data set up such as driver accounts, passenger accounts, location coordinates, geofencing, etc.

We wanted to create a single platform that multiple teams could use to set up their test data and run E2E user-scenario simulations easily. We put ourselves to work on that idea, which resulted in the creation of an internal platform called “Marionette”. It simulates actions performed by Grab’s passenger and driver apps as they are expected to behave in the real world. The objective is to ensure that all standard user workflows are tested before deploying new app versions.

Introducing Marionette

Marionette enables Grabbers (developers and QAs) to run E2E user-scenario simulations without depending on the actual passenger and driver apps. Grabbers can set up data as well as configure data such as drivers, passengers, taxi types, etc to mimic the real-world behavior.

Let’s look at the overall architecture to understand Marionette better:

Overall Architecture

Grabbers can interact with Marionette through three channels: UI, SDK, and through RESTful API endpoints in their test scripts. All requests are routed through a load balancer to the Marionette platform. The Marionette platform in turn talks to the required microservices to create test data and to run the simulations.

The benefits

With Marionette, Grabbers now have the ability to:

  • Simulate the whole booking flow including customer and driver behavior as well as transition through the booking life cycle including pick-up, drop-off, cancellation, etc. For example, developers can make passenger booking from the UI and configure pick-up points, drop-off points, taxi types, and other parameters easily. They can define passenger behaviour such as “make bookings after a specified time interval”, “cancel each booking”, etc. They can also set driver locations, define driver behaviour such as “always accept booking manually”, “decline received bookings”, etc.
  • Simulate bookings in all cities where Grab operates. Further, developers can run simulations for multiple Grab taxi types such as JustGrab, GrabShare, etc.
  • Visualize passengers, drivers, and ride transitions on the UI, which lets them easily test their workflows.
  • Save efforts and time spent on installing third-party android or iOS emulators, troubleshooting or debugging .apk installation files, etc before testing workflows.
  • Conduct E2E testing without real mobile devices and installed apps.
  • Run automatic simulations, in which a particular set of scenarios are run continuously, thus helping developers with exploratory testing.

How we isolated simulations among users

It is important to have independent simulations for each user. Otherwise, simulations don’t yield correct results. This was one of the challenges we faced when we first started running simulations on Marionette.

To resolve this issue, we came up with the idea of “cohorts”. A cohort is a logical group of passengers and drivers who are located in a particular city. Each simulation on Marionette is run using a “cohort” containing the number of drivers and passengers required for that simulation. When a passenger/driver needs to interact with other passengers/drivers (such as for ride bookings), Marionette ensures that the interaction is constrained to resources within the cohort. This ensures that drivers and passengers are not shared in different test cases/simulations, resulting in more consistent test runs.

How to interact with Marionette

Let’s take a look at how to interact with Marionette starting with its user interface first.

User Interface

The Marionette UI is designed to provide the same level of granularity as available on the real passenger and driver apps.

Generally, the UI is used in the following scenarios:

  • To test common user scenarios/workflows after deploying a change on staging.
  • To test the end-to-end booking flow right from the point where a passenger makes a booking till drop-off at the destination.
  • To simulate functionality of other teams within Grab – the passenger app developers can simulate the driver app for their testing and vice versa. Usually, teams work independently and the ability to simulate the dependent app for testing allows developers to work independently.
  • To perform E2E testing (such as by QA teams) without writing any test scripts.

The Marionette UI also allows Grabbers to create and set up data. All that needs to be done is to specify the necessary resources such as number of drivers, number of passengers, city to run the simulation, etc. Running E2E simulations involves just the click of a button after data set up. Reports generated at the end of running simulations provide a graphical visualization of the results. Visual reports save developers’ time, which otherwise is spent on browsing through logs to ascertain errors.

SDK

Marionette also provides an SDK, written in the Go programming language.

It lets developers:

  • Create resources such as passengers, drivers, and cohorts for simulating booking flows.
  • Create booking simulations in both staging and production.
  • Set bookings to specific states as needed for simulation through customizable driver and passenger behaviour.
  • Make HTTP requests and receive responses that matter in tests.
  • Run load tests by scaling up booking requests to match the required workload (QPS).

Let’s look at a high-level booking test case example to understand the simulation workflow.

Assume we want to run an E2E booking test with this driver behavior type — “accepts passenger bookings and transits between booking states according to defined behavior parameters”. This is just one of the driver behavior types in Marionette; other behavior types are also supported. Similarly, passengers also have behaviour types.

To write the E2E test for this example case, we first define the driver behavior in a function like this:

Overall Architecture

Then, we handle the booking request for the driver like this:

Overall Architecture

The SDK client makes the handling of passengers, drivers, and bookings very easy as developers don’t need to worry about hitting multiple services and multiple APIs to set up their required driver and passenger actions. Instead, teams can focus on testing their use cases.

To ensure that passengers and drivers are isolated in our test, we need to group them together in a cohort before running the E2E test.

Overall Architecture

In summary, we have defined the driver’s behavior, created the booking request, created the SDK client and associated the driver and passenger to a cohort. Now, we just have to trigger the E2E test from our IDE. It’s just that simple and easy!

Previously, developers had to write boilerplate code to make HTTP requests and parse returned HTTP responses. With the Marionette SDK in place, developers don’t have to write any boilerplate code saving significant time and effort in E2E testing.

RESTful APIs in test scripts

Marionette provides several RESTful API endpoints that cover different simulation areas such as resource or data creation APIs, driver APIs, passenger APIs, etc. APIs are particularly suitable for scripted testing. Developers can directly call these APIs in their test scripts to facilitate their own tests such as load tests, integration tests, E2E tests, etc.

Developers use these APIs with their preferred programming languages to run simulations. They don’t need to worry about any underlying complexities when using the APIs. For example, developers in Grab have created custom libraries using Marionette APIs in Python, Java, and Bash to run simulations.

What’s next

Currently, we cover E2E tests for our transport domain (microservices for the passenger and driver apps) through Marionette. The next phase is to expand into a full-fledged platform that can test microservices in other Grab domains such as Food, Payments, and so on. Going forward, we are also looking to further simplify the writing of E2E tests and running them as a part of the CD pipeline for seamless testing before deployment.

In conclusion

We had an idea of creating a simulation platform that can run and facilitate E2E testing of microservices. With Marionette, we have achieved this objective. Marionette has helped us understand how end users use our apps, allowing us to make improvements to our services. Further, Marionette ensures there are no breaking changes and provides additional visibility into potential bugs that might be introduced as a result of any changes to microservices.

If you have any comments or questions about Marionette, please leave a comment below.

How we implemented domain-driven development in Golang

Post Syndicated from Grab Tech original https://engineering.grab.com/domain-driven-development-in-golang

Partnerships have always been core to Grab’s super app strategy. We believe in collaborating with partners who are the best in what they do – combining their expertise with what we’re good at so that we can bring high-quality new services to our customers, at the same time create new opportunities for the merchant and driver-partners in our ecosystem.

That’s why we launched GrabPlatform last year. To make it easier for partners to either integrate Grab into their services, or integrate their services into Grab.

In view of that, part of the GrabPlatform’s team mission is to make it easy for partners to integrate with Grab services. These partners are external companies that would like to offer Grab’s services such as ride-booking through their own websites or applications. To do that, we decided to build a website that will serve as a one-stop-shop that would allow them to self-service these integrations.

The challenges we faced with the conventional approach

In the process of building this website, our team noticed that the majority of the functions and responsibilities were added to files without proper segregation. A single file would contain more than 500 lines of code. Each of these files were imported from different collections of source codes, resulting in an unstructured codebase. Any changes to the existing functions risked breaking existing functionality; we realized then that we needed to proactively plan for the future. Hence, we decided to use the principles of Domain-Driven Design (DDD) and idiomatic Go. This blog aims to demonstrate the process of how we leveraged those concepts to design a modern application.

How we implemented DDD in our codebase

Here’s how we went about solving our unstructured codebase using DDD principles.

Step 1: Gather domain (business) knowledge

We collaborated closely with our domain experts (in our case, this was our product team) to identify functionality and flow. From them, we discovered the following key points:

  • After creating a project, developers are added to the project.
  • The domain experts wanted an ability to add other products (e.g. Pricing service, ETA service, GrabPay service) to their projects.
  • They wanted the ability to create multiple authentication clients to access the above products.

Step 2: Break down domain knowledge into bounded context

Now that we had gathered the required domain knowledge (i.e. what our code needed to reflect to our partners), it was time to use the DDD strategic tool Bounded Context to break down problems into subcontexts. Here is a graphical representation of how we converted the problem into smaller units.

Bounded Context

We identified several dependencies on each of the units involved in the project. Take some of these examples:

  • The project domain overlapped with the product and developer domains.
  • Our RideBooking project can only exist if it has some products like Ridebooking APIs and not the other way around.

What this means is a product can exist independent of the project, but a project will have no significance without any product. In the same way, a project is dependent on the developers, but developers can exist whether or not they belong to a project.

Step 3: Identify value objects or entity (lowest layer)

Looking at the above bounded contexts, we figured out the building blocks (i.e. value objects or entity) to break down the above functionality and flow.

// ProjectDAO ...
type ProjectDAO struct {
  ID            int64
  UUID          string
  Status        ProjectStatus
  CreatedAt     time.Time
}

// DeveloperDAO ...
type DeveloperDAO struct {
  ID            int64
  UUID          string
  PhoneHash     *string
  Status        Status
  CreatedAt     time.Time
}

// ProductDAO ...
type ProductDAO struct {
  ID            int64
  UUID          string
  Name          string
  Description   *string
  Status        ProductStatus
  CreatedAt     time.Time
}

// DeveloperProjectDAO to map developer's to a project
type DeveloperProjectDAO struct {
  ID            int64
  DeveloperID   int64
  ProjectID     int64
  Status        DeveloperProjectStatus
}

// ProductProjectDAO to map product's to a project
type ProductProjectDAO struct {
  ID            int64
  ProjectID     int64
  ProductID     int64
  Status        ProjectProductStatus
}

All the objects shown above have ID as a field and can be identifiable, hence they are identified as entities and not as value objects. But if we apply domain knowledge, DeveloperProjectDAO and ProductProjectDAO are actually not independent entities. Project object is the aggregate root since it must exist before the child fields, DevProjectDAO and ProdcutProjectDAO, can exist.

Step 4: Create the repositories

As stated above, we created an interface to abstract the working logic of a particular domain (i.e. Repository). Here is an example of how we designed the repositories:

// ProductRepositoryImpl responsible for product functionality
type ProductRepositoryImpl struct {
  productDao storage.IProductDao // private field
}

type ProductRepository interface {
  GetProductsByIDs(ctx context.Context, ids []int64) ([]IProduct, error)
}

// DeveloperRepositoryImpl
type DeveloperRepositoryImpl struct {
  developerDAO storage.IDeveloperDao // private field
}

type DeveloperRepository interface {
  FindActiveAllowedByDeveloperIDs(ctx context.Context, developerIDs []interface{}) ([]*Developer, error)
  GetDeveloperDetailByProfile(ctx context.Context, developerProfile *appdto.DeveloperProfile) (IDeveloper, error)
}

Here is a look at how we designed our repository for aggregate root project:

// Unexported Struct
type productProjectRepositoryImpl struct {
  productProjectDAO storage.IProjectProductDao // private field
}

type ProductProjectRepository interface {
  GetAllProjectProductByProjectID(ctx context.Context, projectID int64) ([]*ProjectProduct, error)
}

// Unexported Struct
type developerProjectRepositoryImpl struct {
  developerProjectDAO storage.IDeveloperProjectDao // private field
}

type DeveloperProjectRepository interface {
  GetDevelopersByProjectIDs(ctx context.Context, projectIDs []interface{}) ([]*DeveloperProject, error)
  UpdateMappingWithRole(ctx context.Context, developer IDeveloper, project IProject, role string) (*DeveloperProject, error)
}

// Unexported Struct
type projectRepositoryImpl struct {
  projectDao storage.IProjectDao // private field
}

type ProjectRepository interface {
  GetProjectsByIDs(ctx context.Context, projectIDs []interface{}) ([]*Project, error)
  GetActiveProjectByUUID(ctx context.Context, uuid string) (IProject, error)
  GetProjectByUUID(ctx context.Context, uuid string) (*Project, error)
}

type ProjectAggregatorImpl struct {
  projectRepositoryImpl           // private field
  developerProjectRepositoryImpl  // private field
  productProjectRepositoryImpl    // private field
}

type ProjectAggregator interface {
  GetProjects(ctx context.Context) ([]*dto.Project, error)
  AddDeveloper(ctx context.Context, request *appdto.AddDeveloperRequest) (*appdto.AddDeveloperResponse, error)
  GetProjectWithProducts(ctx context.Context, uuid string) (IProject, error)
}

Step 5: Identify Domain Events

The functions described in Step 4 only returns the ID of the developer and product, which conveys no information to the users. In order to provide developer and product information, we use the domain-event technique to return the actual product and developer attributes.

A domain event is something that happened in a bounded context that you want another context of a domain to be aware of. For example, if there are new updates to the developer domain, it’s important to convey these updates to the project domain. This propagation technique is termed as domain event. Domain events enable independence between different classes.

One way to implement it is seen here:

// file: project\_aggregator.go
func (p *ProjectAggregatorImpl) GetProjects(ctx context.Context) ([]*dto.Project, error) {
  ....
  ....
  developers := p.EventHandler.Handle(DomainEvent.FindDeveloperByDeveloperIDs{DeveloperIDs})
  ....
}

// file: event\_type.go
type FindDeveloperByDeveloperIDs struct{ developerID []interface{} }

// file: event\_handler.go
func (e *EventHandler) Handle(event interface{}) interface{} {
  switch op := event.(type) {
      case FindDeveloperByDeveloperIDs:
            developers, _ := e.developerRepository.FindDeveloperByDeveloperIDs(op.developerIDs)
            return developers
      case ....
      ....
    }
}
Domain Event

Some common mistakes to avoid when implementing DDD in your codebase:

  • Not engaging with domain experts. Not interacting with domain experts is a common mistake when using DDD. Talking to domain experts to get an understanding of the problem domain from their perspective is at the core of DDD. Starting with schemas or data modelling instead of talking to domain experts may create code based on a relational model instead of it built around a domain model.
  • Ignoring the language of the domain experts. Creating a ubiquitous language shared with domain experts is also a core DDD practice. This common language must be used in all discussions as well as in the code, e.g. in class and method names.
  • Not identifying bounded contexts. A common approach to solving a complex problem is breaking it down into smaller parts. Creating bounded contexts is breaking down a large domain into smaller ones, each handling one cohesive part of the domain.
  • Using an anaemic domain model. This is a common sign that a team is not doing DDD and often a symptom of a failure in the modelling process. At first, an anaemic domain model often looks like a real domain model with correct names, but the classes lack functionalities. They contain only the Get and Set methods.

How the DDD model improved our software development

Thanks to this brand new clean up, we achieved the following:

  • Core functionalities are evenly distributed to the overall codebase and not limited to just a few files.
  • The developers are aware of what each folder is responsible for by simply looking at the file naming and folder structure.
  • The risk of breaking major functionalities by merely making small changes is greatly reduced. Changing a feature is now more efficient.

The team now finds the code well structured and we require less hand-holding for onboarders, thanks to the simplicity of the structure.

Finally, the most important thing, we now have a system oriented towards our business necessities. Everyone ends up using the same language and terms. Developers communicate better with the business team. The work is more efficient when it comes to establishing solutions for the models that reflect how the business operates, instead of how the software operates.

Lessons Learnt

  • Use DDD to collaborate among all project disciplines (product, business, partner, and so on) and clearly understand the business requirements.
  • Establish a ubiquitous language to discuss domain-related concepts.
  • Use bounded contexts to break down complex domains into manageable parts.
  • Implement a layered architecture (i.e. DDD building blocks) to focus on particular aspects of the application.
  • To simplify your dependency, use domain event to communicate with sub-bounded context.

Preventing Pipeline Calls from Crashing Redis Clusters

Post Syndicated from Grab Tech original https://engineering.grab.com/preventing-pipeline-calls-from-crashing-redis-clusters

Introduction

On Feb 15th, 2019, a slave node in Redis, an in-memory data structure storage, failed requiring a replacement. During this period, roughly only 1 in 21 calls to Apollo, a primary transport booking service, succeeded. This brought Grab rides down significantly for the one minute it took the Redis Cluster to self-recover. This behavior was totally unexpected and completely breached our intention of having multiple replicas.

This blog post describes Grab’s outage post-mortem findings.

Understanding the infrastructure

With Grab’s continuous growth, our services must handle large amounts of data traffic involving high processing power for reading and writing operations. To address this significant growth, reduce handler latency, and improve overall performance, many of our services use Redis – a common in-memory data structure storage – as a cache, database, or message broker. Furthermore, we use a Redis Cluster, a distributed implementation of Redis, for shorter latency and higher availability.

Apollo is our driver-side state machine. It is on almost all requests’ critical path and is a primary component for booking transport and providing great service for customer bookings. It stores individual driver availability in an AWS ElastiCache Redis Cluster, letting our booking service efficiently assign jobs to drivers. It’s critical to keep Apollo running and available 24/7.

Apollo's infrastructure

Because of Apollo’s significance, its Redis Cluster has 3 shards each with 2 slaves. It hashes all keys and, according to the hash value, divides them into three partitions. Each partition has two replications to increase reliability.

We use the Go-Redis client, a popular Redis library, to direct all written queries to the master nodes (which then write to their slaves) to ensure consistency with the database.

Master and slave nodes in the Redis Cluster

For reading related queries, engineers usually turn on the ReadOnly flag and turn off the RouteByLatency flag. These effectively turn on ReadOnlyFromSlaves in the Grab gredis3 library, so the client directs all reading queries to the slave nodes instead of the master nodes. This load distribution frees up master node CPU usage.

Client reading and writing from/to the Redis Cluster

When designing a system, we consider potential hardware outages and network issues. We also think of ways to ensure our Redis Cluster is highly efficient and available; setting the above-mentioned flags help us achieve these goals.

Ideally, this Redis Cluster configuration would not cause issues even if a master or slave node breaks. Apollo should still function smoothly. So, why did that February Apollo outage happen? Why did a single down slave node cause a 95+% call failure rate to the Redis Cluster during the dim-out time?

Let’s start by discussing how to construct a local Redis Cluster step by step, then try and replicate the outage. We’ll look at the reasons behind the outage and provide suggestions on how to use a Redis Cluster client in Go.

How to set up a local Redis Cluster

1. Download and install Redis from here.

2. Set up configuration files for each node. For example, in Apollo, we have 9 nodes, so we need to create 9 files like this with different port numbers(x).

// file_name: node_x.conf (do not include this line in file)

port 600x

cluster-enabled yes

cluster-config-file cluster-node-x.conf

cluster-node-timeout 5000

appendonly yes

appendfilename node-x.aof

dbfilename dump-x.rdb

3. Initiate each node in an individual terminal tab with:

$PATH/redis-4.0.9/src/redis-server node_1.conf

4. Use this Ruby script to create a Redis Cluster. (Each master has two slaves.)

$PATH/redis-4.0.9/src/redis-trib.rb create – replicas 2127.0.0.1:6001..... 127.0.0.1:6009

>>> Performing Cluster Check (using node 127.0.0.1:6001)

M: 7b4a5d9a421d45714e533618e4a2b3becc5f8913 127.0.0.1:6001

   slots:0-5460 (5461 slots) master

   2 additional replica(s)

S: 07272db642467a07d515367c677e3e3428b7b998 127.0.0.1:6007

   slots: (0 slots) slave

   replicates 05363c0ad70a2993db893434b9f61983a6fc0bf8

S: 65a9b839cd18dcae9b5c4f310b05af7627f2185b 127.0.0.1:6004

   slots: (0 slots) slave

   replicates 7b4a5d9a421d45714e533618e4a2b3becc5f8913

M: 05363c0ad70a2993db893434b9f61983a6fc0bf8 127.0.0.1:6003

   slots:10923-16383 (5461 slots) master

   2 additional replica(s)

S: a78586a7343be88393fe40498609734b787d3b01 127.0.0.1:6006

   slots: (0 slots) slave

   replicates 72306f44d3ffa773810c810cfdd53c856cfda893

S: e94c150d910997e90ea6f1100034af7e8b3e0cdf 127.0.0.1:6005

   slots: (0 slots) slave

   replicates 05363c0ad70a2993db893434b9f61983a6fc0bf8

M: 72306f44d3ffa773810c810cfdd53c856cfda893 127.0.0.1:6002

   slots:5461-10922 (5462 slots) master

   2 additional replica(s)

S: ac6ffbf25f48b1726fe8d5c4ac7597d07987bcd7 127.0.0.1:6009

   slots: (0 slots) slave

   replicates 7b4a5d9a421d45714e533618e4a2b3becc5f8913

S: bc56b2960018032d0707307725766ec81e7d43d9 127.0.0.1:6008

   slots: (0 slots) slave

   replicates 72306f44d3ffa773810c810cfdd53c856cfda893

[OK] All nodes agree about slots configuration.

5. Finally, we try to send queries to our Redis Cluster, e.g.

$PATH/redis-4.0.9/src/redis-cli -c -p 6001 hset driverID 100 state available updated_at 11111

What happens when nodes become unreachable?

Redis Cluster Server

As long as the majority of a Redis Cluster’s masters and at least one slave node for each unreachable master are reachable, the cluster is accessible. It can survive even if a few nodes fail.

Let’s say we have N masters, each with K slaves, and random T nodes become unreachable. This algorithm calculates the Redis Cluster failure rate percentage:

if T <= K:
        availability = 100%
else:
        availability = 100% - (1/(N*K - T))

If you successfully built your own Redis Cluster locally, try to kill any node with a simple command-c. The Redis Cluster broadcasts to all nodes that the killed node is now unreachable, so other nodes no longer direct traffic to that port.

If you bring this node back up, all nodes know it’s reachable again. If you kill a master node, the Redis Cluster promotes a slave node to a temp master for writing queries.

$PATH/redis-4.0.9/src/redis-server node_x.conf

With this information, we can’t answer the big question of why a single slave node failure caused an over 95% failure rate in the Apollo outage. Per the above theory, the Redis Cluster should still be 100% available. So, the Redis Cluster server could properly handle an outage, and we concluded it wasn’t the failure rate’s cause. So we looked at the client side and Apollo’s queries.

Golang Redis Cluster Client & Apollo Queries

Apollo’s client side is based on the Go-Redis Library.

During the Apollo outage, we found some code returned many errors during certain pipeline GET calls. When Apollo tried to send a pipeline of HMGET calls to its Redis Cluster, the pipeline returned errors.

First, we looked at the pipeline implementation code in the Go-Redis library. In the function defaultProcessPipeline, the code assigns each command to a Redis node in this line err:=c.mapCmdsByNode(cmds, cmdsMap).

func (c *ClusterClient) mapCmdsByNode(cmds []Cmder, cmdsMap *cmdsMap) error {
state, err := c.state.Get()
        if err != nil {
                setCmdsErr(cmds, err)
                returnerr
        }

        cmdsAreReadOnly := c.cmdsAreReadOnly(cmds)
        for_, cmd := range cmds {
                var node *clusterNode
                var err error
                if cmdsAreReadOnly {
                        _, node, err = c.cmdSlotAndNode(cmd)
                } else {
                        slot := c.cmdSlot(cmd)
                        node, err = state.slotMasterNode(slot)
                }
                if err != nil {
                        returnerr
                }
                cmdsMap.mu.Lock()
                cmdsMap.m[node] = append(cmdsMap.m[node], cmd)
                cmdsMap.mu.Unlock()
        }
        return nil
}

Next, since the readOnly flag is on, we look at the cmdSlotAndNode function. As mentioned earlier, you can get better performance by setting readOnlyFromSlaves to true, which sets RouteByLatency to false. By doing this, RouteByLatency will not take priority and the master does not receive the read commands.

func (c *ClusterClient) cmdSlotAndNode(cmd Cmder) (int, *clusterNode, error) {
        state, err := c.state.Get()
        if err != nil {
                return 0, nil, err
        }

        cmdInfo := c.cmdInfo(cmd.Name())
        slot := cmdSlot(cmd, cmdFirstKeyPos(cmd, cmdInfo))

        if c.opt.ReadOnly && cmdInfo != nil && cmdInfo.ReadOnly {
                if c.opt.RouteByLatency {
                        node, err:= state.slotClosestNode(slot)
                        return slot, node, err
                }

                if c.opt.RouteRandomly {
                        node:= state.slotRandomNode(slot)
                        return slot, node, nil
                }

                node, err:= state.slotSlaveNode(slot)
                return slot, node, err
        }

        node, err:= state.slotMasterNode(slot)
        return slot, node, err
}

Now, let’s try and better understand the outage.

  1. When a slave becomes unreachable, all commands assigned to that slave node fail.
  2. We found in Grab’s Redis library code that a single error in all cmds could cause the entire pipeline to fail.
  3. In addition, engineers return a failure in their code if err != nil. This explains the high failure rate during the outage.
func (w *goRedisWrapperImpl) getResultFromCommands(cmds []goredis.Cmder) ([]gredisapi.ReplyPair, error) {
        results := make([]gredisapi.ReplyPair, len(cmds))
        var err error
        for idx, cmd := range cmds {
                results[idx].Value, results[idx].Err = cmd.(*goredis.Cmd).Result()
                if results[idx].Err == goredis.Nil {
                        results[idx].Err = nil
                        continue
                }
                if err == nil && results[idx].Err != nil {
                        err = results[idx].Err
                }
        }

        return results, err
}

Our next question was, “Why did it take almost one minute for Apollo to recover?”.  The Redis Cluster broadcasts instantly to its other nodes when one node is unreachable. So we looked at how the client assigns jobs.

When the Redis Cluster client loads the node states, it only refreshes the state once a minute. So there’s a maximum one minute delay of state changes between the client and server. Within that minute, the Redis client kept sending queries to that unreachable slave node.

func (c *clusterStateHolder) Get() (*clusterState, error) {
        v := c.state.Load()
        if v != nil {
                state := v.(*clusterState)
                if time.Since(state.createdAt) > time.Minute {
                        c.LazyReload()
                }
                return state, nil
        }
        return c.Reload()
}

What happened to the write queries? Did we lose new data during that one min gap? That’s a very good question! The answer is no since all write queries only went to the master nodes and the Redis Cluster client with a watcher for the master nodes. So, whenever any master node becomes unreachable, the client is not oblivious to the change in state and is well aware of the current state. See the Watcher code.

How to use Go Redis safely?

Redis Cluster Client

One way to avoid a potential outage like our Apollo outage is to create another Redis Cluster client for pipelining only and with a true RouteByLatency value. The Redis Cluster determines the latency according to ping calls to its server.

In this case, all pipelining queries would read through the master nodesif the latency is less than 1ms (code), and as long as the majority side of partitions are alive, the client will get the expected results. More load would go to master with this setting, so be careful about CPU usage in the master nodes when you make the change.

Pipeline Usage

In some cases, the master nodes might not handle so much traffic. Another way to mitigate the impact of an outage is to check for  errors on individual queries when errors happen in a pipeline call.

In Grab’s Redis Cluster library, the function Pipeline(PipelineReadOnly) returns a response with an error for individual reply.

func (c *clientImpl) Pipeline(ctx context.Context, argsList [][]interface{}) ([]gredisapi.ReplyPair, error) {
        defer c.stats.Duration(statsPkgName, metricElapsed, time.Now(), c.getTags(tagFunctionPipeline)...)
        pipe := c.wrappedClient.Pipeline()
        cmds := make([]goredis.Cmder, len(argsList))
        for i, args := range argsList {
                cmd := goredis.NewCmd(args...)
                cmds[i] = cmd
                _ = pipe.Process(cmd)
        }
        _, _ = pipe.Exec()
        return c.wrappedClient.getResultFromCommands(cmds)
}

func (w *goRedisWrapperImpl) getResultFromCommands(cmds []goredis.Cmder) ([]gredisapi.ReplyPair, error) {
        results := make([]gredisapi.ReplyPair, len(cmds))
        var err error
        for idx, cmd := range cmds {
                results[idx].Value, results[idx].Err = cmd.(*goredis.Cmd).Result()
                if results[idx].Err == goredis.Nil {
                        results[idx].Err = nil
                        continue
                }
                if err == nil && results[idx].Err != nil {
                        err = results[idx].Err
                }
        }

        return results, err
}

type ReplyPair struct {
        Value interface{}
        Err   error
}

Instead of returning nil or an error message when err != nil, we could check for errors for each result so successful queries are not affected. This might have minimized the outage’s business impact.

Go Redis Cluster Library

One way to fix the Redis Cluster library is to reload nodes’ status when an error happens.In the go-redis library, defaultProcessor has this logic, which can be applied to defaultProcessPipeline.

In Conclusion

We’ve shown how to build a local Redis Cluster server, explained how Redis Clusters work, and identified its potential risks and solutions. Redis Cluster is a great tool to optimize service performance, but there are potential risks when using it. Please carefully consider our points about how to best use it. If you have any questions, please ask them in the comments section.