Tag Archives: Redis

Let’s Architect! Leveraging in-memory databases

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-leveraging-in-memory-databases/

In-memory databases play a critical role in modern computing, particularly in reducing the strain on existing resources, scaling workloads efficiently, and minimizing the cost of infrastructure. The advanced performance capabilities of in-memory databases make them vital for demanding applications characterized by voluminous data, real-time analytics, and rapid response requirements.

In this edition of Let’s Architect!, we are introducing caching strategies and, further, examining case studies that use Amazon Web Services (AWS), like Amazon ElastiCache or Amazon MemoryDB for Redis, in real workloads where customers share the reasoning behind their approaches. It is very important understanding the context for leveraging a specific solution or pattern, and many common questions can be answered with these resources.

Caching challenges and strategies

Many services built at Amazon rely on caching systems in the background to speed up performance, deal with low latency requirements, and avoid overloading on source databases and other microservices. Operating caches and adding caches into our systems may present complex challenges in terms of monitoring, data consistency, and load on the other components of the system. Indeed, a cache can give big benefits, but it’s also a new component to run and keep healthy. Furthermore, engineers may need to use empirical methods to choose the cache size, expiration policy, and eviction policy: we always have to perform tests and use the metrics to tune the setup.

With this Amazon Builder’s Library resource, you can learn strategies for using caching in your architecture and best practices directly from Amazon’s engineers.

Take me to this Amazon Builder’s Library article!

Strategies applied in Amazon applications at scale, explained and contextualized by Amazon engineers

Strategies applied in Amazon applications at scale, explained and contextualized by Amazon engineers

How Yahoo cost optimizes their in-memory workloads with AWS

Discover how Yahoo effectively leverages the power of Amazon ElastiCache and data tiering to process an astounding 1.3 million advertising data events per second, all while generating savings of up to 50% on their overall bill.

Data tiering is an ingenious method to scale up to hundreds of terabytes of capacity by intelligently managing data. It achieves this by automatically shifting the least-recently accessed data between RAM and high-performance SSDs.

In this video, you will gain insights into how data tiering operates and how you can unlock ultra-fast speeds and seamless scalability for your workloads in a cost-efficient manner. Furthermore, you can also learn how it’s implemented under the hood.

Take me to this re:Invent 2022 video!

A snapshot of how Yahoo architecture leverages Amazon ElastiCache

A snapshot of how Yahoo architecture leverages Amazon ElastiCache

Use MemoryDB to build real-time applications for performance and durability

MemoryDB is a robust, durable database marked by microsecond reads, low single-digit millisecond writes, scalability, and fortified enterprise security. It guarantees an impressive 99.99% availability, coupled with instantaneous recovery without any data loss.

In this session, we explore multiple use cases across sectors, such as Financial Services, Retail, and Media & Entertainment, like payment processing, message brokering, and durable session store applications. Moreover, through a practical demonstration, you can learn how to utilize MemoryDB to establish a microservices message broker for a Media & Entertainment application.

Take me to this AWS Online Tech Talks video!

A sample use case for retail application

A sample use case for retail application

Samsung SmartThings powers home automation with Amazon MemoryDB

MemoryDB offers the kind of ultra-fast performance that only an in-memory database can deliver, curtailing latency to microseconds and processing 160+ million requests per second —without data loss. In this re:Invent 2022 session, you will understand why Samsung SmartThings selected MemoryDB as the engine to power the next generation of their IoT device connectivity platform, one that processes millions of events every day.

You can also discover the intricate design of MemoryDB and how it ensures data durability without compromising the performance of in-memory operations, thanks to the utilization of a multi-AZ transactional log. This session is an enlightening deep-dive into durable, in-memory data operations.

Take me to this re:Invent 2022 video!

The architecture leveraged by Samsung SmartThings using Amazon MemoryDB for Redis

The architecture leveraged by Samsung SmartThings using Amazon MemoryDB for Redis

Amazon ElastiCache: In-memory datastore fundamentals, use cases and examples

In this edition of AWS Online Tech Talks, explore Amazon ElastiCache, a managed service that facilitates the seamless setup, operation, and scaling of widely used, open-source–compatible, in-memory datastores in the cloud environment. This service positions you to develop data-intensive applications or enhance the performance of your existing databases through high-throughput, low-latency, in-memory datastores. Learn how it is leveraged for caching, session stores, gaming, geospatial services, real-time analytics, and queuing functionalities.

This course can help cultivate a deeper understanding of Amazon ElastiCache, and how it can be used to accelerate your data processing while maintaining robustness and reliability.

Take me to this AWS Online Tech Talks course!

A free training course to increase your skills and leverage better in-memory databases

A free training course to increase your skills and leverage better in-memory databases

See you next time!

Thanks for joining us to discuss in-memory databases! In 2 weeks, we’ll talk about SQL databases.

To find all the blogs from this series, visit the Let’s Architect! list of content on the AWS Architecture Blog.

Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support…

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/timestone-netflixs-high-throughput-low-latency-priority-queueing-system-with-built-in-support-1abf249ba95f

Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support for Non-Parallelizable Workloads

by Kostas Christidis

Introduction

Timestone is a high-throughput, low-latency priority queueing system we built in-house to support the needs of Cosmos, our media encoding platform. Over the past 2.5 years, its usage has increased, and Timestone is now also the priority queueing engine backing Conductor, our general-purpose workflow orchestration engine, and BDP Scheduler, the scheduler for large-scale data pipelines. All in all, millions of critical workflows within Netflix now flow through Timestone on a daily basis.

Timestone clients can create queues, enqueue messages with user-defined deadlines and metadata, then dequeue these messages in an earliest-deadline-first (EDF) fashion. Filtering for EDF messages with criteria (e.g. “messages that belong to queue X and have metadata Y”) is also supported.

One of the things that make Timestone different from other priority queues is its support for a construct we call exclusive queues — this is a means to mark chunks of work as non-parallelizable, without requiring any locking or coordination on the consumer side; everything is taken care of by the exclusive queue in the background. We explain the concept in detail in the sections that follow.

Why Timestone

When designing the successor to Reloaded — our media encoding system — back in 2018 (see “Background” section in The Netflix Cosmos Platform), we needed a priority queueing system that would provide queues between the three components in Cosmos (Figure 1):

  1. the API framework (Optimus),
  2. the forward chaining rule engine (Plato), and
  3. the serverless computing layer (Stratum)
Figure 1. A video encoding application built on top of Cosmos. Notice the three Cosmos subsystems: Optimus, an API layer mapping external requests to internal business models, Plato, a workflow layer for business rule modeling, and Stratum, the serverless layer for running stateless and computational-intensive functions. Source: The Netflix Cosmos Platform

Some of the key requirements this priority queueing system would need to satisfy:

1. A message can only be assigned to one worker at any given time. The work that tends to happen in Cosmos is resource-intensive, and can fan out to thousands of actions. Assume then, that there is replication lag between the replicas in our data store, and we present as dequeueable to worker B the message that was just dequeued by worker A via a different node. When we do that, we waste significant compute cycles. This requirement then throws eventually consistent solutions out of the window, and means we want linearizable consistency at the queue level.

2. Allow for non-parallelizable work.

Given that Plato is continuously polling all workflow queues for more work to execute —

While Plato is executing a workflow for a given project (a request for work on a given service) —

Then Plato should not be able to dequeue additional requests for work for that project on that workflow. Otherwise Plato’s inference engine will evaluate the workflow prematurely, and may move the workflow to an incorrect state.

There exists then, a certain type of work in Cosmos that should not be parallelizable, and the ask is for the queueing system to support this type of access pattern natively. This requirement gave birth to the exclusive queue concept. We explain how exclusive queues work in Timestone in the“Key Concepts” section.

3. Allow for dequeueing and queue depth querying using filters (metadata key-value pairs)

4. Allow for the automatic creation of a queue upon message ingestion

5. Render a message dequeueable within a second of ingestion

We built Timestone because we could not find an off-the-shelf solution that meets these requirements.

System Architecture

Timestone is a gRPC-based service. We use protocol buffers to define the interface of our service and the structure of our request and response messages. The system diagram for the application is shown in Figure 2.

Figure 2. Timestone system diagram. Arrows link all the components touched during a typical Timestone client-server interaction. Numbers in red indicate sequence steps. Identical numbers identify concurrent steps.

System of record

The system of record is a durable Redis cluster. Every write request (see Step 1 — note that this includes dequeue requests since they alter the state of the queue) that reaches the cluster (Step 2) is persisted to a transaction log before a response is sent back to the server (Step 3).

Inside the database, we represent each queue with a sorted set where we rank message IDs (see “Message” section) according to priority. We persist messages and queue configurations (see “Queues” section) in Redis as hashes. All data structures related to a queue — from the messages it contains to the in-memory secondary indexes needed to support dequeue-by-filter — are placed in the same Redis shard. We achieve this by having them share a common prefix, specific to the queue in question. We then codify this prefix as a Redis hash tag. Each message carries a payload (see “Message” section) that can weigh up to 32 KiB.

Almost all of the interactions between Timestone and Redis (see “Message States” section) are codified as Lua scripts. In most of these Lua scripts, we tend to update a number of data structures. Since Redis guarantees that each script is executed atomically, a successful script execution is guaranteed to leave the system in a consistent (in the ACID sense) state.

All API operations are queue-scoped. All API operations that modify state are idempotent.

Secondary indexes

For observability purposes, we capture information about incoming messages and their transition between states in two secondary indexes maintained on Elasticsearch.

When we get a write response from Redis, we concurrently (a) return the response to the client, and (b) convert this response into an event that we post to a Kafka cluster, as shown in Step 4. Two Flink jobs — one for each type of index we maintain — consume the events from the corresponding Kafka topics, and update the indexes in Elasticsearch.

One index (“current”) gives users a best-effort view into the current state of the system, while the other index (“historic”) gives users a best effort longitudinal view for messages, allowing them to trace the messages as they flow through Timestone, and answer questions such as time spent in a state, and number of processing errors. We maintain a version counter for each message; every write operation increments that counter. We rely on that version counter to order the events in the historic index.

Events are stored in the Elasticsearch cluster for a finite number of days.

Current Usage at Netflix

The system is dequeue heavy. We see 30K dequeue requests per second (RPS) with a P99 latency of 45ms. In comparison, we see 1.2K enqueue RPS at 25ms P99 latency. We regularly see 5K RPS enqueue bursts at 85ms P99 latency.

15B messages have been enqueued to Timestone since the beginning of the year; these messages have been dequeued 400B times. Pending messages regularly reach 10M.

Usage is expected to double next year, as we migrate the rest of Reloaded, our legacy media encoding system, to Cosmos.

Key Concepts

Message

A message carries an opaque payload, a user-defined priority (see “Priority” section), an optional (mandatory for exclusive queues) set of metadata key-value pairs that can be used for filter-based dequeueing, and an optional invisibility duration.

Any message that is placed into a queue can be dequeued a finite number of times. We call these attempts; each dequeue invocation on a message decreases the attempts left on it.

Priority

The priority of a message is expressed as an integer value; the lower the value, the higher the priority. While an application is free to use whatever range they see fit, the norm is to use Unix timestamps in milliseconds (e.g. 1661990400000 for 9/1/2022 midnight UTC).

Figure 3. A snippet from the PriorityClass enum used by a streaming encoding pipeline in Cosmos. The values in parentheses indicate the offset in days.

It is also entirely up to the application to define its own priority levels. For instance a streaming encoding pipeline within Cosmos uses mail priority classes, as shown in Figure 3. Messages belonging to the standard class use the time of enqueue as their priority, while all other classes have their priority values adjusted in multiples of ∼10 years. The priority is set at the workflow rule level, but can be overridden if the request carries a studio tag, such as DAY_OF_BROADCAST.

Message States

Within a queue, a Timestone message belongs to one of six states (Figure 4):

  1. invisible
  2. pending
  3. running
  4. completed
  5. canceled
  6. errored

In general, a message can be enqueued with or without invisibility, which makes the message invisible or pending respectively. Invisible messages become pending when their invisibility window elapses.

A worker can dequeue a pending earliest-deadline-first message from a queue by specifying the amount of time (lease duration) they will be processing it for. Dequeueing messages in batch is also supported. This moves the message to the running state.

The same worker can then issue a complete call to Timestone within the allotted lease window to move the message to the completed state, or issue a lease extension call if they want to maintain control of the message. (A worker can also move a typically running message to the canceled state to signal it is no longer need for processing.)

If none of these calls are issued on time, the message becomes dequeueable again, and this attempt on the message is spent. If there are no attempts left on the message, it is moved automatically to the errored state.

The terminal states (completed, errored, and canceled) are garbage-collected periodically by a background process.

Messages can move states either when a worker invokes an API operation, or when Timestone runs its background processes (Figure 4, marked in red — these run periodically). Figure 4 shows the complete state transition diagram.

Figure 4. State transition diagram for Timestone messages.

Queues

All incoming messages are stored in queues. Within a queue, messages are sorted by their priority date. Timestone can host an arbitrary number of user-created queues, and offers a set of API operations for queue management, all revolving around a queue configuration object. Data we store in this object includes the queue’s type (see rest of section), the lease duration that applies to dequeued messages, or the invisibility duration that applies to enqueued messages, the number of times a message can be dequeued, and whether enqueueing or dequeueing is temporarily blocked. Note that a message producer can override the default lease or invisibility duration by setting it at the message level during enqueue.

All queues in Timestone fall into two types, simple, and exclusive.

When an exclusive queue is created, it is associated with a user-defined exclusivity key — for example project. All messages posted to that queue must carry this key in their metadata. For instance, a message with project=foo will be accepted into the queue; a message without the project key will not be. In this example, we call foo, the value that corresponds to the exclusivity key, the message’s exclusivity value.

The contract for exclusive queues is that at any point in time, there can be only up to one consumer per exclusivity value. Therefore, if the project-based exclusive queue in our example has two messages with the key-value pair project=foo in it, and one of them is already leased out to a worker, the other one is not dequeueable. This is depicted in Figure 5.

Figure 5. When worker_2 issues a dequeue call, they lease msg_2 instead of msg_1, even though msg_1 has a higher priority. That happens because the queue is exclusive, and the exclusive value foo is already leased out.

In a simple queue no such contract applies, and there is no tight coupling with message metadata keys. A simple queue works as your typical priority queue, simply ordering messages in an earliest-deadline-first fashion.

What We Are Working On

Some of the things we’re working on:

  1. As the the usage of Timestone within Cosmos grows, so does the need to support a range of queue depth queries. To solve this, we are building a dedicated query service that uses a distinct query model.
  2. As noted above (see “System of record” section), a queue and its contents can only currently occupy one Redis shard. Hot queues however can grow big, esp. when compute capacity is scarce. We want to support arbitrarily large queues, which has us building support for queue sharding.
  3. Messages can carry up to 4 key-value pairs. We currently use all of these key-value pairs to populate the secondary indexes used during dequeue-by-filter. This operation is exponentially complex both in terms of time and space (O(2^n)). We are switching to lexicographical ordering on sorted sets to drop the number of indexes by half, and handle metadata in a more cost-efficient manner.

We may be covering our work on the above in follow-up posts. If these kinds of problems sound interesting to you, and if you like the challenges of building distributed systems for the Netflix Content and Studio ecosystem at scale in general, you should consider joining us.

Acknowledgements

Poorna Reddy, Kostas Christidis, Aravindan Ramkumar, Surafel Korse, Jiaofen Xu, Anoop Panicker, and Kishore Banala have contributed to this project. We thank Charles Zhao, Olof Johansson, Frank San Miguel, Dmitry Vasilyev, Prudhvi Chaganti, and the rest of the Cosmos team for their constructive feedback while developing and operating Timestone.


Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support… was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Exploring Data @ Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/exploring-data-netflix-9d87e20072e3

By Gim Mahasintunan on behalf of Data Platform Engineering.

Supporting a rapidly growing base of engineers of varied backgrounds using different data stores can be challenging in any organization. Netflix’s internal teams strive to provide leverage by investing in easy-to-use tooling that streamlines the user experience and incorporates best practices.

In this blog post, we are thrilled to share that we are open-sourcing one such tool: the Netflix Data Explorer. The Data Explorer gives our engineers fast, safe access to their data stored in Cassandra and Dynomite/Redis data stores.

Netflix Data Explorer on GitHub

History

We began this project several years ago when we were onboarding many new Dynomite customers. Dynomite is a high-speed in-memory database, providing highly available cross datacenter replication while preserving Redis-like semantics. We wanted to lower the barrier for adoption so users didn’t need to know datastore-specific CLI commands, could avoid mistakenly running commands that might negatively impact performance, and allow them to access the clusters they frequented every day.

As the project took off, we saw a similar need for our other datastores. Cassandra, our most significant footprint in the fleet, seemed like a great candidate. Users frequently had questions on how they should set up replication, create tables using an appropriate compaction strategy, and craft CQL queries. We knew we could give our users an elevated experience, and at the same time, eliminate many of the common questions on our support channels.

We’ll explore some of the Data Explorer features, and along the way, we’ll highlight some of the ways we enabled the OSS community while still handling some of the unique Netflix-specific use cases.

Multi-Cluster Access

By simply directing users to a single web portal for all of their data stores, we can gain a considerable increase in user productivity. Furthermore, in production environments with hundreds of clusters, we can reduce the available data stores to those authorized for access; this can be supported in OSS environments by implementing a Cluster Access Control Provider responsible for fetching ownership information.

Browsing your accessible clusters in different environments and regions

Schema Designer

Writing CREATE TABLE statements can be an intimidating experience for new Cassandra users. So to help lower the intimidation factor, we built a schema designer that lets users drag and drop their way to a new table.

The schema designer allows you to create a new table using any primitive or collection data type, then designate your partition key and clustering columns. It also provides tools to view the storage layout on disk; browse the supported sample queries (to help design efficient point queries); guide you through the process of choosing a compaction strategy, and many other advanced settings.

Dragging and dropping your way to a new Cassandra table

Explore Your Data

You can quickly execute point queries against your cluster in Explore mode. The Explore mode supports full CRUD of records and allows you to export result sets to CSV or download them as CQL insert statements. The exported CQL can be a handy tool for quickly replicating data from a PROD environment to your TEST environment.

Explore mode gives you quick access to table data

Support for Binary Data

Binary data is another popular feature used by many of our engineers. The Data Explorer won’t fetch binary value data by default (as the persisted data might be sizable). Users can opt-in to retrieve these fields with their choice of encoding.

Choosing how you want to decode blob data

Query IDE

Efficient point queries are available in the Explore mode, but you may have users that still require the flexibility of CQL. Enter the Query mode, which includes a powerful CQL IDE with features like autocomplete and helpful snippets.

Example of free-form Cassandra queries with autocomplete assistance

There are also guardrails in place to help prevent users from making mistakes. For instance, we’ll redirect the user to a bespoke workflow for deleting a table if they try to perform a “DROP TABLE…” command ensuring the operation is done safely with additional validation. (See our integration with Metrics later in this article.)

As you submit queries, they will be saved in the Recent Queries view as well — handy when you are trying to remember that WHERE clause you had crafted before the long weekend.

Dynomite and Redis Features

While C* is feature-rich and might have a more extensive install base, we have plenty of good stuff for Dynomite and Redis users too. Note, the terms Dynomite and Redis are used interchangeably unless explicitly distinguished.

Key Scanning

Since Redis is an in-memory data store, we need to avoid operations that inadvertently load all the keys into memory. We perform SCAN operations across all nodes in the cluster, ensuring we don’t strain the cluster.

Scanning for keys on a Dynomite cluster

Dynomite Collection Support

In addition to simple String keys, Dynomite supports a rich collection of data types, including Lists, Hashes, and sorted and unsorted Sets. The UI supports creating and manipulating these collection types as well.

Editing a Redis hash value

Supporting OSS

As we were building the Data Explorer, we started getting some strong signals that the ease-of-use and productivity gains that we’d seen internally would benefit folks outside of Netflix as well. We tried to balance codifying some hard-learned best practices that would be generally applicable while maintaining the flexibility to support various OSS environments. To that end, we’ve built several adapter layers into the product where you can provide custom implementations as needed.

The application was architected to enable OSS by introducing seams where users could provide their implementations for discovery, access control, and data store-specific connection settings. Users can choose one of the built-in service providers or supply a custom provider.

The diagram below shows the server-side architecture. The server is a Node.js Express application written in TypeScript, and the client is a Single Page App written in Vue.js.

Data Explorer architecture and service adapter layers

Demo Environment

Deploying a new tool in any real-world environment is a time commitment. We get it, and to help you with that initial setup, we have included a dockerized demo environment. It can build the app, pull down images for Cassandra and Redis, and run everything in Docker containers so you can dive right in. Note, the demo environment is not intended for production use.

Overridable Configuration

The Data Explorer ships with many default behaviors, but since no two production environments are alike, we provide a mechanism to override the defaults and specify your custom values for various settings. These can range from which port numbers to use to which features should be disabled in a production environment. (For example, the ability to drop a Cassandra table.)

CLI Setup Tool

To further improve the experience of creating your configuration file, we have built a CLI tool that provides a series of prompts for you to follow. The CLI tool is the recommended approach for building your configuration file, and you can re-run the tool at any point to create a new configuration.

The CLI allows you to create a custom configuration

You can also generate multiple configuration files and easily switch between them when working with different environments. We have instructions on GitHub on working with more than one configuration file.

Service Adapters

It’s no secret that Netflix is a big proponent of microservices: we have discovery services for identifying Cassandra and Dynomite clusters in the environment; access-control services that identify who owns a data store and who can access it; and LDAP services to find out information about the logged-in user. There’s a good chance you have similar services in your environment too.

To help enable such environments, we have several pre-canned configurations with overridable values and adapter layers in place.

Discovery

The first example of this adapter layer in action is how the application finds Discovery information — these are the names and IP addresses of the clusters you want to access. The CLI allows you to choose from a few simple options. For instance, if you have a process that can update a JSON file on disk, you can select “file system.” If instead, you have a REST-based microservice that provides this information, then you can choose “custom” and write a few lines of code necessary to fetch it.

Choosing to discover our data store clusters by reading a local file

Metrics

Another example of this service adapter layer is integration with an external metrics service. We progressively enhance the UI by displaying keyspace and table metrics by implementing a metrics service adapter. These metrics provide insight into which tables are being used at a glance and help our customers make an informed decision when dropping a table.

Without metrics support
With optional metrics support

OSS users can enable the optional Metrics support via the CLI. You then just need to write the custom code to fetch the metrics.

CLI enabling customization of advanced features

i18n Support

While internationalization wasn’t an explicit goal, we discovered that providing Netflix-specific messages in some instances yielded additional value to our internal users. Fundamentally, this is similar to how resource bundles handle different locales.

We are making en-NFLX.ts available internally and en-US.ts available externally. Enterprise customers can enhance their user’s experience by creating custom resource bundles (en-ACME.ts) that link to other tools or enhance default messages. Only a small percentage of the UI and server-side exceptions use these message bundles currently — most commonly to augment messages somehow (e.g., provide links to internal slack channels).

Final Thoughts

We invite you to check out the project and let us know how it works for you. By sharing the Netflix Data Explorer with the OSS community, we hope to help you explore your data and inspire some new ideas.


Exploring Data @ Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Uncovering the truth behind Lua and Redis data consistency

Post Syndicated from Grab Tech original https://engineering.grab.com/uncovering-the-truth-behind-lua-and-redis-data-consistency

Uncovering the truth behind Lua and Redis data consistency

Our team at Grab uses Redis as one of our message queues. The Redis server is deployed in a master/replica setup. Quite recently, we have been noticing a spike in the CPU usage of the Redis replicas every time we deploy our service, even when the replicas are not in use and when there’s no read traffic to it. However, the issue is resolved once we reboot the replica.

Because a reboot of the replica fixes the issue every time, we thought that it might be due to some Elasticache replication issues and didn’t pursue further. However, a recent Redis failover brought this to our attention again. After the failover, the problematic replica becomes the new master and its CPU immediately goes to 100% with the read traffic, which essentially means the cluster is not functional after the failover. And this time we investigated the issue with new vigour. What we found in our investigation led us to deep dive into the details of Redis replication and its implementation of Hash.

Did you know that Redis master/replica can become inconsistent in certain scenarios?

Did you know the encoding of Hash objects on the master and the replica are different even if the writing operations are exactly the same and in the same order? Read on to find out why.

The problem

The following graph shows the CPU utilization of the master vs. the replica immediately after our service is deployed.

Architecture diagram
CPU Utilization

From the graph, you can see the following CPU usage trends. Replica’s CPU usage:

  • Increases immediately after our service is deployed.
  • Spikes higher than the master after a certain time.
  • Get’s back to normal after a reboot.

Cursory investigation

Because the spike occurs only when we deploy our service, we scrutinised all the scripts that were triggered immediately after the deployment. Lua monitor script was identified as a possible suspect. The script redistributes inactive service instances’ messages in the queue to active service instances so that messages can be processed by other healthy instances.

We ran a few experiments related to the Lua monitor script using the Redis monitor command to compare the script’s behaviour on master and the replica. A side note, because this command causes performance degradation, use it with discretion. Coming back to the script, we were surprised to note that the monitor script behaves differently between the master and the replica:

  • Redis executes the script separately on the master and the replica. We expected the script to execute only on master and the resulting changes to be replicated to the secondary.
  • The Redis command HGETALL used in the script returns the hash keys in a different order on master compared to the replica.

Due to the above reasons, the script causes data inconsistencies between the master and its replica. From that point on, the data between the master and the replica keeps diverging till they become completely distinct. Due to the inconsistency, the data on the secondary does not get deleted correctly thereby growing into an extremely large dataset. Any further operations on the large dataset requires a higher CPU usage, which explains why the replica’s CPU usage is higher than the master.

During replica reboots, the data gets synced and consistent again, which is why the CPU usage gets to normal values after rebooting.

Diving deeper on HGETALL

We knew that the keys of a hash are not ordered and we should not rely on the order. But it still puzzled us that the order is different even when the writing sequence is the same between the master and the replica. Plus the fact that the orders are always the same in our local environment with a similar setup made us even more curious.

So to better understand the underlying magic of Redis and to avoid similar bugs in the future, we decided to hammer on and read the Redis source code to get more details.

HGETALL command handling code

The HGETALL command is handled by the function genericHgetallCommand and it further calls hashTypeNext to iterate through the Hash object. A snippet of the code is shown as follows:

/* Move to the next entry in the hash. Return C_OK when the next entry
 * could be found and C_ERR when the iterator reaches the end. */
int hashTypeNext(hashTypeIterator *hi) {
    if (hi->encoding == OBJ_ENCODING_ZIPLIST) {
        // call zipListNext
    } else if (hi->encoding == OBJ_ENCODING_HT) {
        // call dictNext
    } else {
        serverPanic("Unknown hash encoding");
    }
    return C_OK;
}

From the code snippet, you can see that the Redis Hash object actually has two underlying representations:

  • ZIPLIST
  • HASHTABLE (dict)

A bit of research online helped us understand that, to save memory, Redis chooses between the two hash representations based on the following limits:

  • By default, Redis stores the Hash object as a zipped list when the hash has less than 512 entries and when each element’s size is smaller than 64 bytes.
  • If either limit is exceeded, Redis converts the list to a hashtable, and this is irreversible. That is, Redis won’t convert the hashtable back to a list again, even if the entries/size falls below the limit.

Eureka moment

Based on this understanding, we checked the encoding of the problematic hash in staging.

stg-bookings-qu-002.pcxebj.0001.apse1.cache.amazonaws.com:6379> object encoding queue_stats
"hashtable"

stg-bookings-qu-001.pcxebj.0001.apse1.cache.amazonaws.com:6379> object encoding queue_stats
"ziplist"

To our surprise, the encodings of the Hash object on the master and its replica were different. Which means if we add or delete elements in the hash, the sequence of the keys won’t be the same due to hashtable operation vs. list operation!

Now that we have identified the root cause, we were still curious about the difference in encoding between the master and the replica.

How could the underlying representations be different?

We reasoned, “If the master and its replica’s writing operations are exactly the same and in the same order, why are the underlying representations still different?

To answer this, we further looked through the Redis source to find all the possible places that a Hash object’s representation could be changed and soon found the following code snippet:

/* Load a Redis object of the specified type from the specified file.
 * On success a newly allocated object is returned, otherwise NULL. */
robj *rdbLoadObject(int rdbtype, rio *rdb) {
  //...
  if (rdbtype == RDB_TYPE_HASH) {
    //...
    o = createHashObject();  // ziplist

    /* Too many entries? Use a hash table. */
    if (len > server.hash_max_ziplist_entries)
        hashTypeConvert(o, OBJ_ENCODING_HT);

    //...
  }
}

Reading through the code we understand the following behaviour:

  • When restoring from an RDB file, Redis creates a ziplist first for Hash objects.
  • Only when the size of the Hash object is greater than the hash_max_ziplist_entries, the ziplist is converted to a hashtable.

So, if you have a Redis Hash object encoded as a hashtable with its length less than hash_max_ziplist_entries (512) in the master, when you set up a replica, it is encoded as a ziplist.

We were able to verify this behaviour in our local setup as well.

How did we fix it?

We could use the following two approaches to address this issue:

  • Enable script effect replication mode. This tells Redis to replicate the commands generated by the script instead of running the whole script on the replica. One disadvantage to using this approach is that it adds network traffic between the master and the replica.
  • Ensure the behaviour of the Lua monitor script is deterministic. In our case, we can do this by sorting the outputs of HKEYS/HGETALL.

We chose the latter approach because:

  • The Hash object is pretty small ( < 30 elements) so the sorting overhead is low, less than 1ms for 100 elements based on our tests.
  • Replicating our script effect would end up replicating thousands of Redis writing commands on the secondary causing a much higher overhead compared to replicating just the script.

After the fix, the CPU usage of the replica remained in range after each deployment. This also prevented the Redis cluster from being destroyed in the event of a master failover.

Key takeaways

In addition to writing clear and maintainable code, it’s equally important to understand the underlying storage layer that you are dealing with to produce efficient and bug-free code.

The following are some of the key learnings on Redis:

  • Redis does not guarantee the consistency between master and its replica nodes when Lua scripts are used. You have to ensure that the behaviour of the scripts are deterministic to avoid data inconsistency.
  • Redis replicates the whole Lua script instead of the resulting commands to the replica. However, this is the default behaviour and you can disable it.
  • To save memory, Redis uses different representations for Hash. Your Hash object could be stored as a list in memory or a hashtable. This is not guaranteed to be the same across the master and its replicas.

Join us

Grab is more than just the leading ride-hailing and mobile payments platform in Southeast Asia. We use data and technology to improve everything from transportation to payments and financial services across a region of more than 620 million people. We aspire to unlock the true potential of Southeast Asia and look for like-minded individuals to join us on this ride.

If you share our vision of driving South East Asia forward, apply to join our team today.