Tag Archives: Optimization

A multi-dimensional approach helps you proactively prepare for failures, Part 2: Infrastructure layer

Post Syndicated from Piyali Kamra original https://aws.amazon.com/blogs/architecture/a-multi-dimensional-approach-helps-you-proactively-prepare-for-failures-part-2-infrastructure-layer/

Distributed applications resiliency is a cumulative resiliency of applications, infrastructure, and operational processes. Part 1 of this series explored application layer resiliency. In Part 2, we discuss how using Amazon Web Services (AWS) managed services, redundancy, high availability, and infrastructure failover patterns based on recovery time and point objectives (RTO and RPO, respectively) can help in building more resilient infrastructures.

Pattern 1: Recognize high impact/likelihood infrastructure failures

To ensure cloud infrastructure resilience, we need to understand the likelihood and impact of various infrastructure failures, so we can mitigate them. Figure 1 illustrates that most of the failures with high likelihood happen because of operator error or poor deployments.

Automated testing, automated deployments, and solid design patterns can mitigate these failures. There could be datacenter failures—like whole rack failures—but deploying applications using auto scaling and multi-availability zone (multi-AZ) deployment, plus resilient AWS cloud native services, can mitigate the impact.

Likelihood and impact of failure events

Figure 1. Likelihood and impact of failure events

As demonstrated in the Figure 1, infrastructure resiliency is a combination of high availability (HA) and disaster recovery (DR). HA involves increasing the availability of the system by implementing redundancy among the application components and removing single points of failure.

Application layer decisions, like creating stateless applications, make it simpler to implement HA at the infrastructure layer by allowing it to scale using Auto Scaling groups and distributing the redundant applications across multiple AZs.

Pattern 2: Understanding and controlling infrastructure failures

Building a resilient infrastructure requires understanding which infrastructure failures are under control and which ones are not, as demonstrated in Figure 2.

These insights allow us to automate the detection of failures, control them, and employ pro-active patterns, such as static stability, to mitigate the need to scale up the infrastructure by over-provisioning it in advance.

Proactively designing systems in the event of failure

Figure 2. Proactively designing systems in the event of failure

The infrastructure decisions under our control that can increase the infrastructure resiliency of our system, include:

  • AWS services have control and data planes designed for minimum blast radius. Data planes typically have higher availability design goals than control planes and are usually less complex. When implementing recovery or mitigation responses to events that can affect resiliency, using control plane operations can lower the overall resiliency of your architectures. For example, Amazon Route 53 (Route 53) has a data plane designed for a 100% availability SLA. A good fail-over mechanism should rely on the data plane and not the control plane, as explained in Creating Disaster Recovery Mechanisms Using Amazon Route 53.
  • Understanding networking design and routes implemented in a virtual private cloud (VPC) are critical when testing the flow of traffic in our application. Understanding the flow of traffic helps us design better applications and see how one component failure can affect overall ingress/egress traffic. To achieve better network resiliency, it’s important to implement a good subnet strategy and manage our IP addresses to avoid fail-over issues and asymmetric routing in hybrid architectures. Use IP address management tools for established subnet strategies and routing decisions.
  • When designing VPCs and AZs, understanding the service limits, deploying independent routing tables and components in each zone increases availability. For example, highly available NAT gateways are preferred over NAT instances, as noted in the comparison provided in the Amazon VPC documentation.

Pattern 3: Considering different ways of increasing HA at the infrastructure layer

As already detailed, infrastructure resiliency = HA + DR.

Different ways by which system availability can be increased include:

  • Building for redundancy: Redundancy is the duplication of application components to increase the overall availability of the distributed system. After following application layer best practices, we can build auto healing mechanisms at the infrastructure layer.

We can take advantage of auto scaling features and use Amazon CloudWatch metrics and alarms to set up auto scaling triggers and deploy redundant copies of our applications across multiple AZs. This protects workloads from AZ failures, as shown in Figure 3.

Redundancy increases availability

Figure 3. Redundancy increases availability

  • Auto scale your infrastructure: When there are AZ failures, infrastructure auto scaling maintains the desired number of redundant components, which helps maintain the base level application throughput. This way, HA system and manage costs are maintained. Auto scaling uses metrics to scale in and out, appropriately, as shown in Figure 4.
How auto scaling improves availability

Figure 4. How auto scaling improves availability

  • Implement resilient network connectivity patterns: While building highly resilient distributed systems, network access to AWS infrastructure also needs to be highly resilient. While deploying hybrid applications, the capacity needed for hybrid applications to communicate with their cloud native application counterparts is an important consideration in designing the network access using AWS Direct Connect or VPNs.

Testing failover and fallback scenarios helps validate that network paths operate as expected and routes fail over as expected to meet RTO objectives. As the number of connection points between the data center and AWS VPCs increases, a hub and spoke configuration provided by the Direct Connect gateway and transit gateways simplify network topology, testing, and fail over. For more information, visit the AWS Direct Connect Resiliency Recommendations.

  • Whenever possible, use the AWS networking backbone to increase security, resiliency, and lower cost. AWS PrivateLink provides secure access to AWS services and exposes the application’s functionalities and APIs to other business units or partner accounts hosted on AWS.
  • Security appliances need to be set up in HA configuration, so that even if one AZ is unavailable, security inspection can be taken over by the redundant appliances in the other AZs.
  • Think ahead about DNS resolution: DNS is a critical infrastructure component; hybrid DNS resolution should be designed carefully with Route 53 HA inbound and outbound resolver endpoints instead of using self-managed proxies.

Implement a good strategy to share DNS resolver rules across AWS accounts and VPC’s with Resource Access Manager. Network failover tests are an important part of Disaster Recovery and Business Continuity Plans. To learn more, visit Set up integrated DNS resolution for hybrid networks in Amazon Route 53.

Additionally, ELB uses health checks to make sure that requests will route to another component if the underlying traffic application component fails. This improves the distributed system’s availability, as it is the cumulative availability of all different layers in our system. Figure 5 details advantages of some AWS managed services.

AWS managed services help in building resilient infrastructures (click the image to enlarge)

Figure 5. AWS managed services help in building resilient infrastructures (click the image to enlarge)

Pattern 4: Use RTO and RPO requirements to determine the correct failover strategy for your application

Capture RTO and RPO requirements early on to determine solid failover strategies (Figure 6). Disaster recovery strategies within AWS range from low cost and complexity (like backup and restore), to more complex strategies when lower values of RTO and RPO are required.

In pilot light and warm standby, only the primary region receives traffic. Pilot light only critical infrastructure components run in the backup region. Automation is used to check failures in the primary region using health checks and other metrics.

When health checks fail, use a combination of auto scaling groups, automation, and Infrastructure as Code (IaC) for quick deployment of other infrastructure components.

Note: This strategy depends on control plane availability in the secondary region for deploying the resources; keep this point in mind if you don’t have compute pre-provisioned in the secondary region. Carefully consider the business requirements and a distributed system’s application-level characteristics before deciding on a failover strategy. To understand all the factors and complexities involved in each of these disaster recovery strategies refer to disaster recovery options in the cloud.

Relationship between RTO, RPO, cost, data loss, and length of service interruption

Figure 6. Relationship between RTO, RPO, cost, data loss, and length of service interruption

Conclusion

In Part 2 of this series, we discovered that infrastructure resiliency is a combination of HA and DR. It is important to consider likelihood and impact of different failure events on availability requirements. Building in application layer resiliency patterns (Part 1 of this series), along with early discovery of the RTO/RPO requirements, as well as operational and process resiliency of an organization helps in choosing the right managed services and putting in place the appropriate failover strategies for distributed systems.

It’s important to differentiate between normal and abnormal load threshold for applications in order to put automation, alerts, and alarms in place. This allows us to auto scale our infrastructure for normal expected load, plus implement corrective action and automation to root out issues in case of abnormal load. Use IaC for quick failover and test failover processes.

Stay tuned for Part 3, in which we discuss operational resiliency!

From centralized architecture to decentralized architecture: How data sharing fine-tunes Amazon Redshift workloads

Post Syndicated from Jingbin Ma original https://aws.amazon.com/blogs/big-data/from-centralized-architecture-to-decentralized-architecture-how-data-sharing-fine-tunes-amazon-redshift-workloads/

Amazon Redshift is a fully managed, petabyte-scale, massively parallel data warehouse that offers simple operations and high performance. It makes it fast, simple, and cost-effective to analyze all your data using standard SQL and your existing business intelligence (BI) tools. Today, Amazon Redshift has become the most widely used cloud data warehouse.

With the significant growth of data for big data analytics over the years, some customers have asked how they should optimize Amazon Redshift workloads. In this post, we explore how to optimize workloads on Amazon Redshift clusters using Amazon Redshift RA3 nodes, data sharing, and pausing and resuming clusters. For more cost-optimization methods, refer to Getting the most out of your analytics stack with Amazon Redshift.

Key features of Amazon Redshift

First, let’s review some key features:

  • RA3 nodes – Amazon Redshift RA3 nodes are backed by a new managed storage model that gives you the power to separately optimize your compute power and your storage. They bring a few very important features, one of which is data sharing. RA3 nodes also support the ability to pause and resume, which allows you to easily suspend on-demand billing while the cluster is not being used.
  • Data sharing – Amazon Redshift data sharing offers you to extend the ease of use, performance, and cost benefits of Amazon Redshift in a single cluster to multi-cluster deployments while being able to share data. Data sharing enables instant, granular, and fast data access across Redshift clusters without the need to copy or move it. You can securely share live data with Amazon Redshift clusters in the same or different AWS accounts, and across regions. You can share data at many levels, including schemas, tables, views, and user-defined functions. You can also share the most up-to-date and consistent information as it’s updated in Amazon Redshift Serverless. It also provides fine-grained access controls that you can tailor for different users and businesses that all need access to the data. However, data sharing in Amazon Redshift has a few limitations.

Solution overview

In this use case, our customer is heavily using Amazon Redshift as their data warehouse for their analytics workloads, and they have been enjoying the possibility and convenience that Amazon Redshift brought to their business. They mainly use Amazon Redshift to store and process user behavioral data for BI purposes. The data has increased by hundreds of gigabytes daily in recent months, and employees from departments continuously run queries against the Amazon Redshift cluster on their BI platform during business hours.

The company runs four major analytics workloads on a single Amazon Redshift cluster, because some data is used by all workloads:

  • Queries from the BI platform – Various queries run mainly during business hours.
  • Hourly ETL – This extract, transform, and load (ETL) job runs in the first few minutes of each hour. It generally takes about 40 minutes.
  • Daily ETL – This job runs twice a day during business hours, because the operation team needs to get daily reports before the end of the day. Each job normally takes between 1.5–3 hours. It’s the second-most resource-heavy workload.
  • Weekly ETL – This job runs in the early morning every Sunday. It’s the most resource-heavy workload. The job normally takes 3–4 hours.

The analytics team has migrated to the RA3 family and increased the number of nodes of the Amazon Redshift cluster to 12 over the years to keep the average runtime of queries from their BI tool within an acceptable time due to the data size, especially when other workloads are running.

However, they have noticed that performance is reduced while running ETL tasks, and the duration of ETL tasks is long. Therefore, the analytics team wants to explore solutions to optimize their Amazon Redshift cluster.

Because CPU utilization spikes appear while the ETL tasks are running, the AWS team’s first thought was to separate workloads and relevant data into multiple Amazon Redshift clusters with different cluster sizes. By reducing the total number of nodes, we hoped to reduce the cost of Amazon Redshift.

After a series of conversations, the AWS team found that one of the reasons that the customer keeps all workloads on the 12-node Amazon Redshift cluster is to manage the performance of queries from their BI platform, especially while running ETL workloads, which have a big impact on the performance of all workloads on the Amazon Redshift cluster. The obstacle is that many tables in the data warehouse are required to be read and written by multiple workloads, and only the producer of a data share can update the shared data.

The challenge of dividing the Amazon Redshift cluster into multiple clusters is data consistency. Some tables need to be read by ETL workloads and written by BI workloads, and some tables are the opposite. Therefore, if we duplicate data into two Amazon Redshift clusters or only create a data share from the BI cluster to the reporting cluster, the customer will have to develop a data synchronization process to keep the data consistent between all Amazon Redshift clusters, and this process could be very complicated and unmaintainable.

After more analysis to gain an in-depth understanding of the customer’s workloads, the AWS team found that we could put tables into four groups, and proposed a multi-cluster, two-way data sharing solution. The purpose of the solution is to divide the workloads into separate Amazon Redshift clusters so that we can use Amazon Redshift to pause and resume clusters for periodic workloads to reduce the Amazon Redshift running costs, because clusters can still access a single copy of data that is required for workloads. The solution should meet the data consistency requirements without building a complicated data synchronization process.

The following diagram illustrates the old architecture (left) compared to the new multi-cluster solution (right).

Improve the old architecture (left) to the new multi-cluster solution (right)

Dividing workloads and data

Due to the characteristics of the four major workloads, we categorized workloads into two categories: long-running workloads and periodic-running workloads.

The long-running workloads are for the BI platform and hourly ETL jobs. Because the hourly ETL workload requires about 40 minutes to run, the gain is small even if we migrate it to an isolated Amazon Redshift cluster and pause and resume it every hour. Therefore, we leave it with the BI platform.

The periodic-running workloads are the daily and weekly ETL jobs. The daily job generally takes about 1 hour and 40 minutes to 3 hours, and the weekly job generally takes 3–4 hours.

Data sharing plan

The next step is identifying all data (tables) access patterns of each category. We identified four types of tables:

  • Type 1 – Tables are only read and written by long-running workloads
  • Type 2 – Tables are read and written by long-running workloads, and are also read by periodic-running workloads
  • Type 3 – Tables are read and written by periodic-running workloads, and are also read by long-running workloads
  • Type 4 – Tables are only read and written by periodic-running workloads

Fortunately, there is no table that is required to be written by all workloads. Therefore, we can separate the Amazon Redshift cluster into two Amazon Redshift clusters: one for the long-running workloads, and the other for periodic-running workloads with 20 RA3 nodes.

We created a two-way data share between the long-running cluster and the periodic-running cluster. For type 2 tables, we created a data share on the long-running cluster as the producer and the periodic-running cluster as the consumer. For type 3 tables, we created a data share on the periodic-running cluster as the producer and the long-running cluster as the consumer.

The following diagram illustrates this data sharing configuration.

The long-running cluster (producer) shares type 2 tables to the periodic-running cluster (consumer). The periodic-running cluster (producer’) shares type 3 tables to the long-running cluster (consumer’)

Build two-way data share across Amazon Redshift clusters

In this section, we walk through the steps to build a two-way data share across Amazon Redshift clusters. First, let’s take a snapshot of the original Amazon Redshift cluster, which became the long-running cluster later.

Take a snapshot of the long-running-cluster from the Amazon Redshift console

Now, let’s create a new Amazon Redshift cluster with 20 RA3 nodes for periodic-running workloads. Then we migrate the type 3 and type 4 tables to the periodic-running cluster. Make sure you choose the ra3 node type. (Amazon Redshift Serverless supports data sharing too, and it becomes generally available in July 2022, so it is also an option now.)

Create the periodic-running-cluster. Make sure you select the ra3 node type.

Create the long-to-periodic data share

The next step is to create the long-to-periodic data share. Complete the following steps:

  1. On the periodic-running cluster, get the namespace by running the following query:
SELECT current_namespace;

Make sure record the namespace.

  1. On the long-running cluster, we run queries similar to the following:
CREATE DATASHARE ltop_share SET PUBLICACCESSIBLE TRUE;
ALTER DATASHARE ltop_share ADD SCHEMA public_long;
ALTER DATASHARE ltop_share ADD ALL TABLES IN SCHEMA public_long;
GRANT USAGE ON DATASHARE ltop_share TO NAMESPACE '[periodic-running-cluster-namespace]';
  1. We can validate the long-to-periodic data share using the following command:
SHOW datashares;
  1. After we validate the data share, we get the long-running cluster namespace with the following query:
SELECT current-namespace;

Make sure record the namespace.

  1. On the periodic-running cluster, run the following command to load the data from the long-to-periodic data share with the long-running cluster namespace:
CREATE DATABASE ltop FROM DATASHARE ltop_share OF NAMESPACE '[long-running-cluster-namespace]';
  1. Confirm that we have read access to tables in the long-to-periodic data share.

Create the periodic-to-long data share

The next step is to create the periodic-to-long data share. We use the namespaces of the long-running cluster and the periodic-running cluster that we collected in the previous step.

  1. On the periodic-running cluster, run queries similar to the following to create the periodic-to-long data share:
CREATE DATASHARE ptol_share SET PUBLICACCESSIBLE TRUE;
ALTER DATASHARE ptol_share ADD SCHEMA public_periodic;
ALTER DATASHARE ptol_share ADD ALL TABLES IN SCHEMA public_periodic;
GRANT USAGE ON DATASHARE ptol_share TO NAMESPACE '[long-running-cluster-namespace]';
  1. Validate the data share using the following command:
SHOW datashares;
  1. On the long-running cluster, run the following command to load the data from the periodic-to-long data using the periodic-running cluster namespace:
CREATE DATABASE ptol FROM DATASHARE ptol_share OF NAMESPACE '[periodic-running-cluster-namespace]';
  1. Check that we have read access to the tables in the periodic-to-long data share.

At this stage, we have separated workloads into two Amazon Redshift clusters and built a two-way data share across two Amazon Redshift clusters.

The next step is updating the code of different workloads to use the correct endpoints of two Amazon Redshift clusters and perform consolidated tests.

Pause and resume the periodic-running Amazon Redshift cluster

Let’s update the crontab scripts, which run periodic-running workloads. We make two updates.

  1. When the scripts start, call the Amazon Redshift check and resume cluster APIs to resume the periodic-running Amazon Redshift cluster when the cluster is paused:
    aws redshift resume-cluster --cluster-identifier [periodic-running-cluster-id]

  2. After the workloads are finished, call the Amazon Redshift pause cluster API with the cluster ID to pause the cluster:
    aws redshift pause-cluster --cluster-identifier [periodic-running-cluster-id]

Results

After we migrated the workloads to the new architecture, the company’s analytics team ran some tests to verify the results.

According to tests, the performance of all workloads improved:

  • The BI workload is about 100% faster during the ETL workload running periods
  • The hourly ETL workload is about 50% faster
  • The daily workload duration reduced to approximately 40 minutes, from a maximum of 3 hours
  • The weekly workload duration reduced to approximately 1.5 hours, from a maximum of 4 hours

All functionalities work properly, and cost of the new architecture only increased approximately 13%, while over 10% of new data had been added during the testing period.

Learnings and limitations

After we separated the workloads into different Amazon Redshift clusters, we discovered a few things:

  • The performance of the BI workloads was 100% faster because there was no resource competition with daily and weekly ETL workloads anymore
  • The duration of ETL workloads on the periodic-running cluster was reduced significantly because there were more nodes and no resource competition from the BI and hourly ETL workloads
  • Even when over 10% new data was added, the overall cost of the Amazon Redshift clusters only increased by 13%, due to using the cluster pause and resume function of the Amazon Redshift RA3 family

As a result, we saw a 70% price-performance improvement of the Amazon Redshift cluster.

However, there are some limitations of the solution:

  • To use the Amazon Redshift pause and resume function, the code for calling the Amazon Redshift pause and resume APIs must be added to all scheduled scripts that run ETL workloads on the periodic-running cluster
  • Amazon Redshift clusters require several minutes to finish pausing and resuming, although you’re not charged during these processes
  • The size of Amazon Redshift clusters can’t automatically scale in and out depending on workloads

Next steps

After improving performance significantly, we can explore the possibility of reducing the number of nodes of the long-running cluster to reduce Amazon Redshift costs.

Another possible optimization is using Amazon Redshift Spectrum to reduce the cost of Amazon Redshift on cluster storage. With Redshift Spectrum, multiple Amazon Redshift clusters can concurrently query and retrieve the same structured and semistructured dataset in Amazon Simple Storage Service (Amazon S3) without the need to make copies of the data for each cluster or having to load the data into Amazon Redshift tables.

Amazon Redshift Serverless was announced for preview in AWS re:Invent 2021 and became generally available in July 2022. Redshift Serverless automatically provisions and intelligently scales your data warehouse capacity to deliver best-in-class performance for all your analytics. You only pay for the compute used for the duration of the workloads on a per-second basis. You can benefit from this simplicity without making any changes to your existing analytics and BI applications. You can also share data for read purposes across different Amazon Redshift Serverless instances within or across AWS accounts.

Therefore, we can explore the possibility of removing the need to script for pausing and resuming the periodic-running cluster by using Redshift Serverless to make the management easier. We can also explore the possibility of improving the granularity of workloads.

Conclusion

In this post, we discussed how to optimize workloads on Amazon Redshift clusters using RA3 nodes, data sharing, and pausing and resuming clusters. We also explored a use case implementing a multi-cluster two-way data share solution to improve workload performance with a minimum code change. If you have any questions or feedback, please leave them in the comments section.


About the authors

Jingbin Ma

Jingbin Ma is a Sr. Solutions Architect at Amazon Web Services. He helps customers build well-architected applications using AWS services. He has many years of experience working in the internet industry, and his last role was CTO of a New Zealand IT company before joining AWS. He is passionate about serverless and infrastructure as code.

Chao PanChao Pan is a Data Analytics Solutions Architect at Amazon Web Services. He’s responsible for the consultation and design of customers’ big data solution architectures. He has extensive experience in open-source big data. Outside of work, he enjoys hiking.

Optimizing TCP for high WAN throughput while preserving low latency

Post Syndicated from Mike Freemon original https://blog.cloudflare.com/optimizing-tcp-for-high-throughput-and-low-latency/

Optimizing TCP for high WAN throughput while preserving low latency

Optimizing TCP for high WAN throughput while preserving low latency

Here at Cloudflare we’re constantly working on improving our service. Our engineers are looking at hundreds of parameters of our traffic, making sure that we get better all the time.

One of the core numbers we keep a close eye on is HTTP request latency, which is important for many of our products. We regard latency spikes as bugs to be fixed. One example is the 2017 story of “Why does one NGINX worker take all the load?”, where we optimized our TCP Accept queues to improve overall latency of TCP sockets waiting for accept().

Performance tuning is a holistic endeavor, and we monitor and continuously improve a range of other performance metrics as well, including throughput. Sometimes, tradeoffs have to be made. Such a case occurred in 2015, when a latency spike was discovered in our processing of HTTP requests. The solution at the time was to set tcp_rmem to 4 MiB, which minimizes the amount of time the kernel spends on TCP collapse processing. It was this collapse processing that was causing the latency spikes. Later in this post we discuss TCP collapse processing in more detail.

The tradeoff is that using a low value for tcp_rmem limits TCP throughput over high latency links. The following graph shows the maximum throughput as a function of network latency for a window size of 2 MiB. Note that the 2 MiB corresponds to a tcp_rmem value of 4 MiB due to the tcp_adv_win_scale setting in effect at the time.

Optimizing TCP for high WAN throughput while preserving low latency

For the Cloudflare products then in existence, this was not a major problem, as connections terminate and content is served from nearby servers due to our BGP anycast routing.

Since then, we have added new products, such as Magic WAN, WARP, Spectrum, Gateway, and others. These represent new types of use cases and traffic flows.

For example, imagine you’re a typical Magic WAN customer. You have connected all of your worldwide offices together using the Cloudflare global network. While Time to First Byte still matters, Magic WAN office-to-office traffic also needs good throughput. For example, a lot of traffic over these corporate connections will be file sharing using protocols such as SMB. These are elephant flows over long fat networks. Throughput is the metric every eyeball watches as they are downloading files.

We need to continue to provide world-class low latency while simultaneously providing high throughput over high-latency connections.

Before we begin, let’s introduce the players in our game.

TCP receive window is the maximum number of unacknowledged user payload bytes the sender should transmit (bytes-in-flight) at any point in time. The size of the receive window can and does go up and down during the course of a TCP session. It is a mechanism whereby the receiver can tell the sender to stop sending if the sent packets cannot be successfully received because the receive buffers are full. It is this receive window that often limits throughput over high-latency networks.

net.ipv4.tcp_adv_win_scale is a (non-intuitive) number used to account for the overhead needed by Linux to process packets. The receive window is specified in terms of user payload bytes. Linux needs additional memory beyond that to track other data associated with packets it is processing.

The value of the receive window changes during the lifetime of a TCP session, depending on a number of factors. The maximum value that the receive window can be is limited by the amount of free memory available in the receive buffer, according to this table:

tcp_adv_win_scale TCP window size
4 15/16 * available memory in receive buffer
3 ⅞ * available memory in receive buffer
2 ¾ * available memory in receive buffer
1 ½ * available memory in receive buffer
0 available memory in receive buffer
-1 ½ * available memory in receive buffer
-2 ¼ * available memory in receive buffer
-3 ⅛ * available memory in receive buffer

We can intuitively (and correctly) understand that the amount of available memory in the receive buffer is the difference between the used memory and the maximum limit. But what is the maximum size a receive buffer can be? The answer is sk_rcvbuf.

sk_rcvbuf is a per-socket field that specifies the maximum amount of memory that a receive buffer can allocate. This can be set programmatically with the socket option SO_RCVBUF. This can sometimes be useful to do, for localhost TCP sessions, for example, but in general the use of SO_RCVBUF is not recommended.

So how is sk_rcvbuf set? The most appropriate value for that depends on the latency of the TCP session and other factors. This makes it difficult for L7 applications to know how to set these values correctly, as they will be different for every TCP session. The solution to this problem is Linux autotuning.

Linux autotuning

Linux autotuning is logic in the Linux kernel that adjusts the buffer size limits and the receive window based on actual packet processing. It takes into consideration a number of things including TCP session RTT, L7 read rates, and the amount of available host memory.

Autotuning can sometimes seem mysterious, but it is actually fairly straightforward.

The central idea is that Linux can track the rate at which the local application is reading data off of the receive queue. It also knows the session RTT. Because Linux knows these things, it can automatically increase the buffers and receive window until it reaches the point at which the application layer or network bottleneck links are the constraint on throughput (and not host buffer settings). At the same time, autotuning prevents slow local readers from having excessively large receive queues. The way autotuning does that is by limiting the receive window and its corresponding receive buffer to an appropriate size for each socket.

The values set by autotuning can be seen via the Linux “ss” command from the iproute package (e.g. “ss -tmi”).  The relevant output fields from that command are:

Recv-Q is the number of user payload bytes not yet read by the local application.

rcv_ssthresh is the window clamp, a.k.a. the maximum receive window size. This value is not known to the sender. The sender receives only the current window size, via the TCP header field. A closely-related field in the kernel, tp->window_clamp, is the maximum window size allowable based on the amount of available memory. rcv_sshthresh is the receiver-side slow-start threshold value.

skmem_r is the actual amount of memory that is allocated, which includes not only user payload (Recv-Q) but also additional memory needed by Linux to process the packet (packet metadata). This is known within the kernel as sk_rmem_alloc.

Note that there are other buffers associated with a socket, so skmem_r does not represent the total memory that a socket might have allocated. Those other buffers are not involved in the issues presented in this post.

skmem_rb is the maximum amount of memory that could be allocated by the socket for the receive buffer. This is higher than rcv_ssthresh to account for memory needed for packet processing that is not packet data. Autotuning can increase this value (up to tcp_rmem max) based on how fast the L7 application is able to read data from the socket and the RTT of the session. This is known within the kernel as sk_rcvbuf.

rcv_space is the high water mark of the rate of the local application reading from the receive buffer during any RTT. This is used internally within the kernel to adjust sk_rcvbuf.

Earlier we mentioned a setting called tcp_rmem. net.ipv4.tcp_rmem consists of three values, but in this document we are always referring to the third value (except where noted). It is a global setting that specifies the maximum amount of memory that any TCP receive buffer can allocate, i.e. the maximum permissible value that autotuning can use for sk_rcvbuf. This is essentially just a failsafe for autotuning, and under normal circumstances should play only a minor role in TCP memory management.

It’s worth mentioning that receive buffer memory is not preallocated. Memory is allocated based on actual packets arriving and sitting in the receive queue. It’s also important to realize that filling up a receive queue is not one of the criteria that autotuning uses to increase sk_rcvbuf. Indeed, preventing this type of excessive buffering (bufferbloat) is one of the benefits of autotuning.

What’s the problem?

The problem is that we must have a large TCP receive window for high BDP sessions. This is directly at odds with the latency spike problem mentioned above.

Something has to give. The laws of physics (speed of light in glass, etc.) dictate that we must use large window sizes. There is no way to get around that. So we are forced to solve the latency spikes differently.

A brief recap of the latency spike problem

Sometimes a TCP session will fill up its receive buffers. When that happens, the Linux kernel will attempt to reduce the amount of memory the receive queue is using by performing what amounts to a “defragmentation” of memory. This is called collapsing the queue. Collapsing the queue takes time, which is what drives up HTTP request latency.

We do not want to spend time collapsing TCP queues.

Why do receive queues fill up to the point where they hit the maximum memory limit? The usual situation is when the local application starts out reading data from the receive queue at one rate (triggering autotuning to raise the max receive window), followed by the local application slowing down its reading from the receive queue. This is valid behavior, and we need to handle it correctly.

Selecting sysctl values

Before exploring solutions, let’s first decide what we need as the maximum TCP window size.

As we have seen above in the discussion about BDP, the window size is determined based upon the RTT and desired throughput of the connection.

Because Linux autotuning will adjust correctly for sessions with lower RTTs and bottleneck links with lower throughput, all we need to be concerned about are the maximums.

For latency, we have chosen 300 ms as the maximum expected latency, as that is the measured latency between our Zurich and Sydney facilities. It seems reasonable enough as a worst-case latency under normal circumstances.

For throughput, although we have very fast and modern hardware on the Cloudflare global network, we don’t expect a single TCP session to saturate the hardware. We have arbitrarily chosen 3500 mbps as the highest supported throughput for our highest latency TCP sessions.

The calculation for those numbers results in a BDP of 131MB, which we round to the more aesthetic value of 128 MiB.

Recall that allocation of TCP memory includes metadata overhead in addition to packet data. The ratio of actual amount of memory allocated to user payload size varies, depending on NIC driver settings, packet size, and other factors. For full-sized packets on some of our hardware, we have measured average allocations up to 3 times the packet data size. In order to reduce the frequency of TCP collapse on our servers, we set tcp_adv_win_scale to -2. From the table above, we know that the max window size will be ¼ of the max buffer space.

We end up with the following sysctl values:

net.ipv4.tcp_rmem = 8192 262144 536870912
net.ipv4.tcp_wmem = 4096 16384 536870912
net.ipv4.tcp_adv_win_scale = -2

A tcp_rmem of 512MiB and tcp_adv_win_scale of -2 results in a maximum window size that autotuning can set of 128 MiB, our desired value.

Disabling TCP collapse

Patient: Doctor, it hurts when we collapse the TCP receive queue.

Doctor: Then don’t do that!

Generally speaking, when a packet arrives at a buffer when the buffer is full, the packet gets dropped. In the case of these receive buffers, Linux tries to “save the packet” when the buffer is full by collapsing the receive queue. Frequently this is successful, but it is not guaranteed to be, and it takes time.

There are no problems created by immediately just dropping the packet instead of trying to save it. The receive queue is full anyway, so the local receiver application still has data to read. The sender’s congestion control will notice the drop and/or ZeroWindow and will respond appropriately. Everything will continue working as designed.

At present, there is no setting provided by Linux to disable the TCP collapse. We developed an in-house patch to the kernel to disable the TCP collapse logic.

Kernel patch – Attempt #1

The kernel patch for our first attempt was straightforward. At the top of tcp_try_rmem_schedule(), if the memory allocation fails, we simply return (after pred_flag = 0 and tcp_sack_reset()), thus completely skipping the tcp_collapse and related logic.

It didn’t work.

Although we eliminated the latency spikes while using large buffer limits, we did not observe the throughput we expected.

One of the realizations we made as we investigated the situation was that standard network benchmarking tools such as iperf3 and similar do not expose the problem we are trying to solve. iperf3 does not fill the receive queue. Linux autotuning does not open the TCP window large enough. Autotuning is working perfectly for our well-behaved benchmarking program.

We need application-layer software that is slightly less well-behaved, one that exercises the autotuning logic under test. So we wrote one.

A new benchmarking tool

Anomalies were seen during our “Attempt #1” that negatively impacted throughput. The anomalies were seen only under certain specific conditions, and we realized we needed a better benchmarking tool to detect and measure the performance impact of those anomalies.

This tool has turned into an invaluable resource during the development of this patch and raised confidence in our solution.

It consists of two Python programs. The reader opens a TCP session to the daemon, at which point the daemon starts sending user payload as fast as it can, and never stops sending.

The reader, on the other hand, starts and stops reading in a way to open up the TCP receive window wide open and then repeatedly causes the buffers to fill up completely. More specifically, the reader implemented this logic:

  1. reads as fast as it can, for five seconds
    • this is called fast mode
    • opens up the window
  2. calculates 5% of the high watermark of the bytes reader during any previous one second
  3. for each second of the next 15 seconds:
    • this is called slow mode
    • reads that 5% number of bytes, then stops reading
    • sleeps for the remainder of that particular second
    • most of the second consists of no reading at all
  4. steps 1-3 are repeated in a loop three times, so the entire run is 60 seconds

This has the effect of highlighting any issues in the handling of packets when the buffers repeatedly hit the limit.

Revisiting default Linux behavior

Taking a step back, let’s look at the default Linux behavior. The following is kernel v5.15.16.

Optimizing TCP for high WAN throughput while preserving low latency

The Linux kernel is effective at freeing up space in order to make room for incoming packets when the receive buffer memory limit is hit. As documented previously, the cost for saving these packets (i.e. not dropping them) is latency.

However, the latency spikes, in milliseconds, for tcp_try_rmem_schedule(), are:

tcp_rmem 170 MiB, tcp_adv_win_scale +2 (170p2):

@ms:
[0]       27093 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[1]           0 |
[2, 4)        0 |
[4, 8)        0 |
[8, 16)       0 |
[16, 32)      0 |
[32, 64)     16 |

tcp_rmem 146 MiB, tcp_adv_win_scale +3 (146p3):

@ms:
(..., 16)  25984 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[16, 20)       0 |
[20, 24)       0 |
[24, 28)       0 |
[28, 32)       0 |
[32, 36)       0 |
[36, 40)       0 |
[40, 44)       1 |
[44, 48)       6 |
[48, 52)       6 |
[52, 56)       3 |

tcp_rmem 137 MiB, tcp_adv_win_scale +4 (137p4):

@ms:
(..., 16)  37222 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[16, 20)       0 |
[20, 24)       0 |
[24, 28)       0 |
[28, 32)       0 |
[32, 36)       0 |
[36, 40)       1 |
[40, 44)       8 |
[44, 48)       2 |

These are the latency spikes we cannot have on the Cloudflare global network.

Kernel patch – Attempt #2

So the “something” that was not working in Attempt #1 was that the receive queue memory limit was hit early on as the flow was just ramping up (when the values for sk_rmem_alloc and sk_rcvbuf were small, ~800KB). This occurred at about the two second mark for 137p4 test (about 2.25 seconds for 170p2).

In hindsight, we should have noticed that tcp_prune_queue() actually raises sk_rcvbuf when it can. So we modified the patch in response to that, added a guard to allow the collapse to execute when sk_rmem_alloc is less than the threshold value.

net.ipv4.tcp_collapse_max_bytes = 6291456

The next section discusses how we arrived at this value for tcp_collapse_max_bytes.

The patch is available here.

The results with the new patch are as follows:

oscil – 300ms tests

Optimizing TCP for high WAN throughput while preserving low latency

oscil – 20ms tests

Optimizing TCP for high WAN throughput while preserving low latency

oscil – 0ms tests

Optimizing TCP for high WAN throughput while preserving low latency

iperf3 – 300 ms tests

Optimizing TCP for high WAN throughput while preserving low latency

iperf3 – 20 ms tests

Optimizing TCP for high WAN throughput while preserving low latency

iperf3 – 0ms tests

Optimizing TCP for high WAN throughput while preserving low latency

All tests are successful.

Setting tcp_collapse_max_bytes

In order to determine this setting, we need to understand what the biggest queue we can collapse without incurring unacceptable latency.

Optimizing TCP for high WAN throughput while preserving low latency
Optimizing TCP for high WAN throughput while preserving low latency

Using 6 MiB should result in a maximum latency of no more than 2 ms.

Cloudflare production network results

Current production settings (“Old”)

net.ipv4.tcp_rmem = 8192 2097152 16777216
net.ipv4.tcp_wmem = 4096 16384 33554432
net.ipv4.tcp_adv_win_scale = -2
net.ipv4.tcp_collapse_max_bytes = 0
net.ipv4.tcp_notsent_lowat = 4294967295

tcp_collapse_max_bytes of 0 means that the custom feature is disabled and that the vanilla kernel logic is used for TCP collapse processing.

New settings under test (“New”)

net.ipv4.tcp_rmem = 8192 262144 536870912
net.ipv4.tcp_wmem = 4096 16384 536870912
net.ipv4.tcp_adv_win_scale = -2
net.ipv4.tcp_collapse_max_bytes = 6291456
net.ipv4.tcp_notsent_lowat = 131072

The tcp_notsent_lowat setting is discussed in the last section of this post.

The middle value of tcp_rmem was changed as a result of separate work that found that Linux autotuning was setting receive buffers too high for localhost sessions. This updated setting reduces TCP memory usage for those sessions, but does not change anything about the type of TCP sessions that is the focus of this post.

For the following benchmarks, we used non-Cloudflare host machines in Iowa, US, and Melbourne, Australia performing data transfers to the Cloudflare data center in Marseille, France. In Marseille, we have some hosts configured with the existing production settings, and others with the system settings described in this post. Software used is perf3 version 3.9, kernel 5.15.32.

Throughput results

Optimizing TCP for high WAN throughput while preserving low latency

RTT (ms) Throughput with Current Settings (mbps) Throughput with New Settings (mbps) Increase Factor
Iowa to Marseille 121 276 6600 24x
Melbourne to Marseille 282 120 3800 32x

Iowa-Marseille throughput

Optimizing TCP for high WAN throughput while preserving low latency

Iowa-Marseille receive window and bytes-in-flight

Optimizing TCP for high WAN throughput while preserving low latency

Melbourne-Marseille throughput

Optimizing TCP for high WAN throughput while preserving low latency

Melbourne-Marseille receive window and bytes-in-flight

Optimizing TCP for high WAN throughput while preserving low latency

Even with the new settings in place, the Melbourne to Marseille performance is limited by the receive window on the Cloudflare host. This means that further adjustments to these settings yield even higher throughput.

Latency results

The Y-axis on these charts are the 99th percentile time for TCP collapse in seconds.

Cloudflare hosts in Marseille running the current production settings

Optimizing TCP for high WAN throughput while preserving low latency

Cloudflare hosts in Marseille running the new settings

Optimizing TCP for high WAN throughput while preserving low latency

The takeaway in looking at these graphs is that maximum TCP collapse time for the new settings is no worse than with the current production settings. This is the desired result.

Send Buffers

What we have shown so far is that the receiver side seems to be working well, but what about the sender side?

As part of this work, we are setting tcp_wmem max to 512 MiB. For oscillating reader flows, this can cause the send buffer to become quite large. This represents bufferbloat and wasted kernel memory, both things that nobody likes or wants.

Fortunately, there is already a solution: tcp_notsent_lowat. This setting limits the size of unsent bytes in the write queue. More details can be found at https://lwn.net/Articles/560082.

The results are significant:

Optimizing TCP for high WAN throughput while preserving low latency

The RTT for these tests was 466ms. Throughput is not negatively affected. Throughput is at full wire speed in all cases (1 Gbps). Memory usage is as reported by /proc/net/sockstat, TCP mem.

Our web servers already set tcp_notsent_lowat to 131072 for its sockets. All other senders are using 4 GiB, the default value. We are changing the sysctl so that 131072 is in effect for all senders running on the server.

Conclusion

The goal of this work is to open the throughput floodgates for high BDP connections while simultaneously ensuring very low HTTP request latency.

We have accomplished that goal.

Optimizing data warehouse storage

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/optimizing-data-warehouse-storage-7b94a48fdcbe

By Anupom Syam

Background

At Netflix, our current data warehouse contains hundreds of Petabytes of data stored in AWS S3, and each day we ingest and create additional Petabytes. At this scale, we can gain a significant amount of performance and cost benefits by optimizing the storage layout (records, objects, partitions) as the data lands into our warehouse.

There are several benefits of such optimizations like saving on storage, faster query time, cheaper downstream processing, and an increase in developer productivity by removing additional ETLs written only for query performance improvement. On the other hand, these optimizations themselves need to be sufficiently inexpensive to justify their own processing cost over the gains they bring.

We built AutoOptimize to efficiently and transparently optimize the data and metadata storage layout while maximizing their cost and performance benefits.

This article will list some of the use cases of AutoOptimize, discuss the design principles that help enhance efficiency, and present the high-level architecture. Then deep dive into the merging use case of AutoOptimize and share some results and benefits.

Use cases

We found several use cases where a system like AutoOptimize can bring tons of value. Some of the optimizations are prerequisites for a high-performance data warehouse. Sometimes Data Engineers write downstream ETLs on ingested data to optimize the data/metadata layouts to make other ETL processes cheaper and faster. The goal of AutoOptimize is to centralize such optimizations that will remove duplicate work and while doing it more efficiently than vanilla ETLs.

Merge

As the data lands into the data warehouse through real-time data ingestion systems, it comes in different sizes. This results in a perpetually increasing number of small files across the partitions. Merging those numerous smaller files into a handful of larger files can make query processing faster and reduce storage space.

Sort

Presorted records and files in partitions make queries faster and save significant amounts of storage space as it enables a higher level of compression. We already had some existing tables with sorting stages to reduce table storage and improve downstream query performance.

Compaction

Modern data warehouses allow updating and deleting pre-existing records. Iceberg plans to enable this in the form of delta files. Over time, the number of delta files grows, and compacting them to their source files can make the read operations more optimal.

Metadata optimization

In Iceberg, the physical partitioning is decoupled from logical partitioning by keeping a map to file locations in the metadata. This enables us to add additional indexes in the metadata to make point queries more optimal. We can also reorganize the metadata to make file scanning much faster.

Design Principles

For AutoOptimize to efficiently optimize the data layout, we’ve made the following choices:

  1. Just in time vs. periodic optimization
    Only optimize a given data set when required (based on what changed) instead of blind periodic runs.
  2. Essential vs. complete optimization
    Allow users to optimize at the point of diminishing returns instead of a binary setting. For example, we allow a partition to have a few small files instead of always merging files in perfect sizes.
  3. Minimum replacement vs. full overwrite
    Only replace the required minimum amount of files instead of a full sweep overwrite.

These principles reduce resource usage by being more efficient and effective while lowering the end-to-end latency in data processing.

Other than these principles, there are some other design considerations to support and enable:

  • Multi-tenancy with database and table prioritization.
  • Both automatic (event-driven) as well as manual (ad-hoc) optimization.
  • Transparency to end-users.

High-Level Design

AutoOptimize High-Level Design

AutoOptimize is split into 2 subsystems (Service and Actors) to decouple the decisions from the actions at a high level. This decoupling of responsibilities helps us to design, manage, use, and scale the subsystems independently.

AutoOptimize Service

The service is the decision-maker. It decides what to do and when to do in response to an incoming event. It is responsible for listening to incoming events and requests and prioritizing different tables and actions to make the best usage of the available resources.

The work done in the service can be further broken down into the following 3 steps:

Observe: Listen to changes in the warehouse in near real-time. Also, respond to ad-hoc requests created manually by end-users.

Orient: Gather tuning parameters for a particular table that changed. Also, adjust the resource allocation for the table or the number of actors depending on the backlog.

Decide: Determine the highest value action with the right parameters for this particular change and when to act depending on how the action falls in the global priority across all tables and actions.

In AutoOptimize, the service is a cluster of Java (Spring Boot) applications using Redis to keep the states.

AutoOptimize Actors

Actors in AutoOptimize are responsible for the actual work (merging/sorting/compaction etc.). The AutoOptimize Service sends commands to the actors that specify what to do. The job of Actors is to perform those commands in a distributed and fault-tolerant manner.

Actors in AutoOptimize are a pool of long-running Spark jobs managed by the AutoOptimize service.

This was not intentional but we found that the way we modularized AutoOptimize’s decision-making workflow is very similar to the OODA loop and decided to use the same taxonomy.

Other Components

Iceberg
We use Apache Iceberg as the table format. AutoOptimize relies on some of the Iceberg specific features such as snapshot and atomic operations to perform the optimizations in an accurate and scalable manner.

AutoAnalyze
In short, AutoAnalyze finds the best tuning/configuration parameters for a table. It uses “What-If” experiments and previous experiences and heuristics to find the most fitting attributes for a table. We will publish a follow-up blog post about AutoAnalyze in the future. For AutoOptimize, it may find if a table needs file merging or suggest a target file size and other parameters.

Deep Dive into File Merge

File merge is the first use-case that we built for AutoOptimize. Previously we had our homegrown system called Ursula responsible for data ingestion into the Hive based warehouse. The Ursula based pipeline also performed file merges on the ingested table partitions periodically. Since then, we have moved our ingestion to Keystone and our table layout to Iceberg.

The migration out of Ursula to Keystone/Iceberg based ingestion initiated the need for a replacement for Ursula file merge. File merging is necessary for a low latency streaming ingestion pipeline as data often arrive late and unevenly. The number of small files cripples across partitions over time and can have some serious side effects like:

  1. Slowing down queries.
  2. More processing resources.
  3. Increase in storage space.

The goal of File merge in AutoOptimize is to efficiently reduce the side effects while not adding additional latency to the data pipeline.

Solutions

This section will discuss some of the solutions that helped us achieve the previously stated goals.

Just in time optimization

AutoOptimize file merge gets triggered via table change events. This allows AutoOptimize to act right away with a minimum lag. But the problem with being event-driven is it’s expensive to scan the changed partitions every time they change. If we can determine “how noisy” a partition is from the changesets in a rolling manner, we will eliminate unnecessary full partition scanning with early signals from snapshots.

Essential work

After a full partition scan, AutoOptimize gets a more comprehensive view of the state of the partition. We can get a more accurate state of the partition at this stage and avoid non-essential work.

Partition Entropy
We introduced a concept called Partition Entropy (PE) used for early pruning at each step to reduce actual work. It’s a set of stats about the state of the partition. We calculate this in a rolling manner after each snapshot scan and more exhaustively after each partition scan.

The parts of PE that deal with file sizes are called File Size Entropy (FSE). FSE of a partition is derived from the Mean Squared Error (MSE) of file sizes in a partition. We will use the terms FSE and MSE interchangeably.

We use the standard Mean Squared Error formula:

Where,

N = Number of files in the partition
Target = Target File Size
Actual = min(Actual File Size, Target)

When a partition is scanned, it’s easy to calculate the MSE using the above formula as we know the sizes of all files in that partition. We store the MSE and N for each partition in Redis for later use.

At the snapshot scan stage, we get a commit definition containing the list of files and their metadata (like size, number of records, etc.) that got added and deleted in the commit. We calculate the new MSE’ of a changed partition in a rolling manner from the snapshot information and the previously stored stats using this formula:

Where,

M = Number of files added in the snapshot.
Target = Target File Size.
Actual = min(Actual File Size, Target)
N = Previously stored number of files in the partition.
MSE = Previously stored MSE.

We have a tolerance threshold (T) for each partition and skip further processing of the partition if MSE < T². This helps us significantly reduce the number of full partition scans at the snapshot scan step and the number of actual merges in the partition scan stage.

Entropy-Based Filtering

The actual formulas are a little bit more complicated than what stated here, as we need to take care of deleted files and some other edge cases. We could also use Mean Absolute Error but we want to be biased towards outliers — as the goal is to have a more even file size in a partition than having a mixed bag of different sizes with some perfect sized files.

Minimum replacement

Once we start processing a partition, we find the minimum amount of work needed to reduce the File Size Entropy and thus reduce the number of small files.

We use 2 different packing algorithms to achieve this:

Knuth/Plass line breaking algorithm
We use this strategy when the sort order among files is important. With a correct error function (ex: Error²), this algorithm helps minimize MSE with a bounding run time of O(n²).

First Fit Decreasing bin packing algorithm
We use a modified version of the original FFD algorithm if we can ignore the sort order. This helps reduce the number of replacements with an O(nlog(n)) running time.

These methods help us smooth out the file size histogram while doing it optimally with minimal file replacement.

Multi-tenancy

AutoOptimize is multi-tenant; that is, it runs on many different databases and tables. When running the optimizations, it also needs to prioritize and allocate resources at different levels for different tasks. It requires answering questions like which table should be processed first or get more resource bandwidth or what optimization gives the most ROI.

To support multi-tenancy and tasks prioritization, it needs to have the following properties:

  • Weighted resource sharing across different priorities.
  • Fair resource sharing across different tables and tasks with the same priority.
  • Handle bursts to prevent starvation.

We use different types of Weighted Fair Queue implementations inside AutoOptimize, including different combinations of the followings:

  1. Weighted Round Robin
  2. Deficit Weighted Round Robin
  3. Fixed Priority Preemptive

Reliable Priority Queue
To support prioritization and fair resource usage, we introduced a concept called Reliable Priority Queue (RPQ) in AutoOptimize. A reliable queue does not lose items if the subscriber fails to process the items after a dequeue. An RPQ also has a sense of prioritization across different items while being reliable. The concept is fairly similar to the default Redis RPOPLPUSH reliable queue pattern. But for AutoOptimize’s use case, we use Sorted Sets instead of lists to enable prioritization.

The goal of AutoOptimize is to optimize the warehouse with a holistic perspective. Making it multi-tenant with a notion of different priorities helps us make the most optimal resource allocation.

Results

22% reduction in partition scans

2% reduction in merge actions

72% reduction in file replacements

These savings are stacked on top of each other as they are applied in sequence in the AutoOptimize pipeline. This results in a massive reduction in actual processing need while reducing the number of files by 80%.

80% reduction in the number of files

70% saving in compute

We are using 70% less compute instances than our previous merge implementation.

We also see up to 60% improvement in query performance and an additional 1% saving in storage.

Benefits

Increase processing efficiency: As AutoOptimize uses file replacement and can avoid processing by filtering early, it can save processing costs by skipping files that are not required to be merged.

Increase storage efficiency: AutoOptimize helps save storage costs by enabling AutoAnalyze recommendations to sort the records.

Reduce lag: Periodic overwrite ETLs take more time as it works in batches. AutoOptimize reduces end to end lag in data processing by optimizing as we go.

Faster query: A smaller number of files results in smaller file scanning, fewer network calls, and makes queries faster.

Ease of use: AutoOptimize provides a frictionless way to setup optimization with minimum maintenance overhead from Data Engineering.

Developer productivity: Instead of adding an ETL per table for merging, which adds ongoing incremental maintenance cost, we have a single solution that can transparently scale to many tables.

Conclusion

We believe the problems we faced at Netflix are not unique, and some of the techniques and design considerations we made can be applied more generally. By laying out the data intelligently as they are ingested into the warehouse, we are removing complexities for Data Engineers and accelerating the end-to-end pipeline. At the same time, we are gaining a significant amount of performance and cost improvement by optimizing only when it makes sense. We plan to extend AutoOptimize into other use cases and integrate it more with the Iceberg ecosystem in the future.


Optimizing data warehouse storage was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Computing Euclidean distance on 144 dimensions

Post Syndicated from Marek Majkowski original https://blog.cloudflare.com/computing-euclidean-distance-on-144-dimensions/

Computing Euclidean distance on 144 dimensions

Computing Euclidean distance on 144 dimensions

Late last year I read a blog post about our CSAM image scanning tool. I remember thinking: this is so cool! Image processing is always hard, and deploying a real image identification system at Cloudflare is no small achievement!

Some time later, I was chatting with Kornel: “We have all the pieces in the image processing pipeline, but we are struggling with the performance of one component.” Scaling to Cloudflare needs ain’t easy!

The problem was in the speed of the matching algorithm itself. Let me elaborate. As John explained in his blog post, the image matching algorithm creates a fuzzy hash from a processed image. The hash is exactly 144 bytes long. For example, it might look like this:

00e308346a494a188e1043333147267a 653a16b94c33417c12b433095c318012
5612442030d14a4ce82c623f4e224733 1dd84436734e4a5d6e25332e507a8218
6e3b89174e30372d

The hash is designed to be used in a fuzzy matching algorithm that can find “nearby”, related images. The specific algorithm is well defined, but making it fast is left to the programmer — and at Cloudflare we need the matching to be done super fast. We want to match thousands of hashes per second, of images passing through our network, against a database of millions of known images. To make this work, we need to seriously optimize the matching algorithm.

Naive quadratic algorithm

The first algorithm that comes to mind has O(K*N) complexity: for each query, go through every hash in the database. In naive implementation, this creates a lot of work. But how much work exactly?

First, we need to explain how fuzzy matching works.

Given a query hash, the fuzzy match is the “closest” hash in a database. This requires us to define a distance. We treat each hash as a vector containing 144 numbers, identifying a point in a 144-dimensional space. Given two such points, we can calculate the distance using the standard Euclidean formula.

For our particular problem, though, we are interested in the “closest” match in a database only if the distance is lower than some predefined threshold. Otherwise, when the distance is large,  we can assume the images aren’t similar. This is the expected result — most of our queries will not have a related image in the database.

The Euclidean distance equation used by the algorithm is standard:

Computing Euclidean distance on 144 dimensions

To calculate the distance between two 144-byte hashes, we take each byte, calculate the delta, square it, sum it to an accumulator, do a square root, and ta-dah! We have the distance!

Here’s how to count the squared distance in C:

Computing Euclidean distance on 144 dimensions

This function returns the squared distance. We avoid computing the actual distance to save us from running the square root function – it’s slow. Inside the code, for performance and simplicity, we’ll mostly operate on the squared value. We don’t need the actual distance value, we just need to find the vector with the smallest one. In our case it doesn’t matter if we’ll compare distances or squared distances!

As you can see, fuzzy matching is basically a standard problem of finding the closest point in a multi-dimensional space. Surely this has been solved in the past — but let’s not jump ahead.

While this code might be simple, we expect it to be rather slow. Finding the smallest hash distance in a database of, say, 1M entries, would require going over all records, and would need at least:

  1. 144 * 1M subtractions
  2. 144 * 1M multiplications
  3. 144 * 1M additions

And more. This alone adds up to 432 million operations! How does it look in practice? To illustrate this blog post we prepared a full test suite. The large database of known hashes can be well emulated by random data. The query hashes can’t be random and must be slightly more sophisticated, otherwise the exercise wouldn’t be that interesting. We generated the test smartly by byte-swaps of the actual data from the database — this allows us to precisely control the distance between test hashes and database hashes. Take a look at the scripts for details. Here’s our first run of the first, naive, algorithm:

$ make naive
< test-vector.txt ./mmdist-naive > test-vector.tmp
Total: 85261.833ms, 1536 items, avg 55.509ms per query, 18.015 qps

We matched 1,536 test hashes against a database of 1 million random vectors in 85 seconds. It took 55ms of CPU time on average to find the closest neighbour. This is rather slow for our needs.

SIMD for help

An obvious improvement is to use more complex SIMD instructions. SIMD is a way to instruct the CPU to process multiple data points using one instruction. This is a perfect strategy when dealing with vector problems — as is the case for our task.

We settled on using AVX2, with 256 bit vectors. We did this for a simple reason — newer AVX versions are not supported by our AMD CPUs. Additionally, in the past, we were not thrilled by the AVX-512 frequency scaling.

Using AVX2 is easier said than done. There is no single instruction to count Euclidean distance between two uint8 vectors! The fastest way of counting the full distance of two 144-byte vectors with AVX2 we could find is authored by Vlad:

Computing Euclidean distance on 144 dimensions

It’s actually simpler than it looks: load 16 bytes, convert vector from uint8 to int16, subtract the vector, store intermediate sums as int32, repeat. At the end, we need to do complex 4 instructions to extract the partial sums into the final sum. This AVX2 code improves the performance around 3x:

$ make naive-avx2 
Total: 25911.126ms, 1536 items, avg 16.869ms per query, 59.280 qps

We measured 17ms per item, which is still below our expectations. Unfortunately, we can’t push it much further without major changes. The problem is that this code is limited by memory bandwidth. The measurements come from my Intel i7-5557U CPU, which has the max theoretical memory bandwidth of just 25GB/s. The database of 1 million entries takes 137MiB, so it takes at least 5ms to feed the database to my CPU. With this naive algorithm we won’t be able to go below that.

Vantage Point Tree algorithm

Since the naive brute force approach failed, we tried using more sophisticated algorithms. My colleague Kornel Lesiński implemented a super cool Vantage Point algorithm. After a few ups and downs, optimizations and rewrites, we gave up. Our problem turned out to be unusually hard for this kind of algorithm.

We observed “the curse of dimensionality”. Space partitioning algorithms don’t work well in problems with large dimensionality — and in our case, we have an enormous number of 144 dimensions. K-D trees are doomed. Locality-sensitive hashing is also doomed. It’s a bizarre situation in which the space is unimaginably vast, but everything is close together. The volume of the space is a 347-digit-long number, but the maximum distance between points is just 3060 – sqrt(255*255*144).

Space partitioning algorithms are fast, because they gradually narrow the search space as they get closer to finding the closest point. But in our case, the common query is never close to any point in the set, so the search space can’t be narrowed to a meaningful degree.

A VP-tree was a promising candidate, because it operates only on distances, subdividing space into near and far partitions, like a binary tree. When it has a close match, it can be very fast, and doesn’t need to visit more than O(log(N)) nodes. For non-matches, its speed drops dramatically. The algorithm ends up visiting nearly half of the nodes in the tree. Everything is close together in 144 dimensions! Even though the algorithm avoided visiting more than half of the nodes in the tree, the cost of visiting remaining nodes was higher, so the search ended up being slower overall.

Smarter brute force?

This experience got us thinking. Since space partitioning algorithms can’t narrow down the search, and still need to go over a very large number of items, maybe we should focus on going over all the hashes, extremely quickly. We must be smarter about memory bandwidth though — it was the limiting factor in the naive brute force approach before.

Perhaps we don’t need to fetch all the data from memory.

Short distance

The breakthrough came from the realization that we don’t need to count the full distance between hashes. Instead, we can compute only a subset of dimensions, say 32 out of the total of 144. If this distance is already large, then there is no need to compute the full one! Computing more points is not going to reduce the Euclidean distance.

The proposed algorithm works as follows:

1. Take the query hash and extract a 32-byte short hash from it

2. Go over all the 1 million 32-byte short hashes from the database. They must be densely packed in the memory to allow the CPU to perform good prefetching and avoid reading data we won’t need.

3. If the distance of the 32-byte short hash is greater or equal a best score so far, move on

4. Otherwise, investigate the hash thoroughly and compute the full distance.

Even though this algorithm needs to do less arithmetic and memory work, it’s not faster than the previous naive one. See make short-avx2. The problem is: we still need to compute a full distance for hashes that are promising, and there are quite a lot of them. Computing the full distance for promising hashes adds enough work, both in ALU and memory latency, to offset the gains of this algorithm.

There is one detail of our particular application of the image matching problem that will help us a lot moving forward. As we described earlier, the problem is less about finding the closest neighbour and more about proving that the neighbour with a reasonable distance doesn’t exist. Remember — in practice, we don’t expect to find many matches! We expect almost every image we feed into the algorithm to be unrelated to image hashes stored in the database.

It’s sufficient for our algorithm to prove that no neighbour exists within a predefined distance threshold. Let’s assume we are not interested in hashes more distant than, say, 220, which squared is 48,400. This makes our short-distance algorithm variation work much better:

$ make short-avx2-threshold
Total: 4994.435ms, 1536 items, avg 3.252ms per query, 307.542 qps

Origin distance variation

Computing Euclidean distance on 144 dimensions

At some point, John noted that the threshold allows additional optimization. We can order the hashes by their distance from some origin point. Given a query hash which has origin distance of A, we can inspect only hashes which are distant between |A-threshold| and |A+threshold| from the origin. This is pretty much how each level of Vantage Point Tree works, just simplified. This optimization — ordering items in the database by their distance from origin point — is relatively simple and can help save us a bit of work.

While great on paper, this method doesn’t introduce much gain in practice, as the vectors are not grouped in clusters — they are pretty much random! For the threshold values we are interested in, the origin distance algorithm variation gives us ~20% speed boost, which is okay but not breathtaking. This change might bring more benefits if we ever decide to reduce the threshold value, so it might be worth doing for production implementation. However, it doesn’t work well with query batching.

Transposing data for better AVX

But we’re not done with AVX optimizations! The usual problem with AVX is that the instructions don’t normally fit a specific problem. Some serious mind twisting is required to adapt the right instruction to the problem, or to reverse the problem so that a specific instruction can be used. AVX2 doesn’t have useful “horizontal” uint16 subtract, multiply and add operations. For example, _mm_hadd_epi16 exists, but it’s slow and cumbersome.

Instead, we can twist the problem to make use of fast available uint16 operands. For example we can use:

  1. _mm256_sub_epi16
  2. _mm256_mullo_epi16
  3. and _mm256_add_epu16.

The add would overflow in our case, but fortunately there is add-saturate _mm256_adds_epu16.

The saturated add is great and saves us conversion to uint32. It just adds a small limitation: the threshold passed to the program (i.e., the max squared distance) must fit into uint16. However, this is fine for us.

To effectively use these instructions we need to transpose the data in the database. Instead of storing hashes in rows, we can store them in columns:

Computing Euclidean distance on 144 dimensions

So instead of:

  1. [a1, a2, a3],
  2. [b1, b2, b3],
  3. [c1, c2, c3],

We can lay it out in memory transposed:

  1. [a1, b1, c1],
  2. [a2, b2, c2],
  3. [a3, b3, c3],

Now we can load 16 first bytes of hashes using one memory operation. In the next step, we can subtract the first byte of the querying hash using a single instruction, and so on. The algorithm stays exactly the same as defined above; we just make the data easier to load and easier to process for AVX.

The hot loop code even looks relatively pretty:

Computing Euclidean distance on 144 dimensions

With the well-tuned batch size and short distance size parameters we can see the performance of this algorithm:

$ make short-inv-avx2
Total: 1118.669ms, 1536 items, avg 0.728ms per query, 1373.062 qps

Whoa! This is pretty awesome. We started from 55ms per query, and we finished with just 0.73ms. There are further micro-optimizations possible, like memory prefetching or using huge pages to reduce page faults, but they have diminishing returns at this point.

Computing Euclidean distance on 144 dimensions
Roofline model from Denis Bakhvalov’s book‌‌

If you are interested in architectural tuning such as this, take a look at the new performance book by Denis Bakhvalov. It discusses roofline model analysis, which is pretty much what we did here.

Do take a look at our code and tell us if we missed some optimization!

Summary

What an optimization journey! We jumped between memory and ALU bottlenecked code. We discussed more sophisticated algorithms, but in the end, a brute force algorithm — although tuned — gave us the best results.

To get even better numbers, I experimented with Nvidia GPU using CUDA. The CUDA intrinsics like vabsdiff4 and dp4a fit the problem perfectly. The V100 gave us some amazing numbers, but I wasn’t fully satisfied with it. Considering how many AMD Ryzen cores with AVX2 we can get for the cost of a single server-grade GPU, we leaned towards general purpose computing for this particular problem.

This is a great example of the type of complexities we deal with every day. Making even the best technologies work “at Cloudflare scale” requires thinking outside the box. Sometimes we rewrite the solution dozens of times before we find the optimal one. And sometimes we settle on a brute-force algorithm, just very very optimized.

The computation of hashes and image matching are challenging problems that require running very CPU intensive operations.. The CPU we have available on the edge is scarce and workloads like this are incredibly expensive. Even with the optimization work talked about in this blog post, running the CSAM scanner at scale is a challenge and has required a huge engineering effort. And we’re not done! We need to solve more hard problems before we’re satisfied. If you want to help, consider applying!

My internship: Brotli compression using a reduced dictionary

Post Syndicated from Felix Hanau original https://blog.cloudflare.com/brotli-compression-using-a-reduced-dictionary/

My internship: Brotli compression using a reduced dictionary

Brotli is a state of the art lossless compression format, supported by all major browsers. It is capable of achieving considerably better compression ratios than the ubiquitous gzip, and is rapidly gaining in popularity. Cloudflare uses the Google brotli library to dynamically compress web content whenever possible. In 2015, we took an in-depth look at how brotli works and its compression advantages.

One of the more interesting features of the brotli file format, in the context of textual web content compression, is the inclusion of a built-in static dictionary. The dictionary is quite large, and in addition to containing various strings in multiple languages, it also supports the option to apply multiple transformations to those words, increasing its versatility.

The open sourced brotli library, that implements an encoder and decoder for brotli, has 11 predefined quality levels for the encoder, with higher quality level demanding more CPU in exchange for a better compression ratio. The static dictionary feature is used to a limited extent starting with level 5, and to the full extent only at levels 10 and 11, due to the high CPU cost of this feature.

We improve on the limited dictionary use approach and add optimizations to improve the compression at levels 5 through 9 at a negligible performance impact when compressing web content.

Brotli Static Dictionary

Brotli primarily uses the LZ77 algorithm to compress its data. Our previous blog post about brotli provides an introduction.

To improve compression on text files and web content, brotli also includes a static, predefined dictionary. If a byte sequence cannot be matched with an earlier sequence using LZ77 the encoder will try to match the sequence with a reference to the static dictionary, possibly using one of the multiple transforms. For example, every HTML file contains the opening <html> tag that cannot be compressed with LZ77, as it is unique, but it is contained in the brotli static dictionary and will be replaced by a reference to it. The reference generally takes less space than the sequence itself, which decreases the compressed file size.

The dictionary contains 13,504 words in six languages, with lengths from 4 to 24 characters. To improve the compression of real-world text and web data, some dictionary words are common phrases (“The current”) or strings common in web content (‘type=”text/javascript”’). Unlike usual LZ77 compression, a word from the dictionary can only be matched as a whole. Starting a match in the middle of a dictionary word, ending it before the end of a word or even extending into the next word is not supported by the brotli format.

Instead, the dictionary supports 120 transforms of dictionary words to support a larger number of matches and find longer matches. The transforms include adding suffixes (“work” becomes “working”) adding prefixes (“book” => “ the book”) making the first character uppercase (“process” => “Process”) or converting the whole word to uppercase (“html” => “HTML”). In addition to transforms that make words longer or capitalize them, the cut transform allows a shortened match (“consistently” => “consistent”), which makes it possible to find even more matches.

Methods

With the transforms included, the static dictionary contains 1,633,984 different words – too many for exhaustive search, except when used with the slow brotli compression levels 10 and 11. When used at a lower compression level, brotli either disables the dictionary or only searches through a subset of roughly 5,500 words to find matches in an acceptable time frame. It also only considers matches at positions where no LZ77 match can be found and only uses the cut transform.

Our approach to the brotli dictionary uses a larger, but more specialized subset of the dictionary than the default, using more aggressive heuristics to improve the compression ratio with negligible cost to performance. In order to provide a more specialized dictionary, we provide the compressor with a content type hint from our servers, relying on the Content-Type header to tell the compressor if it should use a dictionary for HTML, JavaScript or CSS. The dictionaries can be furthermore refined by colocation language in the future.

Fast dictionary lookup

To improve compression without sacrificing performance, we needed a fast way to find matches if we want to search the dictionary more thoroughly than brotli does by default. Our approach uses three data structures to find a matching word directly. The radix trie is responsible for finding the word while the hash table and bloom filter are used to speed up the radix trie and quickly eliminate many words that can’t be matched using the dictionary.

My internship: Brotli compression using a reduced dictionary
Lookup for a position starting with “type”

The radix trie easily finds the longest matching word without having to try matching several words. To find the match, we traverse the graph based on the text at the current position and remember the last node with a matching word. The radix trie supports compressed nodes (having more than one character as an edge label), which greatly reduces the number of nodes that need to be traversed for typical dictionary words.

The radix trie is slowed down by the large number of positions where we can’t find a match. An important finding is that most mismatching strings have a mismatching character in the first four bytes. Even for positions where a match exists, a lot of time is spent traversing nodes for the first four bytes since the nodes close to the tree root usually have many children.

Luckily, we can use a hash table to look up the node equivalent to four bytes, matching if it exists or reject the possibility of a match. We thus look up the first four bytes of the string, if there is a matching node we traverse the trie from there, which will be fast as each four-byte prefix usually only has a few corresponding dict words. If there is no matching node, there will not be a matching word at this position and we do not need to further consider it.

While the hash table is designed to reject mismatches quickly and avoid cache misses and high search costs in the trie, it still suffers from similar problems: We might search through several 4-byte prefixes with the hash value of the given position, only to learn that no match can be found. Additionally, hash lookups can be expensive due to cache misses.

To quickly reject words that do not match the dictionary, but might still cause cache misses, we use a k=1 bloom filter to quickly rule out most non-matching positions. In the k=1 case, the filter is simply a lookup table with one bit indicating whether any matching 4-byte prefixes exist for a given hash value. If the hash value for the given bit is 0, there won’t be a match. Since the bloom filter uses at most one bit for each four-byte prefix while the hash table requires 16 bytes, cache misses are much less likely. (The actual size of the structures is a bit different since there are many empty spaces in both structures and the bloom filter has twice as many elements to reject more non-matching positions.)

This is very useful for performance as a bloom filter lookup requires a single memory access. The bloom filter is designed to be fast and simple, but still rejects more than half of all non-matching positions and thus allows us to save a full hash lookup, which would often mean a cache miss.

Heuristics

To improve the compression ratio without sacrificing performance, we employed a number of heuristics:

Only search the dictionary at some positions
This is also done using the stock dictionary, but we search more aggressively. While the stock dictionary only considers positions where the LZ77 match finder did not find a match, we also consider positions that have a bad match according to the brotli cost model: LZ77 matches that are short or have a long distance between the current position and the reference usually only offer a small compression improvement, so it is worth trying to find a better match in the static dictionary.

Only consider the longest match and then transform it
Instead of finding and transforming all matches at a position, the radix trie only gives us the longest match which we then transform. This approach results in a vast performance improvement. In most cases, this results in finding the best match.

Only include some transforms
While all transformations can improve the compression ratio, we only included those that work well with the data structures. The suffix transforms can easily be applied after finding a non-transformed match. For the upper case transforms, we include both the non-transformed and the upper case version of a word in the radix trie. The prefix and cut transforms do not play well with the radix trie, therefore a cut of more than 1 byte and prefix transforms are not supported.

Generating the reduced dictionary

At low compression levels, brotli searches a subset of ~5,500 out of 13,504 words of the dictionary, negatively impacting compression. To store the entire dictionary, we would need to store ~31,700 words in the trie considering the upper case transformed output of ASCII sequences and ~11,000 four-byte prefixes in the hash. This would slow down hash table and radix trie, so we needed to find a different subset of the dictionary that works well for web content.

For this purpose, we used a large data set containing representative content. We made sure to use web content from several world regions to reflect language diversity and optimize compression. Based on this data set, we identified which words are most common and result in the largest compression improvement according to the brotli cost model. We only include the most useful words based on this calculation. Additionally, we remove some words if they slow down hash table lookups of other, more common words based on their hash value.

We have generated separate dictionaries for HTML, CSS and JavaScript content and use the MIME type to identify the right dictionary to use. The dictionaries we currently use include about 15-35% of the entire dictionary including uppercase transforms. Depending on the type of data and the desired compression/speed tradeoff, different options for the size of the dictionary can be useful. We have also developed code that automatically gathers statistics about matches and generates a reduced dictionary based on this, which makes it easy to extend this to other textual formats, perhaps data that is majority non-English or XML data and achieve better results for this type of data.

Results

We tested the reduced dictionary on a large data set of HTML, CSS and JavaScript files.

The improvement is especially big for small files as the LZ77 compression is less effective on them. Since the improvement on large files is a lot smaller, we only tested files up to 256KB. We used compression level 5, the same compression level we currently use for dynamic compression on our edge, and tested on a Intel Core i7-7820HQ CPU.

Compression improvement is defined as 1 – (compressed size using the reduced dictionary / compressed size without dictionary). This ratio is then averaged for each input size range. We also provide an average value weighted by file size. Our data set mirrors typical web traffic, covering a wide range of file sizes with small files being more common, which explains the large difference between the weighted and unweighted average.

My internship: Brotli compression using a reduced dictionary

With the improved dictionary approach, we are now able to compress HTML, JavaScript and CSS files as well, or sometimes even better than using a higher compression level would allow us, all while using only 1% to 3% more CPU. For reference using compression level 6 over 5 would increase CPU usage by up to 12%.

Introducing support for the AVIF image format

Post Syndicated from Kornel Lesiński original https://blog.cloudflare.com/generate-avif-images-with-image-resizing/

Introducing support for the AVIF image format

Introducing support for the AVIF image format

We’ve added support for the new AVIF image format in Image Resizing. It compresses images significantly better than older-generation formats such as WebP and JPEG. It’s supported in Chrome desktop today, and support is coming to other Chromium-based browsers, as well as Firefox.

What’s the benefit?

More than a half of an average website’s bandwidth is spent on images. Improved image compression can save bandwidth and improve overall performance of the web. The compression in AVIF is so good that images can reduce to half the size of JPEG and WebP

What is AVIF?

AVIF is a combination of the HEIF ISO standard, and a royalty-free AV1 codec by Mozilla, Xiph, Google, Cisco, and many others.

Currently JPEG is the most popular image format on the Web. It’s doing remarkably well for its age, and it will likely remain popular for years to come thanks to its excellent compatibility. There have been many previous attempts at replacing JPEG, such as JPEG 2000, JPEG XR and WebP. However, these formats offered only modest compression improvements, and didn’t always beat JPEG on image quality. Compression and image quality in AVIF is better than in all of them, and by a wide margin.

Introducing support for the AVIF image format Introducing support for the AVIF image format Introducing support for the AVIF image format
JPEG (17KB) WebP (17KB) AVIF (17KB)

Why a new image format?

One of the big things AVIF does is it fixes WebP’s biggest flaws. WebP is over 10 years old, and AVIF is a major upgrade over the WebP format. These two formats are technically related: they’re both based on a video codec from the VPx family. WebP uses the old VP8 version, while AVIF is based on AV1, which is the next generation after VP10.

WebP is limited to 8-bit color depth, and in its best-compressing mode of operation, it can only store color at half of the image’s resolution (known as chroma subsampling). This causes edges of saturated colors to be smudged or pixelated in WebP. AVIF supports 10- and 12-bit color at full resolution, and high dynamic range (HDR).

Introducing support for the AVIF image format JPEG
Introducing support for the AVIF image format WebP
Introducing support for the AVIF image format WebP (sharp YUV option)
Introducing support for the AVIF image format AVIF

AV1 also uses a new compression technique: chroma-from-luma. Most image formats store brightness separately from color hue. AVIF uses the brightness channel to guess what the color channel may look like. They are usually correlated, so a good guess gives smaller file sizes and sharper edges.

Adoption of AV1 and AVIF

VP8 and WebP suffered from reluctant industry adoption. Safari only added WebP support very recently, 10 years after Chrome.

Chrome 85 supports AVIF already. It’s a work in progress in other Chromium-based browsers, and Firefox. Apple hasn’t announced whether Safari will support AVIF. However, this time Apple is one of the companies in the Alliance for Open Media, creators of AVIF. The AV1 codec is already seeing faster adoption than the previous royalty-free codecs. Latest GPUs from Nvidia, AMD, and Intel already have hardware decoding support for AV1.

AVIF uses the same HEIF container as the HEIC format used in iOS’s camera. AVIF and HEIC offer similar compression performance. The difference is that HEIC is based on a commercial, patent-encumbered H.265 format. In countries that allow software patents, H.265 is illegal to use without obtaining patent licenses. It’s covered by thousands of patents, owned by hundreds of companies, which have fragmented into two competing licensing organizations. Costs and complexity of patent licensing used to be acceptable when videos were published by big studios, and the cost could be passed on to the customer in the price of physical discs and hardware players. Nowadays, when video is mostly consumed via free browsers and apps, the old licensing model has become unsustainable. This has created a huge incentive for Web giants like Google, Netflix, and Amazon to get behind the royalty-free alternative. AV1 is free to use by anyone, and the alliance of tech giants behind it will defend it from patent troll’s lawsuits.

Non-standard image formats usually can only be created using their vendor’s own implementation. AVIF and AV1 are already ahead with multiple independent implementations: libaom, Intel SVT-AV1, libgav1, dav1d, and rav1e. This is a sign of a healthy adoption and a prerequisite to be a Web standard. Our Image Resizing is implemented in Rust, so we’ve chosen the rav1e encoder to create a pure-Rust implementation of AVIF.

Caveats

AVIF is a feature-rich format. Most of its features are for smartphone cameras, such as “live” photos, depth maps, bursts, HDR, and lossless editing. Browsers will support only a fraction of these features. In our implementation in Image Resizing we’re supporting only still standard-range images.

Two biggest problems in AVIF are encoding speed and lack of progressive rendering.

AVIF images don’t show anything on screen until they’re fully downloaded. In contrast, a progressive JPEG can display a lower-quality approximation of the image very quickly, while it’s still loading. When progressive JPEGs are delivered well, they make websites appear to load much faster. Progressive JPEG can look loaded at half of its file size. AVIF can fully load at half of JPEG’s size, so it somewhat overcomes the lack of progressive rendering with the sheer compression advantage. In this case only WebP is left behind, which has neither progressive rendering nor strong compression.

Decoding AVIF images for display takes relatively more CPU power than other codecs, but it should be fast enough in practice. Even low-end Android devices can play AV1 videos in full HD without help of hardware acceleration, and AVIF images are just a single frame of an AV1 video. However, encoding of AVIF images is much slower. It may take even a few seconds to create a single image. AVIF supports tiling, which accelerates encoding on multi-core CPUs. We have lots of CPU cores, so we take advantage of this to make encoding faster. Image Resizing doesn’t use the maximum compression level possible in AVIF to further increase compression speed. Resized images are cached, so the encoding speed is noticeable only on a cache miss.

We will be adding AVIF support to Polish as well. Polish converts images asynchronously in the background, which completely hides the encoding latency, and it will be able to compress AVIF images better than Image Resizing.

Note about benchmarking

It’s surprisingly difficult to fairly and accurately judge which lossy codec is better. Lossy codecs are specifically designed to mainly compress image details that are too subtle for the naked eye to see, so “looks almost the same, but the file size is smaller!” is a common feature of all lossy codecs, and not specific enough to draw conclusions about their performance.

Lossy codecs can’t be compared by comparing just file sizes. Lossy codecs can easily make files of any size. Their effectiveness is in the ratio between file size and visual quality they can achieve.

The best way to compare codecs is to make each compress an image to the exact same file size, and then to compare the actual visual quality they’ve achieved. If the file sizes differ, any judgement may be unfair, because the codec that generated the larger file may have done so only because it was asked to preserve more details, not because it can’t compress better.

How and when to enable AVIF today?

We recommend AVIF for websites that need to deliver high-quality images with as little bandwidth as possible. This is important for users of slow networks and in countries where the bandwidth is expensive.

Browsers that support the AVIF format announce it by adding image/avif to their Accept request header. To request the AVIF format from Image Resizing, set the format option to avif.

The best method to detect and enable support for AVIF is to use image resizing in Workers:

addEventListener('fetch', event => {
  const imageURL = "https://jpeg.speedcf.com/cat/4.jpg";

  const resizingOptions = {
    // You can set any options here, see:
    // https://developers.cloudflare.com/images/worker
    width: 800,
    sharpen: 1.0,
  };

  const accept = event.request.headers.get("accept");
  const isAVIFSupported = /image\/avif/.test(accept);
  if (isAVIFSupported) {
    resizingOptions.format = "avif";
  }
  event.respondWith(fetch(imageURL), {cf:{image: resizingOptions}})
})

The above script will auto-detect the supported format, and serve AVIF automatically. Alternatively, you can use URL-based resizing together with the <picture> element:

<picture>
    <source type="image/avif" 
            srcset="/cdn-cgi/image/format=avif/image.jpg">
    <img src="/image.jpg">
</picture>

This uses our /cdn-cgi/image/… endpoint to perform the conversion, and the alternative source will be picked only by browsers that support the AVIF format.

We have the format=auto option, but it won’t choose AVIF yet. We’re cautious, and we’d like to test the new format more before enabling it by default.