Tag Archives: sql

R2 SQL: a deep dive into our new distributed query engine

Post Syndicated from Yevgen Safronov original https://blog.cloudflare.com/r2-sql-deep-dive/

How do you run SQL queries over petabytes of data… without a server?

We have an answer for that: R2 SQL, a serverless query engine that can sift through enormous datasets and return results in seconds.

This post details the architecture and techniques that make this possible. We’ll walk through our Query Planner, which uses R2 Data Catalog to prune terabytes of data before reading a single byte, and explain how we distribute the work across Cloudflare’s global network, Workers and R2 for massively parallel execution.

From catalog to query

During Developer Week 2025, we launched R2 Data Catalog, a managed Apache Iceberg catalog built directly into your Cloudflare R2 bucket. Iceberg is an open table format that provides critical database features like transactions and schema evolution for petabyte-scale object storage. It gives you a reliable catalog of your data, but it doesn’t provide a way to query it.

Until now, reading your R2 Data Catalog required setting up a separate service like Apache Spark or Trino. Operating these engines at scale is not easy: you need to provision clusters, manage resource usage, and be responsible for their availability, none of which contributes to the primary goal of getting value from your data.

R2 SQL removes that step entirely. It’s a serverless query engine that executes retrieval SQL queries against your Iceberg tables, right where your data lives.

Designing a query engine for petabytes

Object storage is fundamentally different from a traditional database’s storage. A database is structured by design; R2 is an ocean of objects, where a single logical table can be composed of potentially millions of individual files, large and small, with more arriving every second.

Apache Iceberg provides a powerful layer of logical organization on top of this reality. It works by managing the table’s state as an immutable series of snapshots, creating a reliable, structured view of the table by manipulating lightweight metadata files instead of rewriting the data files themselves.

However, this logical structure doesn’t change the underlying physical challenge: an efficient query engine must still find the specific data it needs within that vast collection of files, and this requires overcoming two major technical hurdles:

The I/O problem: A core challenge for query efficiency is minimizing the amount of data read from storage. A brute-force approach of reading every object is simply not viable. The primary goal is to read only the data that is absolutely necessary.

The Compute problem: The amount of data that does need to be read can still be enormous. We need a way to give the right amount of compute power to a query, which might be massive, for just a few seconds, and then scale it down to zero instantly to avoid waste.

Our architecture for R2 SQL is designed to solve these two problems with a two-phase approach: a Query Planner that uses metadata to intelligently prune the search space, and a Query Execution system that distributes the work across Cloudflare’s global network to process the data in parallel.

Query Planner

The most efficient way to process data is to avoid reading it in the first place. This is the core strategy of the R2 SQL Query Planner. Instead of exhaustively scanning every file, the planner makes use of the metadata structure provided by R2 Data Catalog to prune the search space, that is, to avoid reading huge swathes of data irrelevant to a query.

This is a top-down investigation where the planner navigates the hierarchy of Iceberg metadata layers, using stats at each level to build a fast plan, specifying exactly which byte ranges the query engine needs to read.

What do we mean by “stats”?

When we say the planner uses “stats” we are referring to summary metadata that Iceberg stores about the contents of the data files. These statistics create a coarse map of the data, allowing the planner to make decisions about which files to read, and which to ignore, without opening them.

There are two primary levels of statistics the planner uses for pruning:

Partition-level stats: Stored in the Iceberg manifest list, these stats describe the range of partition values for all the data in a given Iceberg manifest file. For a partition on day(event_timestamp), this would be the earliest and latest day present in the files tracked by that manifest.

Column-level stats: Stored in the manifest files, these are more granular stats about each individual data file. Data files in R2 Data Catalog are formatted using the Apache Parquet. For every column of a Parquet file, the manifest stores key information like:

  • The minimum and maximum values. If a query asks for http_status = 500, and a file’s stats show its http_status column has a min of 200 and a max of 404, that entire file can be skipped.

  • A count of null values. This allows the planner to skip files when a query specifically looks for non-null values (e.g., WHERE error_code IS NOT NULL) and the file’s metadata reports that all values for error_code are null.

Now, let’s see how the planner uses these stats as it walks through the metadata layers.

Pruning the search space

The pruning process is a top-down investigation that happens in three main steps:

  1. Table metadata and the current snapshot

The planner begins by asking the catalog for the location of the current table metadata. This is a JSON file containing the table’s current schema, partition specs, and a log of all historical snapshots. The planner then fetches the latest snapshot to work with.

2. Manifest list and partition pruning

The current snapshot points to a single Iceberg manifest list. The planner reads this file and uses the partition-level stats for each entry to perform the first, most powerful pruning step, discarding any manifests whose partition value ranges don’t satisfy the query. For a table partitioned by day(event_timestamp), the planner can use the min/max values in the manifest list to immediately discard any manifests that don’t contain data for the days relevant to the query.

3. Manifests and file-level pruning

For the remaining manifests, the planner reads each one to get a list of the actual Parquet data files. These manifest files contain more granular, column-level stats for each individual data file they track. This allows for a second pruning step, discarding entire data files that cannot possibly contain rows matching the query’s filters.

4. File row-group pruning

Finally, for the specific data files that are still candidates, the Query Planner uses statistics stored inside Parquet file’s footers to skip over entire row groups.

The result of this multi-layer pruning is a precise list of Parquet files, and of row groups within those Parquet files. These become the query work units that are dispatched to the Query Execution system for processing.


The Planning pipeline

In R2 SQL, the multi-layer pruning we’ve described so far isn’t a monolithic process. For a table with millions of files, the metadata can be too large to process before starting any real work. Waiting for a complete plan would introduce significant latency.

Instead, R2 SQL treats planning and execution together as a concurrent pipeline. The planner’s job is to produce a stream of work units for the executor to consume as soon as they are available.

The planner’s investigation begins with two fetches to get a map of the table’s structure: one for the table’s snapshot and another for the manifest list.

Starting execution as early as possible

From that point on, the query is processed in a streaming fashion. As the Query Planner reads through the manifest files and subsequently the data files they point to and prunes them, it immediately emits any matching data files/row groups as work units to the execution queue.

This pipeline structure ensures the compute nodes can begin the expensive work of data I/O almost instantly, long before the planner has finished its full investigation.

On top of this pipeline model, the planner adds a crucial optimization: deliberate ordering. The manifest files are not streamed in an arbitrary sequence. Instead, the planner processes them in an order matching by the query’s ORDER BY clause, guided by the metadata stats. This ensures that the data most likely to contain the desired results is processed first.

These two concepts work together to address query latency from both ends of the query pipeline.

The streamed planning pipeline lets us start crunching data as soon as possible, minimizing the delay before the first byte is processed. At the other end of the pipeline, the deliberate ordering of that work lets us finish early by finding a definitive result without scanning the entire dataset.

The next section explains the mechanics behind this “finish early” strategy.

Stopping early: how to finish without reading everything

Thanks to the Query Planner streaming work units in an order matching the ORDER BY clause, the Query Execution system first processes the data that is most likely to be in the final result set.

This prioritization happens at two levels of the metadata hierarchy:

Manifest ordering: The planner first inspects the manifest list. Using the partition stats for each manifest (e.g., the latest timestamp in that group of files), it decides which entire manifest files to stream first.

Parquet file ordering: As it reads each manifest, it then uses the more granular column-level stats to decide the processing order of the individual Parquet files within that manifest.

This ensures a constantly prioritized stream of work units is sent to the execution engine. This prioritized stream is what allows us to stop the query early.

For instance, with a query like … ORDER BY timestamp DESC LIMIT 5, as the execution engine processes work units and sends back results, the planner does two things concurrently:

It maintains a bounded heap of the best 5 results seen so far, constantly comparing new results to the oldest timestamp in the heap.

It keeps a “high-water mark” on the stream itself. Thanks to the metadata, it always knows the absolute latest timestamp of any data file that has not yet been processed.

The planner is constantly comparing the state of the heap to the water mark of the remaining stream. The moment the oldest timestamp in our Top 5 heap is newer than the high-water mark of the remaining stream, the entire query can be stopped.

At that point, we can prove no remaining work unit could possibly contain a result that would make it into the top 5. The pipeline is halted, and a complete, correct result is returned to the user, often after reading only a fraction of the potentially matching data.

Currently, R2 SQL supports ordering on columns that are part of the table’s partition key only. This is a limitation we are working on lifting in the future.


Architecture


Query Execution

Query Planner streams the query work in bite-sized pieces called row groups. A single Parquet file usually contains multiple row groups, but most of the time only a few of them contain relevant data. Splitting query work into row groups allows R2 SQL to only read small parts of potentially multi-GB Parquet files.

The server that receives the user’s request and performs query planning assumes the role of query coordinator. It distributes the work across query workers and aggregates results before returning them to the user.

Cloudflare’s network is vast, and many servers can be in maintenance at the same time. The query coordinator contacts Cloudflare’s internal API to make sure only healthy, fully functioning servers are picked for query execution. Connections between coordinator and query worker go through Cloudflare Argo Smart Routing to ensure fast, reliable connectivity.

Servers that receive query execution requests from the coordinator assume the role of query workers. Query workers serve as a point of horizontal scalability in R2 SQL. With a higher number of query workers, R2 SQL can process queries faster by distributing the work among many servers. That’s especially true for queries covering large amounts of files.

Both the coordinator and query workers run on Cloudflare’s distributed network, ensuring R2 SQL has plenty of compute power and I/O throughput to handle analytical workloads.

Each query worker receives a batch of row groups from the coordinator as well as an SQL query to run on it. Additionally, the coordinator sends serialized metadata about Parquet files containing the row groups. Thanks to that, query workers know exact byte offsets where each row group is located in the Parquet file without the need to read this information from R2.

Apache DataFusion

Internally, each query worker uses Apache DataFusion to run SQL queries against row groups. DataFusion is an open-source analytical query engine written in Rust. It is built around the concept of partitions. A query is split into multiple concurrent independent streams, each working on its own partition of data.

Partitions in DataFusion are similar to partitions in Iceberg, but serve a different purpose. In Iceberg, partitions are a way to physically organize data on object storage. In DataFusion, partitions organize in-memory data for query processing. While logically they are similar – rows grouped together based on some logic – in practice, a partition in Iceberg doesn’t always correspond to a partition in DataFusion.

DataFusion partitions map perfectly to the R2 SQL query worker’s data model because each row group can be considered its own independent partition. Thanks to that, each row group is processed in parallel.

At the same time, since row groups usually contain at least 1000 rows, R2 SQL benefits from vectorized execution. Each DataFusion partition stream can execute the SQL query on multiple rows in one go, amortizing the overhead of query interpretation.

There are two ends of the spectrum when it comes to query execution: processing all rows sequentially in one big batch and processing each individual row in parallel. Sequential processing creates a so-called “tight loop”, which is usually more CPU cache friendly. In addition to that, we can significantly reduce interpretation overhead, as processing a large number of rows at a time in batches means that we go through the query plan less often. Completely parallel processing doesn’t allow us to do these things, but makes use of multiple CPU cores to finish the query faster.

DataFusion’s architecture allows us to achieve a balance on this scale, reaping benefits from both ends. For each data partition, we gain better CPU cache locality and amortized interpretation overhead. At the same time, since many partitions are processed in parallel, we distribute the workload between multiple CPUs, cutting the execution time further.


In addition to the smart query execution model, DataFusion also provides first-class Parquet support.

As a file format, Parquet has multiple optimizations designed specifically for query engines. Parquet is a column-based format, meaning that each column is physically separated from others. This separation allows better compression ratios, but it also allows the query engine to read columns selectively. If the query only ever uses five columns, we can only read them and skip reading the remaining fifty. This massively reduces the amount of data we need to read from R2 and the CPU time spent on decompression.

DataFusion does exactly that. Using R2 ranged reads, it is able to read parts of the Parquet files containing the requested columns, skipping the rest.

DataFusion’s optimizer also allows us to push down any filters to the lowest levels of the query plan. In other words, we can apply filters right as we are reading values from Parquet files. This allows us to skip materialization of results we know for sure won’t be returned to the user, cutting the query execution time further.

Returning query results

Once the query worker finishes computing results, it returns them to the coordinator through the gRPC protocol.

R2 SQL uses Apache Arrow for internal representation of query results. Arrow is an in-memory format that efficiently represents arrays of structured data. It is also used by DataFusion during query execution to represent partitions of data.

In addition to being an in-memory format, Arrow also defines the Arrow IPC serialization format. Arrow IPC isn’t designed for long-term storage of the data, but for inter-process communication, which is exactly what query workers and the coordinator do over the network. The query worker serializes all the results into the Arrow IPC format and embeds them into the gRPC response. The coordinator in turn deserializes results and can return to working on Arrow arrays.

Future plans

While R2 SQL is currently quite good at executing filter queries, we also plan to rapidly add new capabilities over the coming months. This includes, but is not limited to, adding:

  • Support for complex aggregations in a distributed and scalable fashion;

  • Tools to help provide visibility in query execution to help developers improve performance;

  • Support for many of the configuration options Apache Iceberg supports.

In addition to that, we have plans to improve our developer experience by allowing users to query their R2 Data Catalogs using R2 SQL from the Cloudflare Dashboard.

Given Cloudflare’s distributed compute, network capabilities, and ecosystem of developer tools, we have the opportunity to build something truly unique here. We are exploring different kinds of indexes to make R2 SQL queries even faster and provide more functionality such as full text search, geospatial queries, and more. 

Try it now!

It’s early days for R2 SQL, but we’re excited for users to get their hands on it. R2 SQL is available in open beta today! Head over to our getting started guide to learn how to create an end-to-end data pipeline that processes and delivers events to an R2 Data Catalog table, which can then be queried with R2 SQL.

We’re excited to see what you build! Come share your feedback with us on our Developer Discord.

Picking Servers CPUs for Databases in 2025 is Still Complex

Post Syndicated from Patrick Kennedy original https://www.servethehome.com/picking-servers-cpus-for-databases-in-2025-is-still-complex-amd-oracle-microsoft/

Picking CPUs for databases is still a topic of great complexity in 2025 with CPU vendors making different chips catering to database licenses

The post Picking Servers CPUs for Databases in 2025 is Still Complex appeared first on ServeTheHome.

Sequential consistency without borders: how D1 implements global read replication

Post Syndicated from Justin Mazzola Paluska original https://blog.cloudflare.com/d1-read-replication-beta/

Read replication of D1 databases is in public beta!

D1 read replication makes read-only copies of your database available in multiple regions across Cloudflare’s network.  For busy, read-heavy applications like e-commerce websites, content management tools, and mobile apps:

  • D1 read replication lowers average latency by routing user requests to read replicas in nearby regions.

  • D1 read replication increases overall throughput by offloading read queries to read replicas, allowing the primary database to handle more write queries.

The main copy of your database is called the primary database and the read-only copies are called read replicas.  When you enable replication for a D1 database, the D1 service automatically creates and maintains read replicas of your primary database.  As your users make requests, D1 routes those requests to an appropriate copy of the database (either the primary or a replica) based on performance heuristics, the type of queries made in those requests, and the query consistency needs as expressed by your application.

All of this global replica creation and request routing is handled by Cloudflare at no additional cost.

To take advantage of read replication, your Worker needs to use the new D1 Sessions API. Click the button below to run a Worker using D1 read replication with this code example to see for yourself!

D1 Sessions API

D1’s read replication feature is built around the concept of database sessions.  A session encapsulates all the queries representing one logical session for your application. For example, a session might represent all requests coming from a particular web browser or all requests coming from a mobile app used by one of your users. If you use sessions, your queries will use the appropriate copy of the D1 database that makes the most sense for your request, be that the primary database or a nearby replica.

The sessions implementation ensures sequential consistency for all queries in the session, no matter what copy of the database each query is routed to.  The sequential consistency model has important properties like “read my own writes” and “writes follow reads,” as well as a total ordering of writes. The total ordering of writes means that every replica will see transactions committed in the same order, which is exactly the behavior we want in a transactional system.  Said another way, sequential consistency guarantees that the reads and writes are executed in the order in which you write them in your code.

Some examples of consistency implications in real-world applications:

  • You are using an online store and just placed an order (write query), followed by a visit to the account page to list all your orders (read query handled by a replica). You want the newly placed order to be listed there as well.

  • You are using your bank’s web application and make a transfer to your electricity provider (write query), and then immediately navigate to the account balance page (read query handled by a replica) to check the latest balance of your account, including that last payment.

Why do we need the Sessions API? Why can we not just query replicas directly?

Applications using D1 read replication need the Sessions API because D1 runs on Cloudflare’s global network and there’s no way to ensure that requests from the same client get routed to the same replica for every request. For example, the client may switch from WiFi to a mobile network in a way that changes how their requests are routed to Cloudflare. Or the data center that handled previous requests could be down because of an outage or maintenance.

D1’s read replication is asynchronous, so it’s possible that when you switch between replicas, the replica you switch to lags behind the replica you were using. This could mean that, for example, the new replica hasn’t learned of the writes you just completed.  We could no longer guarantee useful properties like “read your own writes”.  In fact, in the presence of shifty routing, the only consistency property we could guarantee is that what you read had been committed at some point in the past (read committed consistency), which isn’t very useful at all!

Since we can’t guarantee routing to the same replica, we flip the script and use the information we get from the Sessions API to make sure whatever replica we land on can handle the request in a sequentially-consistent manner.

Here’s what the Sessions API looks like in a Worker:

export default {
  async fetch(request: Request, env: Env) {
    // A. Create the session.
    // When we create a D1 session, we can continue where we left off from a previous    
    // session if we have that session's last bookmark or use a constraint.
    const bookmark = request.headers.get('x-d1-bookmark') ?? 'first-unconstrained'
    const session = env.DB.withSession(bookmark)

    // Use this session for all our Workers' routes.
    const response = await handleRequest(request, session)

    // B. Return the bookmark so we can continue the session in another request.
    response.headers.set('x-d1-bookmark', session.getBookmark())

    return response
  }
}

async function handleRequest(request: Request, session: D1DatabaseSession) {
  const { pathname } = new URL(request.url)

  if (request.method === "GET" && pathname === '/api/orders') {
    // C. Session read query.
    const { results } = await session.prepare('SELECT * FROM Orders').all()
    return Response.json(results)

  } else if (request.method === "POST" && pathname === '/api/orders') {
    const order = await request.json<Order>()

    // D. Session write query.
    // Since this is a write query, D1 will transparently forward it to the primary.
    await session
      .prepare('INSERT INTO Orders VALUES (?, ?, ?)')
      .bind(order.orderId, order.customerId, order.quantity)
      .run()

    // E. Session read-after-write query.
    // In order for the application to be correct, this SELECT statement must see
    // the results of the INSERT statement above.
    const { results } = await session
      .prepare('SELECT * FROM Orders')
      .all()

    return Response.json(results)
  }

  return new Response('Not found', { status: 404 })
}

To use the Session API, you first need to create a session using the withSession method (step A).  The withSession method takes a bookmark as a parameter, or a constraint.  The provided constraint instructs D1 where to forward the first query of the session. Using first-unconstrained allows the first query to be processed by any replica without any restriction on how up-to-date it is. Using first-primary ensures that the first query of the session will be forwarded to the primary.

// A. Create the session.
const bookmark = request.headers.get('x-d1-bookmark') ?? 'first-unconstrained'
const session = env.DB.withSession(bookmark)

Providing an explicit bookmark instructs D1 that whichever database instance processes the query has to be at least as up-to-date as the provided bookmark (in case of a replica; the primary database is always up-to-date by definition).  Explicit bookmarks are how we can continue from previously-created sessions and maintain sequential consistency across user requests.

Once you’ve created the session, make queries like you normally would with D1.  The session object ensures that the queries you make are sequentially consistent with regards to each other.

// C. Session read query.
const { results } = await session.prepare('SELECT * FROM Orders').all()

For example, in the code example above, the session read query for listing the orders (step C) will return results that are at least as up-to-date as the bookmark used to create the session (step A).

More interesting is the write query to add a new order (step D) followed by the read query to list all orders (step E). Because both queries are executed on the same session, it is guaranteed that the read query will observe a database copy that includes the write query, thus maintaining sequential consistency.

// D. Session write query.
await session
  .prepare('INSERT INTO Orders VALUES (?, ?, ?)')
  .bind(order.orderId, order.customerId, order.quantity)
  .run()

// E. Session read-after-write query.
const { results } = await session
  .prepare('SELECT * FROM Orders')
  .all()

Note that we could make a single batch query to the primary including both the write and the list, but the benefit of using the new Sessions API is that you can use the extra read replica databases for your read queries and allow the primary database to handle more write queries.

The session object does the necessary bookkeeping to maintain the latest bookmark observed across all queries executed using that specific session, and always includes that latest bookmark in requests to D1. Note that any query executed without using the session object is not guaranteed to be sequentially consistent with the queries executed in the session.

When possible, we suggest continuing sessions across requests by including bookmarks in your responses to clients (step B), and having clients passing previously received bookmarks in their future requests.

// B. Return the bookmark so we can continue the session in another request.
response.headers.set('x-d1-bookmark', session.getBookmark())

This allows all of a client’s requests to be in the same session. You can do this by grabbing the session’s current bookmark at the end of the request (session.getBookmark()) and sending the bookmark in the response back to the client in HTTP headers, in HTTP cookies, or in the response body itself.

Consistency with and without Sessions API

In this section, we will explore the classic scenario of a read-after-write query to showcase how using the new D1 Sessions API ensures that we get sequential consistency and avoid any issues with inconsistent results in our application.


The Client, a user Worker, sends a D1 write query that gets processed by the database primary and gets the results back. However, the subsequent read query ends up being processed by a database replica. If the database replica is lagging far enough behind the database primary, such that it does not yet include the first write query, then the returned results will be inconsistent, and probably incorrect for your application business logic.


Using the Sessions API fixes the inconsistency issue. The first write query is again processed by the database primary, and this time the response includes “Bookmark 100”. The session object will store this bookmark for you transparently.

The subsequent read query is processed by database replica as before, but now since the query includes the previously received “Bookmark 100”, the database replica will wait until its database copy is at least up-to-date as “Bookmark 100”. Only once it’s up-to-date, the read query will be processed and the results returned, including the replica’s latest bookmark “Bookmark 104”.

Notice that the returned bookmark for the read query is “Bookmark 104”, which is different from the one passed in the query request. This can happen if there were other writes from other client requests that also got replicated to the database replica in-between the two queries our own client executed.

Enabling read replication

To start using D1 read replication:

  1. Update your Worker to use the D1 Sessions API to tell D1 what queries are part of the same database session. The Sessions API works with databases that do not have read replication enabled as well, so it’s safe to ship this code even before you enable replicas. Here’s an example.

  2. Enable replicas for your database via Cloudflare dashboard > Select D1 database > Settings.

D1 read replication is built into D1, and you don’t pay extra storage or compute costs for replicas. You incur the exact same D1 usage with or without replicas, based on rows_read and rows_written by your queries. Unlike other traditional database systems with replication, you don’t have to manually create replicas, including where they run, or decide how to route requests between the primary database and read replicas. Cloudflare handles this when using the Sessions API while ensuring sequential consistency.

Since D1 read replication is in beta, we recommend trying D1 read replication on a non-production database first, and migrate to your production workloads after validating read replication works for your use case.

If you don’t have a D1 database and want to try out D1 read replication, create a test database in the Cloudflare dashboard.

Observing your replicas

Once you’ve enabled D1 read replication, read queries will start to be processed by replica database instances. The response of each query includes information in the nested meta object relevant to read replication, like served_by_region and served_by_primary. The first denotes the region of the database instance that processed the query, and the latter will be true if-and-only-if your query was processed by the primary database instance.

In addition, the D1 dashboard overview for a database now includes information about the database instances handling your queries. You can see how many queries are handled by the primary instance or by a replica, and a breakdown of the queries processed by region. The example screenshots below show graphs displaying the number of queries executed and number of rows read by each region.



Under the hood: how D1 read replication is implemented

D1 is implemented on top of SQLite-backed Durable Objects running on top of Cloudflare’s Storage Relay Service.


D1 is structured with a 3-layer architecture.  First is the binding API layer that runs in the customer’s Worker.  Next is a stateless Worker layer that routes requests based on database ID to a layer of Durable Objects that handle the actual SQL operations behind D1.  This is similar to how most applications using Cloudflare Workers and Durable Objects are structured.

For a non-replicated database, there is exactly one Durable Object per database.  When a user’s Worker makes a request with the D1 binding for the database, that request is first routed to a D1 Worker running in the same location as the user’s Worker.  The D1 Worker figures out which D1 Durable Object backs the user’s D1 database and fetches an RPC stub to that Durable Object.  The Durable Objects routing layer figures out where the Durable Object is located, and opens an RPC connection to it.  Finally, the D1 Durable Object then handles the query on behalf of the user’s Worker using the Durable Objects SQL API.

In the Durable Objects SQL API, all queries go to a SQLite database on the local disk of the server where the Durable Object is running.  Durable Objects run SQLite in WAL mode.  In WAL mode, every write query appends to a write-ahead log (the WAL).  As SQLite appends entries to the end of the WAL file, a database-specific component called the Storage Relay Service leader synchronously replicates the entries to 5 durability followers on servers in different datacenters.  When a quorum (at least 3 out of 5) of the durability followers acknowledge that they have safely stored the data, the leader allows SQLite’s write queries to commit and opens the Durable Object’s output gate, so that the Durable Object can respond to requests.

Our implementation of WAL mode allows us to have a complete log of all of the committed changes to the database. This enables a couple of important features in SQLite-backed Durable Objects and D1:

  • We identify each write with a Lamport timestamp we call a bookmark.

  • We construct databases anywhere in the world by downloading all of the WAL entries from cold storage and replaying each WAL entry in order.

  • We implement Point-in-time recovery (PITR) by replaying WAL entries up to a specific bookmark rather than to the end of the log.

Unfortunately, having the main data structure of the database be a log is not ideal.  WAL entries are in write order, which is often neither convenient nor fast.  In order to cut down on the overheads of the log, SQLite checkpoints the log by copying the WAL entries back into the main database file.  Read queries are serviced directly by SQLite using files on disk — either the main database file for checkpointed queries, or the WAL file for writes more recent than the last checkpoint.  Similarly, the Storage Relay Service snapshots the database to cold storage so that we can replay a database by downloading the most recent snapshot and replaying the WAL from there, rather than having to download an enormous number of individual WAL entries.

WAL mode is the foundation for implementing read replication, since we can stream writes to locations other than cold storage in real time.


We implemented read replication in 5 major steps.

First, we made it possible to make replica Durable Objects with a read-only copy of the database.  These replica objects boot by fetching the latest snapshot and replaying the log from cold storage to whatever bookmark primary database’s leader last committed. This basically gave us point-in-time replicas, since without continuous updates, the replicas never updated until the Durable Object restarted.

Second, we registered the replica leader with the primary’s leader so that the primary leader sends the replicas every entry written to the WAL at the same time that it sends the WAL entries to the durability followers.  Each of the WAL entries is marked with a bookmark that uniquely identifies the WAL entry in the sequence of WAL entries.  We’ll use the bookmark later.

Note that since these writes are sent to the replicas before a quorum of durability followers have confirmed them, the writes are actually unconfirmed writes, and the replica leader must be careful to keep the writes hidden from the replica Durable Object until they are confirmed.  The replica leader in the Storage Relay Service does this by implementing enough of SQLite’s WAL-index protocol, so that the unconfirmed writes coming from the primary leader look to SQLite as though it’s just another SQLite client doing unconfirmed writes.  SQLite knows to ignore the writes until they are confirmed in the log.  The upshot of this is that the replica leader can write WAL entries to the SQLite WAL immediately, and then “commit” them when the primary leader tells the replica that the entries have been confirmed by durability followers.

One neat thing about this approach is that writes are sent from the primary to the replica as quickly as they are generated by the primary, helping to minimize lag between replicas.  In theory, if the write query was proxied through a replica to the primary, the response back to the replica will arrive at almost the same time as the message that updates the replica.  In such a case, it looks like there’s no replica lag at all!

In practice, we find that replication is really fast.  Internally, we measure confirm lag, defined as the time from when a primary confirms a change to when the replica confirms a change.  The table below shows the confirm lag for two D1 databases whose primaries are in different regions.

Replica Region

Database A

(Primary region: ENAM)

Database B
(Primary region: WNAM)

ENAM

N/A

30 ms

WNAM

45 ms

N/A

WEUR

55 ms

75 ms

EEUR

67 ms

75 ms

Confirm lag for 2 replicated databases.  N/A means that we have no data for this combination.  The region abbreviations are the same ones used for Durable Object location hints.

The table shows that confirm lag is correlated with the network round-trip time between the data centers hosting the primary databases and their replicas.  This is clearly visible in the difference between the confirm lag for the European replicas of the two databases.  As airline route planners know, EEUR is appreciably further away from ENAM than WNAM, but from WNAM, both European regions (WEUR and EEUR) are about equally as far away.  We see that in our replication numbers.

The exact placement of the D1 database in the region matters too.  Regions like ENAM and WNAM are quite large in themselves.  Database A’s placement in ENAM happens to be further away from most data centers in WNAM compared to database B’s placement in WNAM relative to the ENAM data centers.  As such, database B sees slightly lower confirm lag.

Try as we might, we can’t beat the speed of light!

Third, we updated the Durable Object routing system to be aware of Durable Object replicas.  When read replication is enabled on a Durable Object, two things happen.  First, we create a set of replicas according to a replication policy.  The current replication policy that D1 uses is simple: a static set of replicas in every region that D1 supports.  Second, we turn on a routing policy for the Durable Object.  The current policy that D1 uses is also simple: route to the Durable Object replica in the region close to where the user request is.  With this step, we have updateable read-only replicas, and can route requests to them!

Fourth, we updated D1’s Durable Object code to handle write queries on replicas. D1 uses SQLite to figure out whether a request is a write query or a read query.  This means that the determination of whether something is a read or write query happens after the request is routed.  Read replicas will have to handle write requests!  We solve this by instantiating each replica D1 Durable Object with a reference to its primary.  If the D1 Durable Object determines that the query is a write query, it forwards the request to the primary for the primary to handle. This happens transparently, keeping the user code simple.

As of this fourth step, we can handle read and write queries at every copy of the D1 Durable Object, whether it’s a primary or not.  Unfortunately, as outlined above, if a user’s requests get routed to different read replicas, they may see different views of the database, leading to a very weak consistency model.  So the last step is to implement the Sessions API across the D1 Worker and D1 Durable Object.  Recall that every WAL entry is marked with a bookmark.  These bookmarks uniquely identify a point in (logical) time in the database.  Our bookmarks are strictly monotonically increasing; every write to a database makes a new bookmark with a value greater than any other bookmark for that database.

Using bookmarks, we implement the Sessions API with the following algorithm split across the D1 binding implementation, the D1 Worker, and D1 Durable Object.

First up in the D1 binding, we have code that creates the D1DatabaseSession object and code within the D1DatabaseSession object to keep track of the latest bookmark.

// D1Binding is the binding code running within the user's Worker
// that provides the existing D1 Workers API and the new withSession method.
class D1Binding {
  // Injected by the runtime to the D1 Binding.
  d1Service: D1ServiceBinding

  function withSession(initialBookmark) {
    return D1DatabaseSession(this.d1Service, this.databaseId, initialBookmark);
  }
}

// D1DatabaseSession holds metadata about the session, most importantly the
// latest bookmark we know about for this session.
class D1DatabaseSession {
  constructor(d1Service, databaseId, initialBookmark) {
    this.d1Service = d1Service;
    this.databaseId = databaseId;
    this.bookmark = initialBookmark;
  }

  async exec(query) {
    // The exec method in the binding sends the query to the D1 Worker
    // and waits for the the response, updating the bookmark as
    // necessary so that future calls to exec use the updated bookmark.
    var resp = await this.d1Service.handleUserQuery(databaseId, query, bookmark);
    if (isNewerBookmark(this.bookmark, resp.bookmark)) {
      this.bookmark = resp.bookmark;
    }
    return resp;
  }

  // batch and other SQL APIs are implemented similarly.
}

The binding code calls into the D1 stateless Worker (d1Service in the snippet above), which figures out which Durable Object to use, and proxies the request to the Durable Object.

class D1Worker {
  async handleUserQuery(databaseId, query) {
    var doId = /* look up Durable Object for databaseId */;
    return await this.D1_DO.get(doId).handleWorkerQuery(query, bookmark)
  }
}

Finally, we reach the Durable Objects layer, which figures out how to actually handle the request.

class D1DurableObject {
  async handleWorkerQuery(queries, bookmark) {
    var bookmark = bookmark ?? "first-primary";
    var results = {};

    if (this.isPrimaryDatabase()) {
      // The primary always has the latest data so we can run the
      // query without checking the bookmark.
      var result = /* execute query directly */;
      bookmark = getCurrentBookmark();
      results = result;
    } else {
      // This is running on a replica.
      if (bookmark === "first-primary" || isWriteQuery(query)) {
        // The primary must handle this request, so we'll proxy the
        // request to the primary.
        var resp = await this.primary.handleWorkerQuery(query, bookmark);
        bookmark = resp.bookmark;
        results = resp.results;
      } else {
        // The replica can handle this request, but only after the
        // database is up-to-date with the bookmark.
        if (bookmark !== "first-unconstrained") {
          await waitForBookmark(bookmark);
        }
        var result = /* execute query locally */;
        bookmark = getCurrentBookmark();
        results = result;
      }
    }
    return { results: results, bookmark: bookmark };
  }
}

The D1 Durable Object first figures out if this instance can handle the query, or if the query needs to be sent to the primary.  If the Durable Object can execute the query, it ensures that we execute the query with a bookmark at least as up-to-date as the bookmark requested by the binding.

The upshot is that the three pieces of code work together to ensure that all of the queries in the session see the database in a sequentially consistent order, because each new query will be blocked until it has seen the results of previous queries within the same session.

Conclusion

D1’s new read replication feature is a significant step towards making globally distributed databases easier to use without sacrificing consistency. With automatically provisioned replicas in every region, your applications can now serve read queries faster while maintaining strong sequential consistency across requests, and keeping your application Worker code simple.

We’re excited for developers to explore this feature and see how it improves the performance of your applications. The public beta is just the beginning—we’re actively refining and expanding D1’s capabilities, including evolving replica placement policies, and your feedback will help shape what’s next.

Note that the Sessions API is only available through the D1 Worker Binding for now, and support for the HTTP REST API will follow soon.

Try out D1 read replication today by clicking the “Deploy to Cloudflare” button, check out documentation and examples, and let us know what you build in the D1 Discord channel!

Pools across the sea: how Hyperdrive speeds up access to databases and why we’re making it free

Post Syndicated from Andrew Repp original https://blog.cloudflare.com/how-hyperdrive-speeds-up-database-access/

Free as in beer

In acknowledgement of its pivotal role in building distributed applications that rely on regional databases, we’re making Hyperdrive available on the free plan of Cloudflare Workers!

Hyperdrive enables you to build performant, global apps on Workers with your existing SQL databases. Tell it your database connection string, bring your existing drivers, and Hyperdrive will make connecting to your database faster. No major refactors or convoluted configuration required.

Over the past year, Hyperdrive has become a key service for teams that want to build their applications on Workers and connect to SQL databases. This includes our own engineering teams, with Hyperdrive serving as the tool of choice to connect from Workers to our own Postgres clusters for many of the control-plane actions of our billing, D1, R2, and Workers KV teams (just to name a few). 

This has highlighted for us that Hyperdrive is a fundamental building block, and it solves a common class of problems for which there isn’t a great alternative. We want to make it possible for everyone building on Workers to connect to their database of choice with the best performance possible, using the drivers and frameworks they already know and love.

Performance is a feature

To illustrate how much Hyperdrive can improve your application’s performance, let’s write the world’s simplest benchmark. This is obviously not production code, but is meant to be reflective of a common application you’d bring to the Workers platform. We’re going to use a simple table, a very popular OSS driver (postgres.js), and run a standard OLTP workload from a Worker. We’re going to keep our origin database in London, and query it from Chicago (those locations will come back up later, so keep them in mind).

// This is the test table we're using
// CREATE TABLE IF NOT EXISTS test_data(userId bigint, userText text, isActive bool);

import postgres from 'postgres';

let direct_conn = '<direct connection string here!>';
let hyperdrive_conn = env.HYPERDRIVE.connectionString;

async function measureLatency(connString: string) {
	let beginTime = Date.now();
	let sql = postgres(connString);

	await sql`INSERT INTO test_data VALUES (${999}, 'lorem_ipsum', ${true})`;
	await sql`SELECT userId, userText, isActive FROM test_data WHERE userId = ${999}`;

	let latency = Date.now() - beginTime;
	ctx.waitUntil(sql.end());
	return latency;
}

let directLatency = await measureLatency(direct_conn);
let hyperdriveLatency = await measureLatency(hyperdrive_conn);

The code above

  1. Takes a standard database connection string, and uses it to create a database connection.

  2. Loads a user record into the database.

  3. Queries all records for that user.

  4. Measures how long this takes to do with a direct connection, and with Hyperdrive.

When connecting directly to the origin database, this set of queries takes an average of 1200 ms. With absolutely no other changes, just swapping out the connection string for env.HYPERDRIVE.connectionString, this number is cut down to 500 ms (an almost 60% reduction). If you enable Hyperdrive’s caching, so that the SELECT query is served from cache, this takes only 320 ms. With this one-line change, Hyperdrive will reduce the latency of this Worker by almost 75%! In addition to this speedup, you also get secure auth and transport, as well as a connection pool to help protect your database from being overwhelmed when your usage scales up. See it for yourself using our demo application.


A demo application comparing latencies between Hyperdrive and direct-to-database connections.

Traditional SQL databases are familiar and powerful, but they are designed to be colocated with long-running compute. They were not conceived in the era of modern serverless applications, and have connection models that don’t take the constraints of such an environment into account. Instead, they require highly stateful connections that do not play well with Workers’ global and stateless model. Hyperdrive solves this problem by maintaining database connections across Cloudflare’s network ready to be used at a moment’s notice, caching your queries for fast access, and eliminating round trips to minimize network latency.

With this announcement, many developers are going to be taking a look at Hyperdrive for the first time over the coming weeks and months. To help people dive in and try it out, we think it’s time to talk about how Hyperdrive actually works.

Staying warm in the pool

Let’s talk a bit about database connection poolers, how they work, and what problems they already solve. They are hardly a new technology, after all. 

The point of any connection pooler, Hyperdrive or others, is to minimize the overhead of establishing and coordinating database connections. Every new database connection requires additional memory and CPU time from the database server, and this can only scale just so well as the number of concurrent connections climbs. So the question becomes, how should database connections be shared across clients? 

There are three commonly-used approaches for doing so. These are:

  • Session mode: whenever a client connects, it is assigned a connection of its own until it disconnects. This dramatically reduces the available concurrency, in exchange for much simpler implementation and a broader selection of supported features

  • Transaction mode: when a client is ready to send a query or open a transaction, it is assigned a connection on which to do so. This connection will be returned to the pool when the query or transaction concludes. Subsequent queries during the same client session may (or may not) be assigned a different connection.

  • Statement mode: Like transaction mode, but a connection is given out and returned for each statement. Multi-statement transactions are disallowed.

When building Hyperdrive, we had to decide which of these modes we wanted to use. Each of the approaches implies some fairly serious tradeoffs, so what’s the right choice? For a service intended to make using a database from Workers as pleasant as possible we went with the choice that balances features and performance, and designed Hyperdrive as a transaction-mode pooler. This best serves the goals of supporting a large number of short-lived clients (and therefore very high concurrency), while still supporting the transactional semantics that cause so many people to reach for an RDBMS in the first place.

In terms of this part of its design, Hyperdrive takes its cues from many pre-existing popular connection poolers, and manages operations to allow our users to focus on designing their full-stack applications. There is a configured limit to the number of connections the pool will give out, limits to how long a connection will be held idle until it is allowed to drop and return resources to the database, bookkeeping around prepared statements being shared across pooled connections, and other traditional concerns of the management of these resources to help ensure the origin database is able to run smoothly. These are all described in our documentation.

Round and round we go

Ok, so why build Hyperdrive then? Other poolers that solve these problems already exist — couldn’t developers using Workers just run one of those and call it a day? It turns out that connecting to regional poolers from Workers has the same major downside as connecting to regional databases: network latency and round trips.

Establishing a connection, whether to a database or a pool, requires many exchanges between the client and server. While this is true for all fully-fledged client-server databases (e.g. MySQL, MongoDB), we are going to focus on the PostgreSQL connection protocol flow in this post. As we work through all of the steps involved, what we most want to keep track of is how many round trips it takes to accomplish. Note that we’re mostly concerned about having to wait around while these happen, so “half” round trips such as in the first diagram are not counted. This is because we can send off the message and then proceed without waiting.

The first step to establishing a connection between Postgres client and server is very familiar ground to anyone who’s worked much with networks: a TCP startup handshake. Postgres uses TCP for its underlying transport, and so we must have that connection before anything else can happen on top of it.


With our transport layer in place, the next step is to encrypt the connection. The TLS Handshake involves some back-and-forth in its own right, though this has been reduced to just one round trip for TLS 1.3. Below is the simplest and fastest version of this exchange, but there are certainly scenarios where it can be much more complex.


After the underlying transport is established and secured, the application-level traffic can actually start! However, we’re not quite ready for queries, the client still needs to authenticate to a specific user and database. Again, there are multiple supported approaches that offer varying levels of speed and security. To make this comparison as fair as possible, we’re again going to consider the version that offers the fastest startup (password-based authentication).


So, for those keeping score, establishing a new connection to your database takes a bare minimum of 5 round trips, and can very quickly climb from there. 

While the latency of any given network round trip is going to vary based on so many factors that “it depends” is the only meaningful measurement available, some quick benchmarking during the writing of this post shows ~125 ms from Chicago to London. Now multiply that number by 5 round trips and the problem becomes evident: 625 ms to start up a connection is not viable in a distributed serverless environment. So how does Hyperdrive solve it? What if I told you the trick is that we do it all twice? To understand Hyperdrive’s secret sauce, we need to dive into Hyperdrive’s architecture.


Impersonating a database server

The rest of this post is a deep dive into answering the question of how Hyperdrive does what it does. To give the clearest picture, we’re going to talk about some internal subsystems by name. To help keep everything straight, let’s start with a short glossary that you can refer back to if needed. These descriptions may not make sense yet, but they will by the end of the article.

Hyperdrive subsystem name

Brief description

Client

Lives on the same server as your Worker, talks directly to your database driver. This caches query results and sends queries to Endpoint if needed.

Endpoint

Lives in the data center nearest to your origin database, talks to your origin database. This caches query results and houses a pool of connections to your origin database.

Edge Validator

Sends a request to a Cloudflare data center to validate that Hyperdrive can connect to your origin database at time of creation.

Placement

Builds on top of Edge Validator to connect to your origin database from all eligible data centers, to identify which have the fastest connections.

The first subsystem we want to dig into is named Client. Client’s first job is to pretend to be a database server. When a user’s Worker wants to connect to their database via Hyperdrive, they use a special connection string that the Worker runtime generates on the fly. This tells the Worker to reach out to a Hyperdrive process running on the same Cloudflare server, and direct all traffic to and from the database client to it.

import postgres from "postgres";

// Connect to Hyperdrive
const sql = postgres(env.HYPERDRIVE.connectionString);

// sql will now talk over an RPC channel to Hyperdrive, instead of via TCP to Postgres

Once this connection is established, the database driver will perform the usual handshake expected of it, with our Client playing the role of a database server and sending the appropriate responses. All of this happens on the same Cloudflare server running the Worker, and we observe that the p90 for all this is 4 ms (p50 is 2 ms). Quite a bit better than 625 ms, but how does that help? The query still needs to get to the database, right?

Client’s second main job is to inspect the queries sent from a Worker, and decide whether they can be served from Cloudflare’s cache. We’ll talk more about that later on. Assuming that there are no cached query results available, Client will need to reach out to our second important subsystem, which we call Endpoint.

In for the long haul

Before we dig into the role Endpoint plays, it’s worth talking more about how the Client→Endpoint connection works, because it’s a key piece of our solution. We have already talked a lot about the price of network round trips, and how a Worker might be quite far away from the origin database, so how does Hyperdrive handle the long trip from the Client running alongside their Worker to the Endpoint running near their database without expensive round trips?

This is accomplished with a very handy bit of Cloudflare’s networking infrastructure. When Client gets a cache miss, it will submit a request to our networking platform for a connection to whichever data center Endpoint is running on. This platform keeps a pool of ready TCP connections between all of Cloudflare’s data centers, such that we don’t need to do any preliminary handshakes to begin sending application-level traffic. You might say we put a connection pooler in our connection pooler.

Over this TCP connection, we send an initialization message that includes all of the buffered query messages the Worker has sent to Client (the mental model would be something like a SYN and a payload all bundled together). Endpoint will do its job processing this query, and respond by streaming the response back to Client, leaving the streaming channel open for any followup queries until Client disconnects. This approach allows us to send queries around the world with zero wasted round trips.

Impersonating a database client

Endpoint has a couple different jobs it has to do. Its first job is to pretend to be a database client, and to do the client half of the handshake shown above. Second, it must also do the same query processing that Client does with query messages. Finally, Endpoint will make the same determination on when it needs to reach out to the origin database to get uncached query results.

When Endpoint needs to query the origin database, it will attempt to take a connection out of a limited-size pool of database connections that it keeps. If there is an unused connection available, it is handed out from the pool and used to ferry the query to the origin database, and the results back to Endpoint. Once Endpoint has these results, the connection is immediately returned to the pool so that another Client can use it. These warm connections are usable in a matter of microseconds, which is obviously a dramatic improvement over the round trips from one region to another that a cold startup handshake would require.

If there are no currently unused connections sitting in the pool, it may start up a new one (assuming the pool has not already given out as many connections as it is allowed to). This set of handshakes looks exactly the same as the one Client does, but it happens across the network between a Cloudflare data center and wherever the origin database happens to be. These are the same 5 round trips as our original example, but instead of a full Chicago→London path on every single trip, perhaps it’s Virginia→London, or even London→London. Latency here will depend on which data center Endpoint is being housed in.

Distributed choreography

Earlier, we mentioned that Hyperdrive is a transaction-mode pooler. This means that when a driver is ready to send a query or open a transaction it must get a connection from the pool to use. The core challenge for a transaction-mode pooler is in aligning the state of the driver with the state of the connection checked out from the pool. For example, if the driver thinks it’s in a transaction, but the database doesn’t, then you might get errors or even corrupted results.

Hyperdrive achieves this by ensuring all connections are in the same state when they’re checked out of the pool: idle and ready for a query. Where Hyperdrive differs from other transaction-mode poolers is that it does this dance of matching up the states of two different connections across machines, such that there’s no need to share state between Client and Endpoint! Hyperdrive can terminate the incoming connection in Client on the same machine running the Worker, and pool the connections to the origin database wherever makes the most sense.

The job of a transaction-mode pooler is a hard one. Database connections are fundamentally stateful and keeping track of that state is important to maintain our guise when impersonating either a database client or a server. As an example, one of the trickier pieces of state to manage are prepared statements. When a user creates a new prepared statement, the prepared statement is only created on whichever database connection happened to be checked out at that time. Once the user finishes the transaction or query they are processing, the connection holding that statement is returned to the pool. From the user’s perspective they’re still connected using the same database connection, so a new query or transaction can reasonably expect to use that previously prepared statement. If a different connection is handed out for the next query and the query wants to make use of this resource, the pooler has to do something about it. We went into some depth on this topic in a previous blog post when we released this feature, but in sum, the process looks like this:


Hyperdrive implements this by keeping track of what statements have been prepared by a given client, as well as what statements have been prepared on each origin connection in the pool. When a query comes in expecting to re-use a particular prepared statement (#8 above), Hyperdrive checks if it’s been prepared on the checked-out origin connection. If it hasn’t, Hyperdrive will replay the wire-protocol message sequence to prepare it on the newly-checked-out origin connection (#10 above) before sending the query over it. Many little corrections like this are necessary to keep the client’s connection to Hyperdrive and Hyperdrive’s connection to the origin database lined up so that both sides see what they expect.

Better, faster, smarter, closer

This “split connection” approach is the founding innovation of Hyperdrive, and one of the most vital aspects of it is how it affects starting up new connections. While the same 5+ round trips must always happen on startup, the actual time spent on the round trips can be dramatically reduced by conducting them over the smallest possible distances. This impact of distance can be so big that there is still a huge latency reduction even though the startup round trips must now happen twice (once each between the Worker and Client, and Endpoint and your origin database). So how do we decide where to run everything, to lean into that advantage as much as possible?

The placement of Client has not really changed since the original design of Hyperdrive. Sharing a server with the Worker sending the queries means that the Worker runtime can connect directly to Hyperdrive with no network hop needed. While there is always room for microoptimizations, it’s hard to do much better than that from an architecture perspective.  By far the bigger piece of the latency puzzle is where to run Endpoint.

Hyperdrive keeps a list of data centers that are eligible to house Endpoints, requiring that they have sufficient capacity and the best routes available for pooled connections to use. The key challenge to overcome here is that a database connection string does not tell you where in the world a database actually is. The reality is that reliably going from a hostname to a precise (enough) geographic location is a hard problem, even leaving aside the additional complexity of doing so within a private network. So how do we pick from that list of eligible data centers?

For much of the time since its launch, Hyperdrive solved this with a regional pool approach. When a Worker connected to Hyperdrive, the location of the Worker was used to infer what region the end user was connecting from (e.g. ENAM, WEUR, APAC, etc. — see a rough breakdown here). Data centers to house Endpoints for any given Hyperdrive were deterministically selected from that region’s list of eligible options using rendezvous hashing, resulting in one pool of connections per region.

This approach worked well enough, but it had some severe shortcomings. The first and most obvious is that there’s no guarantee that the data center selected for a given region is actually closer to the origin database than the user making the request. This means that, while you’re getting the benefit of the excellent routing available on Cloudflare’s network, you may be going significantly out of your way to do so. The second downside is that, in the scenario where a new connection must be created, the round trips to do so may be happening over a significantly larger distance than is necessary if the origin database is in a different region than the Endpoint housing the regional connection pool. This increases latency and reduces throughput for the query that needs to instantiate the connection.

The final key downside here is an unfortunate interaction with Smart Placement, a feature of Cloudflare Workers that analyzes the duration of your Worker requests to identify the data center to run your Worker in. With regional Endpoints, the best Smart Placement can possibly do is to put your requests close to the Endpoint for whichever region the origin database is in. Again, there may be other data centers that are closer, but Smart Placement has no way to do better than where the Endpoint is because all Hyperdrive queries must route through it.


We recently shipped some improvements to this system that significantly enhanced performance. The new system discards the concept of regional pools entirely, in favor of a single global Endpoint for each Hyperdrive that is in the eligible data center as close as possible to the origin database.

The way we solved locating the origin database such that we can accomplish this was ultimately very straightforward. We already had a subsystem to confirm, at the time of creation, that Hyperdrive could connect to an origin database using the provided information. We call this subsystem our Edge Validator.

It’s bad user experience to allow someone to create a Hyperdrive, and then find out when they go to use it that they mistyped their password or something. Now they’re stuck trying to debug with extra layers in the way, with a Hyperdrive that can’t possibly work. Instead, whenever a Hyperdrive is created, the Edge Validator will send a request to an arbitrary data center to use its instance of Hyperdrive to connect to the origin database. If this connection fails, the creation of the Hyperdrive will also fail, giving immediate feedback to the user at the time it is most helpful.

With our new subsystem, affectionately called Placement, we now have a solution to the geolocation problem. After Edge Validator has confirmed that the provided information works and the Hyperdrive is created, an extra step is run in the background. Placement will perform the exact same connection routine, except instead of being done once from an arbitrary data center, it is run a handful of times from every single data center that is eligible to house Endpoints. The latency of establishing these connections is collected, and the average is sent back to a central instance of Placement. The data centers that can connect to the origin database the fastest are, by definition, where we want to run Endpoint for this Hyperdrive. The list of these is saved, and at runtime is used to select the Endpoint best suited to housing the pool of connections to the origin database.

Given that the secret sauce of Hyperdrive is in managing and minimizing the latency of establishing these connections, moving Endpoints right next to their origin databases proved to be pretty impactful.


Pictured: query latency as measured from Endpoint to origin databases. The backfill of Placement to existing customers was done in stages on 02/22 and 02/25.

Serverless drivers exist, though?

While we went in a different direction, it’s worth acknowledging that other teams have solved this same problem with a very different approach. Custom database drivers, usually called “serverless drivers”, have made several optimization efforts to reduce both the number of round trips and how quickly they can be conducted, while still connecting directly from your client to your database in the traditional way. While these drivers are impressive, we chose not to go this route for a couple of reasons.

First off, a big part of the appeal of using Postgres is its vibrant ecosystem. Odds are good you’ve used Postgres before, and it can probably help solve whichever problem you’re tackling with your newest project. This familiarity and shared knowledge across projects is an absolute superpower. We wanted to lean into this advantage by supporting the most popular drivers already in this ecosystem, instead of fragmenting it by adding a competing one.

Second, Hyperdrive also functions as a cache for individual queries (a bit of trivia: its name while still in Alpha was actually sql-query-cache). Doing this as effectively as possible for distributed users requires some clever positioning of where exactly the query results should be cached. One of the unique advantages of running a distributed service on Cloudflare’s network is that we have a lot of flexibility on where to run things, and can confidently surmount challenges like those. If we’re going to be playing three-card monte with where things are happening anyway, it makes the most sense to favor that route for solving the other problems we’re trying to tackle too.

Pick your favorite cache pun

As we’ve talked about in the past, Hyperdrive buffers protocol messages until it has enough information to know whether a query can be served from cache. In a post about how Hyperdrive works it would be a shame to skip talking about how exactly we cache query results, so let’s close by diving into that.

First and foremost, Hyperdrive uses Cloudflare’s cache, because when you have technology like that already available to you, it’d be silly not to use it. This has some implications for our architecture that are worth exploring.

The cache exists in each of Cloudflare’s data centers, and by default these are separate instances. That means that a Client operating close to the user has one, and an Endpoint operating close to the origin database has one. However, historically we weren’t able to take full advantage of that, because the logic for interacting with cache was tightly bound to the logic for managing the pool of connections.

Part of our recent architecture refactoring effort, where we switched to global Endpoints, was to split up this logic such that we can take advantage of Client’s cache too. This was necessary because, with Endpoint moving to a single location for each Hyperdrive, users from other regions would otherwise have gotten cache hits served from almost as far away as the origin.

With the new architecture, the role of Client during active query handling transitioned from that of a “dumb pipe” to more like what Endpoint had always been doing. It now buffers protocol messages, and serves results from cache if possible. In those scenarios, Hyperdrive’s traffic never leaves the data center that the Worker is running in, reducing query latencies from 20-70 ms to an average of around 4 ms. As a side benefit, it also substantially reduces the network bandwidth Hyperdrive uses to serve these queries. A win-win!

In the scenarios where query results can’t be served from the cache in Client’s data center, all is still not lost. Endpoint may also have cached results for this query, because it can field traffic from many different Clients around the world. If so, it will provide these results back to Client, along with how much time is remaining before they expire, such that Client can both return them and store them correctly into its own cache. Likewise, if Endpoint does need to go to the origin database for results, they will be stored into both Client and Endpoint caches. This ensures that followup queries from that same Client data center will get the happy path with single-digit ms response times, and also reduce load on the origin database from any other Client’s queries. This functions similarly to how Cloudflare’s Tiered Cache works, with Endpoint’s cache functioning as a final layer of shielding for the origin database.

Come on in, the water’s fine!

With this announcement of a Free Plan for Hyperdrive, and newly armed with the knowledge of how it works under the hood, we hope you’ll enjoy building your next project with it! You can get started with a single Wrangler command (or using the dashboard):

wrangler hyperdrive create postgres-hyperdrive 
--connection-string="postgres://user:[email protected]:5432/defaultdb"

We’ve also included a Deploy to Cloudflare button below to let you get started with a sample Worker app using Hyperdrive, just bring your existing Postgres database! If you have any questions or ideas for future improvements, please feel free to visit our Discord channel!

Over 700 million events/second: How we make sense of too much data

Post Syndicated from Constantin Pan original https://blog.cloudflare.com/how-we-make-sense-of-too-much-data/

Cloudflare’s network provides an enormous array of services to our customers. We collect and deliver associated data to customers in the form of event logs and aggregated analytics. As of December 2024, our data pipeline is ingesting up to 706M events per second generated by Cloudflare’s services, and that represents 100x growth since our 2018 data pipeline blog post

At peak, we are moving 107 GiB/s of compressed data, either pushing it directly to customers or subjecting it to additional queueing and batching.

All of these data streams power things like Logs, Analytics, and billing, as well as other products, such as training machine learning models for bot detection. This blog post is focused on techniques we use to efficiently and accurately deal with the high volume of data we ingest for our Analytics products. A previous blog post provides a deeper dive into the data pipeline for Logs. 

The pipeline can be roughly described by the following diagram.


The data pipeline has multiple stages, and each can and will naturally break or slow down because of hardware failures or misconfiguration. And when that happens, there is just too much data to be able to buffer it all for very long. Eventually some will get dropped, causing gaps in analytics and a degraded product experience unless proper mitigations are in place.

Dropping data to retain information

How does one retain valuable information from more than half a billion events per second, when some must be dropped? Drop it in a controlled way, by downsampling.

Here is a visual analogy showing the difference between uncontrolled data loss and downsampling. In both cases the same number of pixels were delivered. One is a higher resolution view of just a small portion of a popular painting, while the other shows the full painting, albeit blurry and highly pixelated.


As we noted above, any point in the pipeline can fail, so we want the ability to downsample at any point as needed. Some services proactively downsample data at the source before it even hits Logfwdr. This makes the information extracted from that data a little bit blurry, but much more useful than what otherwise would be delivered: random chunks of the original with gaps in between, or even nothing at all. The amount of “blur” is outside our control (we make our best effort to deliver full data), but there is a robust way to estimate it, as discussed in the next section.

Logfwdr can decide to downsample data sitting in the buffer when it overflows. Logfwdr handles many data streams at once, so we need to prioritize them by assigning each data stream a weight and then applying max-min fairness to better utilize the buffer. It allows each data stream to store as much as it needs, as long as the whole buffer is not saturated. Once it is saturated, streams divide it fairly according to their weighted size.

In our implementation (Go), each data stream is driven by a goroutine, and they cooperate via channels. They consult a single tracker object every time they allocate and deallocate memory. The tracker uses a max-heap to always know who the heaviest participant is and what the total usage is. Whenever the total usage goes over the limit, the tracker repeatedly sends the “please shed some load” signal to the heaviest participant, until the usage is again under the limit.

The effect of this is that healthy streams, which buffer a tiny amount, allocate whatever they need without losses. But any lagging streams split the remaining memory allowance fairly.

We downsample more or less uniformly, by always taking some of the least downsampled batches from the buffer (using min-heap to find those) and merging them together upon downsampling.


Merging keeps the batches roughly the same size and their number under control.

Downsampling is cheap, but since data in the buffer is compressed, it causes recompression, which is the single most expensive thing we do to the data. But using extra CPU time is the last thing you want to do when the system is under heavy load! We compensate for the recompression costs by starting to downsample the fresh data as well (before it gets compressed for the first time) whenever the stream is in the “shed the load” state.

We called this approach “bottomless buffers”, because you can squeeze effectively infinite amounts of data in there, and it will just automatically be thinned out. Bottomless buffers resemble reservoir sampling, where the buffer is the reservoir and the population comes as the input stream. But there are some differences. First is that in our pipeline the input stream of data never ends, while reservoir sampling assumes it ends to finalize the sample. Secondly, the resulting sample also never ends.

Let’s look at the next stage in the pipeline: Logreceiver. It sits in front of a distributed queue. The purpose of logreceiver is to partition each stream of data by a key that makes it easier for Logpush, Analytics inserters, or some other process to consume.

Logreceiver proactively performs adaptive sampling of analytics. This improves the accuracy of analytics for small customers (receiving on the order of 10 events per day), while more aggressively downsampling large customers (millions of events per second). Logreceiver then pushes the same data at multiple resolutions (100%, 10%, 1%, etc.) into different topics in the distributed queue. This allows it to keep pushing something rather than nothing when the queue is overloaded, by just skipping writing the high-resolution samples of data.

The same goes for Inserters: they can skip reading or writing high-resolution data. The Analytics APIs can skip reading high resolution data. The analytical database might be unable to read high resolution data because of overload or degraded cluster state or because there is just too much to read (very wide time range or very large customer). Adaptively dropping to lower resolutions allows the APIs to return some results in all of those cases.

Extracting value from downsampled data

Okay, we have some downsampled data in the analytical database. It looks like the original data, but with some rows missing. How do we make sense of it? How do we know if the results can be trusted?

Let’s look at the math.

Since the amount of sampling can vary over time and between nodes in the distributed system, we need to store this information along with the data. With each event $x_i$ we store its sample interval, which is the reciprocal to its inclusion probability $\pi_i = \frac{1}{\text{sample interval}}$. For example, if we sample 1 in every 1,000 events, each of the events included in the resulting sample will have its $\pi_i = 0.001$, so the sample interval will be 1,000. When we further downsample that batch of data, the inclusion probabilities (and the sample intervals) multiply together: a 1 in 1,000 sample from a 1 in 1,000 sample is a 1 in 1,000,000 sample of the original population. The sample interval of an event can also be interpreted roughly as the number of original events that this event represents, so in the literature it is known as weight $w_i = \frac{1}{\pi_i}$.

We rely on the Horvitz-Thompson estimator (HT, paper) in order to derive analytics about $x_i$. It gives two estimates: the analytical estimate (e.g. the population total or size) and the estimate of the variance of that estimate. The latter enables us to figure out how accurate the results are by building confidence intervals. They define ranges that cover the true value with a given probability (confidence level). A typical confidence level is 0.95, at which a confidence interval (a, b) tells that you can be 95% sure the true SUM or COUNT is between a and b.

So far, we know how to use the HT estimator for doing SUM, COUNT, and AVG.

Given a sample of size $n$, consisting of values $x_i$ and their inclusion probabilities $\pi_i$, the HT estimator for the population total (i.e. SUM) would be

$$\widehat{T}=\sum_{i=1}^n{\frac{x_i}{\pi_i}}=\sum_{i=1}^n{x_i w_i}.$$

The variance of $\widehat{T}$ is:

$$\widehat{V}(\widehat{T}) = \sum_{i=1}^n{x_i^2 \frac{1 – \pi_i}{\pi_i^2}} + \sum_{i \neq j}^n{x_i x_j \frac{\pi_{ij} – \pi_i \pi_j}{\pi_{ij} \pi_i \pi_j}},$$

where $\pi_{ij}$ is the probability of both $i$-th and $j$-th events being sampled together.

We use Poisson sampling, where each event is subjected to an independent Bernoulli trial (“coin toss”) which determines whether the event becomes part of the sample. Since each trial is independent, we can equate $\pi_{ij} = \pi_i \pi_j$, which when plugged in the variance estimator above turns the right-hand sum to zero:

$$\widehat{V}(\widehat{T}) = \sum_{i=1}^n{x_i^2 \frac{1 – \pi_i}{\pi_i^2}} + \sum_{i \neq j}^n{x_i x_j \frac{0}{\pi_{ij} \pi_i \pi_j}},$$

thus

$$\widehat{V}(\widehat{T}) = \sum_{i=1}^n{x_i^2 \frac{1 – \pi_i}{\pi_i^2}} = \sum_{i=1}^n{x_i^2 w_i (w_i-1)}.$$

For COUNT we use the same estimator, but plug in $x_i = 1$. This gives us:

$$\begin{align}
\widehat{C} &= \sum_{i=1}^n{\frac{1}{\pi_i}} = \sum_{i=1}^n{w_i},\\
\widehat{V}(\widehat{C}) &= \sum_{i=1}^n{\frac{1 – \pi_i}{\pi_i^2}} = \sum_{i=1}^n{w_i (w_i-1)}.
\end{align}$$

For AVG we would use

$$\begin{align}
\widehat{\mu} &= \frac{\widehat{T}}{N},\\
\widehat{V}(\widehat{\mu}) &= \frac{\widehat{V}(\widehat{T})}{N^2},
\end{align}$$

if we could, but the original population size $N$ is not known, it is not stored anywhere, and it is not even possible to store because of custom filtering at query time. Plugging $\widehat{C}$ instead of $N$ only partially works. It gives a valid estimator for the mean itself, but not for its variance, so the constructed confidence intervals are unusable.

In all cases the corresponding pair of estimates are used as the $\mu$ and $\sigma^2$ of the normal distribution (because of the central limit theorem), and then the bounds for the confidence interval (of confidence level ) are:

$$\Big( \mu – \Phi^{-1}\big(\frac{1 + \alpha}{2}\big) \cdot \sigma, \quad \mu + \Phi^{-1}\big(\frac{1 + \alpha}{2}\big) \cdot \sigma\Big).$$

We do not know the N, but there is a workaround: simultaneous confidence intervals. Construct confidence intervals for SUM and COUNT independently, and then combine them into a confidence interval for AVG. This is known as the Bonferroni method. It requires generating wider (half the “inconfidence”) intervals for SUM and COUNT. Here is a simplified visual representation, but the actual estimator will have to take into account the possibility of the orange area going below zero.


In SQL, the estimators and confidence intervals look like this:

WITH sum(x * _sample_interval)                              AS t,
     sum(x * x * _sample_interval * (_sample_interval - 1)) AS vt,
     sum(_sample_interval)                                  AS c,
     sum(_sample_interval * (_sample_interval - 1))         AS vc,
     -- ClickHouse does not expose the erf⁻¹ function, so we precompute some magic numbers,
     -- (only for 95% confidence, will be different otherwise):
     --   1.959963984540054 = Φ⁻¹((1+0.950)/2) = √2 * erf⁻¹(0.950)
     --   2.241402727604945 = Φ⁻¹((1+0.975)/2) = √2 * erf⁻¹(0.975)
     1.959963984540054 * sqrt(vt) AS err950_t,
     1.959963984540054 * sqrt(vc) AS err950_c,
     2.241402727604945 * sqrt(vt) AS err975_t,
     2.241402727604945 * sqrt(vc) AS err975_c
SELECT t - err950_t AS lo_total,
       t            AS est_total,
       t + err950_t AS hi_total,
       c - err950_c AS lo_count,
       c            AS est_count,
       c + err950_c AS hi_count,
       (t - err975_t) / (c + err975_c) AS lo_average,
       t / c                           AS est_average,
       (t + err975_t) / (c - err975_c) AS hi_average
FROM ...

Construct a confidence interval for each timeslot on the timeseries, and you get a confidence band, clearly showing the accuracy of the analytics. The figure below shows an example of such a band in shading around the line.


Sampling is easy to screw up

We started using confidence bands on our internal dashboards, and after a while noticed something scary: a systematic error! For one particular website the “total bytes served” estimate was higher than the true control value obtained from rollups, and the confidence bands were way off. See the figure below, where the true value (blue line) is outside the yellow confidence band at all times.


We checked the stored data for corruption, it was fine. We checked the math in the queries, it was fine. It was only after reading through the source code for all of the systems responsible for sampling that we found a candidate for the root cause.

We used simple random sampling everywhere, basically “tossing a coin” for each event, but in Logreceiver sampling was done differently. Instead of sampling randomly it would perform systematic sampling by picking events at equal intervals starting from the first one in the batch.


Why would that be a problem?

There are two reasons. The first is that we can no longer claim $\pi_{ij} = \pi_i \pi_j$, so the simplified variance estimator stops working and confidence intervals cannot be trusted. But even worse, the estimator for the total becomes biased. To understand why exactly, we wrote a short repro code in Python:

import itertools

def take_every(src, period):
    for i, x in enumerate(src):
    if i % period == 0:
        yield x

pattern = [10, 1, 1, 1, 1, 1]
sample_interval = 10 # bad if it has common factors with len(pattern)
true_mean = sum(pattern) / len(pattern)

orig = itertools.cycle(pattern)
sample_size = 10000
sample = itertools.islice(take_every(orig, sample_interval), sample_size)

sample_mean = sum(sample) / sample_size

print(f"{true_mean=} {sample_mean=}")

After playing with different values for pattern and sample_interval in the code above, we realized where the bias was coming from.

Imagine a person opening a huge generated HTML page with many small/cached resources, such as icons. The first response will be big, immediately followed by a burst of small responses. If the website is not visited that much, responses will tend to end up all together at the start of a batch in Logfwdr. Logreceiver does not cut batches, only concatenates them. The first response remains first, so it always gets picked and skews the estimate up.


We checked the hypothesis against the raw unsampled data that we happened to have because that particular website was also using one of the Logs products. We took all events in a given time range, and grouped them by cutting at gaps of at least one minute. In each group, we ranked all events by time and looked at the variable of interest (response size in bytes), and put it on a scatter plot against the rank inside the group.


A clear pattern! The first response is much more likely to be larger than average.

We fixed the issue by making Logreceiver shuffle the data before sampling. As we rolled out the fix, the estimation and the true value converged.


Now, after battle testing it for a while, we are confident the HT estimator is implemented properly and we are using the correct sampling process.

Using Cloudflare’s analytics APIs to query sampled data

We already power most of our analytics datasets with sampled data. For example, the Workers Analytics Engine exposes the sample interval in SQL, allowing our customers to build their own dashboards with confidence bands. In the GraphQL API, all of the data nodes that have “Adaptive” in their name are based on sampled data, and the sample interval is exposed as a field there as well, though it is not possible to build confidence intervals from that alone. We are working on exposing confidence intervals in the GraphQL API, and as an experiment have added them to the count and edgeResponseBytes (sum) fields on the httpRequestsAdaptiveGroups nodes. This is available under confidence(level: X).

Here is a sample GraphQL query:

query HTTPRequestsWithConfidence(
  $accountTag: string
  $zoneTag: string
  $datetimeStart: string
  $datetimeEnd: string
) {
  viewer {
    zones(filter: { zoneTag: $zoneTag }) {
      httpRequestsAdaptiveGroups(
        filter: {
          datetime_geq: $datetimeStart
          datetime_leq: $datetimeEnd
      }
      limit: 100
    ) {
      confidence(level: 0.95) {
        level
        count {
          estimate
          lower
          upper
          sampleSize
        }
        sum {
          edgeResponseBytes {
            estimate
            lower
            upper
            sampleSize
          }
        }
      }
    }
  }
}

The query above asks for the estimates and the 95% confidence intervals for SUM(edgeResponseBytes) and COUNT. The results will also show the sample size, which is good to know, as we rely on the central limit theorem to build the confidence intervals, thus small samples don’t work very well.

Here is the response from this query:

{
  "data": {
    "viewer": {
      "zones": [
        {
          "httpRequestsAdaptiveGroups": [
            {
              "confidence": {
                "level": 0.95,
                "count": {
                  "estimate": 96947,
                  "lower": "96874.24",
                  "upper": "97019.76",
                  "sampleSize": 96294
                },
                "sum": {
                  "edgeResponseBytes": {
                    "estimate": 495797559,
                    "lower": "495262898.54",
                    "upper": "496332219.46",
                    "sampleSize": 96294
                  }
                }
              }
            }
          ]
        }
      ]
    }
  },
  "errors": null
}

The response shows the estimated count is 96947, and we are 95% confident that the true count lies in the range 96874.24 to 97019.76. Similarly, the estimate and range for the sum of response bytes are provided.

The estimates are based on a sample size of 96294 rows, which is plenty of samples to calculate good confidence intervals.

Conclusion

We have discussed what kept our data pipeline scalable and resilient despite doubling in size every 1.5 years, how the math works, and how it is easy to mess up. We are constantly working on better ways to keep the data pipeline, and the products based on it, useful to our customers. If you are interested in doing things like that and want to help us build a better Internet, check out our careers page.

Elephants in tunnels: how Hyperdrive connects to databases inside your VPC networks

Post Syndicated from Andrew Repp original https://blog.cloudflare.com/elephants-in-tunnels-how-hyperdrive-connects-to-databases-inside-your-vpc-networks

With September’s announcement of Hyperdrive’s ability to send database traffic from Workers over Cloudflare Tunnels, we wanted to dive into the details of what it took to make this happen.

Hyper-who?

Accessing your data from anywhere in Region Earth can be hard. Traditional databases are powerful, familiar, and feature-rich, but your users can be thousands of miles away from your database. This can cause slower connection startup times, slower queries, and connection exhaustion as everything takes longer to accomplish.

Cloudflare Workers is an incredibly lightweight runtime, which enables our customers to deploy their applications globally by default and renders the cold start problem almost irrelevant. The trade-off for these light, ephemeral execution contexts is the lack of persistence for things like database connections. Database connections are also notoriously expensive to spin up, with many round trips required between client and server before any query or result bytes can be exchanged.

Hyperdrive is designed to make the centralized databases you already have feel like they’re global while keeping connections to those databases hot. We use our global network to get faster routes to your database, keep connection pools primed, and cache your most frequently run queries as close to users as possible.

Why a Tunnel?

For something as sensitive as your database, exposing access to the public Internet can be uncomfortable. It is common to instead host your database on a private network, and allowlist known-safe IP addresses or configure GRE tunnels to permit traffic to it. This is complex, toilsome, and error-prone. 

On Cloudflare’s Developer Platform, we strive for simplicity and ease-of-use. We cannot expect all of our customers to be experts in configuring networking solutions, and so we went in search of a simpler solution. Being your own customer is rarely a bad choice, and it so happens that Cloudflare offers an excellent option for this scenario: Tunnels.

Cloudflare Tunnel is a Zero Trust product that creates a secure connection between your private network and Cloudflare. Exposing services within your private network can be as simple as running a cloudflared binary, or deploying a Docker container running the cloudflared image we distribute.


A custom handler and generic streams

Integrating with Tunnels to support sending Postgres directly through them was a bit of a new challenge for us. Most of the time, when we use Tunnels internally (more on that later!), we rely on the excellent job cloudflared does of handling all of the mechanics, and we just treat them as pipes. That wouldn’t work for Hyperdrive, though, so we had to dig into how Tunnels actually ingress traffic to build a solution.

Hyperdrive handles Postgres traffic using an entirely custom implementation of the Postgres message protocol. This is necessary, because we sometimes have to alter the specific type or content of messages sent from client to server, or vice versa. Handling individual bytes gives us the flexibility to implement whatever logic any new feature might need.

An additional, perhaps less obvious, benefit of handling Postgres message traffic as just bytes is that we are not bound to the transport layer choices of some ORM or library. One of the nuances of running services in Cloudflare is that we may want to egress traffic over different services or protocols, for a variety of different reasons. In this case, being able to egress traffic via a Tunnel would be pretty challenging if we were stuck with whatever raw TCP socket a library had established for us.

The way we accomplish this relies on a mainstay of Rust: traits (which are how Rust lets developers apply logic across generic functions and types). In the Rust ecosystem, there are two traits that define the behavior Hyperdrive wants out of its transport layers: AsyncRead and AsyncWrite. There are a couple of others we also need, but we’re going to focus on just these two. These traits enable us to code our entire custom handler against a generic stream of data, without the handler needing to know anything about the underlying protocol used to implement the stream. So, we can pass around a WebSocket connection as a generic I/O stream, wherever it might be needed.

As an example, the code to create a generic TCP stream and send a Postgres startup message across it might look like this:

/// Send a startup message to a Postgres server, in the role of a PG client.
/// https://www.postgresql.org/docs/current/protocol-message-formats.html#PROTOCOL-MESSAGE-FORMATS-STARTUPMESSAGE
pub async fn send_startup<S>(stream: &mut S, user_name: &str, db_name: &str, app_name: &str) -> Result<(), ConnectionError>
where
    S: AsyncWrite + Unpin,
{
    let protocol_number = 196608 as i32;
    let user_str = &b"user\0"[..];
    let user_bytes = user_name.as_bytes();
    let db_str = &b"database\0"[..];
    let db_bytes = db_name.as_bytes();
    let app_str = &b"application_name\0"[..];
    let app_bytes = app_name.as_bytes();
    let len = 4 + 4
        + user_str.len() + user_bytes.len() + 1
        + db_str.len() + db_bytes.len() + 1
        + app_str.len() + app_bytes.len() + 1 + 1;

    // Construct a BytesMut of our startup message, then send it
    let mut startup_message = BytesMut::with_capacity(len as usize);
    startup_message.put_i32(len as i32);
    startup_message.put_i32(protocol_number);
    startup_message.put(user_str);
    startup_message.put_slice(user_bytes);
    startup_message.put_u8(0);
    startup_message.put(db_str);
    startup_message.put_slice(db_bytes);
    startup_message.put_u8(0);
    startup_message.put(app_str);
    startup_message.put_slice(app_bytes);
    startup_message.put_u8(0);
    startup_message.put_u8(0);

    match stream.write_all(&startup_message).await {
        Ok(_) => Ok(()),
        Err(err) => {
            error!("Error writing startup to server: {}", err.to_string());
            ConnectionError::InternalError
        }
    }
}

/// Connect to a TCP socket
let stream = match TcpStream::connect(("localhost", 5432)).await {
    Ok(s) => s,
    Err(err) => {
        error!("Error connecting to address: {}", err.to_string());
        return ConnectionError::InternalError;
    }
};
let _ = send_startup(&mut stream, "db_user", "my_db").await;

With this approach, if we wanted to encrypt the stream using TLS before we write to it (upgrading our existing TcpStream connection in-place, to an SslStream), we would only have to change the code we use to create the stream, while generating and sending the traffic would remain unchanged. This is because SslStream also implements AsyncWrite!

/// We're handwaving the SSL setup here. You're welcome.
let conn_config = new_tls_client_config()?;

/// Encrypt the TcpStream, returning an SslStream
let ssl_stream = match tokio_boring::connect(conn_config, domain, stream).await {
    Ok(s) => s,
    Err(err) => {
        error!("Error during websocket TLS handshake: {}", err.to_string());
        return ConnectionError::InternalError;
    }
};
let _ = send_startup(&mut ssl_stream, "db_user", "my_db").await;

Whence WebSocket

WebSocket is an application layer protocol that enables bidirectional communication between a client and server. Typically, to establish a WebSocket connection, a client initiates an HTTP request and indicates they wish to upgrade the connection to WebSocket via the “Upgrade” header. Then, once the client and server complete the handshake, both parties can send messages over the connection until one of them terminates it.

Now, it turns out that the way Cloudflare Tunnels work under the hood is that both ends of the tunnel want to speak WebSocket, and rely on a translation layer to convert all traffic to or from WebSocket. The cloudflared daemon you spin up within your private network handles this for us! For Hyperdrive, however, we did not have a suitable translation layer to send Postgres messages across WebSocket, and had to write one.

One of the (many) fantastic things about Rust traits is that the contract they present is very clear. To be AsyncRead, you just need to implement poll_read. To be AsyncWrite, you need to implement only three functions (poll_write, poll_flush, and poll_shutdown). Further, there is excellent support for WebSocket in Rust built on top of the tungstenite-rs library.

Thus, building our custom WebSocket stream such that it can share the same machinery as all our other generic streams just means translating the existing WebSocket support into these poll functions. There are some existing OSS projects that do this, but for multiple reasons we could not use the existing options. The primary reason is that Hyperdrive operates across multiple threads (thanks to the tokio runtime), and so we rely on our connections to also handle Send, Sync, and Unpin. None of the available solutions had all five traits handled. It turns out that most of them went with the paradigm of Sink and Stream, which provide a solid base from which to translate to AsyncRead and AsyncWrite. In fact some of the functions overlap, and can be passed through almost unchanged. For example, poll_flush and poll_shutdown have 1-to-1 analogs, and require almost no engineering effort to convert from Sink to AsyncWrite.

/// We use this struct to implement the traits we need on top of a WebSocketStream
pub struct HyperSocket<S>
where
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
    inner: WebSocketStream<S>,
    read_state: Option<ReadState>,
    write_err: Option<Error>,
}

impl<S> AsyncWrite for HyperSocket<S>
where
    S: AsyncRead + AsyncWrite + Send + Sync + Unpin,
{
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        match ready!(Pin::new(&mut self.inner).poll_flush(cx)) {
            Ok(_) => Poll::Ready(Ok(())),
            Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Other, err))),
        }
    }

    fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
        match ready!(Pin::new(&mut self.inner).poll_close(cx)) {
            Ok(_) => Poll::Ready(Ok(())),
            Err(err) => Poll::Ready(Err(Error::new(ErrorKind::Other, err))),
        }
    }
}

With that translation done, we can use an existing WebSocket library to upgrade our SslStream connection to a Cloudflare Tunnel, and wrap the result in our AsyncRead/AsyncWrite implementation. The result can then be used anywhere that our other transport streams would work, without any changes needed to the rest of our codebase! 

That would look something like this:

let websocket = match tokio_tungstenite::client_async(request, ssl_stream).await {
    Ok(ws) => Ok(ws),
    Err(err) => {
        error!("Error during websocket conn setup: {}", err.to_string());
        return ConnectionError::InternalError;
    }
};
let websocket_stream = HyperSocket::new(websocket));
let _ = send_startup(&mut websocket_stream, "db_user", "my_db").await;

Access granted

An observant reader might have noticed that in the code example above we snuck in a variable named request that we passed in when upgrading from an SslStream to a WebSocketStream. This is for multiple reasons. The first reason is that Tunnels are assigned a hostname and use this hostname for routing. The second and more interesting reason is that (as mentioned above) when negotiating an upgrade from HTTP to WebSocket, a request must be sent to the server hosting the ingress side of the Tunnel to perform the upgrade. This is pretty universal, but we also add in an extra piece here.

At Cloudflare, we believe that secure defaults and defense in depth are the correct ways to build a better Internet. This is why traffic across Tunnels is encrypted, for example. However, that does not necessarily prevent unwanted traffic from being sent into your Tunnel, and therefore egressing out to your database. While Postgres offers a robust set of access control options for protecting your database, wouldn’t it be best if unwanted traffic never got into your private network in the first place? 

To that end, all Tunnels set up for use with Hyperdrive should have a Zero Trust Access Application configured to protect them. These applications should use a Service Token to authorize connections. When setting up a new Hyperdrive, you have the option to provide the token’s ID and Secret, which will be encrypted and stored alongside the rest of your configuration. These will be presented as part of the WebSocket upgrade request to authorize the connection, allowing your database traffic through while preventing unwanted access.

This can be done within the request’s headers, and might look something like this:

let ws_url = format!("wss://{}", host);
let mut request = match ws_url.into_client_request() {
    Ok(req) => req,
    Err(err) => {
        error!(
            "Hostname {} could not be parsed into a valid request URL: {}", 
            host,
            err.to_string()
        );
        return ConnectionError::InternalError;
    }
};
request.headers_mut().insert(
    "CF-Access-Client-Id",
    http::header::HeaderValue::from_str(&client_id).unwrap(),
);
request.headers_mut().insert(
    "CF-Access-Client-Secret",
    http::header::HeaderValue::from_str(&client_secret).unwrap(),
);

Building for customer zero

If you’ve been reading the blog for a long time, some of this might sound a bit familiar.  This isn’t the first time that we’ve sent Postgres traffic across a tunnel, it’s something most of us do from our laptops regularly.  This works very well for interactive use cases with low traffic volume and a high tolerance for latency, but historically most of our products have not been able to employ the same approach.

Cloudflare operates many data centers around the world, and most services run in every one of those data centers. There are some tasks, however, that make the most sense to run in a more centralized fashion. These include tasks such as managing control plane operations, or storing configuration state.  Nearly every Cloudflare product houses its control plane information in Postgres clusters run centrally in a handful of our data centers, and we use a variety of approaches for accessing that centralized data from elsewhere in our network. For example, many services currently use a push-based model to publish updates to Quicksilver, and work through the complexities implied by such a model. This has been a recurring challenge for any team looking to build a new product.

Hyperdrive’s entire reason for being is to make it easy to access such central databases from our global network. When we began exploring Tunnel integrations as a feature, many internal teams spoke up immediately and strongly suggested they’d be interested in using it themselves. This was an excellent opportunity for Cloudflare to scratch its own itch, while also getting a lot of traffic on a new feature before releasing it directly to the public. As always, being “customer zero” means that we get fast feedback, more reliability over time, stronger connections between teams, and an overall better suite of products. We jumped at the chance.

As we rolled out early versions of Tunnel integration, we worked closely with internal teams to get them access to it, and fixed any rough spots they encountered. We’re pleased to share that this first batch of teams have found great success building new or refactored products on Hyperdrive over Tunnels. For example: if you’ve already tried out Workers Builds, or recently submitted an abuse report, you’re among our first users!  At the time of this writing, we have several more internal teams working to onboard, and we on the Hyperdrive team are very excited to see all the different ways in which fast and simple connections from Workers to a centralized database can help Cloudflare just as much as they’ve been helping our external customers.

Outro

Cloudflare is on a mission to make the Internet faster, safer, and more reliable. Hyperdrive was built to make connecting to centralized databases from the Workers runtime as quick and consistent as possible, and this latest development is designed to help all those who want to use Hyperdrive without directly exposing resources within their virtual private clouds (VPCs) on the public web.

To this end, we chose to build a solution around our suite of industry-leading Zero Trust tools, and were delighted to find how simple it was to implement in our runtime given the power and extensibility of the Rust trait system. 

Without waiting for the ink to dry, multiple teams within Cloudflare have adopted this new feature to quickly and easily solve what have historically been complex challenges, and are happily operating it in production today.

And now, if you haven’t already, try setting up Hyperdrive across a Tunnel, and let us know what you think in the Hyperdrive Discord channel!

Supporting Postgres Named Prepared Statements in Hyperdrive

Post Syndicated from Andrew Repp original https://blog.cloudflare.com/postgres-named-prepared-statements-supported-hyperdrive


Hyperdrive (Cloudflare’s globally distributed SQL connection pooler and cache) recently added support for Postgres protocol-level named prepared statements across pooled connections. Named prepared statements allow Postgres to cache query execution plans, providing potentially substantial performance improvements. Further, many popular drivers in the ecosystem use these by default, meaning that not having them is a bit of a footgun for developers. We are very excited that Hyperdrive’s users will now have access to better performance and a more seamless development experience, without needing to make any significant changes to their applications!

While we’re not the first connection pooler to add this support (PgBouncer got to it in October 2023 in version 1.21, for example), there were some unique challenges in how we implemented it. To that end, we wanted to do a deep dive on what it took for us to deliver this.

Hyper-what?

One of the classic problems of building on the web is that your users are everywhere, but your database tends to be in one spot.  Combine that with pesky limitations like network routing, or the speed of light, and you can often run into situations where your users feel the pain of having your database so far away. This can look like slower queries, slower startup times, and connection exhaustion as everything takes longer to accomplish.

Hyperdrive is designed to make the centralized databases you already have feel like they’re global. We use our global network to get faster routes to your database, keep connection pools primed, and cache your most frequently run queries as close to users as possible.

Postgres Message Protocol

To understand exactly what the challenge with prepared statements is, it’s first necessary to dig in a bit to the Postgres Message Protocol. Specifically, we are going to take a look at the protocol for an “extended” query, which uses different message types and is a bit more complex than a “simple” query, but which is more powerful and thus more widely used.

A query using Hyperdrive might be coded something like this, but a lot goes on under the hood in order for Postgres to reliably return your response.

import postgres from "postgres";

// with Hyperdrive, we don't have to disable prepared statements anymore!
// const sql = postgres(env.HYPERDRIVE.connectionString, {prepare: false});

// make a connection, with the default postgres.js settings (prepare is set to true)
const sql = postgres(env.HYPERDRIVE.connectionString);

// This sends the query, and while it looks like a single action it contains several 
// messages implied within it
let [{ a, b, c, id }] = await sql`SELECT a, b, c, id FROM hyper_test WHERE id = ${target_id}`;

To prepare a statement, a Postgres client begins by sending a Parse message. This includes the query string, the number of parameters to be interpolated, and the statement’s name. The name is a key piece of this puzzle. If it is empty, then Postgres uses a special “unnamed” prepared statement slot that gets overwritten on each new Parse. These are relatively easy to support, as most drivers will keep the entirety of a message sequence for unnamed statements together, and will not try to get too aggressive about reusing the prepared statement because it is overwritten so often.

If the statement has a name, however, then it is kept prepared for the remainder of the Postgres session (unless it is explicitly removed with DEALLOCATE). This is convenient because parsing a query string and preparing the statement costs bytes sent on the wire and CPU cycles to process, so reusing a statement is quite a nice optimization.

Once done with Parse, there are a few remaining steps to (the simplest form of) an extended query:

  • A Bind message, which provides the specific values to be passed for the parameters in the statement (if any).
  • An Execute message, which tells the Postgres server to actually perform the data retrieval and processing.
  • And finally a Sync message, which causes the server to close the implicit transaction, return results, and provides a synchronization point for error handling.

While that is the core pattern for accomplishing an extended protocol query, there are many more complexities possible (named Portal, ErrorResponse, etc.).

We will briefly mention one other complexity we often encounter in this protocol, which is Describe messages. Many drivers leverage Postgres’ built-in types to help with deserialization of the results into structs or classes. This is accomplished by sending a Parse-Describe-Flush/Sync sequence, which will send a statement to be prepared, and will expect back information about the types and data the query will return. This complicates bookkeeping around named prepared statements, as now there are two separate queries, with two separate kinds of responses, that must be kept track of. We won’t go into much depth on the tradeoffs of an additional round-trip in exchange for advanced information about the results’ format, but suffice it to say that it must be handled explicitly in order for the overall system to gracefully support prepared statements.

So the basic query from our code above looks like this from a message perspective:

A more complete description and the full structure of each message type are well described in the Postgres documentation.

So, what’s so hard about that?

Buffering Messages

The first challenge that Hyperdrive must solve (that many other connection poolers don’t have) is that it’s also a cache.

The happiest path for a query on Hyperdrive never travels far, and we are quite proud of the low latency of our cache hits. However, this presents a particular challenge in the case of an extended protocol query. A Parse by itself is insufficient as a cache key, both because the parameter values in the Bind messages can alter the expected results, and because it might be followed up with either a Describe or an Execute message which will invoke drastically different responses.

So Hyperdrive cannot simply pass each message to the origin database, as we must buffer them in a message log until we have enough information to reliably distinguish between cache keys. It turns out that receiving a Sync is quite a natural point at which to check whether you have enough information to serve a response. For most scenarios, we buffer until we receive a Sync, and then (assuming the scenario is cacheable) we determine whether we can serve the response from cache or we need to take a connection to the origin database.

Taking a Connection From the Pool

Assuming we aren’t serving a response from cache, for whatever reason, we’ll need to take an origin connection from our pool. One of the key advantages any connection pooler offers is in allowing many client connections to share few database connections, so minimizing how often and for how long these connections are held is crucial to making Hyperdrive performant.

To this end, Hyperdrive operates in what is traditionally called “transaction mode”. This means that a connection taken from the pool for any given transaction is returned once that transaction concludes. This is in contrast to what is often called “session mode”, where once a connection is taken from the pool it is held by the client until the client disconnects.

For Hyperdrive, allowing any client to take any database connection is vital. This is because if we “pin” a client to a given database connection then we have one fewer available for every other possible client. You can run yourself out of database connections very quickly once you start down that path, especially when your clients are many small Workers spread around the world.

The challenge prepared statements present to this scenario is that they exist at the “session” scope, which is to say, at the scope of one connection. If a client prepares a statement on connection A, but tries to reuse it and gets assigned connection B, Postgres will naturally throw an error claiming the statement doesn’t exist in the given session. No results will be returned, the client is unhappy, and all that’s left is to retry with a Parse message included. This causes extra round-trips between client and server, defeating the whole purpose of what is meant to be an optimization.

One of the goals of a connection pooler is to be as transparent to the client and server as possible. There are limitations, as Postgres will let you do some powerful things to session state that cannot be reasonably shared across arbitrary client connections, but to the extent possible the endpoints should not have to know or care about any multiplexing happening between them.

This means that when a client sends a Parse message on its connection, it should expect that the statement will be available for reuse when it wants to send a Bind-Execute-Sync sequence later on. It also means that the server should not get Bind messages for statements that only exist on some other session. Maintaining this illusion is the crux of providing support for this feature.

Putting it all together

So, what does the solution look like? If a client sends Parse-Bind-Execute-Sync with a named prepared statement, then later sends Bind-Execute-Sync to reuse it, how can we make sure that everything happens as expected? The solution, it turns out, needs just a few built-in Rust data structures for efficiently capturing what we need (a HashMap, some LruCaches and a VecDeque), and some straightforward business logic to keep track of when to intervene in the messages being passed back and forth.

Whenever a named Parse comes in, we store it in an in-memory HashMap on the server that handles message processing for that client’s connection. This persists until the client is disconnected. This means that whenever we see anything referencing the statement, we can go retrieve the complete message defining it. We’ll come back to this in a moment.

Once we’ve buffered all the messages we can and gotten to the point where it’s time to return results (let’s say because the client sent a Sync), we need to start applying some logic. For the sake of brevity we’re going to omit talking through error handling here, as it does add some significant complexity but is somewhat out of scope for this discussion.

There are two main questions that determine how we should proceed:

  1. Does our message sequence include a Parse, or are we trying to reuse a pre-existing statement?
  2. Do we have a cache hit or are we serving from the origin database?

This gives us four scenarios to consider:

  1. Parse with cache hit
  2. Parse with cache miss
  3. Reuse with cache hit
  4. Reuse with cache miss

A Parse with a cache hit is the easiest path to address, as we don’t need to do anything special. We use the messages sent as a cache key, and serve the results back to the client. We will still keep the Parse in our HashMap in case we want it later (#2 below), but otherwise we’re good to go.

A Parse with a cache miss is a bit more complicated, as now we need to send these messages to the origin server. We take a connection at random from our pool and do so, passing the results back to the client. With that, we’ve begun to make changes to session state such that all our database connections are no longer identical to each other. To keep track of what we’ve done to muddy up our state, we keep a LruCache on each connection of which statements it already has prepared. In the case where we need to evict from such a cache, we will also DEALLOCATE the statement on the connection to keep things tracked correctly.

Reuse with a cache hit is yet more tricky, but still straightforward enough. In the example below, we are sent a Bind with the same parameters twice (#1 and #9). We must identify that we received a Bind without a preceding Parse, we must go retrieve that Parse (#10), and we must use the information from it to build our cache key. Once all that is accomplished, we can serve our results from cache, needing only to trim out the ParseComplete within the cached results before returning them to the client.

Reuse with a cache miss is the hardest scenario, as it may require us to lie in both directions. In the example below, we cache results for one set of parameters (#8), but are sent a Bind with different parameters (#9). As in the cache hit scenario, we must identify that we were not sent a Parse as part of the current message sequence, retrieve it from our HashMap (#10), and build our cache key to GET from cache and confirm the miss (#11). Once we take a connection from the pool, though, we then need to check if it already has the statement we want prepared. If not, we must take our saved Parse and prepend it to our message log to be sent along to the origin database (#13). Thus, what the server receives looks like a perfectly valid Parse-Bind-Execute-Sync sequence. This is where our VecDeque (mentioned above) comes in, as converting our message log to that structure allowed us to very ergonomically make such changes without needing to rebuild the whole byte sequence. Once we receive the response from the server, all that’s needed is to trim out the initial ParseComplete response from the server, as a well-made client would likely be very confused receiving such a response to a Parse it didn’t send. With that message trimmed out, however, the client is in the position of getting exactly what it asked for, and both sides of the conversation are happy.

Dénouement

Now that we’ve got a working solution, where all parties are functioning well, let’s review! Our solution lets us share database connections across arbitrary clients with no “pinning”, no custom handling on either client or server, and supports reuse of prepared statements to reduce CPU load on re-parsing queries and reduce network traffic on re-sending Parse messages. Engineering always involves tradeoffs, so the cost of this is that we will sometimes still need to sneak in a Parse because a client got assigned a different connection on reuse, and in those scenarios there is a small amount of additional memory overhead because the same statement is prepared on multiple connections.

And now, if you haven’t already, go give Hyperdrive a spin, and let us know what you think in the Hyperdrive Discord channel!

Let’s Architect! Leveraging SQL databases on AWS

Post Syndicated from Luca Mezzalira original https://aws.amazon.com/blogs/architecture/lets-architect-leveraging-sql-databases-on-aws/

SQL databases in Amazon Web Services (AWS), using services like Amazon Relational Database Service (Amazon RDS) and Amazon Aurora, offer software architects scalability, automated management, robust security, and cost-efficiency. This combination simplifies database management, improves performance, enhances security, and allows architects to create efficient and scalable software systems.

In this post, we introduce caching strategies and continue with real case studies that use services like Amazon ElastiCache or Amazon MemoryDB in real workloads where customers share the reasoning behind their approaches. It’s very important to understand the context for leveraging a specific solution or pattern, and these resources answer many commonly asked questions.

Build scalable multi-tenant databases with Amazon Aurora

For software architects and developers, striking the right balance between operational complexity and cost efficiency is a perpetual challenge. Often, provisioning a separate database for each workload is the gold standard, offering unmatched isolation and granular operational controls. However, it’s not always the most cost-effective or operationally manageable approach. Through a real-world success story, we explore how Aurora played a pivotal role in helping VMware Aria Cost, powered by CloudHealth, consolidate a staggering 166 self-managed MySQL databases onto 62 Aurora clusters.

Take me to this re:Invent 2022 video!

A migration process to move a MySQL database from self-managed to fully managed with Amazon Aurora

A migration process to move a MySQL database from self-managed to fully managed with Amazon Aurora

Amazon RDS Blue/Green Deployments, Optimized Writes & Optimized Reads

Amazon RDS Blue/Green Deployments revolutionizes the way you handle database updates, ensuring safety and simplicity, often achieving rapid updates in just a minute, with zero data loss. Meanwhile, Amazon RDS Optimized Writes turbocharges write transaction throughput by as much as double, without any additional extra cost. Amazon RDS Optimized Reads steps in to deliver a significant boost to database performance, processing queries up to 50% faster.

Discover how to leverage these capabilities of Amazon RDS in this one-hour video from re:Invent 2022.

Take me to this re:Invent 2022 video!

Amazon RDS Blue/Green Deployments in action

Amazon RDS Blue/Green Deployments in action

Designing a DR strategy on Amazon RDS for SQL Server

In the world of mission-critical workloads, the importance of a robust disaster recovery (DR) strategy cannot be overstated. It’s the lifeline that ensures databases stay operational, even in the face of unexpected events. Discover the intricacies of crafting a dependable, cross-Region DR strategy tailored to Amazon RDS for SQL Server.

In this AWS Developers session, we uncover the best practices for efficiently managing and monitoring these cross-Region read replicas. From proactive monitoring to fine-tuning, you’ll gain the insights needed to keep your DR strategy finely tuned.

Take me to this AWS Developers video!

How to design a DR strategy using Amazon RDS

How to design a DR strategy using Amazon RDS

Deep dive into Amazon Aurora and its innovations

Aurora represents a paradigm shift in relational databases, boasting an architecture that decouples computational processes from data storage. It introduces advanced features, such as Global Database and low-latency read replicas, redefining the landscape of database management.

This modern database service excels in performance, scalability, and high availability on a large scale, offering compatibility with both MySQL and PostgreSQL open-source editions. Additionally, it provides an array of developer tools tailored for serverless and machine learning-driven applications.

This re:Invent 2022 session is an in-depth exploration of some of Aurora’s most compelling features, including Aurora Serverless v2 and Global Database. We also share the most recent innovations aimed at enhancing performance, scalability, and security while streamlining operational processes.

Take me to this re:Invent 2022 video!

A glance of one of the features of Amazon Aurora Global Database

A glance of one of the features of Amazon Aurora Global Database

See you next time!

Thanks for joining us today to explore leveraging SQL databases! We’ll see you in two weeks when we talk about batch processing workloads.

To find all the blogs from this series, check out the Let’s Architect! list of content on the AWS Architecture Blog.

Amazon MSK backup for Archival, Replay, or Analytics

Post Syndicated from Rohit Yadav original https://aws.amazon.com/blogs/architecture/amazon-msk-backup-for-archival-replay-or-analytics/

Amazon MSK is a fully managed service that helps you build and run applications that use Apache Kafka to process streaming data. Apache Kafka is an open-source platform for building real-time streaming data pipelines and applications. With Amazon MSK, you can use native Apache Kafka APIs to populate data lakes. You can also stream changes to and from databases, and power machine learning and analytics applications.

Amazon MSK simplifies the setup, scaling, and management of clusters running Apache Kafka. MSK manages the provisioning, configuration, and maintenance of resources for a highly available Kafka clusters. It is fully compatible with Apache Kafka and supports familiar community-build tools such as MirrorMaker 2.0, Kafka Connect and Kafka streams.

Introduction

In the past few years, the volume of data that companies must ingest has increased significantly. Information comes from various sources, like transactional databases, system logs, SaaS platforms, mobile, and IoT devices. Businesses want to act as soon as the data arrives. This has resulted in increased adoption of scalable real-time streaming solutions. These solutions scale horizontally to provide the needed throughput to process data in real time, with milliseconds of latency. Customers have adopted Amazon MSK as a top choice of streaming platforms. Amazon MSK gives you the flexibility to retain topic data for longer term (default 7 days). This supports replay, analytics, and machine learning based use cases. When IT and business systems are producing and processing terabytes of data per hour, it can become expensive to store, manage, and retrieve data. This has led to legacy data archival processes moving towards cheaper, reliable, and long-term storage solutions like Amazon Simple Storage Service (S3).

Following are some of the benefits of archiving Amazon MSK topic data to Amazon S3:

  1. Reduced Cost – You only must retain the data in the cluster based on your Recovery Point Objective (RPO). Any historical data can be archived in Amazon S3 and replayed if necessary.
  2. Integration with Enterprise Data Lake – Since your data is available in S3, you can now integrate with other data analytics services like Amazon EMR, AWS Glue, Amazon Athena, to run data aggregation and analytics. For example, you can build reports to visualize month over month changes.
  3. Optimize Machine Learning Workloads – Machine learning applications will be able to train new models and improve predictions using historical streams of data available in Amazon S3. This also enables better integration with Amazon Machine Learning services.
  4. Compliance – Long-term data archival for regulatory and security compliance.
  5. Backloading data to other systems – Ability to rebuild data into other application environments such as pre-prod, testing, and more.

There are many benefits to using Amazon S3 as long-term storage for Amazon MSK topics. Let’s dive deeper into the recommended architecture for this pattern. We will present an architecture to back up Amazon MSK topics to Amazon S3 in real time. In addition, we’ll demonstrate some of the use cases previously mentioned.

Architecture

The diagram following illustrates the architecture for building a real-time archival pipeline to archive Amazon MSK topics to S3. This architecture uses an AWS Lambda function to process records from your Amazon MSK cluster when the cluster is configured as an event source. As a consumer, you don’t need to worry about infrastructure management or scaling with Lambda. You only pay for what you consume, so you don’t pay for over-provisioned infrastructure.

To create an event source mapping, you can add your Amazon MSK cluster in a Lambda function trigger. The Lambda service internally polls for new records or messages from the event source, and then synchronously invokes the target Lambda function. Lambda reads the messages in batches from one or more partitions and provides these to your function as an event payload. The function then processes records, and sends the payload to an Amazon Kinesis Data Firehose delivery stream. We use Kinesis Data Firehose delivery stream because it can natively batch, compress, transform, and encrypt your events before loading to S3.

In this architecture, Kinesis Data Firehose delivers the records received from Lambda in Gzip file to Amazon S3. These files are partitioned in hive style format by Kinesis Data Firehose:

data/year = yyyy/month = MM/day = dd/hour = HH

Figure 1. Archival Architecture

Figure 1. Archival Architecture

Let’s review some of the possible solutions that can be built on this archived data.

Integration with Enterprise Data Lake

The architecture diagram following shows how you can integrate the archived data in Amazon S3 with your Enterprise Data Lake. Since the data files are prefixed in hive style format, you can partition and store the Data Catalog in AWS Glue. With partitioning in place, you can perform optimizations like partition pruning, which enables predicate pushdown for improved performance of your analytics queries. You can also use AWS Data Analytics services like Amazon EMR and AWS Glue for batch analytics. Amazon Athena can be used to run serverless SQL-like interactive queries on visualization and data.

Data currently gets stored in JSON files. Following are some of the services/tools that can be integrated with your archive for reporting, analytics, visualization, and machine learning requirements.

Figure 2. Analytics Architecture

Figure 2. Analytics Architecture

Cloning data into other application environments

There are use cases where you would want to use this data to clone other application environments using this archive.

These clusters could be used for testing or debugging purposes. You could decide to use only a subset of your data from the archive. Let’s say you want to debug an issue beyond the configured retention period, but not replicate all the data to your testing environment. With archived data in S3, you can build downstream jobs to filter data that can be loaded into a new Amazon MSK cluster. The following diagram highlights this pattern:

Figure 3. Replay Architecture

Figure 3. Replay Architecture

Ready for a Test Drive

To help you get started, we would like to introduce an AWS Solution: AWS Streaming Data Solution for Amazon MSK (scroll down and see Option 3 tab). There is a single-click AWS CloudFormation template, which can assist you in quickly provisioning resources. This will get your real-time archival pipeline for Amazon MSK up and running quickly. This solution shortens your development time by removing or reducing the need for you to:

  • Model and provision resources using AWS CloudFormation
  • Set up Amazon CloudWatch alarms, dashboards, and logging
  • Manually implement streaming data best practices in AWS

This solution is data and logic agnostic, enabling you to start with boilerplate code and start customizing quickly. After deployment, use this solution’s monitoring capabilities to transition easily to production.

Conclusion

In this post, we explained the architecture to build a scalable, highly available real-time archival of Amazon MSK topics to long term storage in Amazon S3. The architecture was built using Amazon MSK, AWS Lambda, Amazon Kinesis Data Firehose, and Amazon S3. The architecture also illustrates how you can integrate your Amazon MSK streaming data in S3 with your Enterprise Data Lake.

Summarize devices that are not reachable

Post Syndicated from Aigars Kadiķis original https://blog.zabbix.com/summarize-devices-that-are-not-reachable/13219/

In this lab, we will list all devices which are not reachable by a monitoring tool. This is good when we want to improve the overall monitoring experience and decrease the size queue (metrics which has not been arrived at the instance).

Tools required for the job: Access to a database server or a Windows computer with PowerShell

To summarize devices that are not reachable at the moment we can use a database query. Tested and works on 4.0, 5.0, on MySQL and PostgreSQL:

SELECT hosts.host,
       interface.ip,
       interface.dns,
       interface.useip,
       CASE interface.type
           WHEN 1 THEN 'ZBX'
           WHEN 2 THEN 'SNMP'
           WHEN 3 THEN 'IPMI'
           WHEN 4 THEN 'JMX'
       END AS "type",
       hosts.error
FROM hosts
JOIN interface ON interface.hostid=hosts.hostid
WHERE hosts.available=2
  AND interface.main=1
  AND hosts.status=0;

A very similar (but not exactly the same) outcome can be obtained via Windows PowerShell by contacting Zabbix API. Try this snippet:

$headers = New-Object "System.Collections.Generic.Dictionary[[String],[String]]"
$headers.Add("Content-Type", "application/json")
$url = 'http://192.168.1.101/api_jsonrpc.php'
$user = 'api'
$password = 'zabbix'

# authorization
$key = Invoke-RestMethod $url -Method 'POST' -Headers $headers -Body "
{
    `"jsonrpc`": `"2.0`",
    `"method`": `"user.login`",
    `"params`": {
        `"user`": `"$user`",
        `"password`": `"$password`"
    },
    `"id`": 1
}
" | foreach { $_.result }
echo $key

# filter out unreachable Agent, SNMP, JMX, IPMI hosts
Invoke-RestMethod $url -Method 'POST' -Headers $headers -Body "
{
    `"jsonrpc`": `"2.0`",
    `"method`": `"host.get`",
    `"params`": {
        `"output`": [`"interfaces`",`"host`",`"proxy_hostid`",`"disable_until`",`"lastaccess`",`"errors_from`",`"error`"],
        `"selectInterfaces`": `"extend`",
        `"filter`": {`"available`": `"2`",`"status`":`"0`"}
    },
    `"auth`": `"$key`",
    `"id`": 1
}
" | foreach { $_.result }  | foreach { $_.interfaces } | Out-GridView

# log out
Invoke-RestMethod $url -Method 'POST' -Headers $headers -Body "
{
    `"jsonrpc`": `"2.0`",
    `"method`": `"user.logout`",
    `"params`": [],
    `"id`": 1,
    `"auth`": `"$key`"
}
"

Set a valid credential (URL, username, password) on the top of the code before executing it.

The benefit of PowerShell here is that we can use some on-the-fly filtering:

What is the exact meaning of the field ‘type’ we can understand by looking on the previous database query:

       CASE interface.type
           WHEN 1 THEN 'ZBX'
           WHEN 2 THEN 'SNMP'
           WHEN 3 THEN 'IPMI'
           WHEN 4 THEN 'JMX'
       END AS "type",

On Windows PowerShell, it is possible to download the unreachable hosts directly to CSV file. To do that, in the code above, we need to change:

Out-GridView

to

Export-Csv c:\temp\unavailable.hosts.csv

Alright, this was the knowledge bit today. Let’s keep Zabbixing!