Tag Archives: Optimization

A multi-dimensional approach helps you proactively prepare for failures, Part 2: Infrastructure layer

Post Syndicated from Piyali Kamra original https://aws.amazon.com/blogs/architecture/a-multi-dimensional-approach-helps-you-proactively-prepare-for-failures-part-2-infrastructure-layer/

Distributed applications resiliency is a cumulative resiliency of applications, infrastructure, and operational processes. Part 1 of this series explored application layer resiliency. In Part 2, we discuss how using Amazon Web Services (AWS) managed services, redundancy, high availability, and infrastructure failover patterns based on recovery time and point objectives (RTO and RPO, respectively) can help in building more resilient infrastructures.

Pattern 1: Recognize high impact/likelihood infrastructure failures

To ensure cloud infrastructure resilience, we need to understand the likelihood and impact of various infrastructure failures, so we can mitigate them. Figure 1 illustrates that most of the failures with high likelihood happen because of operator error or poor deployments.

Automated testing, automated deployments, and solid design patterns can mitigate these failures. There could be datacenter failures—like whole rack failures—but deploying applications using auto scaling and multi-availability zone (multi-AZ) deployment, plus resilient AWS cloud native services, can mitigate the impact.

Likelihood and impact of failure events

Figure 1. Likelihood and impact of failure events

As demonstrated in the Figure 1, infrastructure resiliency is a combination of high availability (HA) and disaster recovery (DR). HA involves increasing the availability of the system by implementing redundancy among the application components and removing single points of failure.

Application layer decisions, like creating stateless applications, make it simpler to implement HA at the infrastructure layer by allowing it to scale using Auto Scaling groups and distributing the redundant applications across multiple AZs.

Pattern 2: Understanding and controlling infrastructure failures

Building a resilient infrastructure requires understanding which infrastructure failures are under control and which ones are not, as demonstrated in Figure 2.

These insights allow us to automate the detection of failures, control them, and employ pro-active patterns, such as static stability, to mitigate the need to scale up the infrastructure by over-provisioning it in advance.

Proactively designing systems in the event of failure

Figure 2. Proactively designing systems in the event of failure

The infrastructure decisions under our control that can increase the infrastructure resiliency of our system, include:

  • AWS services have control and data planes designed for minimum blast radius. Data planes typically have higher availability design goals than control planes and are usually less complex. When implementing recovery or mitigation responses to events that can affect resiliency, using control plane operations can lower the overall resiliency of your architectures. For example, Amazon Route 53 (Route 53) has a data plane designed for a 100% availability SLA. A good fail-over mechanism should rely on the data plane and not the control plane, as explained in Creating Disaster Recovery Mechanisms Using Amazon Route 53.
  • Understanding networking design and routes implemented in a virtual private cloud (VPC) are critical when testing the flow of traffic in our application. Understanding the flow of traffic helps us design better applications and see how one component failure can affect overall ingress/egress traffic. To achieve better network resiliency, it’s important to implement a good subnet strategy and manage our IP addresses to avoid fail-over issues and asymmetric routing in hybrid architectures. Use IP address management tools for established subnet strategies and routing decisions.
  • When designing VPCs and AZs, understanding the service limits, deploying independent routing tables and components in each zone increases availability. For example, highly available NAT gateways are preferred over NAT instances, as noted in the comparison provided in the Amazon VPC documentation.

Pattern 3: Considering different ways of increasing HA at the infrastructure layer

As already detailed, infrastructure resiliency = HA + DR.

Different ways by which system availability can be increased include:

  • Building for redundancy: Redundancy is the duplication of application components to increase the overall availability of the distributed system. After following application layer best practices, we can build auto healing mechanisms at the infrastructure layer.

We can take advantage of auto scaling features and use Amazon CloudWatch metrics and alarms to set up auto scaling triggers and deploy redundant copies of our applications across multiple AZs. This protects workloads from AZ failures, as shown in Figure 3.

Redundancy increases availability

Figure 3. Redundancy increases availability

  • Auto scale your infrastructure: When there are AZ failures, infrastructure auto scaling maintains the desired number of redundant components, which helps maintain the base level application throughput. This way, HA system and manage costs are maintained. Auto scaling uses metrics to scale in and out, appropriately, as shown in Figure 4.
How auto scaling improves availability

Figure 4. How auto scaling improves availability

  • Implement resilient network connectivity patterns: While building highly resilient distributed systems, network access to AWS infrastructure also needs to be highly resilient. While deploying hybrid applications, the capacity needed for hybrid applications to communicate with their cloud native application counterparts is an important consideration in designing the network access using AWS Direct Connect or VPNs.

Testing failover and fallback scenarios helps validate that network paths operate as expected and routes fail over as expected to meet RTO objectives. As the number of connection points between the data center and AWS VPCs increases, a hub and spoke configuration provided by the Direct Connect gateway and transit gateways simplify network topology, testing, and fail over. For more information, visit the AWS Direct Connect Resiliency Recommendations.

  • Whenever possible, use the AWS networking backbone to increase security, resiliency, and lower cost. AWS PrivateLink provides secure access to AWS services and exposes the application’s functionalities and APIs to other business units or partner accounts hosted on AWS.
  • Security appliances need to be set up in HA configuration, so that even if one AZ is unavailable, security inspection can be taken over by the redundant appliances in the other AZs.
  • Think ahead about DNS resolution: DNS is a critical infrastructure component; hybrid DNS resolution should be designed carefully with Route 53 HA inbound and outbound resolver endpoints instead of using self-managed proxies.

Implement a good strategy to share DNS resolver rules across AWS accounts and VPC’s with Resource Access Manager. Network failover tests are an important part of Disaster Recovery and Business Continuity Plans. To learn more, visit Set up integrated DNS resolution for hybrid networks in Amazon Route 53.

Additionally, ELB uses health checks to make sure that requests will route to another component if the underlying traffic application component fails. This improves the distributed system’s availability, as it is the cumulative availability of all different layers in our system. Figure 5 details advantages of some AWS managed services.

AWS managed services help in building resilient infrastructures (click the image to enlarge)

Figure 5. AWS managed services help in building resilient infrastructures (click the image to enlarge)

Pattern 4: Use RTO and RPO requirements to determine the correct failover strategy for your application

Capture RTO and RPO requirements early on to determine solid failover strategies (Figure 6). Disaster recovery strategies within AWS range from low cost and complexity (like backup and restore), to more complex strategies when lower values of RTO and RPO are required.

In pilot light and warm standby, only the primary region receives traffic. Pilot light only critical infrastructure components run in the backup region. Automation is used to check failures in the primary region using health checks and other metrics.

When health checks fail, use a combination of auto scaling groups, automation, and Infrastructure as Code (IaC) for quick deployment of other infrastructure components.

Note: This strategy depends on control plane availability in the secondary region for deploying the resources; keep this point in mind if you don’t have compute pre-provisioned in the secondary region. Carefully consider the business requirements and a distributed system’s application-level characteristics before deciding on a failover strategy. To understand all the factors and complexities involved in each of these disaster recovery strategies refer to disaster recovery options in the cloud.

Relationship between RTO, RPO, cost, data loss, and length of service interruption

Figure 6. Relationship between RTO, RPO, cost, data loss, and length of service interruption

Conclusion

In Part 2 of this series, we discovered that infrastructure resiliency is a combination of HA and DR. It is important to consider likelihood and impact of different failure events on availability requirements. Building in application layer resiliency patterns (Part 1 of this series), along with early discovery of the RTO/RPO requirements, as well as operational and process resiliency of an organization helps in choosing the right managed services and putting in place the appropriate failover strategies for distributed systems.

It’s important to differentiate between normal and abnormal load threshold for applications in order to put automation, alerts, and alarms in place. This allows us to auto scale our infrastructure for normal expected load, plus implement corrective action and automation to root out issues in case of abnormal load. Use IaC for quick failover and test failover processes.

Stay tuned for Part 3, in which we discuss operational resiliency!

From centralized architecture to decentralized architecture: How data sharing fine-tunes Amazon Redshift workloads

Post Syndicated from Jingbin Ma original https://aws.amazon.com/blogs/big-data/from-centralized-architecture-to-decentralized-architecture-how-data-sharing-fine-tunes-amazon-redshift-workloads/

Amazon Redshift is a fully managed, petabyte-scale, massively parallel data warehouse that offers simple operations and high performance. It makes it fast, simple, and cost-effective to analyze all your data using standard SQL and your existing business intelligence (BI) tools. Today, Amazon Redshift has become the most widely used cloud data warehouse.

With the significant growth of data for big data analytics over the years, some customers have asked how they should optimize Amazon Redshift workloads. In this post, we explore how to optimize workloads on Amazon Redshift clusters using Amazon Redshift RA3 nodes, data sharing, and pausing and resuming clusters. For more cost-optimization methods, refer to Getting the most out of your analytics stack with Amazon Redshift.

Key features of Amazon Redshift

First, let’s review some key features:

  • RA3 nodes – Amazon Redshift RA3 nodes are backed by a new managed storage model that gives you the power to separately optimize your compute power and your storage. They bring a few very important features, one of which is data sharing. RA3 nodes also support the ability to pause and resume, which allows you to easily suspend on-demand billing while the cluster is not being used.
  • Data sharing – Amazon Redshift data sharing offers you to extend the ease of use, performance, and cost benefits of Amazon Redshift in a single cluster to multi-cluster deployments while being able to share data. Data sharing enables instant, granular, and fast data access across Redshift clusters without the need to copy or move it. You can securely share live data with Amazon Redshift clusters in the same or different AWS accounts, and across regions. You can share data at many levels, including schemas, tables, views, and user-defined functions. You can also share the most up-to-date and consistent information as it’s updated in Amazon Redshift Serverless. It also provides fine-grained access controls that you can tailor for different users and businesses that all need access to the data. However, data sharing in Amazon Redshift has a few limitations.

Solution overview

In this use case, our customer is heavily using Amazon Redshift as their data warehouse for their analytics workloads, and they have been enjoying the possibility and convenience that Amazon Redshift brought to their business. They mainly use Amazon Redshift to store and process user behavioral data for BI purposes. The data has increased by hundreds of gigabytes daily in recent months, and employees from departments continuously run queries against the Amazon Redshift cluster on their BI platform during business hours.

The company runs four major analytics workloads on a single Amazon Redshift cluster, because some data is used by all workloads:

  • Queries from the BI platform – Various queries run mainly during business hours.
  • Hourly ETL – This extract, transform, and load (ETL) job runs in the first few minutes of each hour. It generally takes about 40 minutes.
  • Daily ETL – This job runs twice a day during business hours, because the operation team needs to get daily reports before the end of the day. Each job normally takes between 1.5–3 hours. It’s the second-most resource-heavy workload.
  • Weekly ETL – This job runs in the early morning every Sunday. It’s the most resource-heavy workload. The job normally takes 3–4 hours.

The analytics team has migrated to the RA3 family and increased the number of nodes of the Amazon Redshift cluster to 12 over the years to keep the average runtime of queries from their BI tool within an acceptable time due to the data size, especially when other workloads are running.

However, they have noticed that performance is reduced while running ETL tasks, and the duration of ETL tasks is long. Therefore, the analytics team wants to explore solutions to optimize their Amazon Redshift cluster.

Because CPU utilization spikes appear while the ETL tasks are running, the AWS team’s first thought was to separate workloads and relevant data into multiple Amazon Redshift clusters with different cluster sizes. By reducing the total number of nodes, we hoped to reduce the cost of Amazon Redshift.

After a series of conversations, the AWS team found that one of the reasons that the customer keeps all workloads on the 12-node Amazon Redshift cluster is to manage the performance of queries from their BI platform, especially while running ETL workloads, which have a big impact on the performance of all workloads on the Amazon Redshift cluster. The obstacle is that many tables in the data warehouse are required to be read and written by multiple workloads, and only the producer of a data share can update the shared data.

The challenge of dividing the Amazon Redshift cluster into multiple clusters is data consistency. Some tables need to be read by ETL workloads and written by BI workloads, and some tables are the opposite. Therefore, if we duplicate data into two Amazon Redshift clusters or only create a data share from the BI cluster to the reporting cluster, the customer will have to develop a data synchronization process to keep the data consistent between all Amazon Redshift clusters, and this process could be very complicated and unmaintainable.

After more analysis to gain an in-depth understanding of the customer’s workloads, the AWS team found that we could put tables into four groups, and proposed a multi-cluster, two-way data sharing solution. The purpose of the solution is to divide the workloads into separate Amazon Redshift clusters so that we can use Amazon Redshift to pause and resume clusters for periodic workloads to reduce the Amazon Redshift running costs, because clusters can still access a single copy of data that is required for workloads. The solution should meet the data consistency requirements without building a complicated data synchronization process.

The following diagram illustrates the old architecture (left) compared to the new multi-cluster solution (right).

Improve the old architecture (left) to the new multi-cluster solution (right)

Dividing workloads and data

Due to the characteristics of the four major workloads, we categorized workloads into two categories: long-running workloads and periodic-running workloads.

The long-running workloads are for the BI platform and hourly ETL jobs. Because the hourly ETL workload requires about 40 minutes to run, the gain is small even if we migrate it to an isolated Amazon Redshift cluster and pause and resume it every hour. Therefore, we leave it with the BI platform.

The periodic-running workloads are the daily and weekly ETL jobs. The daily job generally takes about 1 hour and 40 minutes to 3 hours, and the weekly job generally takes 3–4 hours.

Data sharing plan

The next step is identifying all data (tables) access patterns of each category. We identified four types of tables:

  • Type 1 – Tables are only read and written by long-running workloads
  • Type 2 – Tables are read and written by long-running workloads, and are also read by periodic-running workloads
  • Type 3 – Tables are read and written by periodic-running workloads, and are also read by long-running workloads
  • Type 4 – Tables are only read and written by periodic-running workloads

Fortunately, there is no table that is required to be written by all workloads. Therefore, we can separate the Amazon Redshift cluster into two Amazon Redshift clusters: one for the long-running workloads, and the other for periodic-running workloads with 20 RA3 nodes.

We created a two-way data share between the long-running cluster and the periodic-running cluster. For type 2 tables, we created a data share on the long-running cluster as the producer and the periodic-running cluster as the consumer. For type 3 tables, we created a data share on the periodic-running cluster as the producer and the long-running cluster as the consumer.

The following diagram illustrates this data sharing configuration.

The long-running cluster (producer) shares type 2 tables to the periodic-running cluster (consumer). The periodic-running cluster (producer’) shares type 3 tables to the long-running cluster (consumer’)

Build two-way data share across Amazon Redshift clusters

In this section, we walk through the steps to build a two-way data share across Amazon Redshift clusters. First, let’s take a snapshot of the original Amazon Redshift cluster, which became the long-running cluster later.

Take a snapshot of the long-running-cluster from the Amazon Redshift console

Now, let’s create a new Amazon Redshift cluster with 20 RA3 nodes for periodic-running workloads. Then we migrate the type 3 and type 4 tables to the periodic-running cluster. Make sure you choose the ra3 node type. (Amazon Redshift Serverless supports data sharing too, and it becomes generally available in July 2022, so it is also an option now.)

Create the periodic-running-cluster. Make sure you select the ra3 node type.

Create the long-to-periodic data share

The next step is to create the long-to-periodic data share. Complete the following steps:

  1. On the periodic-running cluster, get the namespace by running the following query:
SELECT current_namespace;

Make sure record the namespace.

  1. On the long-running cluster, we run queries similar to the following:
CREATE DATASHARE ltop_share SET PUBLICACCESSIBLE TRUE;
ALTER DATASHARE ltop_share ADD SCHEMA public_long;
ALTER DATASHARE ltop_share ADD ALL TABLES IN SCHEMA public_long;
GRANT USAGE ON DATASHARE ltop_share TO NAMESPACE '[periodic-running-cluster-namespace]';
  1. We can validate the long-to-periodic data share using the following command:
SHOW datashares;
  1. After we validate the data share, we get the long-running cluster namespace with the following query:
SELECT current-namespace;

Make sure record the namespace.

  1. On the periodic-running cluster, run the following command to load the data from the long-to-periodic data share with the long-running cluster namespace:
CREATE DATABASE ltop FROM DATASHARE ltop_share OF NAMESPACE '[long-running-cluster-namespace]';
  1. Confirm that we have read access to tables in the long-to-periodic data share.

Create the periodic-to-long data share

The next step is to create the periodic-to-long data share. We use the namespaces of the long-running cluster and the periodic-running cluster that we collected in the previous step.

  1. On the periodic-running cluster, run queries similar to the following to create the periodic-to-long data share:
CREATE DATASHARE ptol_share SET PUBLICACCESSIBLE TRUE;
ALTER DATASHARE ptol_share ADD SCHEMA public_periodic;
ALTER DATASHARE ptol_share ADD ALL TABLES IN SCHEMA public_periodic;
GRANT USAGE ON DATASHARE ptol_share TO NAMESPACE '[long-running-cluster-namespace]';
  1. Validate the data share using the following command:
SHOW datashares;
  1. On the long-running cluster, run the following command to load the data from the periodic-to-long data using the periodic-running cluster namespace:
CREATE DATABASE ptol FROM DATASHARE ptol_share OF NAMESPACE '[periodic-running-cluster-namespace]';
  1. Check that we have read access to the tables in the periodic-to-long data share.

At this stage, we have separated workloads into two Amazon Redshift clusters and built a two-way data share across two Amazon Redshift clusters.

The next step is updating the code of different workloads to use the correct endpoints of two Amazon Redshift clusters and perform consolidated tests.

Pause and resume the periodic-running Amazon Redshift cluster

Let’s update the crontab scripts, which run periodic-running workloads. We make two updates.

  1. When the scripts start, call the Amazon Redshift check and resume cluster APIs to resume the periodic-running Amazon Redshift cluster when the cluster is paused:
    aws redshift resume-cluster --cluster-identifier [periodic-running-cluster-id]

  2. After the workloads are finished, call the Amazon Redshift pause cluster API with the cluster ID to pause the cluster:
    aws redshift pause-cluster --cluster-identifier [periodic-running-cluster-id]

Results

After we migrated the workloads to the new architecture, the company’s analytics team ran some tests to verify the results.

According to tests, the performance of all workloads improved:

  • The BI workload is about 100% faster during the ETL workload running periods
  • The hourly ETL workload is about 50% faster
  • The daily workload duration reduced to approximately 40 minutes, from a maximum of 3 hours
  • The weekly workload duration reduced to approximately 1.5 hours, from a maximum of 4 hours

All functionalities work properly, and cost of the new architecture only increased approximately 13%, while over 10% of new data had been added during the testing period.

Learnings and limitations

After we separated the workloads into different Amazon Redshift clusters, we discovered a few things:

  • The performance of the BI workloads was 100% faster because there was no resource competition with daily and weekly ETL workloads anymore
  • The duration of ETL workloads on the periodic-running cluster was reduced significantly because there were more nodes and no resource competition from the BI and hourly ETL workloads
  • Even when over 10% new data was added, the overall cost of the Amazon Redshift clusters only increased by 13%, due to using the cluster pause and resume function of the Amazon Redshift RA3 family

As a result, we saw a 70% price-performance improvement of the Amazon Redshift cluster.

However, there are some limitations of the solution:

  • To use the Amazon Redshift pause and resume function, the code for calling the Amazon Redshift pause and resume APIs must be added to all scheduled scripts that run ETL workloads on the periodic-running cluster
  • Amazon Redshift clusters require several minutes to finish pausing and resuming, although you’re not charged during these processes
  • The size of Amazon Redshift clusters can’t automatically scale in and out depending on workloads

Next steps

After improving performance significantly, we can explore the possibility of reducing the number of nodes of the long-running cluster to reduce Amazon Redshift costs.

Another possible optimization is using Amazon Redshift Spectrum to reduce the cost of Amazon Redshift on cluster storage. With Redshift Spectrum, multiple Amazon Redshift clusters can concurrently query and retrieve the same structured and semistructured dataset in Amazon Simple Storage Service (Amazon S3) without the need to make copies of the data for each cluster or having to load the data into Amazon Redshift tables.

Amazon Redshift Serverless was announced for preview in AWS re:Invent 2021 and became generally available in July 2022. Redshift Serverless automatically provisions and intelligently scales your data warehouse capacity to deliver best-in-class performance for all your analytics. You only pay for the compute used for the duration of the workloads on a per-second basis. You can benefit from this simplicity without making any changes to your existing analytics and BI applications. You can also share data for read purposes across different Amazon Redshift Serverless instances within or across AWS accounts.

Therefore, we can explore the possibility of removing the need to script for pausing and resuming the periodic-running cluster by using Redshift Serverless to make the management easier. We can also explore the possibility of improving the granularity of workloads.

Conclusion

In this post, we discussed how to optimize workloads on Amazon Redshift clusters using RA3 nodes, data sharing, and pausing and resuming clusters. We also explored a use case implementing a multi-cluster two-way data share solution to improve workload performance with a minimum code change. If you have any questions or feedback, please leave them in the comments section.


About the authors

Jingbin Ma

Jingbin Ma is a Sr. Solutions Architect at Amazon Web Services. He helps customers build well-architected applications using AWS services. He has many years of experience working in the internet industry, and his last role was CTO of a New Zealand IT company before joining AWS. He is passionate about serverless and infrastructure as code.

Chao PanChao Pan is a Data Analytics Solutions Architect at Amazon Web Services. He’s responsible for the consultation and design of customers’ big data solution architectures. He has extensive experience in open-source big data. Outside of work, he enjoys hiking.

Optimizing TCP for high WAN throughput while preserving low latency

Post Syndicated from Mike Freemon original https://blog.cloudflare.com/optimizing-tcp-for-high-throughput-and-low-latency/

Optimizing TCP for high WAN throughput while preserving low latency

Optimizing TCP for high WAN throughput while preserving low latency

Here at Cloudflare we’re constantly working on improving our service. Our engineers are looking at hundreds of parameters of our traffic, making sure that we get better all the time.

One of the core numbers we keep a close eye on is HTTP request latency, which is important for many of our products. We regard latency spikes as bugs to be fixed. One example is the 2017 story of “Why does one NGINX worker take all the load?”, where we optimized our TCP Accept queues to improve overall latency of TCP sockets waiting for accept().

Performance tuning is a holistic endeavor, and we monitor and continuously improve a range of other performance metrics as well, including throughput. Sometimes, tradeoffs have to be made. Such a case occurred in 2015, when a latency spike was discovered in our processing of HTTP requests. The solution at the time was to set tcp_rmem to 4 MiB, which minimizes the amount of time the kernel spends on TCP collapse processing. It was this collapse processing that was causing the latency spikes. Later in this post we discuss TCP collapse processing in more detail.

The tradeoff is that using a low value for tcp_rmem limits TCP throughput over high latency links. The following graph shows the maximum throughput as a function of network latency for a window size of 2 MiB. Note that the 2 MiB corresponds to a tcp_rmem value of 4 MiB due to the tcp_adv_win_scale setting in effect at the time.

Optimizing TCP for high WAN throughput while preserving low latency

For the Cloudflare products then in existence, this was not a major problem, as connections terminate and content is served from nearby servers due to our BGP anycast routing.

Since then, we have added new products, such as Magic WAN, WARP, Spectrum, Gateway, and others. These represent new types of use cases and traffic flows.

For example, imagine you’re a typical Magic WAN customer. You have connected all of your worldwide offices together using the Cloudflare global network. While Time to First Byte still matters, Magic WAN office-to-office traffic also needs good throughput. For example, a lot of traffic over these corporate connections will be file sharing using protocols such as SMB. These are elephant flows over long fat networks. Throughput is the metric every eyeball watches as they are downloading files.

We need to continue to provide world-class low latency while simultaneously providing high throughput over high-latency connections.

Before we begin, let’s introduce the players in our game.

TCP receive window is the maximum number of unacknowledged user payload bytes the sender should transmit (bytes-in-flight) at any point in time. The size of the receive window can and does go up and down during the course of a TCP session. It is a mechanism whereby the receiver can tell the sender to stop sending if the sent packets cannot be successfully received because the receive buffers are full. It is this receive window that often limits throughput over high-latency networks.

net.ipv4.tcp_adv_win_scale is a (non-intuitive) number used to account for the overhead needed by Linux to process packets. The receive window is specified in terms of user payload bytes. Linux needs additional memory beyond that to track other data associated with packets it is processing.

The value of the receive window changes during the lifetime of a TCP session, depending on a number of factors. The maximum value that the receive window can be is limited by the amount of free memory available in the receive buffer, according to this table:

tcp_adv_win_scale TCP window size
4 15/16 * available memory in receive buffer
3 ⅞ * available memory in receive buffer
2 ¾ * available memory in receive buffer
1 ½ * available memory in receive buffer
0 available memory in receive buffer
-1 ½ * available memory in receive buffer
-2 ¼ * available memory in receive buffer
-3 ⅛ * available memory in receive buffer

We can intuitively (and correctly) understand that the amount of available memory in the receive buffer is the difference between the used memory and the maximum limit. But what is the maximum size a receive buffer can be? The answer is sk_rcvbuf.

sk_rcvbuf is a per-socket field that specifies the maximum amount of memory that a receive buffer can allocate. This can be set programmatically with the socket option SO_RCVBUF. This can sometimes be useful to do, for localhost TCP sessions, for example, but in general the use of SO_RCVBUF is not recommended.

So how is sk_rcvbuf set? The most appropriate value for that depends on the latency of the TCP session and other factors. This makes it difficult for L7 applications to know how to set these values correctly, as they will be different for every TCP session. The solution to this problem is Linux autotuning.

Linux autotuning

Linux autotuning is logic in the Linux kernel that adjusts the buffer size limits and the receive window based on actual packet processing. It takes into consideration a number of things including TCP session RTT, L7 read rates, and the amount of available host memory.

Autotuning can sometimes seem mysterious, but it is actually fairly straightforward.

The central idea is that Linux can track the rate at which the local application is reading data off of the receive queue. It also knows the session RTT. Because Linux knows these things, it can automatically increase the buffers and receive window until it reaches the point at which the application layer or network bottleneck links are the constraint on throughput (and not host buffer settings). At the same time, autotuning prevents slow local readers from having excessively large receive queues. The way autotuning does that is by limiting the receive window and its corresponding receive buffer to an appropriate size for each socket.

The values set by autotuning can be seen via the Linux “ss” command from the iproute package (e.g. “ss -tmi”).  The relevant output fields from that command are:

Recv-Q is the number of user payload bytes not yet read by the local application.

rcv_ssthresh is the window clamp, a.k.a. the maximum receive window size. This value is not known to the sender. The sender receives only the current window size, via the TCP header field. A closely-related field in the kernel, tp->window_clamp, is the maximum window size allowable based on the amount of available memory. rcv_sshthresh is the receiver-side slow-start threshold value.

skmem_r is the actual amount of memory that is allocated, which includes not only user payload (Recv-Q) but also additional memory needed by Linux to process the packet (packet metadata). This is known within the kernel as sk_rmem_alloc.

Note that there are other buffers associated with a socket, so skmem_r does not represent the total memory that a socket might have allocated. Those other buffers are not involved in the issues presented in this post.

skmem_rb is the maximum amount of memory that could be allocated by the socket for the receive buffer. This is higher than rcv_ssthresh to account for memory needed for packet processing that is not packet data. Autotuning can increase this value (up to tcp_rmem max) based on how fast the L7 application is able to read data from the socket and the RTT of the session. This is known within the kernel as sk_rcvbuf.

rcv_space is the high water mark of the rate of the local application reading from the receive buffer during any RTT. This is used internally within the kernel to adjust sk_rcvbuf.

Earlier we mentioned a setting called tcp_rmem. net.ipv4.tcp_rmem consists of three values, but in this document we are always referring to the third value (except where noted). It is a global setting that specifies the maximum amount of memory that any TCP receive buffer can allocate, i.e. the maximum permissible value that autotuning can use for sk_rcvbuf. This is essentially just a failsafe for autotuning, and under normal circumstances should play only a minor role in TCP memory management.

It’s worth mentioning that receive buffer memory is not preallocated. Memory is allocated based on actual packets arriving and sitting in the receive queue. It’s also important to realize that filling up a receive queue is not one of the criteria that autotuning uses to increase sk_rcvbuf. Indeed, preventing this type of excessive buffering (bufferbloat) is one of the benefits of autotuning.

What’s the problem?

The problem is that we must have a large TCP receive window for high BDP sessions. This is directly at odds with the latency spike problem mentioned above.

Something has to give. The laws of physics (speed of light in glass, etc.) dictate that we must use large window sizes. There is no way to get around that. So we are forced to solve the latency spikes differently.

A brief recap of the latency spike problem

Sometimes a TCP session will fill up its receive buffers. When that happens, the Linux kernel will attempt to reduce the amount of memory the receive queue is using by performing what amounts to a “defragmentation” of memory. This is called collapsing the queue. Collapsing the queue takes time, which is what drives up HTTP request latency.

We do not want to spend time collapsing TCP queues.

Why do receive queues fill up to the point where they hit the maximum memory limit? The usual situation is when the local application starts out reading data from the receive queue at one rate (triggering autotuning to raise the max receive window), followed by the local application slowing down its reading from the receive queue. This is valid behavior, and we need to handle it correctly.

Selecting sysctl values

Before exploring solutions, let’s first decide what we need as the maximum TCP window size.

As we have seen above in the discussion about BDP, the window size is determined based upon the RTT and desired throughput of the connection.

Because Linux autotuning will adjust correctly for sessions with lower RTTs and bottleneck links with lower throughput, all we need to be concerned about are the maximums.

For latency, we have chosen 300 ms as the maximum expected latency, as that is the measured latency between our Zurich and Sydney facilities. It seems reasonable enough as a worst-case latency under normal circumstances.

For throughput, although we have very fast and modern hardware on the Cloudflare global network, we don’t expect a single TCP session to saturate the hardware. We have arbitrarily chosen 3500 mbps as the highest supported throughput for our highest latency TCP sessions.

The calculation for those numbers results in a BDP of 131MB, which we round to the more aesthetic value of 128 MiB.

Recall that allocation of TCP memory includes metadata overhead in addition to packet data. The ratio of actual amount of memory allocated to user payload size varies, depending on NIC driver settings, packet size, and other factors. For full-sized packets on some of our hardware, we have measured average allocations up to 3 times the packet data size. In order to reduce the frequency of TCP collapse on our servers, we set tcp_adv_win_scale to -2. From the table above, we know that the max window size will be ¼ of the max buffer space.

We end up with the following sysctl values:

net.ipv4.tcp_rmem = 8192 262144 536870912
net.ipv4.tcp_wmem = 4096 16384 536870912
net.ipv4.tcp_adv_win_scale = -2

A tcp_rmem of 512MiB and tcp_adv_win_scale of -2 results in a maximum window size that autotuning can set of 128 MiB, our desired value.

Disabling TCP collapse

Patient: Doctor, it hurts when we collapse the TCP receive queue.

Doctor: Then don’t do that!

Generally speaking, when a packet arrives at a buffer when the buffer is full, the packet gets dropped. In the case of these receive buffers, Linux tries to “save the packet” when the buffer is full by collapsing the receive queue. Frequently this is successful, but it is not guaranteed to be, and it takes time.

There are no problems created by immediately just dropping the packet instead of trying to save it. The receive queue is full anyway, so the local receiver application still has data to read. The sender’s congestion control will notice the drop and/or ZeroWindow and will respond appropriately. Everything will continue working as designed.

At present, there is no setting provided by Linux to disable the TCP collapse. We developed an in-house patch to the kernel to disable the TCP collapse logic.

Kernel patch – Attempt #1

The kernel patch for our first attempt was straightforward. At the top of tcp_try_rmem_schedule(), if the memory allocation fails, we simply return (after pred_flag = 0 and tcp_sack_reset()), thus completely skipping the tcp_collapse and related logic.

It didn’t work.

Although we eliminated the latency spikes while using large buffer limits, we did not observe the throughput we expected.

One of the realizations we made as we investigated the situation was that standard network benchmarking tools such as iperf3 and similar do not expose the problem we are trying to solve. iperf3 does not fill the receive queue. Linux autotuning does not open the TCP window large enough. Autotuning is working perfectly for our well-behaved benchmarking program.

We need application-layer software that is slightly less well-behaved, one that exercises the autotuning logic under test. So we wrote one.

A new benchmarking tool

Anomalies were seen during our “Attempt #1” that negatively impacted throughput. The anomalies were seen only under certain specific conditions, and we realized we needed a better benchmarking tool to detect and measure the performance impact of those anomalies.

This tool has turned into an invaluable resource during the development of this patch and raised confidence in our solution.

It consists of two Python programs. The reader opens a TCP session to the daemon, at which point the daemon starts sending user payload as fast as it can, and never stops sending.

The reader, on the other hand, starts and stops reading in a way to open up the TCP receive window wide open and then repeatedly causes the buffers to fill up completely. More specifically, the reader implemented this logic:

  1. reads as fast as it can, for five seconds
    • this is called fast mode
    • opens up the window
  2. calculates 5% of the high watermark of the bytes reader during any previous one second
  3. for each second of the next 15 seconds:
    • this is called slow mode
    • reads that 5% number of bytes, then stops reading
    • sleeps for the remainder of that particular second
    • most of the second consists of no reading at all
  4. steps 1-3 are repeated in a loop three times, so the entire run is 60 seconds

This has the effect of highlighting any issues in the handling of packets when the buffers repeatedly hit the limit.

Revisiting default Linux behavior

Taking a step back, let’s look at the default Linux behavior. The following is kernel v5.15.16.

Optimizing TCP for high WAN throughput while preserving low latency

The Linux kernel is effective at freeing up space in order to make room for incoming packets when the receive buffer memory limit is hit. As documented previously, the cost for saving these packets (i.e. not dropping them) is latency.

However, the latency spikes, in milliseconds, for tcp_try_rmem_schedule(), are:

tcp_rmem 170 MiB, tcp_adv_win_scale +2 (170p2):

@ms:
[0]       27093 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[1]           0 |
[2, 4)        0 |
[4, 8)        0 |
[8, 16)       0 |
[16, 32)      0 |
[32, 64)     16 |

tcp_rmem 146 MiB, tcp_adv_win_scale +3 (146p3):

@ms:
(..., 16)  25984 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[16, 20)       0 |
[20, 24)       0 |
[24, 28)       0 |
[28, 32)       0 |
[32, 36)       0 |
[36, 40)       0 |
[40, 44)       1 |
[44, 48)       6 |
[48, 52)       6 |
[52, 56)       3 |

tcp_rmem 137 MiB, tcp_adv_win_scale +4 (137p4):

@ms:
(..., 16)  37222 |@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
[16, 20)       0 |
[20, 24)       0 |
[24, 28)       0 |
[28, 32)       0 |
[32, 36)       0 |
[36, 40)       1 |
[40, 44)       8 |
[44, 48)       2 |

These are the latency spikes we cannot have on the Cloudflare global network.

Kernel patch – Attempt #2

So the “something” that was not working in Attempt #1 was that the receive queue memory limit was hit early on as the flow was just ramping up (when the values for sk_rmem_alloc and sk_rcvbuf were small, ~800KB). This occurred at about the two second mark for 137p4 test (about 2.25 seconds for 170p2).

In hindsight, we should have noticed that tcp_prune_queue() actually raises sk_rcvbuf when it can. So we modified the patch in response to that, added a guard to allow the collapse to execute when sk_rmem_alloc is less than the threshold value.

net.ipv4.tcp_collapse_max_bytes = 6291456

The next section discusses how we arrived at this value for tcp_collapse_max_bytes.

The patch is available here.

The results with the new patch are as follows:

oscil – 300ms tests

Optimizing TCP for high WAN throughput while preserving low latency

oscil – 20ms tests

Optimizing TCP for high WAN throughput while preserving low latency

oscil – 0ms tests

Optimizing TCP for high WAN throughput while preserving low latency

iperf3 – 300 ms tests

Optimizing TCP for high WAN throughput while preserving low latency

iperf3 – 20 ms tests

Optimizing TCP for high WAN throughput while preserving low latency

iperf3 – 0ms tests

Optimizing TCP for high WAN throughput while preserving low latency

All tests are successful.

Setting tcp_collapse_max_bytes

In order to determine this setting, we need to understand what the biggest queue we can collapse without incurring unacceptable latency.

Optimizing TCP for high WAN throughput while preserving low latency
Optimizing TCP for high WAN throughput while preserving low latency

Using 6 MiB should result in a maximum latency of no more than 2 ms.

Cloudflare production network results

Current production settings (“Old”)

net.ipv4.tcp_rmem = 8192 2097152 16777216
net.ipv4.tcp_wmem = 4096 16384 33554432
net.ipv4.tcp_adv_win_scale = -2
net.ipv4.tcp_collapse_max_bytes = 0
net.ipv4.tcp_notsent_lowat = 4294967295

tcp_collapse_max_bytes of 0 means that the custom feature is disabled and that the vanilla kernel logic is used for TCP collapse processing.

New settings under test (“New”)

net.ipv4.tcp_rmem = 8192 262144 536870912
net.ipv4.tcp_wmem = 4096 16384 536870912
net.ipv4.tcp_adv_win_scale = -2
net.ipv4.tcp_collapse_max_bytes = 6291456
net.ipv4.tcp_notsent_lowat = 131072

The tcp_notsent_lowat setting is discussed in the last section of this post.

The middle value of tcp_rmem was changed as a result of separate work that found that Linux autotuning was setting receive buffers too high for localhost sessions. This updated setting reduces TCP memory usage for those sessions, but does not change anything about the type of TCP sessions that is the focus of this post.

For the following benchmarks, we used non-Cloudflare host machines in Iowa, US, and Melbourne, Australia performing data transfers to the Cloudflare data center in Marseille, France. In Marseille, we have some hosts configured with the existing production settings, and others with the system settings described in this post. Software used is perf3 version 3.9, kernel 5.15.32.

Throughput results

Optimizing TCP for high WAN throughput while preserving low latency

RTT (ms) Throughput with Current Settings (mbps) Throughput with New Settings (mbps) Increase Factor
Iowa to Marseille 121 276 6600 24x
Melbourne to Marseille 282 120 3800 32x

Iowa-Marseille throughput

Optimizing TCP for high WAN throughput while preserving low latency

Iowa-Marseille receive window and bytes-in-flight

Optimizing TCP for high WAN throughput while preserving low latency

Melbourne-Marseille throughput

Optimizing TCP for high WAN throughput while preserving low latency

Melbourne-Marseille receive window and bytes-in-flight

Optimizing TCP for high WAN throughput while preserving low latency

Even with the new settings in place, the Melbourne to Marseille performance is limited by the receive window on the Cloudflare host. This means that further adjustments to these settings yield even higher throughput.

Latency results

The Y-axis on these charts are the 99th percentile time for TCP collapse in seconds.

Cloudflare hosts in Marseille running the current production settings

Optimizing TCP for high WAN throughput while preserving low latency

Cloudflare hosts in Marseille running the new settings

Optimizing TCP for high WAN throughput while preserving low latency

The takeaway in looking at these graphs is that maximum TCP collapse time for the new settings is no worse than with the current production settings. This is the desired result.

Send Buffers

What we have shown so far is that the receiver side seems to be working well, but what about the sender side?

As part of this work, we are setting tcp_wmem max to 512 MiB. For oscillating reader flows, this can cause the send buffer to become quite large. This represents bufferbloat and wasted kernel memory, both things that nobody likes or wants.

Fortunately, there is already a solution: tcp_notsent_lowat. This setting limits the size of unsent bytes in the write queue. More details can be found at https://lwn.net/Articles/560082.

The results are significant:

Optimizing TCP for high WAN throughput while preserving low latency

The RTT for these tests was 466ms. Throughput is not negatively affected. Throughput is at full wire speed in all cases (1 Gbps). Memory usage is as reported by /proc/net/sockstat, TCP mem.

Our web servers already set tcp_notsent_lowat to 131072 for its sockets. All other senders are using 4 GiB, the default value. We are changing the sysctl so that 131072 is in effect for all senders running on the server.

Conclusion

The goal of this work is to open the throughput floodgates for high BDP connections while simultaneously ensuring very low HTTP request latency.

We have accomplished that goal.

Optimizing data warehouse storage

Post Syndicated from Netflix Technology Blog original https://netflixtechblog.com/optimizing-data-warehouse-storage-7b94a48fdcbe

By Anupom Syam

Background

At Netflix, our current data warehouse contains hundreds of Petabytes of data stored in AWS S3, and each day we ingest and create additional Petabytes. At this scale, we can gain a significant amount of performance and cost benefits by optimizing the storage layout (records, objects, partitions) as the data lands into our warehouse.

There are several benefits of such optimizations like saving on storage, faster query time, cheaper downstream processing, and an increase in developer productivity by removing additional ETLs written only for query performance improvement. On the other hand, these optimizations themselves need to be sufficiently inexpensive to justify their own processing cost over the gains they bring.

We built AutoOptimize to efficiently and transparently optimize the data and metadata storage layout while maximizing their cost and performance benefits.

This article will list some of the use cases of AutoOptimize, discuss the design principles that help enhance efficiency, and present the high-level architecture. Then deep dive into the merging use case of AutoOptimize and share some results and benefits.

Use cases

We found several use cases where a system like AutoOptimize can bring tons of value. Some of the optimizations are prerequisites for a high-performance data warehouse. Sometimes Data Engineers write downstream ETLs on ingested data to optimize the data/metadata layouts to make other ETL processes cheaper and faster. The goal of AutoOptimize is to centralize such optimizations that will remove duplicate work and while doing it more efficiently than vanilla ETLs.

Merge

As the data lands into the data warehouse through real-time data ingestion systems, it comes in different sizes. This results in a perpetually increasing number of small files across the partitions. Merging those numerous smaller files into a handful of larger files can make query processing faster and reduce storage space.

Sort

Presorted records and files in partitions make queries faster and save significant amounts of storage space as it enables a higher level of compression. We already had some existing tables with sorting stages to reduce table storage and improve downstream query performance.

Compaction

Modern data warehouses allow updating and deleting pre-existing records. Iceberg plans to enable this in the form of delta files. Over time, the number of delta files grows, and compacting them to their source files can make the read operations more optimal.

Metadata optimization

In Iceberg, the physical partitioning is decoupled from logical partitioning by keeping a map to file locations in the metadata. This enables us to add additional indexes in the metadata to make point queries more optimal. We can also reorganize the metadata to make file scanning much faster.

Design Principles

For AutoOptimize to efficiently optimize the data layout, we’ve made the following choices:

  1. Just in time vs. periodic optimization
    Only optimize a given data set when required (based on what changed) instead of blind periodic runs.
  2. Essential vs. complete optimization
    Allow users to optimize at the point of diminishing returns instead of a binary setting. For example, we allow a partition to have a few small files instead of always merging files in perfect sizes.
  3. Minimum replacement vs. full overwrite
    Only replace the required minimum amount of files instead of a full sweep overwrite.

These principles reduce resource usage by being more efficient and effective while lowering the end-to-end latency in data processing.

Other than these principles, there are some other design considerations to support and enable:

  • Multi-tenancy with database and table prioritization.
  • Both automatic (event-driven) as well as manual (ad-hoc) optimization.
  • Transparency to end-users.

High-Level Design

AutoOptimize High-Level Design

AutoOptimize is split into 2 subsystems (Service and Actors) to decouple the decisions from the actions at a high level. This decoupling of responsibilities helps us to design, manage, use, and scale the subsystems independently.

AutoOptimize Service

The service is the decision-maker. It decides what to do and when to do in response to an incoming event. It is responsible for listening to incoming events and requests and prioritizing different tables and actions to make the best usage of the available resources.

The work done in the service can be further broken down into the following 3 steps:

Observe: Listen to changes in the warehouse in near real-time. Also, respond to ad-hoc requests created manually by end-users.

Orient: Gather tuning parameters for a particular table that changed. Also, adjust the resource allocation for the table or the number of actors depending on the backlog.

Decide: Determine the highest value action with the right parameters for this particular change and when to act depending on how the action falls in the global priority across all tables and actions.

In AutoOptimize, the service is a cluster of Java (Spring Boot) applications using Redis to keep the states.

AutoOptimize Actors

Actors in AutoOptimize are responsible for the actual work (merging/sorting/compaction etc.). The AutoOptimize Service sends commands to the actors that specify what to do. The job of Actors is to perform those commands in a distributed and fault-tolerant manner.

Actors in AutoOptimize are a pool of long-running Spark jobs managed by the AutoOptimize service.

This was not intentional but we found that the way we modularized AutoOptimize’s decision-making workflow is very similar to the OODA loop and decided to use the same taxonomy.

Other Components

Iceberg
We use Apache Iceberg as the table format. AutoOptimize relies on some of the Iceberg specific features such as snapshot and atomic operations to perform the optimizations in an accurate and scalable manner.

AutoAnalyze
In short, AutoAnalyze finds the best tuning/configuration parameters for a table. It uses “What-If” experiments and previous experiences and heuristics to find the most fitting attributes for a table. We will publish a follow-up blog post about AutoAnalyze in the future. For AutoOptimize, it may find if a table needs file merging or suggest a target file size and other parameters.

Deep Dive into File Merge

File merge is the first use-case that we built for AutoOptimize. Previously we had our homegrown system called Ursula responsible for data ingestion into the Hive based warehouse. The Ursula based pipeline also performed file merges on the ingested table partitions periodically. Since then, we have moved our ingestion to Keystone and our table layout to Iceberg.

The migration out of Ursula to Keystone/Iceberg based ingestion initiated the need for a replacement for Ursula file merge. File merging is necessary for a low latency streaming ingestion pipeline as data often arrive late and unevenly. The number of small files cripples across partitions over time and can have some serious side effects like:

  1. Slowing down queries.
  2. More processing resources.
  3. Increase in storage space.

The goal of File merge in AutoOptimize is to efficiently reduce the side effects while not adding additional latency to the data pipeline.

Solutions

This section will discuss some of the solutions that helped us achieve the previously stated goals.

Just in time optimization

AutoOptimize file merge gets triggered via table change events. This allows AutoOptimize to act right away with a minimum lag. But the problem with being event-driven is it’s expensive to scan the changed partitions every time they change. If we can determine “how noisy” a partition is from the changesets in a rolling manner, we will eliminate unnecessary full partition scanning with early signals from snapshots.

Essential work

After a full partition scan, AutoOptimize gets a more comprehensive view of the state of the partition. We can get a more accurate state of the partition at this stage and avoid non-essential work.

Partition Entropy
We introduced a concept called Partition Entropy (PE) used for early pruning at each step to reduce actual work. It’s a set of stats about the state of the partition. We calculate this in a rolling manner after each snapshot scan and more exhaustively after each partition scan.

The parts of PE that deal with file sizes are called File Size Entropy (FSE). FSE of a partition is derived from the Mean Squared Error (MSE) of file sizes in a partition. We will use the terms FSE and MSE interchangeably.

We use the standard Mean Squared Error formula:

Where,

N = Number of files in the partition
Target = Target File Size
Actual = min(Actual File Size, Target)

When a partition is scanned, it’s easy to calculate the MSE using the above formula as we know the sizes of all files in that partition. We store the MSE and N for each partition in Redis for later use.

At the snapshot scan stage, we get a commit definition containing the list of files and their metadata (like size, number of records, etc.) that got added and deleted in the commit. We calculate the new MSE’ of a changed partition in a rolling manner from the snapshot information and the previously stored stats using this formula:

Where,

M = Number of files added in the snapshot.
Target = Target File Size.
Actual = min(Actual File Size, Target)
N = Previously stored number of files in the partition.
MSE = Previously stored MSE.

We have a tolerance threshold (T) for each partition and skip further processing of the partition if MSE < T². This helps us significantly reduce the number of full partition scans at the snapshot scan step and the number of actual merges in the partition scan stage.

Entropy-Based Filtering

The actual formulas are a little bit more complicated than what stated here, as we need to take care of deleted files and some other edge cases. We could also use Mean Absolute Error but we want to be biased towards outliers — as the goal is to have a more even file size in a partition than having a mixed bag of different sizes with some perfect sized files.

Minimum replacement

Once we start processing a partition, we find the minimum amount of work needed to reduce the File Size Entropy and thus reduce the number of small files.

We use 2 different packing algorithms to achieve this:

Knuth/Plass line breaking algorithm
We use this strategy when the sort order among files is important. With a correct error function (ex: Error²), this algorithm helps minimize MSE with a bounding run time of O(n²).

First Fit Decreasing bin packing algorithm
We use a modified version of the original FFD algorithm if we can ignore the sort order. This helps reduce the number of replacements with an O(nlog(n)) running time.

These methods help us smooth out the file size histogram while doing it optimally with minimal file replacement.

Multi-tenancy

AutoOptimize is multi-tenant; that is, it runs on many different databases and tables. When running the optimizations, it also needs to prioritize and allocate resources at different levels for different tasks. It requires answering questions like which table should be processed first or get more resource bandwidth or what optimization gives the most ROI.

To support multi-tenancy and tasks prioritization, it needs to have the following properties:

  • Weighted resource sharing across different priorities.
  • Fair resource sharing across different tables and tasks with the same priority.
  • Handle bursts to prevent starvation.

We use different types of Weighted Fair Queue implementations inside AutoOptimize, including different combinations of the followings:

  1. Weighted Round Robin
  2. Deficit Weighted Round Robin
  3. Fixed Priority Preemptive

Reliable Priority Queue
To support prioritization and fair resource usage, we introduced a concept called Reliable Priority Queue (RPQ) in AutoOptimize. A reliable queue does not lose items if the subscriber fails to process the items after a dequeue. An RPQ also has a sense of prioritization across different items while being reliable. The concept is fairly similar to the default Redis RPOPLPUSH reliable queue pattern. But for AutoOptimize’s use case, we use Sorted Sets instead of lists to enable prioritization.

The goal of AutoOptimize is to optimize the warehouse with a holistic perspective. Making it multi-tenant with a notion of different priorities helps us make the most optimal resource allocation.

Results

22% reduction in partition scans

2% reduction in merge actions

72% reduction in file replacements

These savings are stacked on top of each other as they are applied in sequence in the AutoOptimize pipeline. This results in a massive reduction in actual processing need while reducing the number of files by 80%.

80% reduction in the number of files

70% saving in compute

We are using 70% less compute instances than our previous merge implementation.

We also see up to 60% improvement in query performance and an additional 1% saving in storage.

Benefits

Increase processing efficiency: As AutoOptimize uses file replacement and can avoid processing by filtering early, it can save processing costs by skipping files that are not required to be merged.

Increase storage efficiency: AutoOptimize helps save storage costs by enabling AutoAnalyze recommendations to sort the records.

Reduce lag: Periodic overwrite ETLs take more time as it works in batches. AutoOptimize reduces end to end lag in data processing by optimizing as we go.

Faster query: A smaller number of files results in smaller file scanning, fewer network calls, and makes queries faster.

Ease of use: AutoOptimize provides a frictionless way to setup optimization with minimum maintenance overhead from Data Engineering.

Developer productivity: Instead of adding an ETL per table for merging, which adds ongoing incremental maintenance cost, we have a single solution that can transparently scale to many tables.

Conclusion

We believe the problems we faced at Netflix are not unique, and some of the techniques and design considerations we made can be applied more generally. By laying out the data intelligently as they are ingested into the warehouse, we are removing complexities for Data Engineers and accelerating the end-to-end pipeline. At the same time, we are gaining a significant amount of performance and cost improvement by optimizing only when it makes sense. We plan to extend AutoOptimize into other use cases and integrate it more with the Iceberg ecosystem in the future.


Optimizing data warehouse storage was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Computing Euclidean distance on 144 dimensions

Post Syndicated from Marek Majkowski original https://blog.cloudflare.com/computing-euclidean-distance-on-144-dimensions/

Computing Euclidean distance on 144 dimensions

Computing Euclidean distance on 144 dimensions

Late last year I read a blog post about our CSAM image scanning tool. I remember thinking: this is so cool! Image processing is always hard, and deploying a real image identification system at Cloudflare is no small achievement!

Some time later, I was chatting with Kornel: “We have all the pieces in the image processing pipeline, but we are struggling with the performance of one component.” Scaling to Cloudflare needs ain’t easy!

The problem was in the speed of the matching algorithm itself. Let me elaborate. As John explained in his blog post, the image matching algorithm creates a fuzzy hash from a processed image. The hash is exactly 144 bytes long. For example, it might look like this:

00e308346a494a188e1043333147267a 653a16b94c33417c12b433095c318012
5612442030d14a4ce82c623f4e224733 1dd84436734e4a5d6e25332e507a8218
6e3b89174e30372d

The hash is designed to be used in a fuzzy matching algorithm that can find “nearby”, related images. The specific algorithm is well defined, but making it fast is left to the programmer — and at Cloudflare we need the matching to be done super fast. We want to match thousands of hashes per second, of images passing through our network, against a database of millions of known images. To make this work, we need to seriously optimize the matching algorithm.

Naive quadratic algorithm

The first algorithm that comes to mind has O(K*N) complexity: for each query, go through every hash in the database. In naive implementation, this creates a lot of work. But how much work exactly?

First, we need to explain how fuzzy matching works.

Given a query hash, the fuzzy match is the “closest” hash in a database. This requires us to define a distance. We treat each hash as a vector containing 144 numbers, identifying a point in a 144-dimensional space. Given two such points, we can calculate the distance using the standard Euclidean formula.

For our particular problem, though, we are interested in the “closest” match in a database only if the distance is lower than some predefined threshold. Otherwise, when the distance is large,  we can assume the images aren’t similar. This is the expected result — most of our queries will not have a related image in the database.

The Euclidean distance equation used by the algorithm is standard:

Computing Euclidean distance on 144 dimensions

To calculate the distance between two 144-byte hashes, we take each byte, calculate the delta, square it, sum it to an accumulator, do a square root, and ta-dah! We have the distance!

Here’s how to count the squared distance in C:

Computing Euclidean distance on 144 dimensions

This function returns the squared distance. We avoid computing the actual distance to save us from running the square root function – it’s slow. Inside the code, for performance and simplicity, we’ll mostly operate on the squared value. We don’t need the actual distance value, we just need to find the vector with the smallest one. In our case it doesn’t matter if we’ll compare distances or squared distances!

As you can see, fuzzy matching is basically a standard problem of finding the closest point in a multi-dimensional space. Surely this has been solved in the past — but let’s not jump ahead.

While this code might be simple, we expect it to be rather slow. Finding the smallest hash distance in a database of, say, 1M entries, would require going over all records, and would need at least:

  1. 144 * 1M subtractions
  2. 144 * 1M multiplications
  3. 144 * 1M additions

And more. This alone adds up to 432 million operations! How does it look in practice? To illustrate this blog post we prepared a full test suite. The large database of known hashes can be well emulated by random data. The query hashes can’t be random and must be slightly more sophisticated, otherwise the exercise wouldn’t be that interesting. We generated the test smartly by byte-swaps of the actual data from the database — this allows us to precisely control the distance between test hashes and database hashes. Take a look at the scripts for details. Here’s our first run of the first, naive, algorithm:

$ make naive
< test-vector.txt ./mmdist-naive > test-vector.tmp
Total: 85261.833ms, 1536 items, avg 55.509ms per query, 18.015 qps

We matched 1,536 test hashes against a database of 1 million random vectors in 85 seconds. It took 55ms of CPU time on average to find the closest neighbour. This is rather slow for our needs.

SIMD for help

An obvious improvement is to use more complex SIMD instructions. SIMD is a way to instruct the CPU to process multiple data points using one instruction. This is a perfect strategy when dealing with vector problems — as is the case for our task.

We settled on using AVX2, with 256 bit vectors. We did this for a simple reason — newer AVX versions are not supported by our AMD CPUs. Additionally, in the past, we were not thrilled by the AVX-512 frequency scaling.

Using AVX2 is easier said than done. There is no single instruction to count Euclidean distance between two uint8 vectors! The fastest way of counting the full distance of two 144-byte vectors with AVX2 we could find is authored by Vlad:

Computing Euclidean distance on 144 dimensions

It’s actually simpler than it looks: load 16 bytes, convert vector from uint8 to int16, subtract the vector, store intermediate sums as int32, repeat. At the end, we need to do complex 4 instructions to extract the partial sums into the final sum. This AVX2 code improves the performance around 3x:

$ make naive-avx2 
Total: 25911.126ms, 1536 items, avg 16.869ms per query, 59.280 qps

We measured 17ms per item, which is still below our expectations. Unfortunately, we can’t push it much further without major changes. The problem is that this code is limited by memory bandwidth. The measurements come from my Intel i7-5557U CPU, which has the max theoretical memory bandwidth of just 25GB/s. The database of 1 million entries takes 137MiB, so it takes at least 5ms to feed the database to my CPU. With this naive algorithm we won’t be able to go below that.

Vantage Point Tree algorithm

Since the naive brute force approach failed, we tried using more sophisticated algorithms. My colleague Kornel Lesiński implemented a super cool Vantage Point algorithm. After a few ups and downs, optimizations and rewrites, we gave up. Our problem turned out to be unusually hard for this kind of algorithm.

We observed “the curse of dimensionality”. Space partitioning algorithms don’t work well in problems with large dimensionality — and in our case, we have an enormous number of 144 dimensions. K-D trees are doomed. Locality-sensitive hashing is also doomed. It’s a bizarre situation in which the space is unimaginably vast, but everything is close together. The volume of the space is a 347-digit-long number, but the maximum distance between points is just 3060 – sqrt(255*255*144).

Space partitioning algorithms are fast, because they gradually narrow the search space as they get closer to finding the closest point. But in our case, the common query is never close to any point in the set, so the search space can’t be narrowed to a meaningful degree.

A VP-tree was a promising candidate, because it operates only on distances, subdividing space into near and far partitions, like a binary tree. When it has a close match, it can be very fast, and doesn’t need to visit more than O(log(N)) nodes. For non-matches, its speed drops dramatically. The algorithm ends up visiting nearly half of the nodes in the tree. Everything is close together in 144 dimensions! Even though the algorithm avoided visiting more than half of the nodes in the tree, the cost of visiting remaining nodes was higher, so the search ended up being slower overall.

Smarter brute force?

This experience got us thinking. Since space partitioning algorithms can’t narrow down the search, and still need to go over a very large number of items, maybe we should focus on going over all the hashes, extremely quickly. We must be smarter about memory bandwidth though — it was the limiting factor in the naive brute force approach before.

Perhaps we don’t need to fetch all the data from memory.

Short distance

The breakthrough came from the realization that we don’t need to count the full distance between hashes. Instead, we can compute only a subset of dimensions, say 32 out of the total of 144. If this distance is already large, then there is no need to compute the full one! Computing more points is not going to reduce the Euclidean distance.

The proposed algorithm works as follows:

1. Take the query hash and extract a 32-byte short hash from it

2. Go over all the 1 million 32-byte short hashes from the database. They must be densely packed in the memory to allow the CPU to perform good prefetching and avoid reading data we won’t need.

3. If the distance of the 32-byte short hash is greater or equal a best score so far, move on

4. Otherwise, investigate the hash thoroughly and compute the full distance.

Even though this algorithm needs to do less arithmetic and memory work, it’s not faster than the previous naive one. See make short-avx2. The problem is: we still need to compute a full distance for hashes that are promising, and there are quite a lot of them. Computing the full distance for promising hashes adds enough work, both in ALU and memory latency, to offset the gains of this algorithm.

There is one detail of our particular application of the image matching problem that will help us a lot moving forward. As we described earlier, the problem is less about finding the closest neighbour and more about proving that the neighbour with a reasonable distance doesn’t exist. Remember — in practice, we don’t expect to find many matches! We expect almost every image we feed into the algorithm to be unrelated to image hashes stored in the database.

It’s sufficient for our algorithm to prove that no neighbour exists within a predefined distance threshold. Let’s assume we are not interested in hashes more distant than, say, 220, which squared is 48,400. This makes our short-distance algorithm variation work much better:

$ make short-avx2-threshold
Total: 4994.435ms, 1536 items, avg 3.252ms per query, 307.542 qps

Origin distance variation

Computing Euclidean distance on 144 dimensions

At some point, John noted that the threshold allows additional optimization. We can order the hashes by their distance from some origin point. Given a query hash which has origin distance of A, we can inspect only hashes which are distant between |A-threshold| and |A+threshold| from the origin. This is pretty much how each level of Vantage Point Tree works, just simplified. This optimization — ordering items in the database by their distance from origin point — is relatively simple and can help save us a bit of work.

While great on paper, this method doesn’t introduce much gain in practice, as the vectors are not grouped in clusters — they are pretty much random! For the threshold values we are interested in, the origin distance algorithm variation gives us ~20% speed boost, which is okay but not breathtaking. This change might bring more benefits if we ever decide to reduce the threshold value, so it might be worth doing for production implementation. However, it doesn’t work well with query batching.

Transposing data for better AVX

But we’re not done with AVX optimizations! The usual problem with AVX is that the instructions don’t normally fit a specific problem. Some serious mind twisting is required to adapt the right instruction to the problem, or to reverse the problem so that a specific instruction can be used. AVX2 doesn’t have useful “horizontal” uint16 subtract, multiply and add operations. For example, _mm_hadd_epi16 exists, but it’s slow and cumbersome.

Instead, we can twist the problem to make use of fast available uint16 operands. For example we can use:

  1. _mm256_sub_epi16
  2. _mm256_mullo_epi16
  3. and _mm256_add_epu16.

The add would overflow in our case, but fortunately there is add-saturate _mm256_adds_epu16.

The saturated add is great and saves us conversion to uint32. It just adds a small limitation: the threshold passed to the program (i.e., the max squared distance) must fit into uint16. However, this is fine for us.

To effectively use these instructions we need to transpose the data in the database. Instead of storing hashes in rows, we can store them in columns:

Computing Euclidean distance on 144 dimensions

So instead of:

  1. [a1, a2, a3],
  2. [b1, b2, b3],
  3. [c1, c2, c3],

We can lay it out in memory transposed:

  1. [a1, b1, c1],
  2. [a2, b2, c2],
  3. [a3, b3, c3],

Now we can load 16 first bytes of hashes using one memory operation. In the next step, we can subtract the first byte of the querying hash using a single instruction, and so on. The algorithm stays exactly the same as defined above; we just make the data easier to load and easier to process for AVX.

The hot loop code even looks relatively pretty:

Computing Euclidean distance on 144 dimensions

With the well-tuned batch size and short distance size parameters we can see the performance of this algorithm:

$ make short-inv-avx2
Total: 1118.669ms, 1536 items, avg 0.728ms per query, 1373.062 qps

Whoa! This is pretty awesome. We started from 55ms per query, and we finished with just 0.73ms. There are further micro-optimizations possible, like memory prefetching or using huge pages to reduce page faults, but they have diminishing returns at this point.

Computing Euclidean distance on 144 dimensions
Roofline model from Denis Bakhvalov’s book‌‌

If you are interested in architectural tuning such as this, take a look at the new performance book by Denis Bakhvalov. It discusses roofline model analysis, which is pretty much what we did here.

Do take a look at our code and tell us if we missed some optimization!

Summary

What an optimization journey! We jumped between memory and ALU bottlenecked code. We discussed more sophisticated algorithms, but in the end, a brute force algorithm — although tuned — gave us the best results.

To get even better numbers, I experimented with Nvidia GPU using CUDA. The CUDA intrinsics like vabsdiff4 and dp4a fit the problem perfectly. The V100 gave us some amazing numbers, but I wasn’t fully satisfied with it. Considering how many AMD Ryzen cores with AVX2 we can get for the cost of a single server-grade GPU, we leaned towards general purpose computing for this particular problem.

This is a great example of the type of complexities we deal with every day. Making even the best technologies work “at Cloudflare scale” requires thinking outside the box. Sometimes we rewrite the solution dozens of times before we find the optimal one. And sometimes we settle on a brute-force algorithm, just very very optimized.

The computation of hashes and image matching are challenging problems that require running very CPU intensive operations.. The CPU we have available on the edge is scarce and workloads like this are incredibly expensive. Even with the optimization work talked about in this blog post, running the CSAM scanner at scale is a challenge and has required a huge engineering effort. And we’re not done! We need to solve more hard problems before we’re satisfied. If you want to help, consider applying!

My internship: Brotli compression using a reduced dictionary

Post Syndicated from Felix Hanau original https://blog.cloudflare.com/brotli-compression-using-a-reduced-dictionary/

My internship: Brotli compression using a reduced dictionary

Brotli is a state of the art lossless compression format, supported by all major browsers. It is capable of achieving considerably better compression ratios than the ubiquitous gzip, and is rapidly gaining in popularity. Cloudflare uses the Google brotli library to dynamically compress web content whenever possible. In 2015, we took an in-depth look at how brotli works and its compression advantages.

One of the more interesting features of the brotli file format, in the context of textual web content compression, is the inclusion of a built-in static dictionary. The dictionary is quite large, and in addition to containing various strings in multiple languages, it also supports the option to apply multiple transformations to those words, increasing its versatility.

The open sourced brotli library, that implements an encoder and decoder for brotli, has 11 predefined quality levels for the encoder, with higher quality level demanding more CPU in exchange for a better compression ratio. The static dictionary feature is used to a limited extent starting with level 5, and to the full extent only at levels 10 and 11, due to the high CPU cost of this feature.

We improve on the limited dictionary use approach and add optimizations to improve the compression at levels 5 through 9 at a negligible performance impact when compressing web content.

Brotli Static Dictionary

Brotli primarily uses the LZ77 algorithm to compress its data. Our previous blog post about brotli provides an introduction.

To improve compression on text files and web content, brotli also includes a static, predefined dictionary. If a byte sequence cannot be matched with an earlier sequence using LZ77 the encoder will try to match the sequence with a reference to the static dictionary, possibly using one of the multiple transforms. For example, every HTML file contains the opening <html> tag that cannot be compressed with LZ77, as it is unique, but it is contained in the brotli static dictionary and will be replaced by a reference to it. The reference generally takes less space than the sequence itself, which decreases the compressed file size.

The dictionary contains 13,504 words in six languages, with lengths from 4 to 24 characters. To improve the compression of real-world text and web data, some dictionary words are common phrases (“The current”) or strings common in web content (‘type=”text/javascript”’). Unlike usual LZ77 compression, a word from the dictionary can only be matched as a whole. Starting a match in the middle of a dictionary word, ending it before the end of a word or even extending into the next word is not supported by the brotli format.

Instead, the dictionary supports 120 transforms of dictionary words to support a larger number of matches and find longer matches. The transforms include adding suffixes (“work” becomes “working”) adding prefixes (“book” => “ the book”) making the first character uppercase (“process” => “Process”) or converting the whole word to uppercase (“html” => “HTML”). In addition to transforms that make words longer or capitalize them, the cut transform allows a shortened match (“consistently” => “consistent”), which makes it possible to find even more matches.

Methods

With the transforms included, the static dictionary contains 1,633,984 different words – too many for exhaustive search, except when used with the slow brotli compression levels 10 and 11. When used at a lower compression level, brotli either disables the dictionary or only searches through a subset of roughly 5,500 words to find matches in an acceptable time frame. It also only considers matches at positions where no LZ77 match can be found and only uses the cut transform.

Our approach to the brotli dictionary uses a larger, but more specialized subset of the dictionary than the default, using more aggressive heuristics to improve the compression ratio with negligible cost to performance. In order to provide a more specialized dictionary, we provide the compressor with a content type hint from our servers, relying on the Content-Type header to tell the compressor if it should use a dictionary for HTML, JavaScript or CSS. The dictionaries can be furthermore refined by colocation language in the future.

Fast dictionary lookup

To improve compression without sacrificing performance, we needed a fast way to find matches if we want to search the dictionary more thoroughly than brotli does by default. Our approach uses three data structures to find a matching word directly. The radix trie is responsible for finding the word while the hash table and bloom filter are used to speed up the radix trie and quickly eliminate many words that can’t be matched using the dictionary.

My internship: Brotli compression using a reduced dictionary
Lookup for a position starting with “type”

The radix trie easily finds the longest matching word without having to try matching several words. To find the match, we traverse the graph based on the text at the current position and remember the last node with a matching word. The radix trie supports compressed nodes (having more than one character as an edge label), which greatly reduces the number of nodes that need to be traversed for typical dictionary words.

The radix trie is slowed down by the large number of positions where we can’t find a match. An important finding is that most mismatching strings have a mismatching character in the first four bytes. Even for positions where a match exists, a lot of time is spent traversing nodes for the first four bytes since the nodes close to the tree root usually have many children.

Luckily, we can use a hash table to look up the node equivalent to four bytes, matching if it exists or reject the possibility of a match. We thus look up the first four bytes of the string, if there is a matching node we traverse the trie from there, which will be fast as each four-byte prefix usually only has a few corresponding dict words. If there is no matching node, there will not be a matching word at this position and we do not need to further consider it.

While the hash table is designed to reject mismatches quickly and avoid cache misses and high search costs in the trie, it still suffers from similar problems: We might search through several 4-byte prefixes with the hash value of the given position, only to learn that no match can be found. Additionally, hash lookups can be expensive due to cache misses.

To quickly reject words that do not match the dictionary, but might still cause cache misses, we use a k=1 bloom filter to quickly rule out most non-matching positions. In the k=1 case, the filter is simply a lookup table with one bit indicating whether any matching 4-byte prefixes exist for a given hash value. If the hash value for the given bit is 0, there won’t be a match. Since the bloom filter uses at most one bit for each four-byte prefix while the hash table requires 16 bytes, cache misses are much less likely. (The actual size of the structures is a bit different since there are many empty spaces in both structures and the bloom filter has twice as many elements to reject more non-matching positions.)

This is very useful for performance as a bloom filter lookup requires a single memory access. The bloom filter is designed to be fast and simple, but still rejects more than half of all non-matching positions and thus allows us to save a full hash lookup, which would often mean a cache miss.

Heuristics

To improve the compression ratio without sacrificing performance, we employed a number of heuristics:

Only search the dictionary at some positions
This is also done using the stock dictionary, but we search more aggressively. While the stock dictionary only considers positions where the LZ77 match finder did not find a match, we also consider positions that have a bad match according to the brotli cost model: LZ77 matches that are short or have a long distance between the current position and the reference usually only offer a small compression improvement, so it is worth trying to find a better match in the static dictionary.

Only consider the longest match and then transform it
Instead of finding and transforming all matches at a position, the radix trie only gives us the longest match which we then transform. This approach results in a vast performance improvement. In most cases, this results in finding the best match.

Only include some transforms
While all transformations can improve the compression ratio, we only included those that work well with the data structures. The suffix transforms can easily be applied after finding a non-transformed match. For the upper case transforms, we include both the non-transformed and the upper case version of a word in the radix trie. The prefix and cut transforms do not play well with the radix trie, therefore a cut of more than 1 byte and prefix transforms are not supported.

Generating the reduced dictionary

At low compression levels, brotli searches a subset of ~5,500 out of 13,504 words of the dictionary, negatively impacting compression. To store the entire dictionary, we would need to store ~31,700 words in the trie considering the upper case transformed output of ASCII sequences and ~11,000 four-byte prefixes in the hash. This would slow down hash table and radix trie, so we needed to find a different subset of the dictionary that works well for web content.

For this purpose, we used a large data set containing representative content. We made sure to use web content from several world regions to reflect language diversity and optimize compression. Based on this data set, we identified which words are most common and result in the largest compression improvement according to the brotli cost model. We only include the most useful words based on this calculation. Additionally, we remove some words if they slow down hash table lookups of other, more common words based on their hash value.

We have generated separate dictionaries for HTML, CSS and JavaScript content and use the MIME type to identify the right dictionary to use. The dictionaries we currently use include about 15-35% of the entire dictionary including uppercase transforms. Depending on the type of data and the desired compression/speed tradeoff, different options for the size of the dictionary can be useful. We have also developed code that automatically gathers statistics about matches and generates a reduced dictionary based on this, which makes it easy to extend this to other textual formats, perhaps data that is majority non-English or XML data and achieve better results for this type of data.

Results

We tested the reduced dictionary on a large data set of HTML, CSS and JavaScript files.

The improvement is especially big for small files as the LZ77 compression is less effective on them. Since the improvement on large files is a lot smaller, we only tested files up to 256KB. We used compression level 5, the same compression level we currently use for dynamic compression on our edge, and tested on a Intel Core i7-7820HQ CPU.

Compression improvement is defined as 1 – (compressed size using the reduced dictionary / compressed size without dictionary). This ratio is then averaged for each input size range. We also provide an average value weighted by file size. Our data set mirrors typical web traffic, covering a wide range of file sizes with small files being more common, which explains the large difference between the weighted and unweighted average.

My internship: Brotli compression using a reduced dictionary

With the improved dictionary approach, we are now able to compress HTML, JavaScript and CSS files as well, or sometimes even better than using a higher compression level would allow us, all while using only 1% to 3% more CPU. For reference using compression level 6 over 5 would increase CPU usage by up to 12%.

Introducing support for the AVIF image format

Post Syndicated from Kornel Lesiński original https://blog.cloudflare.com/generate-avif-images-with-image-resizing/

Introducing support for the AVIF image format

Introducing support for the AVIF image format

We’ve added support for the new AVIF image format in Image Resizing. It compresses images significantly better than older-generation formats such as WebP and JPEG. It’s supported in Chrome desktop today, and support is coming to other Chromium-based browsers, as well as Firefox.

What’s the benefit?

More than a half of an average website’s bandwidth is spent on images. Improved image compression can save bandwidth and improve overall performance of the web. The compression in AVIF is so good that images can reduce to half the size of JPEG and WebP

What is AVIF?

AVIF is a combination of the HEIF ISO standard, and a royalty-free AV1 codec by Mozilla, Xiph, Google, Cisco, and many others.

Currently JPEG is the most popular image format on the Web. It’s doing remarkably well for its age, and it will likely remain popular for years to come thanks to its excellent compatibility. There have been many previous attempts at replacing JPEG, such as JPEG 2000, JPEG XR and WebP. However, these formats offered only modest compression improvements, and didn’t always beat JPEG on image quality. Compression and image quality in AVIF is better than in all of them, and by a wide margin.

Introducing support for the AVIF image format Introducing support for the AVIF image format Introducing support for the AVIF image format
JPEG (17KB) WebP (17KB) AVIF (17KB)

Why a new image format?

One of the big things AVIF does is it fixes WebP’s biggest flaws. WebP is over 10 years old, and AVIF is a major upgrade over the WebP format. These two formats are technically related: they’re both based on a video codec from the VPx family. WebP uses the old VP8 version, while AVIF is based on AV1, which is the next generation after VP10.

WebP is limited to 8-bit color depth, and in its best-compressing mode of operation, it can only store color at half of the image’s resolution (known as chroma subsampling). This causes edges of saturated colors to be smudged or pixelated in WebP. AVIF supports 10- and 12-bit color at full resolution, and high dynamic range (HDR).

Introducing support for the AVIF image format JPEG
Introducing support for the AVIF image format WebP
Introducing support for the AVIF image format WebP (sharp YUV option)
Introducing support for the AVIF image format AVIF

AV1 also uses a new compression technique: chroma-from-luma. Most image formats store brightness separately from color hue. AVIF uses the brightness channel to guess what the color channel may look like. They are usually correlated, so a good guess gives smaller file sizes and sharper edges.

Adoption of AV1 and AVIF

VP8 and WebP suffered from reluctant industry adoption. Safari only added WebP support very recently, 10 years after Chrome.

Chrome 85 supports AVIF already. It’s a work in progress in other Chromium-based browsers, and Firefox. Apple hasn’t announced whether Safari will support AVIF. However, this time Apple is one of the companies in the Alliance for Open Media, creators of AVIF. The AV1 codec is already seeing faster adoption than the previous royalty-free codecs. Latest GPUs from Nvidia, AMD, and Intel already have hardware decoding support for AV1.

AVIF uses the same HEIF container as the HEIC format used in iOS’s camera. AVIF and HEIC offer similar compression performance. The difference is that HEIC is based on a commercial, patent-encumbered H.265 format. In countries that allow software patents, H.265 is illegal to use without obtaining patent licenses. It’s covered by thousands of patents, owned by hundreds of companies, which have fragmented into two competing licensing organizations. Costs and complexity of patent licensing used to be acceptable when videos were published by big studios, and the cost could be passed on to the customer in the price of physical discs and hardware players. Nowadays, when video is mostly consumed via free browsers and apps, the old licensing model has become unsustainable. This has created a huge incentive for Web giants like Google, Netflix, and Amazon to get behind the royalty-free alternative. AV1 is free to use by anyone, and the alliance of tech giants behind it will defend it from patent troll’s lawsuits.

Non-standard image formats usually can only be created using their vendor’s own implementation. AVIF and AV1 are already ahead with multiple independent implementations: libaom, Intel SVT-AV1, libgav1, dav1d, and rav1e. This is a sign of a healthy adoption and a prerequisite to be a Web standard. Our Image Resizing is implemented in Rust, so we’ve chosen the rav1e encoder to create a pure-Rust implementation of AVIF.

Caveats

AVIF is a feature-rich format. Most of its features are for smartphone cameras, such as “live” photos, depth maps, bursts, HDR, and lossless editing. Browsers will support only a fraction of these features. In our implementation in Image Resizing we’re supporting only still standard-range images.

Two biggest problems in AVIF are encoding speed and lack of progressive rendering.

AVIF images don’t show anything on screen until they’re fully downloaded. In contrast, a progressive JPEG can display a lower-quality approximation of the image very quickly, while it’s still loading. When progressive JPEGs are delivered well, they make websites appear to load much faster. Progressive JPEG can look loaded at half of its file size. AVIF can fully load at half of JPEG’s size, so it somewhat overcomes the lack of progressive rendering with the sheer compression advantage. In this case only WebP is left behind, which has neither progressive rendering nor strong compression.

Decoding AVIF images for display takes relatively more CPU power than other codecs, but it should be fast enough in practice. Even low-end Android devices can play AV1 videos in full HD without help of hardware acceleration, and AVIF images are just a single frame of an AV1 video. However, encoding of AVIF images is much slower. It may take even a few seconds to create a single image. AVIF supports tiling, which accelerates encoding on multi-core CPUs. We have lots of CPU cores, so we take advantage of this to make encoding faster. Image Resizing doesn’t use the maximum compression level possible in AVIF to further increase compression speed. Resized images are cached, so the encoding speed is noticeable only on a cache miss.

We will be adding AVIF support to Polish as well. Polish converts images asynchronously in the background, which completely hides the encoding latency, and it will be able to compress AVIF images better than Image Resizing.

Note about benchmarking

It’s surprisingly difficult to fairly and accurately judge which lossy codec is better. Lossy codecs are specifically designed to mainly compress image details that are too subtle for the naked eye to see, so “looks almost the same, but the file size is smaller!” is a common feature of all lossy codecs, and not specific enough to draw conclusions about their performance.

Lossy codecs can’t be compared by comparing just file sizes. Lossy codecs can easily make files of any size. Their effectiveness is in the ratio between file size and visual quality they can achieve.

The best way to compare codecs is to make each compress an image to the exact same file size, and then to compare the actual visual quality they’ve achieved. If the file sizes differ, any judgement may be unfair, because the codec that generated the larger file may have done so only because it was asked to preserve more details, not because it can’t compress better.

How and when to enable AVIF today?

We recommend AVIF for websites that need to deliver high-quality images with as little bandwidth as possible. This is important for users of slow networks and in countries where the bandwidth is expensive.

Browsers that support the AVIF format announce it by adding image/avif to their Accept request header. To request the AVIF format from Image Resizing, set the format option to avif.

The best method to detect and enable support for AVIF is to use image resizing in Workers:

addEventListener('fetch', event => {
  const imageURL = "https://jpeg.speedcf.com/cat/4.jpg";

  const resizingOptions = {
    // You can set any options here, see:
    // https://developers.cloudflare.com/images/worker
    width: 800,
    sharpen: 1.0,
  };

  const accept = event.request.headers.get("accept");
  const isAVIFSupported = /image\/avif/.test(accept);
  if (isAVIFSupported) {
    resizingOptions.format = "avif";
  }
  event.respondWith(fetch(imageURL), {cf:{image: resizingOptions}})
})

The above script will auto-detect the supported format, and serve AVIF automatically. Alternatively, you can use URL-based resizing together with the <picture> element:

<picture>
    <source type="image/avif" 
            srcset="/cdn-cgi/image/format=avif/image.jpg">
    <img src="/image.jpg">
</picture>

This uses our /cdn-cgi/image/… endpoint to perform the conversion, and the alternative source will be picked only by browsers that support the AVIF format.

We have the format=auto option, but it won’t choose AVIF yet. We’re cautious, and we’d like to test the new format more before enabling it by default.

When Bloom filters don’t bloom

Post Syndicated from Marek Majkowski original https://blog.cloudflare.com/when-bloom-filters-dont-bloom/

When Bloom filters don't bloom

When Bloom filters don't bloom

I’ve known about Bloom filters (named after Burton Bloom) since university, but I haven’t had an opportunity to use them in anger. Last month this changed – I became fascinated with the promise of this data structure, but I quickly realized it had some drawbacks. This blog post is the tale of my brief love affair with Bloom filters.

While doing research about IP spoofing, I needed to examine whether the source IP addresses extracted from packets reaching our servers were legitimate, depending on the geographical location of our data centers. For example, source IPs belonging to a legitimate Italian ISP should not arrive in a Brazilian datacenter. This problem might sound simple, but in the ever-evolving landscape of the internet this is far from easy. Suffice it to say I ended up with many large text files with data like this:

When Bloom filters don't bloom

This reads as: the IP 192.0.2.1 was recorded reaching Cloudflare data center number 107 with a legitimate request. This data came from many sources, including our active and passive probes, logs of certain domains we own (like cloudflare.com), public sources (like BGP table), etc. The same line would usually be repeated across multiple files.

I ended up with a gigantic collection of data of this kind. At some point I counted 1 billion lines across all the harvested sources. I usually write bash scripts to pre-process the inputs, but at this scale this approach wasn’t working. For example, removing duplicates from this tiny file of a meager 600MiB and 40M lines, took… about an eternity:

When Bloom filters don't bloom

Enough to say that deduplicating lines using the usual bash commands like ‘sort’ in various configurations (see ‘–parallel’, ‘–buffer-size’ and ‘–unique’) was not optimal for such a large data set.

Bloom filters to the rescue

When Bloom filters don't bloom

Image by David Eppstein Public Domain

Then I had a brainwave – it’s not necessary to sort the lines! I just need to remove duplicated lines – using some kind of "set" data structure should be much faster. Furthermore, I roughly know the cardinality of the input file (number of unique lines), and I can live with some data points being lost – using a probabilistic data structure is fine!

Bloom-filters are a perfect fit!

While you should go and read Wikipedia on Bloom Filters, here is how I look at this data structure.

How would you implement a "set"? Given a perfect hash function, and infinite memory, we could just create an infinite bit array and set a bit number ‘hash(item)’ for each item we encounter. This would give us a perfect "set" data structure. Right? Trivial. Sadly, hash functions have collisions and infinite memory doesn’t exist, so we have to compromise in our reality. But we can calculate and manage the probability of collisions. For example, imagine we have a good hash function, and 128GiB of memory. We can calculate the probability of the second item added to the bit array colliding would be 1 in 1099511627776. The probability of collision when adding more items worsens as we fill up the bit array.

Furthermore, we could use more than one hash function, and end up with a denser bit array. This is exactly what Bloom filters optimize for. A Bloom filter is a bunch of math on top of the four variables:

  • ‘n’ – The number of input elements (cardinality)
  • ‘m’ – Memory used by the bit-array
  • ‘k’ – Number of hash functions counted for each input
  • ‘p’ – Probability of a false positive match

Given the ‘n’ input cardinality and the ‘p’ desired probability of false positive, the Bloom filter math returns the ‘m’ memory required and ‘k’ number of hash functions needed.

Check out this excellent visualization by Thomas Hurst showing how parameters influence each other:

mmuniq-bloom

Guided by this intuition, I set out on a journey to add a new tool to my toolbox – ‘mmuniq-bloom’, a probabilistic tool that, given input on STDIN, returns only unique lines on STDOUT, hopefully much faster than ‘sort’ + ‘uniq’ combo!

Here it is:

For simplicity and speed I designed ‘mmuniq-bloom’ with a couple of assumptions. First, unless otherwise instructed, it uses 8 hash functions k=8. This seems to be a close to optimal number for the data sizes I’m working with, and the hash function can quickly output 8 decent hashes. Then we align ‘m’, number of bits in the bit array, to be a power of two. This is to avoid the pricey % modulo operation, which compiles down to slow assembly ‘div’. With power-of-two sizes we can just do bitwise AND. (For a fun read, see how compilers can optimize some divisions by using multiplication by a magic constant.)

We can now run it against the same data file we used before:

When Bloom filters don't bloom

Oh, this is so much better! 12 seconds is much more manageable than 2 minutes before. But hold on… The program is using an optimized data structure, relatively limited memory footprint, optimized line-parsing and good output buffering… 12 seconds is still eternity compared to ‘wc -l’ tool:

When Bloom filters don't bloom

What is going on? I understand that counting lines by ‘wc’ is easier than figuring out unique lines, but is it really worth the 26x difference? Where does all the CPU in ‘mmuniq-bloom’ go?

It must be my hash function. ‘wc’ doesn’t need to spend all this CPU performing all this strange math for each of the 40M lines on input. I’m using a pretty non-trivial ‘siphash24’ hash function, so it surely burns the CPU, right? Let’s check by running the code computing hash function but not doing any Bloom filter operations:

When Bloom filters don't bloom

This is strange. Counting the hash function indeed costs about 2s, but the program took 12s in the previous run. The Bloom filter alone takes 10 seconds? How is that possible? It’s such a simple data structure…

A secret weapon – a profiler

It was time to use a proper tool for the task – let’s fire up a profiler and see where the CPU goes. First, let’s fire an ‘strace’ to confirm we are not running any unexpected syscalls:

When Bloom filters don't bloom

Everything looks good. The 10 calls to ‘mmap’ each taking 4ms (3971 us) is intriguing, but it’s fine. We pre-populate memory up front with ‘MAP_POPULATE’ to save on page faults later.

What is the next step? Of course Linux’s ‘perf’!

When Bloom filters don't bloom

Then we can see the results:

When Bloom filters don't bloom

Right, so we indeed burn 87.2% of cycles in our hot code. Let’s see where exactly. Doing ‘perf annotate process_line –source’ quickly shows something I didn’t expect.

When Bloom filters don't bloom

You can see 26.90% of CPU burned in the ‘mov’, but that’s not all of it! The compiler correctly inlined the function, and unrolled the loop 8-fold. Summed up that ‘mov’ or ‘uint64_t v = *p’ line adds up to a great majority of cycles!

When Bloom filters don't bloom

Clearly ‘perf’ must be mistaken, how can such a simple line cost so much? We can repeat the benchmark with any other profiler and it will show us the same problem. For example, I like using ‘google-perftools’ with kcachegrind since they emit eye-candy charts:

When Bloom filters don't bloom

The rendered result looks like this:

When Bloom filters don't bloom

Allow me to summarise what we found so far.

The generic ‘wc’ tool takes 0.45s CPU time to process 600MiB file. Our optimized ‘mmuniq-bloom’ tool takes 12 seconds. CPU is burned on one ‘mov’ instruction, dereferencing memory….

When Bloom filters don't bloom

Image by Jose Nicdao CC BY/2.0

Oh! I how could I have forgotten. Random memory access is slow! It’s very, very, very slow!

According to the rule of thumb "latency numbers every programmer should know about", one RAM fetch is about 100ns. Let’s do the math: 40 million lines, 8 hashes counted for each line. Since our Bloom filter is 128MiB, on our older hardware it doesn’t fit into L3 cache! The hashes are uniformly distributed across the large memory range – each hash generates a memory miss. Adding it together that’s…

When Bloom filters don't bloom

That suggests 32 seconds burned just on memory fetches. The real program is faster, taking only 12s. This is because, although the Bloom filter data does not completely fit into L3 cache, it still gets some benefit from caching. It’s easy to see with ‘perf stat -d’:

When Bloom filters don't bloom

Right, so we should have had at least 320M LLC-load-misses, but we had only 280M. This still doesn’t explain why the program was running only 12 seconds. But it doesn’t really matter. What matters is that the number of cache misses is a real problem and we can only fix it by reducing the number of memory accesses. Let’s try tuning Bloom filter to use only one hash function:

When Bloom filters don't bloom

Ouch! That really hurt! The Bloom filter required 64 GiB of memory to get our desired false positive probability ratio of 1-error-per-10k-lines. This is terrible!

Also, it doesn’t seem like we improved much. It took the OS 22 seconds to prepare memory for us, but we still burned 11 seconds in userspace. I guess this time any benefits from hitting memory less often were offset by lower cache-hit probability due to drastically increased memory size. In previous runs we required only 128MiB for the Bloom filter!

Dumping Bloom filters altogether

This is getting ridiculous. To get the same false positive guarantees we either must use many hashes in Bloom filter (like 8) and therefore many memory operations, or we can have 1 hash function, but enormous memory requirements.

We aren’t really constrained by available memory, instead we want to optimize for reduced memory accesses. All we need is a data structure that requires at most 1 memory miss per item, and use less than 64 Gigs of RAM…

While we could think of more sophisticated data structures like Cuckoo filter, maybe we can be simpler. How about a good old simple hash table with linear probing?

When Bloom filters don't bloom
Image by Vadims Podāns

Welcome mmuniq-hash

Here you can find a tweaked version of mmuniq-bloom, but using hash table:

Instead of storing bits as for the Bloom-filter, we are now storing 64-bit hashes from the ‘siphash24’ function. This gives us much stronger probability guarantees, with probability of false positives much better than one error in 10k lines.

Let’s do the math. Adding a new item to a hash table containing, say 40M, entries has ’40M/2^64′ chances of hitting a hash collision. This is about one in 461 billion – a reasonably low probability. But we are not adding one item to a pre-filled set! Instead we are adding 40M lines to the initially empty set. As per birthday paradox this has much higher chances of hitting a collision at some point. A decent approximation is ‘~n^2/2m’, which in our case is ‘~(40M^2)/(2*(2^64))’. This is a chance of one in 23000. In other words, assuming we are using good hash function, every one in 23 thousand random sets of 40M items, will have a hash collision. This practical chance of hitting a collision is non-negligible, but it’s still better than a Bloom filter and totally acceptable for my use case.

The hash table code runs faster, has better memory access patterns and better false positive probability than the Bloom filter approach.

When Bloom filters don't bloom

Don’t be scared about the "hash conflicts" line, it just indicates how full the hash table was. We are using linear probing, so when a bucket is already used, we just pick up the next empty bucket. In our case we had to skip over 0.7 buckets on average to find an empty slot in the table. This is fine and, since we iterate over the buckets in linear order, we can expect the memory to be nicely prefetched.

From the previous exercise we know our hash function takes about 2 seconds of this. Therefore, it’s fair to say 40M memory hits take around 4 seconds.

Lessons learned

Modern CPUs are really good at sequential memory access when it’s possible to predict memory fetch patterns (see Cache prefetching). Random memory access on the other hand is very costly.

Advanced data structures are very interesting, but beware. Modern computers require cache-optimized algorithms. When working with large datasets, not fitting L3, prefer optimizing for reduced number loads, over optimizing the amount of memory used.

I guess it’s fair to say that Bloom filters are great, as long as they fit into the L3 cache. The moment this assumption is broken, they are terrible. This is not news, Bloom filters optimize for memory usage, not for memory access. For example, see the Cuckoo Filters paper.

Another thing is the ever-lasting discussion about hash functions. Frankly – in most cases it doesn’t matter. The cost of counting even complex hash functions like ‘siphash24’ is small compared to the cost of random memory access. In our case simplifying the hash function will bring only small benefits. The CPU time is simply spent somewhere else – waiting for memory!

One colleague often says: "You can assume modern CPUs are infinitely fast. They run at infinite speed until they hit the memory wall".

Finally, don’t follow my mistakes – everyone should start profiling with ‘perf stat -d’ and look at the "Instructions per cycle" (IPC) counter. If it’s below 1, it generally means the program is stuck on waiting for memory. Values above 2 would be great, it would mean the workload is mostly CPU-bound. Sadly, I’m yet to see high values in the workloads I’m dealing with…

Improved mmuniq

With the help of my colleagues I’ve prepared a further improved version of the ‘mmuniq’ hash table based tool. See the code:

It is able to dynamically resize the hash table, to support inputs of unknown cardinality. Then, by using batching, it can effectively use the ‘prefetch’ CPU hint, speeding up the program by 35-40%. Beware, sprinkling the code with ‘prefetch’ rarely works. Instead, I specifically changed the flow of algorithms to take advantage of this instruction. With all the improvements I got the run time down to 2.1 seconds:

When Bloom filters don't bloom

The end

Writing this basic tool which tries to be faster than ‘sort | uniq’ combo revealed some hidden gems of modern computing. With a bit of work we were able to speed it up from more than two minutes to 2 seconds. During this journey we learned about random memory access latency, and the power of cache friendly data structures. Fancy data structures are exciting, but in practice reducing random memory loads often brings better results.

AWS Compute Optimizer – Your Customized Resource Optimization Service

Post Syndicated from Sébastien Stormacq original https://aws.amazon.com/blogs/aws/aws-compute-optimizer-your-customized-resource-optimization-service/

When I publicly speak about Amazon Elastic Compute Cloud (EC2) instance type, one frequently asked question I receive is “How can I be sure I choose the right instance type for my application?” Choosing the correct instance type is between art and science. It usually involves knowing your application performance characteristics under normal circumstances (the baseline) and the expected daily variations, and to pick up an instance type that matches these characteristics. After that, you monitor key metrics to validate your choice, and you iterate over the time to adjust the instance type that best suits the cost vs performance ratio for your application. Over-provisioning resources results in paying too much for your infrastructure, and under-provisioning resources results in lower application performance, possibly impacting customer experience.

Earlier this year, we launched Cost Explorer Rightsizing Recommendations, which help you identify under-utilized Amazon Elastic Compute Cloud (EC2) instances that may be downsized within the same family to save money. We received great feedback and customers are asking for more recommendations beyond just downsizing within the same instance family.

Today, we are announcing a new service to help you to optimize compute resources for your workloads: AWS Compute Optimizer. AWS Compute Optimizer uses machine learning techniques to analyze the history of resource consumption on your account, and make well-articulated and actionable recommendations tailored to your resource usage. AWS Compute Optimizer is integrated to AWS Organizations, you can view recommendations for multiple accounts from your master AWS Organizations account.

To get started with AWS Compute Optimizer, I navigate to the AWS Management Console, select AWS Compute Optimizer and activate the service. It immediately starts to analyze my resource usage and history using Amazon CloudWatch metrics and delivers the first recommendations a few hours later.

I can see the first recommendations on the AWS Compute Optimizer dashboard:

I click Over-provisioned: 8 instances to get the details:

I click on one of the eight links to get the actionable findings:

AWS Compute Optimizer offers multiple options. I can and I scroll down the bottom of that page to verify what is the impact if I decide to apply this recommendation:

I can also access the recommendation from the AWS Command Line Interface (CLI):

$ aws compute-optimizer get-ec2-instance-recommendations --instance-arns arn:aws:ec2:us-east-1:012345678912:instance/i-0218a45abd8b53658
{
    "instanceRecommendations": [
        {
            "instanceArn": "arn:aws:ec2:us-east-1:012345678912:instance/i-0218a45abd8b53658",
            "accountId": "012345678912",
            "currentInstanceType": "m5.xlarge",
            "finding": "OVER_PROVISIONED",
            "utilizationMetrics": [
                {
                    "name": "CPU",
                    "statistic": "MAXIMUM",
                    "value": 2.0
                }
            ],
            "lookBackPeriodInDays": 14.0,
            "recommendationOptions": [
                {
                    "instanceType": "r5.large",
                    "projectedUtilizationMetrics": [
                        {
                            "name": "CPU",
                            "statistic": "MAXIMUM",
                            "value": 3.2
                        }
                    ],
                    "performanceRisk": 1.0,
                    "rank": 1
                },
                {
                    "instanceType": "t3.xlarge",
                    "projectedUtilizationMetrics": [
                        {
                            "name": "CPU",
                            "statistic": "MAXIMUM",
                            "value": 2.0
                        }
                    ],
                    "performanceRisk": 3.0,
                    "rank": 2
                },
                {
                    "instanceType": "m5.xlarge",
                    "projectedUtilizationMetrics": [
                        {
                            "name": "CPU",
                            "statistic": "MAXIMUM",
                            "value": 2.0
                        }
                    ],
                    "performanceRisk": 1.0,
                    "rank": 3
                }
            ],
            "recommendationSources": [
                {
                    "recommendationSourceArn": "arn:aws:ec2:us-east-1:012345678912:instance/i-0218a45abd8b53658",
                    "recommendationSourceType": "Ec2Instance"
                }
            ],
            "lastRefreshTimestamp": 1575006953.102
        }
    ],
    "errors": []
}

Keep in mind that AWS Compute Optimizer uses Amazon CloudWatch metrics as basis for the recommendations. By default, CloudWatch metrics are the ones it can observe from an hypervisor point of view, such as CPU utilization, disk IO, and network IO. If I want AWS Compute Optimizer to take into account operating system level metrics, such as memory usage, I need to install a CloudWatch agent on my EC2 instance. AWS Compute Optimizer automatically recognizes these metrics when available and takes these into account when creating recommendation, otherwise, it shows “Data Unavailable” in the console.

AWS customers told us performance is not the only metric they look at when choosing a resource, the price vs performance ratio is important too. For example, it might make sense to use a new generation instance family, such as m5, rather than the older generation (m3 or m4), even when the new generation seems over-provisioned for the workload. This is why, after AWS Compute Optimizer identifies a list of optimal AWS resources for your workload, it presents on-demand pricing, reserved instance pricing, reserved instance utilization, and reserved instance coverage, along with expected resource efficiency to its recommendations.

AWS Compute Optimizer makes it easy to right-size your resource. However, keep in mind that while it is relatively easy to right-size resources for modern applications, or stateless applications that scale horizontally, it might be very difficult to right-size older apps. Some older apps might not run correctly under different hardware architecture, or need different drivers, or not be supported by the application vendor at all. Be sure to check with your vendor before trying to optimize cloud resources for packaged or older apps.

We strongly advise you to thoroughly test your applications on the new recommended instance type before applying any recommendation into production.

Compute Optimizer is free to use and available initially in these AWS Regions: US East (N. Virginia), US West (Oregon), Europe (Ireland), US East (Ohio), South America (São Paulo). Connect to the AWS Management Console today and discover how much you can save by choosing the right resource size for your cloud applications.

— seb

Predictive CPU isolation of containers at Netflix

Post Syndicated from Netflix Technology Blog original https://medium.com/netflix-techblog/predictive-cpu-isolation-of-containers-at-netflix-91f014d856c7?source=rss----2615bd06b42e---4

By Benoit Rostykus, Gabriel Hartmann

Noisy Neighbors

We’ve all had noisy neighbors at one point in our life. Whether it’s at a cafe or through a wall of an apartment, it is always disruptive. The need for good manners in shared spaces turns out to be important not just for people, but for your Docker containers too.

When you’re running in the cloud your containers are in a shared space; in particular they share the CPU’s memory hierarchy of the host instance.

Because microprocessors are so fast, computer architecture design has evolved towards adding various levels of caching between compute units and the main memory, in order to hide the latency of bringing the bits to the brains. However, the key insight here is that these caches are partially shared among the CPUs, which means that perfect performance isolation of co-hosted containers is not possible. If the container running on the core next to your container suddenly decides to fetch a lot of data from the RAM, it will inevitably result in more cache misses for you (and hence a potential performance degradation).

Linux to the rescue?

Traditionally it has been the responsibility of the operating system’s task scheduler to mitigate this performance isolation problem. In Linux, the current mainstream solution is CFS (Completely Fair Scheduler). Its goal is to assign running processes to time slices of the CPU in a “fair” way.

CFS is widely used and therefore well tested and Linux machines around the world run with reasonable performance. So why mess with it? As it turns out, for the large majority of Netflix use cases, its performance is far from optimal. Titus is Netflix’s container platform. Every month, we run millions of containers on thousands of machines on Titus, serving hundreds of internal applications and customers. These applications range from critical low-latency services powering our customer-facing video streaming service, to batch jobs for encoding or machine learning. Maintaining performance isolation between these different applications is critical to ensuring a good experience for internal and external customers.

We were able to meaningfully improve both the predictability and performance of these containers by taking some of the CPU isolation responsibility away from the operating system and moving towards a data driven solution involving combinatorial optimization and machine learning.

The idea

CFS operates by very frequently (every few microseconds) applying a set of heuristics which encapsulate a general concept of best practices around CPU hardware use.

Instead, what if we reduced the frequency of interventions (to every few seconds) but made better data-driven decisions regarding the allocation of processes to compute resources in order to minimize collocation noise?

One traditional way of mitigating CFS performance issues is for application owners to manually cooperate through the use of core pinning or nice values. However, we can automatically make better global decisions by detecting collocation opportunities based on actual usage information. For example if we predict that container A is going to become very CPU intensive soon, then maybe we should run it on a different NUMA socket than container B which is very latency-sensitive. This avoids thrashing caches too much for B and evens out the pressure on the L3 caches of the machine.

Optimizing placements through combinatorial optimization

What the OS task scheduler is doing is essentially solving a resource allocation problem: I have X threads to run but only Y CPUs available, how do I allocate the threads to the CPUs to give the illusion of concurrency?

As an illustrative example, let’s consider a toy instance of 16 hyperthreads. It has 8 physical hyperthreaded cores, split on 2 NUMA sockets. Each hyperthread shares its L1 and L2 caches with its neighbor, and shares its L3 cache with the 7 other hyperthreads on the socket:

If we want to run container A on 4 threads and container B on 2 threads on this instance, we can look at what “bad” and “good” placement decisions look like:

The first placement is intuitively bad because we potentially create collocation noise between A and B on the first 2 cores through their L1/L2 caches, and on the socket through the L3 cache while leaving a whole socket empty. The second placement looks better as each CPU is given its own L1/L2 caches, and we make better use of the two L3 caches available.

Resource allocation problems can be efficiently solved through a branch of mathematics called combinatorial optimization, used for example for airline scheduling or logistics problems.

We formulate the problem as a Mixed Integer Program (MIP). Given a set of K containers each requesting a specific number of CPUs on an instance possessing d threads, the goal is to find a binary assignment matrix M of size (d, K) such that each container gets the number of CPUs it requested. The loss function and constraints contain various terms expressing a priori good placement decisions such as:

  • avoid spreading a container across multiple NUMA sockets (to avoid potentially slow cross-sockets memory accesses or page migrations)
  • don’t use hyper-threads unless you need to (to reduce L1/L2 thrashing)
  • try to even out pressure on the L3 caches (based on potential measurements of the container’s hardware usage)
  • don’t shuffle things too much between placement decisions

Given the low-latency and low-compute requirements of the system (we certainly don’t want to spend too many CPU cycles figuring out how containers should use CPU cycles!), can we actually make this work in practice?

Implementation

We decided to implement the strategy through Linux cgroups since they are fully supported by CFS, by modifying each container’s cpuset cgroup based on the desired mapping of containers to hyper-threads. In this way a user-space process defines a “fence” within which CFS operates for each container. In effect we remove the impact of CFS heuristics on performance isolation while retaining its core scheduling capabilities.

This user-space process is a Titus subsystem called titus-isolate which works as follows. On each instance, we define three events that trigger a placement optimization:

  • add: A new container was allocated by the Titus scheduler to this instance and needs to be run
  • remove: A running container just finished
  • rebalance: CPU usage may have changed in the containers so we should reevaluate our placement decisions

We periodically enqueue rebalance events when no other event has recently triggered a placement decision.

Every time a placement event is triggered, titus-isolate queries a remote optimization service (running as a Titus service, hence also isolating itself… turtles all the way down) which solves the container-to-threads placement problem.

This service then queries a local GBRT model (retrained every couple of hours on weeks of data collected from the whole Titus platform) predicting the P95 CPU usage of each container in the coming 10 minutes (conditional quantile regression). The model contains both contextual features (metadata associated with the container: who launched it, image, memory and network configuration, app name…) as well as time-series features extracted from the last hour of historical CPU usage of the container collected regularly by the host from the kernel CPU accounting controller.

The predictions are then fed into a MIP which is solved on the fly. We’re using cvxpy as a nice generic symbolic front-end to represent the problem which can then be fed into various open-source or proprietary MIP solver backends. Since MIPs are NP-hard, some care needs to be taken. We impose a hard time budget to the solver to drive the branch-and-cut strategy into a low-latency regime, with guardrails around the MIP gap to control overall quality of the solution found.

The service then returns the placement decision to the host, which executes it by modifying the cpusets of the containers.

For example, at any moment in time, an r4.16xlarge with 64 logical CPUs might look like this (the color scale represents CPU usage):

Results

The first version of the system led to surprisingly good results. We reduced overall runtime of batch jobs by multiple percent on average while most importantly reducing job runtime variance (a reasonable proxy for isolation), as illustrated below. Here we see a real-world batch job runtime distribution with and without improved isolation:

Notice how we mostly made the problem of long-running outliers disappear. The right-tail of unlucky noisy-neighbors runs is now gone.

For services, the gains were even more impressive. One specific Titus middleware service serving the Netflix streaming service saw a capacity reduction of 13% (a decrease of more than 1000 containers) needed at peak traffic to serve the same load with the required P99 latency SLA! We also noticed a sharp reduction of the CPU usage on the machines, since far less time was spent by the kernel in cache invalidation logic. Our containers are now more predictable, faster and the machine is less used! It’s not often that you can have your cake and eat it too.

Next Steps

We are excited with the strides made so far in this area. We are working on multiple fronts to extend the solution presented here.

We want to extend the system to support CPU oversubscription. Most of our users have challenges knowing how to properly size the numbers of CPUs their app needs. And in fact, this number varies during the lifetime of their containers. Since we already predict future CPU usage of the containers, we want to automatically detect and reclaim unused resources. For example, one could decide to auto-assign a specific container to a shared cgroup of underutilized CPUs, to better improve overall isolation and machine utilization, if we can detect the sensitivity threshold of our users along the various axes of the following graph.

We also want to leverage kernel PMC events to more directly optimize for minimal cache noise. One possible avenue is to use the Intel based bare metal instances recently introduced by Amazon that allow deep access to performance analysis tools. We could then feed this information directly into the optimization engine to move towards a more supervised learning approach. This would require a proper continuous randomization of the placements to collect unbiased counterfactuals, so we could build some sort of interference model (“what would be the performance of container A in the next minute, if I were to colocate one of its threads on the same core as container B, knowing that there’s also C running on the same socket right now?”).

Conclusion

If any of this piques your interest, reach out to us! We’re looking for ML engineers to help us push the boundary of containers performance and “machine learning for systems” and systems engineers for our core infrastructure and compute platform.


Predictive CPU isolation of containers at Netflix was originally published in Netflix TechBlog on Medium, where people are continuing the conversation by highlighting and responding to this story.

Announcing Cloudflare Image Resizing: Simplifying Optimal Image Delivery

Post Syndicated from Isaac Specter original https://blog.cloudflare.com/announcing-cloudflare-image-resizing-simplifying-optimal-image-delivery/

Announcing Cloudflare Image Resizing: Simplifying Optimal Image Delivery

Announcing Cloudflare Image Resizing: Simplifying Optimal Image Delivery

In the past three years, the amount of image data on the median mobile webpage has doubled. Growing images translate directly to users hitting data transfer caps, experiencing slower websites, and even leaving if a website doesn’t load in a reasonable amount of time. The crime is many of these images are so slow because they are larger than they need to be, sending data over the wire which has absolutely no (positive) impact on the user’s experience.

To provide a concrete example, let’s consider this photo of Cloudflare’s Lava Lamp Wall:

Announcing Cloudflare Image Resizing: Simplifying Optimal Image Delivery
Announcing Cloudflare Image Resizing: Simplifying Optimal Image Delivery

On the left you see the photo, scaled to 300 pixels wide. On the right you see the same image delivered in its original high resolution, scaled in a desktop web browser. They both look exactly the same, yet the image on the right takes more than twenty times more data to load. Even for the best and most conscientious developers resizing every image to handle every possible device geometry consumes valuable time, and it’s exceptionally easy to forget to do this resizing altogether.

Today we are launching a new product, Image Resizing, to fix this problem once and for all.

Announcing Image Resizing

With Image Resizing, Cloudflare adds another important product to its suite of available image optimizations.  This product allows customers to perform a rich set of the key actions on images.

  • Resize – The source image will be resized to the specified height and width.  This action allows multiple different sized variants to be created for each specific use.
  • Crop – The source image will be resized to a new size that does not maintain the original aspect ratio and a portion of the image will be removed.  This can be especially helpful for headshots and product images where different formats must be achieved by keeping only a portion of the image.
  • Compress – The source image will have its file size reduced by applying lossy compression.  This should be used when slight quality reduction is an acceptable trade for file size reduction.
  • Convert to WebP – When the users browser supports it, the source image will be converted to WebP.  Delivering a WebP image takes advantage of the modern, highly optimized image format.

By using a combination of these actions, customers store a single high quality image on their server, and Image Resizing can be leveraged to create specialized variants for each specific use case.  Without any additional effort, each variant will also automatically benefit from Cloudflare’s global caching.

Examples

Ecommerce Thumbnails

Ecommerce sites typically store a high-quality image of each product.  From that image, they need to create different variants depending on how that product will be displayed.  One example is creating thumbnails for a catalog view.  Using Image Resizing, if the high quality image is located here:

https://example.com/images/shoe123.jpg

This is how to display a 75×75 pixel thumbnail using Image Resizing:

<img src="/cdn-cgi/image/width=75,height=75/images/shoe123.jpg">

Responsive Images

When tailoring a site to work on various device types and sizes, it’s important to always use correctly sized images.  This can be difficult when images are intended to fill a particular percentage of the screen.  To solve this problem, <img srcset sizes> can be used.

Without Image Resizing, multiple versions of the same image would need to be created and stored.  In this example, a single high quality copy of hero.jpg is stored, and Image Resizing is used to resize for each particular size as needed.

<img width="100%" srcset=" /cdn-cgi/image/fit=contain,width=320/assets/hero.jpg 320w, /cdn-cgi/image/fit=contain,width=640/assets/hero.jpg 640w, /cdn-cgi/image/fit=contain,width=960/assets/hero.jpg 960w, /cdn-cgi/image/fit=contain,width=1280/assets/hero.jpg 1280w, /cdn-cgi/image/fit=contain,width=2560/assets/hero.jpg 2560w, " src="/cdn-cgi/image/width=960/assets/hero.jpg">

Enforce Maximum Size Without Changing URLs

Image Resizing is also available from within a Cloudflare Worker. Workers allow you to write code which runs close to your users all around the world. For example, you might wish to add Image Resizing to your images while keeping the same URLs. Your users and client would be able to use the same image URLs as always, but the images will be transparently modified in whatever way you need.

You can install a Worker on a route which matches your image URLs, and resize any images larger than a limit:

addEventListener('fetch', event => {
  event.respondWith(handleRequest(event.request))
})

async function handleRequest(request) {
  return fetch(request, {
    cf: {
      image: {
        width: 800,
        height: 800,
        fit: 'scale-down'
      }
  });
}

As a Worker is just code, it is also easy to run this worker only on URLs with image extensions, or even to only resize images being delivered to mobile clients.

Cloudflare and Images

Cloudflare has a long history building tools to accelerate images. Our caching has always helped reduce latency by storing a copy of images closer to the user.  Polish automates options for both lossless and lossy image compression to remove unnecessary bytes from images.  Mirage accelerates image delivery based on device type. We are continuing to invest in all of these tools, as they all serve a unique role in improving the image experience on the web.

Image Resizing is different because it is the first image product at Cloudflare to give developers full control over how their images would be served. You should choose Image Resizing if you are comfortable defining the sizes you wish your images to be served at in advance or within a Cloudflare Worker.

Next Steps and Simple Pricing

Image Resizing is available today for Business and Enterprise Customers.  To enable it, login to the Cloudflare Dashboard and navigate to the Speed Tab.  There you’ll find the section for Image Resizing which you can enable with one click.

This product is included in the Business and Enterprise plans at no additional cost with generous usage limits.  Business Customers have 100k requests per month limit and will be charged $10 for each additional 100k requests per month.  Enterprise Customers have a 10M request per month limit with discounted tiers for higher usage.  Requests are defined as a hit on a URI that contains Image Resizing or a call to Image Resizing from a Worker.

Now that you’ve enabled Image Resizing, it’s time to resize your first image.

  1. Using your existing site, store an image here: https://yoursite.com/images/yourimage.jpg
  2. Use this URL to resize that image:
    https://yoursite.com/cdn-cgi/image/width=100,height=100,quality=75/images/yourimage.jpg
  3. Experiment with changing width=, height=, and quality=.

The instructions above use the Default URL Format for Image Resizing.  For details on options, uses cases, and compatibility, refer to our Developer Documentation.

Parallel streaming of progressive images

Post Syndicated from Andrew Galloni original https://blog.cloudflare.com/parallel-streaming-of-progressive-images/

Parallel streaming of progressive images

Parallel streaming of progressive images

Progressive image rendering and HTTP/2 multiplexing technologies have existed for a while, but now we’ve combined them in a new way that makes them much more powerful. With Cloudflare progressive streaming images appear to load in half of the time, and browsers can start rendering pages sooner.


In HTTP/1.1 connections, servers didn’t have any choice about the order in which resources were sent to the client; they had to send responses, as a whole, in the exact order they were requested by the web browser. HTTP/2 improved this by adding multiplexing and prioritization, which allows servers to decide exactly what data is sent and when. We’ve taken advantage of these new HTTP/2 capabilities to improve perceived speed of loading of progressive images by sending the most important fragments of image data sooner.

This feature is compatible with all major browsers, and doesn’t require any changes to page markup, so it’s very easy to adopt. Sign up for the Beta to enable it on your site!

What is progressive image rendering?

Basic images load strictly from top to bottom. If a browser has received only half of an image file, it can show only the top half of the image. Progressive images have their content arranged not from top to bottom, but from a low level of detail to a high level of detail. Receiving a fraction of image data allows browsers to show the entire image, only with a lower fidelity. As more data arrives, the image becomes clearer and sharper.

Parallel streaming of progressive images

This works great in the JPEG format, where only about 10-15% of the data is needed to display a preview of the image, and at 50% of the data the image looks almost as good as when the whole file is delivered. Progressive JPEG images contain exactly the same data as baseline images, merely reshuffled in a more useful order, so progressive rendering doesn’t add any cost to the file size. This is possible, because JPEG doesn’t store the image as pixels. Instead, it represents the image as frequency coefficients, which are like a set of predefined patterns that can be blended together, in any order, to reconstruct the original image. The inner workings of JPEG are really fascinating, and you can learn more about them from my recent performance.now() conference talk.

The end result is that the images can look almost fully loaded in half of the time, for free! The page appears to be visually complete and can be used much sooner. The rest of the image data arrives shortly after, upgrading images to their full quality, before visitors have time to notice anything is missing.

HTTP/2 progressive streaming

But there’s a catch. Websites have more than one image (sometimes even hundreds of images). When the server sends image files naïvely, one after another, the progressive rendering doesn’t help that much, because overall the images still load sequentially:

Parallel streaming of progressive images

Having complete data for half of the images (and no data for the other half) doesn’t look as good as having half of the data for all images.

And there’s another problem: when the browser doesn’t know image sizes yet, it lays the page out with placeholders instead, and relays out the page when each image loads. This can make pages jump during loading, which is inelegant, distracting and annoying for the user.

Our new progressive streaming feature greatly improves the situation: we can send all of the images at once, in parallel. This way the browser gets size information for all of the images as soon as possible, can paint a preview of all images without having to wait for a lot of data, and large images don’t delay loading of styles, scripts and other more important resources.

This idea of streaming of progressive images in parallel is as old as HTTP/2 itself, but it needs special handling in low-level parts of web servers, and so far this hasn’t been implemented at a large scale.

When we were improving our HTTP/2 prioritization, we realized it can be also used to implement this feature. Image files as a whole are neither high nor low priority. The priority changes within each file, and dynamic re-prioritization gives us the behavior we want:

  • The image header that contains the image size is very high priority, because the browser needs to know the size as soon as possible to do page layout. The image header is small, so it doesn’t hurt to send it ahead of other data.

    Parallel streaming of progressive images

  • The minimum amount of data in the image required to show a preview of the image has a medium priority (we’d like to plug "holes" left for unloaded images as soon as possible, but also leave some bandwidth available for scripts, fonts and other resources)

    Parallel streaming of progressive images

  • The remainder of the image data is low priority. Browsers can stream it last to refine image quality once there’s no rush, since the page is already fully usable.

Knowing the exact amount of data to send in each phase requires understanding the structure of image files, but it seemed weird to us to make our web server parse image responses and have a format-specific behavior hardcoded at a protocol level. By framing the problem as a dynamic change of priorities, were able to elegantly separate low-level networking code from knowledge of image formats. We can use Workers or offline image processing tools to analyze the images, and instruct our server to change HTTP/2 priorities accordingly.

The great thing about parallel streaming of images is that it doesn’t add any overhead. We’re still sending the same data, the same amount of data, we’re just sending it in a smarter order. This technique takes advantage of existing web standards, so it’s compatible with all browsers.

The waterfall

Here are waterfall charts from WebPageTest showing comparison of regular HTTP/2 responses and progressive streaming. In both cases the files were exactly the same, the amount of data transferred was the same, and the overall page loading time was the same (within measurement noise). In the charts, blue segments show when data was transferred, and green shows when each request was idle.

Parallel streaming of progressive images

The first chart shows a typical server behavior that makes images load mostly sequentially. The chart itself looks neat, but the actual experience of loading that page was not great — the last image didn’t start loading until almost the end.

The second chart shows images loaded in parallel. The blue vertical streaks throughout the chart are image headers sent early followed by a couple of stages of progressive rendering. You can see that useful data arrived sooner for all of the images. You may notice that one of the images has been sent in one chunk, rather than split like all the others. That’s because at the very beginning of a TCP/IP connection we don’t know the true speed of the connection yet, and we have to sacrifice some opportunity to do prioritization in order to maximize the connection speed.

The metrics compared to other solutions

There are other techniques intended to provide image previews quickly, such as low-quality image placeholder (LQIP), but they have several drawbacks. They add unnecessary data for the placeholders, and usually interfere with browsers’ preload scanner, and delay loading of full-quality images due to dependence on JavaScript needed to upgrade the previews to full images.

  • Our solution doesn’t cause any additional requests, and doesn’t add any extra data. Overall page load time is not delayed.
  • Our solution doesn’t require any JavaScript. It takes advantage of functionality supported natively in the browsers.
  • Our solution doesn’t require any changes to page’s markup, so it’s very safe and easy to deploy site-wide.

The improvement in user experience is reflected in performance metrics such as SpeedIndex metric and and time to visually complete. Notice that with regular image loading the visual progress is linear, but with the progressive streaming it quickly jumps to mostly complete:

Parallel streaming of progressive images

Parallel streaming of progressive images

Getting the most out of progressive rendering

Avoid ruining the effect with JavaScript. Scripts that hide images and wait until the onload event to reveal them (with a fade in, etc.) will defeat progressive rendering. Progressive rendering works best with the good old <img> element.

Is it JPEG-only?

Our implementation is format-independent, but progressive streaming is useful only for certain file types. For example, it wouldn’t make sense to apply it to scripts or stylesheets: these resources are rendered as all-or-nothing.

Prioritizing of image headers (containing image size) works for all file formats.

The benefits of progressive rendering are unique to JPEG (supported in all browsers) and JPEG 2000 (supported in Safari). GIF and PNG have interlaced modes, but these modes come at a cost of worse compression. WebP doesn’t even support progressive rendering at all. This creates a dilemma: WebP is usually 20%-30% smaller than a JPEG of equivalent quality, but progressive JPEG appears to load 50% faster. There are next-generation image formats that support progressive rendering better than JPEG, and compress better than WebP, but they’re not supported in web browsers yet. In the meantime you can choose between the bandwidth savings of WebP or the better perceived performance of progressive JPEG by changing Polish settings in your Cloudflare dashboard.

Custom header for experimentation

We also support a custom HTTP header that allows you to experiment with, and optimize streaming of other resources on your site. For example, you could make our servers send the first frame of animated GIFs with high priority and deprioritize the rest. Or you could prioritize loading of resources mentioned in <head> of HTML documents before <body> is loaded.

The custom header can be set only from a Worker. The syntax is a comma-separated list of file positions with priority and concurrency. The priority and concurrency is the same as in the whole-file cf-priority header described in the previous blog post.

cf-priority-change: <offset in bytes>:<priority>/<concurrency>, ...

For example, for a progressive JPEG we use something like (this is a fragment of JS to use in a Worker):

let headers = new Headers(response.headers);
headers.set("cf-priority", "30/0");
headers.set("cf-priority-change", "512:20/1, 15000:10/n");
return new Response(response.body, {headers});

Which instructs the server to use priority 30 initially, while it sends the first 512 bytes. Then switch to priority 20 with some concurrency (/1), and finally after sending 15000 bytes of the file, switch to low priority and high concurrency (/n) to deliver the rest of the file.

We’ll try to split HTTP/2 frames to match the offsets specified in the header to change the sending priority as soon as possible. However, priorities don’t guarantee that data of different streams will be multiplexed exactly as instructed, since the server can prioritize only when it has data of multiple streams waiting to be sent at the same time. If some of the responses arrive much sooner from the upstream server or the cache, the server may send them right away, without waiting for other responses.

Try it!

You can use our Polish tool to convert your images to progressive JPEG. Sign up for the beta to have them elegantly streamed in parallel.