Tag Archives: Redis

Evaluating performance impact of removing Redis-cache from a Scylla-backed service

Post Syndicated from Grab Tech original https://engineering.grab.com/evalutate-performance-remove-redis-from-scylla-service

Introduction

At Grab, we operate a set of services that manage and provide counts of various items. While this may seem straightforward, the scale at which this feature operates—benefiting millions of Grab users daily—introduces complexity. This feature is divided into three microservices: one for “writing” counts, another for handling “read” requests, and a third serving as the backend for a portal used by data scientists and analysts to configure these counters.

This article focuses on the service responsible for handling “read” requests. This service is backed by Scylla storage and a Redis cache. It also connects to a MySQL RDS to retrieve “counter configurations” that are necessary for processing incoming requests. Written in Rust, the service serves tens of thousands of queries per second (QPS) during peak times, with each request typically being a “batch request” requiring multiple lookups (~10) on Scylla.

Recently, the service has encountered performance challenges, causing periodic spikes in Scylla QPS. These spikes occur throughout the day but are particularly evident during peak hours. To understand this better, we’ll first walk you through how this service operates, particularly how it serves incoming requests. We will then explain our proposed solution and the outcomes of our experiment.

Anatomy of a request

Each counter configuration stored in MySQL has a template that dictates the format of incoming queries. For example, this sample counter configuration is used to count the raindrops for a specific city:

{
    "id": 34,
    "name": "count_rain_drops",
    "template": "rain_drops:city:{city_id}"
    ....
    ....
}

An incoming request using this counter might look like this:

{
    "key": "rain_drops:city:111222",
    "fromTime": 1727215430, // 24 September 2024 22:03:50
    "toTime": 1727400000, // 27 September 2024 01:20:00
}

This request seeks the number of raindrops in our imaginary city with city ID: 111222, between 1727215430 (24 September 2024 22:03:50) and 1727400000 (27 September 2024 01:20:00).

Another service keeps track of raindrops by city and writes the minutely (truncated at 15 minutes), hourly, and daily counts to three different Scylla tables:

  • minutely_count_table
  • hourly_count_table
  • daily_count_table

The service processing the request rounds down the time to the nearest 15 minutes. As a result, the request is processed with the following time range:

  • Start time: 24 September 2024 22:00:00
  • End time: 27 September 2024 01:15:00

Let’s assume we have the following data in these three tables for “rain_drops:city:111222”. The datapoints used in the above example request are highlighted in bold.

minutely_count_table:

key minutely_timestamp count
rain_drops:city:111222 2024-09-24T22:00:00Z 3
rain_drops:city:111222 2024-09-24T22:15:00Z 2
rain_drops:city:111222 2024-09-24T22:30:00Z 4
rain_drops:city:111222 2024-09-24T22:45:00Z 1
rain_drops:city:111222 2024-09-27T01:00:00Z 2
rain_drops:city:111222 2024-09-27T01:15:00Z 3

hourly_count_table:

key hourly_timestamp count
rain_drops:city:111222 2024-09-24T22:00:00Z 18
rain_drops:city:111222 2024-09-24T23:00:00Z 22
rain_drops:city:111222 2024-09-25T00:00:00Z 15
rain_drops:city:111222 2024-09-27T00:00:00Z 11
rain_drops:city:111222 2024-09-27T01:00:00Z 9

daily_count_table:

key daily_timestamp count
rain_drops:city:111222 2024-09-24T00:00:00Z 214
rain_drops:city:111222 2024-09-25T00:00:00Z 189
rain_drops:city:111222 2024-09-26T00:00:00Z 245
rain_drops:city:111222 2024-09-27T00:00:00Z 78

Now, let’s see how the service calculates the total count for the incoming request with “rain_drops:city:111222” based on the provided data:

Time range:

  • From: 24 September 2024 22:03:50
  • To: 27 September 2024 01:20:00

For the full days within the range, specifically 25th and 26th September, we can use data from the daily_count_table. However, for the start (24th September) and end (27th September) date of the range, we cannot use data from the daily_count_table as the range only includes portions of these dates. Instead, we will use a combination of data from the hourly_count_table and minutely_count_table to accurately capture the counts for these days.

  1. Query the daily_count_table:

    Sum (full day: 25 and 26th Sep): 189 + 245 = 434

  2. Query the hourly_count_table:
    • For 24th September (from 22:00:00 to 23:59:59):

      Hourly count: 18 + 22 = 40

    • For 27th September (from 00:00:00 to 01:00:00):

      Hourly count: 11

  3. Query the minutely_count_table:

    For 27th September (from 01:00:00 to 01:15:00):

    Minutely count: 2

  4. Total count:

    Total = Daily count (25th and 26th) + Hourly count (24th) + Hourly count (27th) + Minutely count (27th)

    = 434 + 40 + 11 + 2

    = 487

Figure 1: The example request for “rain_drops:city:111222” is handled using data from three different Scylla tables.

As shown in the calculation, when the service receives the request, it comes up with the total count of raindrops by querying three Scylla tables and summing them up using some specific rules within the service itself.

Querying the cache

In the previous section, we explained how Scylla handles a query. If we cached the response for the same request earlier, retrieval from the cache follows a simpler logic. For instance, for the example request, the total count is stored using the floored start and end times (rounded to the nearest 15-minute window within an hour), which was used for the Scylla query instead of the original time in the request. The cache key-value pair would look like this:

  • key: id:34:rain_drops:city:111222:1727215200:1727399700
  • value: 487

Timestamps 1727215200 and 1727399700 represent the adjusted start and end times of 24 September 2024 22:00:00 and 27 September 2024 01:15:00, respectively. It has a Time-To-Live (TTL) of 5 minutes. During this TTL window, any request for the key “rain_drops:city:111222” having the same start and end times (after rounding to the nearest 15 minutes) will be read from the cache instead of querying Scylla.

For example, for the following three start times, although they are different, after flooring the request to the nearest 15 minutes, the start time becomes 24 September 2024 22:00:00 for all of them, which is the same start time as the one in the cache.

  • 24 September 2024 22:01:00
  • 24 September 2024 22:02:00
  • 24 September 2024 22:06:00

In day-to-day operations, this caching setup allows roughly half of our total production requests to be served by the Redis cache.

Figure 2. The graph visualises the relative quantity of cache hits vs Scylla-bound requests.

Problem statement

The setup consisting of Scylla and Redis cache works well. Particularly because Scylla-bound queries need to look up 1-3 tables (minutely, hourly, daily, depending on the time range) and perform the summation as explained earlier, whereas a single cache lookup gets the final value for the same query. However, as our cache key pattern follows the 15-minute truncation strategy, along with a 5-minute cache TTL, it leads to an interesting phenomenon – our cache hits plummet and Scylla QPS spikes at the end of every 15 minutes.

Figure 3. Graph showing 15-minute spikes in Scylla-bound requests accompanied by a decline in cache hit rates.

This occurs primarily due to the fact that almost all requests to our service are for recent data. Due to this, at the end of every 15-minute block within an hour (i.e., 00, 15, 30, 45), most of the requests require creating new cache keys for the latest 15-minute block. At this point in time, there may be many unexpired (i.e., have not reached 5 min TTL) cache keys from the previous 15-minutes block, but they become less relevant as most requests are asking for recent data.

The table in Figure 4 shows example data for configurations “rain_drops:city:111222” and “bird_sighting:city:333444”. For these two configurations, new cache keys are created due to TTL expiry at random times. However, at the end of the 15-minute block, which, in this case is at the end of 22:00-22:15 block, both configurations need new cache keys for the new 15-minute time block that has just started (i.e., start of 22:15-22:30), even though some of their cache keys from the previous 15-minute block are still valid. This requirement of creating new cache keys for most of the requests at the end of a 15-minute block causes spikes in Scylla QPS and a sharp decline in cache hits.

One question that arises is – “Why don’t we see a spike every 5 minutes for cache key TTL expiry?” This is because, within the 15 minutes block, new cache keys are continuously created when a key reaches TTL and a new request for that is received. Since this happens all the time as shown in Figure 4, we do not see a sharp spike. In other words, although Scylla does receive more queries due to cache TTL expiry, it does not lead to a spike in Scylla queries or a sharp drop in cache hits. This is because the cache keys are always being created and invalidated due to TTL expiry instead of following a fixed 5-minute block similar to the 15-minute block we use for our truncation strategy.

Figure 4. This table visualises scenarios when new cache keys are required due to TTL expiry vs due to 15-minute truncation strategy.

These Scylla QPS spikes at the end of every 15-minute block lead to a highly imbalanced Scylla QPS. This often causes high latency in our service during the 15-minute blocks that fall within the peak traffic hours. This further causes more requests to time out, eventually increasing the number of failed requests.

Proposed solution

We propose mitigating this issue by completely removing the Redis-backed caching mechanism from the service. Our observations indicate that the Scylla spikes at the end of 15-minute blocks occur due to cache hit misses. Therefore, removing the caching should eliminate the spikes and provide for a more balanced load.

We acknowledge that this may seem counterintuitive from an overall performance standpoint as removing caching means all queries will be Scylla-bound, potentially impacting the overall performance since caching usually speeds up processes. In addition, caching also comes with an advantage where for cache hits, the service does not need to do the summation on Scylla results from minutely, hourly, and the daily table. Despite these shortcomings, we hypothesise that removing caching should not have an adverse impact on the overall performance. This is based on the fact the Scylla has its own sophisticated caching mechanism. However, our existing setup uses Redis for caching, underutilising Scylla’s cache as the most subsequent queries hit the Redis cache instead.

In summary, we propose eliminating the Redis caching component from our current architecture. This change is expected to resolve the Scylla query spikes observed at the end of every 15-minute block. By relying on Scylla’s native caching mechanism, we anticipate maintaining the service’s overall performance more effectively. The removal of Redis is counterbalanced by the optimised utilisation of Scylla’s built-in caching capabilities.

Experiment

Procedure

The experiment was done on an important live service serving thousands of QPS. To avoid disruptions, we followed a gradual approach. We first turned off caching for a few configurations. If there were no adverse impacts observed, we incrementally disabled cache for more configurations. We controlled the rollout increment by using a mathematical operator on the configuration IDs. This approach is simple and allows us to deterministically disable the cache for specific configurations across all requests, as opposed to using a percentage rollout which randomly disables the cache for different configurations across different requests. This is also due to the fact that the number of configurations is relatively steady and small (less than a thousand). Since these configurations are already fully cached in the service memory from RDS, there will be no performance impact of having a condition that operates on these configurations.

To make sense of the graphs and metrics reported in this section, it is important to understand the traffic pattern of this service. The service usually sees two peaks every day: noon and another around 6-7 PM. On a weekly basis, we usually see the highest traffic on Friday, with the busiest period being from 6-8 PM.

In addition, the timeline of when and how we made various changes to our setup is important to accurately interpret our results.

Experiment timeline: Nov 5 – Nov 13, 2024:

  • Redis cache disabled for ~5% of the counter configurations – Nov 5, 2024, 10.26 AM (Canary started: 10.00 AM)

  • Redis cache disabled for ~25% of the counter configurations – Nov 5, 2024, 12.44 PM (Canary started: 12.20 PM)

  • Redis cache disabled for ~35% of the counter configurations – Nov 6, 2024, 10.50 AM (Canary started: 10.21 AM)

  • Redis cache disabled for ~75% of the counter configurations – Nov 7, 2024, 10.53 AM (Canary started: 10.26 AM) 

  • Experimenting with running a major compaction job during the day time: Tue, Nov 12, 2024, between 2-5 PM (on all nodes)

  • Day time scheduled major compaction job starts from: Tue, Nov 13, 2024, between 2-5 PM (on all nodes)

  • Redis cache disabled for 100% of the counter configs – Wed, 13 Nov 2024, 10:56 AM (Canary started: 10:32 AM)

Unless otherwise specified, the graphs and metrics we report in this article uses this fixed time window: Oct 31 (Thu) 12.00 AM – Nov 15 (Friday) 11.59 PM SGT. This time window covers the entire experiment period with some buffer to observe the experiment’s impact.

Observations

As we progressively disabled read from external Redis cache over the span of 8 days (Nov 5 – Nov 13), we made interesting observations and experimented with some Scylla configuration changes on our end. We describe them in the following sections.

Scylla hit vs. cache hit

As we progressively disabled Redis cache for most of the counters, one obvious impact was the gradual increase in Scylla-bound QPS and similar decrease in Redis-cache hit. When Redis-cache was enabled for 100% of the configurations, 50% of the requests were bound for Scylla and the other 50% were for Redis. At the end of the experiment, after fully disabling Redis cache, 100% of the requests were Scylla-bound.

Figure 5. Gradual increase in Scylla QPS and simultaneous decrease in Redis cache hit.

15-minutes and hourly spikes

We noticed that the 15-minute spikes in Scylla QPS as well as the associated latency slowly became less prominent and eventually disappeared from the graph after we completely disabled the Redis cache. However, we noticed that the hourly spike still remained. This is attributed to the higher QPS from the clients calling this service at the turn of every hour. As a result, limited optimisation can be done to reduce the hourly spike on this service’s end.

Figure 6. The 15-minute spikes in Scylla QPS disappeared after the external Redis cache was fully disabled. This graph uses a smaller time window to show the earlier spikes. It also shows the persistence of hourly spikes after the experiment which is attributed to the clients of this service sending more requests at the start of every hour.
Figure 7. The graph shows that the 15-minute spikes in Scylla’s latency disappeared after the external Redis cache was fully disabled. This graph uses a smaller time window to show the earlier spikes. It also shows the persistence of hourly spikes in latency after the experiment which is attributed to the clients of this service sending more requests at the start of every hour.

Service latency and additional Scylla compaction job

When we disabled Redis cache for about 75% of the counters configurations on Nov 7 (which accounts for about 85% of the overall QPS), we noticed an increase in the overall average service latency, from between 6-8 ms to 7-12 ms (P99 went from ~30-50ms to ~30-70ms). This caused a spike in open circuit breaker (CB) events on Hystrix. At this point, before disabling cache for more counters, on Nov 12, we experimented with running an additional major compaction job on Scylla between 2-5 PM on all our Scylla nodes, progressively on each availability zone (AZ). It is noteworthy that we already have a scheduled major compaction job that runs around 3 AM every day. The outcome of this experiment was quite positive. It brought back the average and P99 latency almost to the prior level when we had Redis cache enabled for 100% of the counters. This also had a similar effect on the Hystrix CB open events. Based on this observation, we made this additional day time major compaction job as a daily scheduled job. We disabled Redis cache for 100% of the counters the next day (Nov 13). This expectedly increased the Scylla QPS, with no noticeable adverse effect on the service latency or Hystrix CB open events.

Figure 8. This graph shows how the average latency changed as a result of the experiment. The higher spikes correspond to the time when Redis cache was being progressively disabled before introducing the day time Scylla compaction job. The spikes lessened after the compaction job was introduced on Nov 12 (Note: Friday spike was due to higher traffic in general).
Figure 9. This graph shows how the P99 latency changed as a result of the experiment. The higher spikes correspond to the time when Redis cache was being progressively disabled before introducing the day time Scylla compaction job. The spikes lessened after the compaction job was introduced on Nov 12 (Note: Friday spike was due to higher traffic in general).

Scylla’s own cache

One of our hypotheses was that we were not using Scylla cache due to our system’s design, along with all the service specific characteristics discussed earlier. Our experimental results show that this is indeed the case. We observed a significant increase in Scylla reads with Scylla’s own cache hits, while Scylla reads with Scylla’s own cache misses remained about the same despite our Scylla cluster receiving double the traffic. Percentage-wise, before disabling the external Redis cache, Scylla hit its own cache for ~30% of the total reads, and after we have completely disabled the external Redis cache, Scylla hit its cache for about 70% of the reads. We believe that this largely contributes to the overall performance of the service despite fully decommissioning the expensive Redis cache component from our system architecture.

Figure 10. Significant increase in Scylla reads after disable Redis cache.
Figure 11. No change in Scylla cache miss despite the doubling of Scylla traffic.

Scylla CPU and memory usage

Contrary to our assumption, although the Scylla QPS doubled due to the change done as part of this experiment, there was marginal increase in Scylla CPU usage (from ~50% to ~52% at peak). In terms of memory, Scylla log-structured allocator (LSA) memory usage remains consistent. For Non-LSA memory, the maximum utilisation did not increase. However, we noticed two daily spikes instead of one existed before the experiment. The second spike results from the newly added daily major compaction job. Notably,the overall non-LSA peak has slightly decreased after the introduction of the new compaction job.

Figure 12. Relatively steady Scylla CPU utilisation.
Figure 13. Non-LSA memory usage spikes twice a day after the experiment. The new spike corresponds to the newly added day time compaction job.

Conclusion

In summary, we were able to maintain the same service performance while removing an expensive Redis cache component from our system architecture, which accounted for about 25% of the overall service cost. This has been made possible primarily by significant increase in the utilisation of Scylla’s own cache and adding a daily major compaction job on all our Scylla nodes.

In the future, we plan to further experiment with different Scylla configurations for potential performance gain, specifically to improve the latency.

Join us

Grab is a leading superapp in Southeast Asia, operating across the deliveries, mobility and digital financial services sectors. Serving over 800 cities in eight Southeast Asian countries, Grab enables millions of people everyday to order food or groceries, send packages, hail a ride or taxi, pay for online purchases or access services such as lending and insurance, all through a single app. Grab was founded in 2012 with the mission to drive Southeast Asia forward by creating economic empowerment for everyone. Grab strives to serve a triple bottom line – we aim to simultaneously deliver financial performance for our shareholders and have a positive social impact, which includes economic empowerment for millions of people in the region, while mitigating our environmental footprint.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Evaluating performance impact of removing Redis-cache from a Scylla-backed service

Post Syndicated from Grab Tech original https://engineering.grab.com/evaluate-performance-remove-redis-from-scylla-service

Introduction

At Grab, we operate a set of services that manage and provide counts of various items. While this may seem straightforward, the scale at which this feature operates—benefiting millions of Grab users daily—introduces complexity. This feature is divided into three microservices: one for “writing” counts, another for handling “read” requests, and a third serving as the backend for a portal used by data scientists and analysts to configure these counters.

This article focuses on the service responsible for handling “read” requests. This service is backed by Scylla storage and a Redis cache. It also connects to a MySQL RDS to retrieve “counter configurations” that are necessary for processing incoming requests. Written in Rust, the service serves tens of thousands of queries per second (QPS) during peak times, with each request typically being a “batch request” requiring multiple lookups (~10) on Scylla.

Recently, the service has encountered performance challenges, causing periodic spikes in Scylla QPS. These spikes occur throughout the day but are particularly evident during peak hours. To understand this better, we’ll first walk you through how this service operates, particularly how it serves incoming requests. We will then explain our proposed solution and the outcomes of our experiment.

Anatomy of a request

Each counter configuration stored in MySQL has a template that dictates the format of incoming queries. For example, this sample counter configuration is used to count the raindrops for a specific city:

{
    "id": 34,
    "name": "count_rain_drops",
    "template": "rain_drops:city:{city_id}"
    ....
    ....
}

An incoming request using this counter might look like this:

{
    "key": "rain_drops:city:111222",
    "fromTime": 1727215430, // 24 September 2024 22:03:50
    "toTime": 1727400000, // 27 September 2024 01:20:00
}

This request seeks the number of raindrops in our imaginary city with city ID: 111222, between 1727215430 (24 September 2024 22:03:50) and 1727400000 (27 September 2024 01:20:00).

Another service keeps track of raindrops by city and writes the minutely (truncated at 15 minutes), hourly, and daily counts to three different Scylla tables:

  • minutely_count_table
  • hourly_count_table
  • daily_count_table

The service processing the request rounds down the time to the nearest 15 minutes. As a result, the request is processed with the following time range:

  • Start time: 24 September 2024 22:00:00
  • End time: 27 September 2024 01:15:00

Let’s assume we have the following data in these three tables for “rain_drops:city:111222”. The datapoints used in the above example request are highlighted in bold.

minutely_count_table:

key minutely_timestamp count
rain_drops:city:111222 2024-09-24T22:00:00Z 3
rain_drops:city:111222 2024-09-24T22:15:00Z 2
rain_drops:city:111222 2024-09-24T22:30:00Z 4
rain_drops:city:111222 2024-09-24T22:45:00Z 1
rain_drops:city:111222 2024-09-27T01:00:00Z 2
rain_drops:city:111222 2024-09-27T01:15:00Z 3

hourly_count_table:

key hourly_timestamp count
rain_drops:city:111222 2024-09-24T22:00:00Z 18
rain_drops:city:111222 2024-09-24T23:00:00Z 22
rain_drops:city:111222 2024-09-25T00:00:00Z 15
rain_drops:city:111222 2024-09-27T00:00:00Z 11
rain_drops:city:111222 2024-09-27T01:00:00Z 9

daily_count_table:

key daily_timestamp count
rain_drops:city:111222 2024-09-24T00:00:00Z 214
rain_drops:city:111222 2024-09-25T00:00:00Z 189
rain_drops:city:111222 2024-09-26T00:00:00Z 245
rain_drops:city:111222 2024-09-27T00:00:00Z 78

Now, let’s see how the service calculates the total count for the incoming request with “rain_drops:city:111222” based on the provided data:

Time range:

  • From: 24 September 2024 22:03:50
  • To: 27 September 2024 01:20:00

For the full days within the range, specifically 25th and 26th September, we can use data from the daily_count_table. However, for the start (24th September) and end (27th September) date of the range, we cannot use data from the daily_count_table as the range only includes portions of these dates. Instead, we will use a combination of data from the hourly_count_table and minutely_count_table to accurately capture the counts for these days.

  1. Query the daily_count_table:

    Sum (full day: 25 and 26th Sep): 189 + 245 = 434

  2. Query the hourly_count_table:
    • For 24th September (from 22:00:00 to 23:59:59):

      Hourly count: 18 + 22 = 40

    • For 27th September (from 00:00:00 to 01:00:00):

      Hourly count: 11

  3. Query the minutely_count_table:

    For 27th September (from 01:00:00 to 01:15:00):

    Minutely count: 2

  4. Total count:

    Total = Daily count (25th and 26th) + Hourly count (24th) + Hourly count (27th) + Minutely count (27th)

    = 434 + 40 + 11 + 2

    = 487

Figure 1: The example request for “rain_drops:city:111222” is handled using data from three different Scylla tables.

As shown in the calculation, when the service receives the request, it comes up with the total count of raindrops by querying three Scylla tables and summing them up using some specific rules within the service itself.

Querying the cache

In the previous section, we explained how Scylla handles a query. If we cached the response for the same request earlier, retrieval from the cache follows a simpler logic. For instance, for the example request, the total count is stored using the floored start and end times (rounded to the nearest 15-minute window within an hour), which was used for the Scylla query instead of the original time in the request. The cache key-value pair would look like this:

  • key: id:34:rain_drops:city:111222:1727215200:1727399700
  • value: 487

Timestamps 1727215200 and 1727399700 represent the adjusted start and end times of 24 September 2024 22:00:00 and 27 September 2024 01:15:00, respectively. It has a Time-To-Live (TTL) of 5 minutes. During this TTL window, any request for the key “rain_drops:city:111222” having the same start and end times (after rounding to the nearest 15 minutes) will be read from the cache instead of querying Scylla.

For example, for the following three start times, although they are different, after flooring the request to the nearest 15 minutes, the start time becomes 24 September 2024 22:00:00 for all of them, which is the same start time as the one in the cache.

  • 24 September 2024 22:01:00
  • 24 September 2024 22:02:00
  • 24 September 2024 22:06:00

In day-to-day operations, this caching setup allows roughly half of our total production requests to be served by the Redis cache.

Figure 2. The graph visualises the relative quantity of cache hits vs Scylla-bound requests.

Problem statement

The setup consisting of Scylla and Redis cache works well. Particularly because Scylla-bound queries need to look up 1-3 tables (minutely, hourly, daily, depending on the time range) and perform the summation as explained earlier, whereas a single cache lookup gets the final value for the same query. However, as our cache key pattern follows the 15-minute truncation strategy, along with a 5-minute cache TTL, it leads to an interesting phenomenon – our cache hits plummet and Scylla QPS spikes at the end of every 15 minutes.

Figure 3. Graph showing 15-minute spikes in Scylla-bound requests accompanied by a decline in cache hit rates.

This occurs primarily due to the fact that almost all requests to our service are for recent data. Due to this, at the end of every 15-minute block within an hour (i.e., 00, 15, 30, 45), most of the requests require creating new cache keys for the latest 15-minute block. At this point in time, there may be many unexpired (i.e., have not reached 5 min TTL) cache keys from the previous 15-minutes block, but they become less relevant as most requests are asking for recent data.

The table in Figure 4 shows example data for configurations “rain_drops:city:111222” and “bird_sighting:city:333444”. For these two configurations, new cache keys are created due to TTL expiry at random times. However, at the end of the 15-minute block, which, in this case is at the end of 22:00-22:15 block, both configurations need new cache keys for the new 15-minute time block that has just started (i.e., start of 22:15-22:30), even though some of their cache keys from the previous 15-minute block are still valid. This requirement of creating new cache keys for most of the requests at the end of a 15-minute block causes spikes in Scylla QPS and a sharp decline in cache hits.

One question that arises is – “Why don’t we see a spike every 5 minutes for cache key TTL expiry?” This is because, within the 15 minutes block, new cache keys are continuously created when a key reaches TTL and a new request for that is received. Since this happens all the time as shown in Figure 4, we do not see a sharp spike. In other words, although Scylla does receive more queries due to cache TTL expiry, it does not lead to a spike in Scylla queries or a sharp drop in cache hits. This is because the cache keys are always being created and invalidated due to TTL expiry instead of following a fixed 5-minute block similar to the 15-minute block we use for our truncation strategy.

Figure 4. This table visualises scenarios when new cache keys are required due to TTL expiry vs due to 15-minute truncation strategy.

These Scylla QPS spikes at the end of every 15-minute block lead to a highly imbalanced Scylla QPS. This often causes high latency in our service during the 15-minute blocks that fall within the peak traffic hours. This further causes more requests to time out, eventually increasing the number of failed requests.

Proposed solution

We propose mitigating this issue by completely removing the Redis-backed caching mechanism from the service. Our observations indicate that the Scylla spikes at the end of 15-minute blocks occur due to cache hit misses. Therefore, removing the caching should eliminate the spikes and provide for a more balanced load.

We acknowledge that this may seem counterintuitive from an overall performance standpoint as removing caching means all queries will be Scylla-bound, potentially impacting the overall performance since caching usually speeds up processes. In addition, caching also comes with an advantage where for cache hits, the service does not need to do the summation on Scylla results from minutely, hourly, and the daily table. Despite these shortcomings, we hypothesise that removing caching should not have an adverse impact on the overall performance. This is based on the fact the Scylla has its own sophisticated caching mechanism. However, our existing setup uses Redis for caching, underutilising Scylla’s cache as the most subsequent queries hit the Redis cache instead.

In summary, we propose eliminating the Redis caching component from our current architecture. This change is expected to resolve the Scylla query spikes observed at the end of every 15-minute block. By relying on Scylla’s native caching mechanism, we anticipate maintaining the service’s overall performance more effectively. The removal of Redis is counterbalanced by the optimised utilisation of Scylla’s built-in caching capabilities.

Experiment

Procedure

The experiment was done on an important live service serving thousands of QPS. To avoid disruptions, we followed a gradual approach. We first turned off caching for a few configurations. If there were no adverse impacts observed, we incrementally disabled cache for more configurations. We controlled the rollout increment by using a mathematical operator on the configuration IDs. This approach is simple and allows us to deterministically disable the cache for specific configurations across all requests, as opposed to using a percentage rollout which randomly disables the cache for different configurations across different requests. This is also due to the fact that the number of configurations is relatively steady and small (less than a thousand). Since these configurations are already fully cached in the service memory from RDS, there will be no performance impact of having a condition that operates on these configurations.

To make sense of the graphs and metrics reported in this section, it is important to understand the traffic pattern of this service. The service usually sees two peaks every day: noon and another around 6-7 PM. On a weekly basis, we usually see the highest traffic on Friday, with the busiest period being from 6-8 PM.

In addition, the timeline of when and how we made various changes to our setup is important to accurately interpret our results.

Experiment timeline: Nov 5 – Nov 13, 2024:

  • Redis cache disabled for ~5% of the counter configurations – Nov 5, 2024, 10.26 AM (Canary started: 10.00 AM)

  • Redis cache disabled for ~25% of the counter configurations – Nov 5, 2024, 12.44 PM (Canary started: 12.20 PM)

  • Redis cache disabled for ~35% of the counter configurations – Nov 6, 2024, 10.50 AM (Canary started: 10.21 AM)

  • Redis cache disabled for ~75% of the counter configurations – Nov 7, 2024, 10.53 AM (Canary started: 10.26 AM) 

  • Experimenting with running a major compaction job during the day time: Tue, Nov 12, 2024, between 2-5 PM (on all nodes)

  • Day time scheduled major compaction job starts from: Tue, Nov 13, 2024, between 2-5 PM (on all nodes)

  • Redis cache disabled for 100% of the counter configs – Wed, 13 Nov 2024, 10:56 AM (Canary started: 10:32 AM)

Unless otherwise specified, the graphs and metrics we report in this article uses this fixed time window: Oct 31 (Thu) 12.00 AM – Nov 15 (Friday) 11.59 PM SGT. This time window covers the entire experiment period with some buffer to observe the experiment’s impact.

Observations

As we progressively disabled read from external Redis cache over the span of 8 days (Nov 5 – Nov 13), we made interesting observations and experimented with some Scylla configuration changes on our end. We describe them in the following sections.

Scylla hit vs. cache hit

As we progressively disabled Redis cache for most of the counters, one obvious impact was the gradual increase in Scylla-bound QPS and similar decrease in Redis-cache hit. When Redis-cache was enabled for 100% of the configurations, 50% of the requests were bound for Scylla and the other 50% were for Redis. At the end of the experiment, after fully disabling Redis cache, 100% of the requests were Scylla-bound.

Figure 5. Gradual increase in Scylla QPS and simultaneous decrease in Redis cache hit.

15-minutes and hourly spikes

We noticed that the 15-minute spikes in Scylla QPS as well as the associated latency slowly became less prominent and eventually disappeared from the graph after we completely disabled the Redis cache. However, we noticed that the hourly spike still remained. This is attributed to the higher QPS from the clients calling this service at the turn of every hour. As a result, limited optimisation can be done to reduce the hourly spike on this service’s end.

Figure 6. The 15-minute spikes in Scylla QPS disappeared after the external Redis cache was fully disabled. This graph uses a smaller time window to show the earlier spikes. It also shows the persistence of hourly spikes after the experiment which is attributed to the clients of this service sending more requests at the start of every hour.
Figure 7. The graph shows that the 15-minute spikes in Scylla’s latency disappeared after the external Redis cache was fully disabled. This graph uses a smaller time window to show the earlier spikes. It also shows the persistence of hourly spikes in latency after the experiment which is attributed to the clients of this service sending more requests at the start of every hour.

Service latency and additional Scylla compaction job

When we disabled Redis cache for about 75% of the counters configurations on Nov 7 (which accounts for about 85% of the overall QPS), we noticed an increase in the overall average service latency, from between 6-8 ms to 7-12 ms (P99 went from ~30-50ms to ~30-70ms). This caused a spike in open circuit breaker (CB) events on Hystrix. At this point, before disabling cache for more counters, on Nov 12, we experimented with running an additional major compaction job on Scylla between 2-5 PM on all our Scylla nodes, progressively on each availability zone (AZ). It is noteworthy that we already have a scheduled major compaction job that runs around 3 AM every day. The outcome of this experiment was quite positive. It brought back the average and P99 latency almost to the prior level when we had Redis cache enabled for 100% of the counters. This also had a similar effect on the Hystrix CB open events. Based on this observation, we made this additional day time major compaction job as a daily scheduled job. We disabled Redis cache for 100% of the counters the next day (Nov 13). This expectedly increased the Scylla QPS, with no noticeable adverse effect on the service latency or Hystrix CB open events.

Figure 8. This graph shows how the average latency changed as a result of the experiment. The higher spikes correspond to the time when Redis cache was being progressively disabled before introducing the day time Scylla compaction job. The spikes lessened after the compaction job was introduced on Nov 12 (Note: Friday spike was due to higher traffic in general).
Figure 9. This graph shows how the P99 latency changed as a result of the experiment. The higher spikes correspond to the time when Redis cache was being progressively disabled before introducing the day time Scylla compaction job. The spikes lessened after the compaction job was introduced on Nov 12 (Note: Friday spike was due to higher traffic in general).

Scylla’s own cache

One of our hypotheses was that we were not using Scylla cache due to our system’s design, along with all the service specific characteristics discussed earlier. Our experimental results show that this is indeed the case. We observed a significant increase in Scylla reads with Scylla’s own cache hits, while Scylla reads with Scylla’s own cache misses remained about the same despite our Scylla cluster receiving double the traffic. Percentage-wise, before disabling the external Redis cache, Scylla hit its own cache for ~30% of the total reads, and after we have completely disabled the external Redis cache, Scylla hit its cache for about 70% of the reads. We believe that this largely contributes to the overall performance of the service despite fully decommissioning the expensive Redis cache component from our system architecture.

Figure 10. Significant increase in Scylla reads after disable Redis cache.
Figure 11. No change in Scylla cache miss despite the doubling of Scylla traffic.

Scylla CPU and memory usage

Contrary to our assumption, although the Scylla QPS doubled due to the change done as part of this experiment, there was marginal increase in Scylla CPU usage (from ~50% to ~52% at peak). In terms of memory, Scylla log-structured allocator (LSA) memory usage remains consistent. For Non-LSA memory, the maximum utilisation did not increase. However, we noticed two daily spikes instead of one existed before the experiment. The second spike results from the newly added daily major compaction job. Notably,the overall non-LSA peak has slightly decreased after the introduction of the new compaction job.

Figure 12. Relatively steady Scylla CPU utilisation.
Figure 13. Non-LSA memory usage spikes twice a day after the experiment. The new spike corresponds to the newly added day time compaction job.

Conclusion

In summary, we were able to maintain the same service performance while removing an expensive Redis cache component from our system architecture, which accounted for about 25% of the overall service cost. This has been made possible primarily by significant increase in the utilisation of Scylla’s own cache and adding a daily major compaction job on all our Scylla nodes.

In the future, we plan to further experiment with different Scylla configurations for potential performance gain, specifically to improve the latency.

Join us

Grab is a leading superapp in Southeast Asia, operating across the deliveries, mobility and digital financial services sectors. Serving over 800 cities in eight Southeast Asian countries, Grab enables millions of people everyday to order food or groceries, send packages, hail a ride or taxi, pay for online purchases or access services such as lending and insurance, all through a single app. Grab was founded in 2012 with the mission to drive Southeast Asia forward by creating economic empowerment for everyone. Grab strives to serve a triple bottom line – we aim to simultaneously deliver financial performance for our shareholders and have a positive social impact, which includes economic empowerment for millions of people in the region, while mitigating our environmental footprint.

Powered by technology and driven by heart, our mission is to drive Southeast Asia forward by creating economic empowerment for everyone. If this mission speaks to you, join our team today!

Let’s Architect! Leveraging in-memory databases

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-leveraging-in-memory-databases/

In-memory databases play a critical role in modern computing, particularly in reducing the strain on existing resources, scaling workloads efficiently, and minimizing the cost of infrastructure. The advanced performance capabilities of in-memory databases make them vital for demanding applications characterized by voluminous data, real-time analytics, and rapid response requirements.

In this edition of Let’s Architect!, we are introducing caching strategies and, further, examining case studies that use Amazon Web Services (AWS), like Amazon ElastiCache or Amazon MemoryDB for Redis, in real workloads where customers share the reasoning behind their approaches. It is very important understanding the context for leveraging a specific solution or pattern, and many common questions can be answered with these resources.

Caching challenges and strategies

Many services built at Amazon rely on caching systems in the background to speed up performance, deal with low latency requirements, and avoid overloading on source databases and other microservices. Operating caches and adding caches into our systems may present complex challenges in terms of monitoring, data consistency, and load on the other components of the system. Indeed, a cache can give big benefits, but it’s also a new component to run and keep healthy. Furthermore, engineers may need to use empirical methods to choose the cache size, expiration policy, and eviction policy: we always have to perform tests and use the metrics to tune the setup.

With this Amazon Builder’s Library resource, you can learn strategies for using caching in your architecture and best practices directly from Amazon’s engineers.

Take me to this Amazon Builder’s Library article!

Strategies applied in Amazon applications at scale, explained and contextualized by Amazon engineers

Strategies applied in Amazon applications at scale, explained and contextualized by Amazon engineers

How Yahoo cost optimizes their in-memory workloads with AWS

Discover how Yahoo effectively leverages the power of Amazon ElastiCache and data tiering to process an astounding 1.3 million advertising data events per second, all while generating savings of up to 50% on their overall bill.

Data tiering is an ingenious method to scale up to hundreds of terabytes of capacity by intelligently managing data. It achieves this by automatically shifting the least-recently accessed data between RAM and high-performance SSDs.

In this video, you will gain insights into how data tiering operates and how you can unlock ultra-fast speeds and seamless scalability for your workloads in a cost-efficient manner. Furthermore, you can also learn how it’s implemented under the hood.

Take me to this re:Invent 2022 video!

A snapshot of how Yahoo architecture leverages Amazon ElastiCache

A snapshot of how Yahoo architecture leverages Amazon ElastiCache

Use MemoryDB to build real-time applications for performance and durability

MemoryDB is a robust, durable database marked by microsecond reads, low single-digit millisecond writes, scalability, and fortified enterprise security. It guarantees an impressive 99.99% availability, coupled with instantaneous recovery without any data loss.

In this session, we explore multiple use cases across sectors, such as Financial Services, Retail, and Media & Entertainment, like payment processing, message brokering, and durable session store applications. Moreover, through a practical demonstration, you can learn how to utilize MemoryDB to establish a microservices message broker for a Media & Entertainment application.

Take me to this AWS Online Tech Talks video!

A sample use case for retail application

A sample use case for retail application

Samsung SmartThings powers home automation with Amazon MemoryDB

MemoryDB offers the kind of ultra-fast performance that only an in-memory database can deliver, curtailing latency to microseconds and processing 160+ million requests per second —without data loss. In this re:Invent 2022 session, you will understand why Samsung SmartThings selected MemoryDB as the engine to power the next generation of their IoT device connectivity platform, one that processes millions of events every day.

You can also discover the intricate design of MemoryDB and how it ensures data durability without compromising the performance of in-memory operations, thanks to the utilization of a multi-AZ transactional log. This session is an enlightening deep-dive into durable, in-memory data operations.

Take me to this re:Invent 2022 video!

The architecture leveraged by Samsung SmartThings using Amazon MemoryDB for Redis

The architecture leveraged by Samsung SmartThings using Amazon MemoryDB for Redis

Amazon ElastiCache: In-memory datastore fundamentals, use cases and examples

In this edition of AWS Online Tech Talks, explore Amazon ElastiCache, a managed service that facilitates the seamless setup, operation, and scaling of widely used, open-source–compatible, in-memory datastores in the cloud environment. This service positions you to develop data-intensive applications or enhance the performance of your existing databases through high-throughput, low-latency, in-memory datastores. Learn how it is leveraged for caching, session stores, gaming, geospatial services, real-time analytics, and queuing functionalities.

This course can help cultivate a deeper understanding of Amazon ElastiCache, and how it can be used to accelerate your data processing while maintaining robustness and reliability.

Take me to this AWS Online Tech Talks course!

A free training course to increase your skills and leverage better in-memory databases

A free training course to increase your skills and leverage better in-memory databases

See you next time!

Thanks for joining us to discuss in-memory databases! In 2 weeks, we’ll talk about SQL databases.

To find all the blogs from this series, visit the Let’s Architect! list of content on the AWS Architecture Blog.

Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support…

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/timestone-netflixs-high-throughput-low-latency-priority-queueing-system-with-built-in-support-1abf249ba95f

Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support for Non-Parallelizable Workloads

by Kostas Christidis

Introduction

Timestone is a high-throughput, low-latency priority queueing system we built in-house to support the needs of Cosmos, our media encoding platform. Over the past 2.5 years, its usage has increased, and Timestone is now also the priority queueing engine backing Conductor, our general-purpose workflow orchestration engine, and BDP Scheduler, the scheduler for large-scale data pipelines. All in all, millions of critical workflows within Netflix now flow through Timestone on a daily basis.

Timestone clients can create queues, enqueue messages with user-defined deadlines and metadata, then dequeue these messages in an earliest-deadline-first (EDF) fashion. Filtering for EDF messages with criteria (e.g. “messages that belong to queue X and have metadata Y”) is also supported.

One of the things that make Timestone different from other priority queues is its support for a construct we call exclusive queues — this is a means to mark chunks of work as non-parallelizable, without requiring any locking or coordination on the consumer side; everything is taken care of by the exclusive queue in the background. We explain the concept in detail in the sections that follow.

Why Timestone

When designing the successor to Reloaded — our media encoding system — back in 2018 (see “Background” section in The Netflix Cosmos Platform), we needed a priority queueing system that would provide queues between the three components in Cosmos (Figure 1):

  1. the API framework (Optimus),
  2. the forward chaining rule engine (Plato), and
  3. the serverless computing layer (Stratum)
Figure 1. A video encoding application built on top of Cosmos. Notice the three Cosmos subsystems: Optimus, an API layer mapping external requests to internal business models, Plato, a workflow layer for business rule modeling, and Stratum, the serverless layer for running stateless and computational-intensive functions. Source: The Netflix Cosmos Platform

Some of the key requirements this priority queueing system would need to satisfy:

1. A message can only be assigned to one worker at any given time. The work that tends to happen in Cosmos is resource-intensive, and can fan out to thousands of actions. Assume then, that there is replication lag between the replicas in our data store, and we present as dequeueable to worker B the message that was just dequeued by worker A via a different node. When we do that, we waste significant compute cycles. This requirement then throws eventually consistent solutions out of the window, and means we want linearizable consistency at the queue level.

2. Allow for non-parallelizable work.

Given that Plato is continuously polling all workflow queues for more work to execute —

While Plato is executing a workflow for a given project (a request for work on a given service) —

Then Plato should not be able to dequeue additional requests for work for that project on that workflow. Otherwise Plato’s inference engine will evaluate the workflow prematurely, and may move the workflow to an incorrect state.

There exists then, a certain type of work in Cosmos that should not be parallelizable, and the ask is for the queueing system to support this type of access pattern natively. This requirement gave birth to the exclusive queue concept. We explain how exclusive queues work in Timestone in the“Key Concepts” section.

3. Allow for dequeueing and queue depth querying using filters (metadata key-value pairs)

4. Allow for the automatic creation of a queue upon message ingestion

5. Render a message dequeueable within a second of ingestion

We built Timestone because we could not find an off-the-shelf solution that meets these requirements.

System Architecture

Timestone is a gRPC-based service. We use protocol buffers to define the interface of our service and the structure of our request and response messages. The system diagram for the application is shown in Figure 2.

Figure 2. Timestone system diagram. Arrows link all the components touched during a typical Timestone client-server interaction. Numbers in red indicate sequence steps. Identical numbers identify concurrent steps.

System of record

The system of record is a durable Redis cluster. Every write request (see Step 1 — note that this includes dequeue requests since they alter the state of the queue) that reaches the cluster (Step 2) is persisted to a transaction log before a response is sent back to the server (Step 3).

Inside the database, we represent each queue with a sorted set where we rank message IDs (see “Message” section) according to priority. We persist messages and queue configurations (see “Queues” section) in Redis as hashes. All data structures related to a queue — from the messages it contains to the in-memory secondary indexes needed to support dequeue-by-filter — are placed in the same Redis shard. We achieve this by having them share a common prefix, specific to the queue in question. We then codify this prefix as a Redis hash tag. Each message carries a payload (see “Message” section) that can weigh up to 32 KiB.

Almost all of the interactions between Timestone and Redis (see “Message States” section) are codified as Lua scripts. In most of these Lua scripts, we tend to update a number of data structures. Since Redis guarantees that each script is executed atomically, a successful script execution is guaranteed to leave the system in a consistent (in the ACID sense) state.

All API operations are queue-scoped. All API operations that modify state are idempotent.

Secondary indexes

For observability purposes, we capture information about incoming messages and their transition between states in two secondary indexes maintained on Elasticsearch.

When we get a write response from Redis, we concurrently (a) return the response to the client, and (b) convert this response into an event that we post to a Kafka cluster, as shown in Step 4. Two Flink jobs — one for each type of index we maintain — consume the events from the corresponding Kafka topics, and update the indexes in Elasticsearch.

One index (“current”) gives users a best-effort view into the current state of the system, while the other index (“historic”) gives users a best effort longitudinal view for messages, allowing them to trace the messages as they flow through Timestone, and answer questions such as time spent in a state, and number of processing errors. We maintain a version counter for each message; every write operation increments that counter. We rely on that version counter to order the events in the historic index.

Events are stored in the Elasticsearch cluster for a finite number of days.

Current Usage at Netflix

The system is dequeue heavy. We see 30K dequeue requests per second (RPS) with a P99 latency of 45ms. In comparison, we see 1.2K enqueue RPS at 25ms P99 latency. We regularly see 5K RPS enqueue bursts at 85ms P99 latency.

15B messages have been enqueued to Timestone since the beginning of the year; these messages have been dequeued 400B times. Pending messages regularly reach 10M.

Usage is expected to double next year, as we migrate the rest of Reloaded, our legacy media encoding system, to Cosmos.

Key Concepts

Message

A message carries an opaque payload, a user-defined priority (see “Priority” section), an optional (mandatory for exclusive queues) set of metadata key-value pairs that can be used for filter-based dequeueing, and an optional invisibility duration.

Any message that is placed into a queue can be dequeued a finite number of times. We call these attempts; each dequeue invocation on a message decreases the attempts left on it.

Priority

The priority of a message is expressed as an integer value; the lower the value, the higher the priority. While an application is free to use whatever range they see fit, the norm is to use Unix timestamps in milliseconds (e.g. 1661990400000 for 9/1/2022 midnight UTC).

Figure 3. A snippet from the PriorityClass enum used by a streaming encoding pipeline in Cosmos. The values in parentheses indicate the offset in days.

It is also entirely up to the application to define its own priority levels. For instance a streaming encoding pipeline within Cosmos uses mail priority classes, as shown in Figure 3. Messages belonging to the standard class use the time of enqueue as their priority, while all other classes have their priority values adjusted in multiples of ∼10 years. The priority is set at the workflow rule level, but can be overridden if the request carries a studio tag, such as DAY_OF_BROADCAST.

Message States

Within a queue, a Timestone message belongs to one of six states (Figure 4):

  1. invisible
  2. pending
  3. running
  4. completed
  5. canceled
  6. errored

In general, a message can be enqueued with or without invisibility, which makes the message invisible or pending respectively. Invisible messages become pending when their invisibility window elapses.

A worker can dequeue a pending earliest-deadline-first message from a queue by specifying the amount of time (lease duration) they will be processing it for. Dequeueing messages in batch is also supported. This moves the message to the running state.

The same worker can then issue a complete call to Timestone within the allotted lease window to move the message to the completed state, or issue a lease extension call if they want to maintain control of the message. (A worker can also move a typically running message to the canceled state to signal it is no longer need for processing.)

If none of these calls are issued on time, the message becomes dequeueable again, and this attempt on the message is spent. If there are no attempts left on the message, it is moved automatically to the errored state.

The terminal states (completed, errored, and canceled) are garbage-collected periodically by a background process.

Messages can move states either when a worker invokes an API operation, or when Timestone runs its background processes (Figure 4, marked in red — these run periodically). Figure 4 shows the complete state transition diagram.

Figure 4. State transition diagram for Timestone messages.

Queues

All incoming messages are stored in queues. Within a queue, messages are sorted by their priority date. Timestone can host an arbitrary number of user-created queues, and offers a set of API operations for queue management, all revolving around a queue configuration object. Data we store in this object includes the queue’s type (see rest of section), the lease duration that applies to dequeued messages, or the invisibility duration that applies to enqueued messages, the number of times a message can be dequeued, and whether enqueueing or dequeueing is temporarily blocked. Note that a message producer can override the default lease or invisibility duration by setting it at the message level during enqueue.

All queues in Timestone fall into two types, simple, and exclusive.

When an exclusive queue is created, it is associated with a user-defined exclusivity key — for example project. All messages posted to that queue must carry this key in their metadata. For instance, a message with project=foo will be accepted into the queue; a message without the project key will not be. In this example, we call foo, the value that corresponds to the exclusivity key, the message’s exclusivity value.

The contract for exclusive queues is that at any point in time, there can be only up to one consumer per exclusivity value. Therefore, if the project-based exclusive queue in our example has two messages with the key-value pair project=foo in it, and one of them is already leased out to a worker, the other one is not dequeueable. This is depicted in Figure 5.

Figure 5. When worker_2 issues a dequeue call, they lease msg_2 instead of msg_1, even though msg_1 has a higher priority. That happens because the queue is exclusive, and the exclusive value foo is already leased out.

In a simple queue no such contract applies, and there is no tight coupling with message metadata keys. A simple queue works as your typical priority queue, simply ordering messages in an earliest-deadline-first fashion.

What We Are Working On

Some of the things we’re working on:

  1. As the the usage of Timestone within Cosmos grows, so does the need to support a range of queue depth queries. To solve this, we are building a dedicated query service that uses a distinct query model.
  2. As noted above (see “System of record” section), a queue and its contents can only currently occupy one Redis shard. Hot queues however can grow big, esp. when compute capacity is scarce. We want to support arbitrarily large queues, which has us building support for queue sharding.
  3. Messages can carry up to 4 key-value pairs. We currently use all of these key-value pairs to populate the secondary indexes used during dequeue-by-filter. This operation is exponentially complex both in terms of time and space (O(2^n)). We are switching to lexicographical ordering on sorted sets to drop the number of indexes by half, and handle metadata in a more cost-efficient manner.

We may be covering our work on the above in follow-up posts. If these kinds of problems sound interesting to you, and if you like the challenges of building distributed systems for the Netflix Content and Studio ecosystem at scale in general, you should consider joining us.

Acknowledgements

Poorna Reddy, Kostas Christidis, Aravindan Ramkumar, Surafel Korse, Jiaofen Xu, Anoop Panicker, and Kishore Banala have contributed to this project. We thank Charles Zhao, Olof Johansson, Frank San Miguel, Dmitry Vasilyev, Prudhvi Chaganti, and the rest of the Cosmos team for their constructive feedback while developing and operating Timestone.


Timestone: Netflix’s High-Throughput, Low-Latency Priority Queueing System with Built-in Support… was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Exploring Data @ Netflix

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/exploring-data-netflix-9d87e20072e3

By Gim Mahasintunan on behalf of Data Platform Engineering.

Supporting a rapidly growing base of engineers of varied backgrounds using different data stores can be challenging in any organization. Netflix’s internal teams strive to provide leverage by investing in easy-to-use tooling that streamlines the user experience and incorporates best practices.

In this blog post, we are thrilled to share that we are open-sourcing one such tool: the Netflix Data Explorer. The Data Explorer gives our engineers fast, safe access to their data stored in Cassandra and Dynomite/Redis data stores.

Netflix Data Explorer on GitHub

History

We began this project several years ago when we were onboarding many new Dynomite customers. Dynomite is a high-speed in-memory database, providing highly available cross datacenter replication while preserving Redis-like semantics. We wanted to lower the barrier for adoption so users didn’t need to know datastore-specific CLI commands, could avoid mistakenly running commands that might negatively impact performance, and allow them to access the clusters they frequented every day.

As the project took off, we saw a similar need for our other datastores. Cassandra, our most significant footprint in the fleet, seemed like a great candidate. Users frequently had questions on how they should set up replication, create tables using an appropriate compaction strategy, and craft CQL queries. We knew we could give our users an elevated experience, and at the same time, eliminate many of the common questions on our support channels.

We’ll explore some of the Data Explorer features, and along the way, we’ll highlight some of the ways we enabled the OSS community while still handling some of the unique Netflix-specific use cases.

Multi-Cluster Access

By simply directing users to a single web portal for all of their data stores, we can gain a considerable increase in user productivity. Furthermore, in production environments with hundreds of clusters, we can reduce the available data stores to those authorized for access; this can be supported in OSS environments by implementing a Cluster Access Control Provider responsible for fetching ownership information.

Browsing your accessible clusters in different environments and regions

Schema Designer

Writing CREATE TABLE statements can be an intimidating experience for new Cassandra users. So to help lower the intimidation factor, we built a schema designer that lets users drag and drop their way to a new table.

The schema designer allows you to create a new table using any primitive or collection data type, then designate your partition key and clustering columns. It also provides tools to view the storage layout on disk; browse the supported sample queries (to help design efficient point queries); guide you through the process of choosing a compaction strategy, and many other advanced settings.

Dragging and dropping your way to a new Cassandra table

Explore Your Data

You can quickly execute point queries against your cluster in Explore mode. The Explore mode supports full CRUD of records and allows you to export result sets to CSV or download them as CQL insert statements. The exported CQL can be a handy tool for quickly replicating data from a PROD environment to your TEST environment.

Explore mode gives you quick access to table data

Support for Binary Data

Binary data is another popular feature used by many of our engineers. The Data Explorer won’t fetch binary value data by default (as the persisted data might be sizable). Users can opt-in to retrieve these fields with their choice of encoding.

Choosing how you want to decode blob data

Query IDE

Efficient point queries are available in the Explore mode, but you may have users that still require the flexibility of CQL. Enter the Query mode, which includes a powerful CQL IDE with features like autocomplete and helpful snippets.

Example of free-form Cassandra queries with autocomplete assistance

There are also guardrails in place to help prevent users from making mistakes. For instance, we’ll redirect the user to a bespoke workflow for deleting a table if they try to perform a “DROP TABLE…” command ensuring the operation is done safely with additional validation. (See our integration with Metrics later in this article.)

As you submit queries, they will be saved in the Recent Queries view as well — handy when you are trying to remember that WHERE clause you had crafted before the long weekend.

Dynomite and Redis Features

While C* is feature-rich and might have a more extensive install base, we have plenty of good stuff for Dynomite and Redis users too. Note, the terms Dynomite and Redis are used interchangeably unless explicitly distinguished.

Key Scanning

Since Redis is an in-memory data store, we need to avoid operations that inadvertently load all the keys into memory. We perform SCAN operations across all nodes in the cluster, ensuring we don’t strain the cluster.

Scanning for keys on a Dynomite cluster

Dynomite Collection Support

In addition to simple String keys, Dynomite supports a rich collection of data types, including Lists, Hashes, and sorted and unsorted Sets. The UI supports creating and manipulating these collection types as well.

Editing a Redis hash value

Supporting OSS

As we were building the Data Explorer, we started getting some strong signals that the ease-of-use and productivity gains that we’d seen internally would benefit folks outside of Netflix as well. We tried to balance codifying some hard-learned best practices that would be generally applicable while maintaining the flexibility to support various OSS environments. To that end, we’ve built several adapter layers into the product where you can provide custom implementations as needed.

The application was architected to enable OSS by introducing seams where users could provide their implementations for discovery, access control, and data store-specific connection settings. Users can choose one of the built-in service providers or supply a custom provider.

The diagram below shows the server-side architecture. The server is a Node.js Express application written in TypeScript, and the client is a Single Page App written in Vue.js.

Data Explorer architecture and service adapter layers

Demo Environment

Deploying a new tool in any real-world environment is a time commitment. We get it, and to help you with that initial setup, we have included a dockerized demo environment. It can build the app, pull down images for Cassandra and Redis, and run everything in Docker containers so you can dive right in. Note, the demo environment is not intended for production use.

Overridable Configuration

The Data Explorer ships with many default behaviors, but since no two production environments are alike, we provide a mechanism to override the defaults and specify your custom values for various settings. These can range from which port numbers to use to which features should be disabled in a production environment. (For example, the ability to drop a Cassandra table.)

CLI Setup Tool

To further improve the experience of creating your configuration file, we have built a CLI tool that provides a series of prompts for you to follow. The CLI tool is the recommended approach for building your configuration file, and you can re-run the tool at any point to create a new configuration.

The CLI allows you to create a custom configuration

You can also generate multiple configuration files and easily switch between them when working with different environments. We have instructions on GitHub on working with more than one configuration file.

Service Adapters

It’s no secret that Netflix is a big proponent of microservices: we have discovery services for identifying Cassandra and Dynomite clusters in the environment; access-control services that identify who owns a data store and who can access it; and LDAP services to find out information about the logged-in user. There’s a good chance you have similar services in your environment too.

To help enable such environments, we have several pre-canned configurations with overridable values and adapter layers in place.

Discovery

The first example of this adapter layer in action is how the application finds Discovery information — these are the names and IP addresses of the clusters you want to access. The CLI allows you to choose from a few simple options. For instance, if you have a process that can update a JSON file on disk, you can select “file system.” If instead, you have a REST-based microservice that provides this information, then you can choose “custom” and write a few lines of code necessary to fetch it.

Choosing to discover our data store clusters by reading a local file

Metrics

Another example of this service adapter layer is integration with an external metrics service. We progressively enhance the UI by displaying keyspace and table metrics by implementing a metrics service adapter. These metrics provide insight into which tables are being used at a glance and help our customers make an informed decision when dropping a table.

Without metrics support
With optional metrics support

OSS users can enable the optional Metrics support via the CLI. You then just need to write the custom code to fetch the metrics.

CLI enabling customization of advanced features

i18n Support

While internationalization wasn’t an explicit goal, we discovered that providing Netflix-specific messages in some instances yielded additional value to our internal users. Fundamentally, this is similar to how resource bundles handle different locales.

We are making en-NFLX.ts available internally and en-US.ts available externally. Enterprise customers can enhance their user’s experience by creating custom resource bundles (en-ACME.ts) that link to other tools or enhance default messages. Only a small percentage of the UI and server-side exceptions use these message bundles currently — most commonly to augment messages somehow (e.g., provide links to internal slack channels).

Final Thoughts

We invite you to check out the project and let us know how it works for you. By sharing the Netflix Data Explorer with the OSS community, we hope to help you explore your data and inspire some new ideas.


Exploring Data @ Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Uncovering the truth behind Lua and Redis data consistency

Post Syndicated from Grab Tech original https://engineering.grab.com/uncovering-the-truth-behind-lua-and-redis-data-consistency

Uncovering the truth behind Lua and Redis data consistency

Our team at Grab uses Redis as one of our message queues. The Redis server is deployed in a master/replica setup. Quite recently, we have been noticing a spike in the CPU usage of the Redis replicas every time we deploy our service, even when the replicas are not in use and when there’s no read traffic to it. However, the issue is resolved once we reboot the replica.

Because a reboot of the replica fixes the issue every time, we thought that it might be due to some Elasticache replication issues and didn’t pursue further. However, a recent Redis failover brought this to our attention again. After the failover, the problematic replica becomes the new master and its CPU immediately goes to 100% with the read traffic, which essentially means the cluster is not functional after the failover. And this time we investigated the issue with new vigour. What we found in our investigation led us to deep dive into the details of Redis replication and its implementation of Hash.

Did you know that Redis master/replica can become inconsistent in certain scenarios?

Did you know the encoding of Hash objects on the master and the replica are different even if the writing operations are exactly the same and in the same order? Read on to find out why.

The problem

The following graph shows the CPU utilization of the master vs. the replica immediately after our service is deployed.

Architecture diagram
CPU Utilization

From the graph, you can see the following CPU usage trends. Replica’s CPU usage:

  • Increases immediately after our service is deployed.
  • Spikes higher than the master after a certain time.
  • Get’s back to normal after a reboot.

Cursory investigation

Because the spike occurs only when we deploy our service, we scrutinised all the scripts that were triggered immediately after the deployment. Lua monitor script was identified as a possible suspect. The script redistributes inactive service instances’ messages in the queue to active service instances so that messages can be processed by other healthy instances.

We ran a few experiments related to the Lua monitor script using the Redis monitor command to compare the script’s behaviour on master and the replica. A side note, because this command causes performance degradation, use it with discretion. Coming back to the script, we were surprised to note that the monitor script behaves differently between the master and the replica:

  • Redis executes the script separately on the master and the replica. We expected the script to execute only on master and the resulting changes to be replicated to the secondary.
  • The Redis command HGETALL used in the script returns the hash keys in a different order on master compared to the replica.

Due to the above reasons, the script causes data inconsistencies between the master and its replica. From that point on, the data between the master and the replica keeps diverging till they become completely distinct. Due to the inconsistency, the data on the secondary does not get deleted correctly thereby growing into an extremely large dataset. Any further operations on the large dataset requires a higher CPU usage, which explains why the replica’s CPU usage is higher than the master.

During replica reboots, the data gets synced and consistent again, which is why the CPU usage gets to normal values after rebooting.

Diving deeper on HGETALL

We knew that the keys of a hash are not ordered and we should not rely on the order. But it still puzzled us that the order is different even when the writing sequence is the same between the master and the replica. Plus the fact that the orders are always the same in our local environment with a similar setup made us even more curious.

So to better understand the underlying magic of Redis and to avoid similar bugs in the future, we decided to hammer on and read the Redis source code to get more details.

HGETALL command handling code

The HGETALL command is handled by the function genericHgetallCommand and it further calls hashTypeNext to iterate through the Hash object. A snippet of the code is shown as follows:

/* Move to the next entry in the hash. Return C_OK when the next entry
 * could be found and C_ERR when the iterator reaches the end. */
int hashTypeNext(hashTypeIterator *hi) {
    if (hi->encoding == OBJ_ENCODING_ZIPLIST) {
        // call zipListNext
    } else if (hi->encoding == OBJ_ENCODING_HT) {
        // call dictNext
    } else {
        serverPanic("Unknown hash encoding");
    }
    return C_OK;
}

From the code snippet, you can see that the Redis Hash object actually has two underlying representations:

  • ZIPLIST
  • HASHTABLE (dict)

A bit of research online helped us understand that, to save memory, Redis chooses between the two hash representations based on the following limits:

  • By default, Redis stores the Hash object as a zipped list when the hash has less than 512 entries and when each element’s size is smaller than 64 bytes.
  • If either limit is exceeded, Redis converts the list to a hashtable, and this is irreversible. That is, Redis won’t convert the hashtable back to a list again, even if the entries/size falls below the limit.

Eureka moment

Based on this understanding, we checked the encoding of the problematic hash in staging.

stg-bookings-qu-002.pcxebj.0001.apse1.cache.amazonaws.com:6379> object encoding queue_stats
"hashtable"

stg-bookings-qu-001.pcxebj.0001.apse1.cache.amazonaws.com:6379> object encoding queue_stats
"ziplist"

To our surprise, the encodings of the Hash object on the master and its replica were different. Which means if we add or delete elements in the hash, the sequence of the keys won’t be the same due to hashtable operation vs. list operation!

Now that we have identified the root cause, we were still curious about the difference in encoding between the master and the replica.

How could the underlying representations be different?

We reasoned, “If the master and its replica’s writing operations are exactly the same and in the same order, why are the underlying representations still different?

To answer this, we further looked through the Redis source to find all the possible places that a Hash object’s representation could be changed and soon found the following code snippet:

/* Load a Redis object of the specified type from the specified file.
 * On success a newly allocated object is returned, otherwise NULL. */
robj *rdbLoadObject(int rdbtype, rio *rdb) {
  //...
  if (rdbtype == RDB_TYPE_HASH) {
    //...
    o = createHashObject();  // ziplist

    /* Too many entries? Use a hash table. */
    if (len > server.hash_max_ziplist_entries)
        hashTypeConvert(o, OBJ_ENCODING_HT);

    //...
  }
}

Reading through the code we understand the following behaviour:

  • When restoring from an RDB file, Redis creates a ziplist first for Hash objects.
  • Only when the size of the Hash object is greater than the hash_max_ziplist_entries, the ziplist is converted to a hashtable.

So, if you have a Redis Hash object encoded as a hashtable with its length less than hash_max_ziplist_entries (512) in the master, when you set up a replica, it is encoded as a ziplist.

We were able to verify this behaviour in our local setup as well.

How did we fix it?

We could use the following two approaches to address this issue:

  • Enable script effect replication mode. This tells Redis to replicate the commands generated by the script instead of running the whole script on the replica. One disadvantage to using this approach is that it adds network traffic between the master and the replica.
  • Ensure the behaviour of the Lua monitor script is deterministic. In our case, we can do this by sorting the outputs of HKEYS/HGETALL.

We chose the latter approach because:

  • The Hash object is pretty small ( < 30 elements) so the sorting overhead is low, less than 1ms for 100 elements based on our tests.
  • Replicating our script effect would end up replicating thousands of Redis writing commands on the secondary causing a much higher overhead compared to replicating just the script.

After the fix, the CPU usage of the replica remained in range after each deployment. This also prevented the Redis cluster from being destroyed in the event of a master failover.

Key takeaways

In addition to writing clear and maintainable code, it’s equally important to understand the underlying storage layer that you are dealing with to produce efficient and bug-free code.

The following are some of the key learnings on Redis:

  • Redis does not guarantee the consistency between master and its replica nodes when Lua scripts are used. You have to ensure that the behaviour of the scripts are deterministic to avoid data inconsistency.
  • Redis replicates the whole Lua script instead of the resulting commands to the replica. However, this is the default behaviour and you can disable it.
  • To save memory, Redis uses different representations for Hash. Your Hash object could be stored as a list in memory or a hashtable. This is not guaranteed to be the same across the master and its replicas.

Join us

Grab is more than just the leading ride-hailing and mobile payments platform in Southeast Asia. We use data and technology to improve everything from transportation to payments and financial services across a region of more than 620 million people. We aspire to unlock the true potential of Southeast Asia and look for like-minded individuals to join us on this ride.

If you share our vision of driving South East Asia forward, apply to join our team today.