All posts by Catarina Pires Mota

DO it again: how we used Durable Objects to add WebSockets support and authentication to AI Gateway

Post Syndicated from Catarina Pires Mota original https://blog.cloudflare.com/do-it-again

In October 2024, we talked about storing billions of logs from your AI application using AI Gateway, and how we used Cloudflare’s Developer Platform to do this. 

With AI Gateway already processing over 3 billion logs and experiencing rapid growth, the number of connections to the platform continues to increase steadily. To help developers manage this scale more effectively, we wanted to offer an alternative to implementing HTTP/2 keep-alive to maintain persistent HTTP(S) connections, thereby avoiding the overhead of repeated handshakes and TLS negotiations with each new HTTP connection to AI Gateway. We understand that implementing HTTP/2 can present challenges, particularly when many libraries and tools may not support it by default and most modern programming languages have well-established WebSocket libraries available.

With this in mind, we used Cloudflare’s Developer Platform and Durable Objects (yes, again!) to build a WebSockets API that establishes a single, persistent connection, enabling continuous communication. 

Through this API, all AI providers supported by AI Gateway can be accessed via WebSocket, allowing you to maintain a single TCP connection between your client or server application and the AI Gateway. The best part? Even if your chosen provider doesn’t support WebSockets, we handle it for you, managing the requests to your preferred AI provider.


By connecting via WebSocket to AI Gateway, we make the requests to the inference service for you using the provider’s supported protocols (HTTPS, WebSocket, etc.), and you can keep the connection open to execute as many inference requests as you would like. 

To make your connection to AI Gateway more secure, we are also introducing authentication for AI Gateway. The new WebSockets API will require authentication. All you need to do is create a Cloudflare API token with the permission “AI Gateway: Run” and send that in the cf-aig-authorization header.


In the flow diagram above:

1️⃣ When Authenticated Gateway is enabled and a valid token is included, requests will pass successfully.

2️⃣ If Authenticated Gateway is enabled, but a request does not contain the required cf-aig-authorization header with a valid token, the request will fail. This ensures only verified requests pass through the gateway. 

3️⃣ When Authenticated Gateway is disabled, the cf-aig-authorization header is bypassed entirely, and any token — whether valid or invalid — is ignored.

How we built it

We recently used Durable Objects (DOs) to scale our logging solution for AI Gateway, so using WebSockets within the same DOs was a natural fit.

When a new WebSocket connection is received by our Cloudflare Workers, we implement authentication in two ways to support the diverse capabilities of WebSocket clients. The primary method involves validating a Cloudflare API token through the cf-aig-authorization header, ensuring the token is valid for the connecting account and gateway. 

However, due to limitations in browser WebSocket implementations, we also support authentication via the “sec-websocket-protocol” header. Browser WebSocket clients don’t allow for custom headers in their standard API, complicating the addition of authentication tokens in requests. While we don’t recommend that you store API keys in a browser, we decided to add this method to add more flexibility to all WebSocket clients.

// Built-in WebSocket client in browsers
const socket = new WebSocket("wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/", [
   "cf-aig-authorization.${AI_GATEWAY_TOKEN}"
]);


// ws npm package
import WebSocket from "ws";
const ws = new WebSocket("wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/",{
   headers: {
       "cf-aig-authorization": "Bearer AI_GATEWAY_TOKEN",
   },
});

After this initial verification step, we upgrade the connection to the Durable Object, meaning that it will now handle all the messages for the connection. Before the new connection is fully accepted, we generate a random UUID, so this connection is identifiable among all the messages received by the Durable Object. During an open connection, any AI Gateway settings passed via headers — such as cf-aig-skip-cache (which bypasses caching when set to true) — are stored and applied to all requests in the session. However, these headers can still be overridden on a per-request basis, just like with the Universal Endpoint today.

How it works

Once the connection is established, the Durable Object begins listening for incoming messages. From this point on, users can send messages in the AI Gateway universal format via WebSocket, simplifying the transition of your application from an existing HTTP setup to WebSockets-based communication.

import WebSocket from "ws";
const ws = new WebSocket("wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/",{
   headers: {
       "cf-aig-authorization": "Bearer AI_GATEWAY_TOKEN",
   },
});

ws.send(JSON.stringify({
   type: "universal.create",
   request: {
      "eventId": "my-request",
      "provider": "workers-ai",
      "endpoint": "@cf/meta/llama-3.1-8b-instruct",
      "headers": {
         "Authorization": "Bearer WORKERS_AI_TOKEN",
         "Content-Type": "application/json"
      },
      "query": {
         "prompt": "tell me a joke"
      }
   }
}));

ws.on("message", function incoming(message) {
   console.log(message.toString())
});

When a new message reaches the Durable Object, it’s processed using the same code that powers the HTTP Universal Endpoint, enabling seamless code reuse across Workers and Durable Objects — one of the key benefits of building on Cloudflare.

For non-streaming requests, the response is wrapped in a JSON envelope, allowing us to include additional information beyond the AI inference itself, such as the AI Gateway log ID for that request.

Here’s an example response for the request above:

{
  "type":"universal.created",
  "metadata":{
     "cacheStatus":"MISS",
     "eventId":"my-request",
     "logId":"01JC3R94FRD97JBCBX3S0ZAXKW",
     "step":"0",
     "contentType":"application/json"
  },
  "response":{
     "result":{
        "response":"Why was the math book sad? Because it had too many problems. Would you like to hear another one?"
     },
     "success":true,
     "errors":[],
     "messages":[]
  }
}

For streaming requests, AI Gateway sends an initial message with request metadata telling the developer the stream is starting.

{
  "type":"universal.created",
  "metadata":{
     "cacheStatus":"MISS",
     "eventId":"my-request",
     "logId":"01JC40RB3NGBE5XFRZGBN07572",
     "step":"0",
     "contentType":"text/event-stream"
  }
}

After this initial message, all streaming chunks are relayed in real-time to the WebSocket connection as they arrive from the inference provider. Note that only the eventId field is included in the metadata for these streaming chunks (more info on what this new field is below).

{
  "type":"universal.stream",
  "metadata":{
     "eventId":"my-request",
  }
  "response":{
     "response":"would"
  }
}

This approach serves two purposes: first, all request metadata is already provided in the initial message. Second, it addresses the concurrency challenge of handling multiple streaming requests simultaneously.

Handling asynchronous events

With WebSocket connections, client and server can send messages asynchronously at any time. This means the client doesn’t need to wait for a server response before sending another message. But what happens if a client sends multiple streaming inference requests immediately after the WebSocket connection opens?

In this case, the server streams all the inference responses simultaneously to the client. Since everything occurs asynchronously, the client has no built-in way to identify which response corresponds to each request.

To address this, we introduced a new field in the Universal format called eventId, which allows AI Gateway to include a client-defined ID with each message, even in a streaming WebSocket environment.

So, to fully answer the question above: the server streams both responses in parallel chunks, and the client can accurately identify which request each message belongs to based on the eventId.

Once all chunks for a request have been streamed, AI Gateway sends a final message to signal the request’s completion. For added flexibility, this message includes all the metadata again, even though it was also provided at the start of the streaming process.

{
  "type":"universal.done",
  "metadata":{
     "cacheStatus":"MISS",
     "eventId":"my-request",
     "logId":"01JC40RB3NGBE5XFRZGBN07572",
     "step":"0",
     "contentType":"text/event-stream"
  }
}

Try it out today

AI Gateway’s real-time Websocket API is now in beta and open to everyone!

To try it out, copy your gateway universal endpoint URL, and replace the “https://” with “wss://”, like this:

wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/


Then open a WebSocket connection using your Universal Endpoint, and guarantee that it is authenticated with a Cloudflare token with the AI Gateway Run permission.

Here’s an example code using the ws npm package:

import WebSocket from "ws";
const ws = new WebSocket("wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/", {
   headers: {
       "cf-aig-authorization": "Bearer AI_GATEWAY_TOKEN",
   },
});

ws.on("open", function open() {
   console.log("Connected to server.");
   ws.send(JSON.stringify({
      type: "universal.create",
      request: {
         "provider": "workers-ai",
         "endpoint": "@cf/meta/llama-3.1-8b-instruct",
         "headers": {
            "Authorization": "Bearer WORKERS_AI_TOKEN",
            "Content-Type": "application/json"
         },
         "query": {
            "stream": true,
            "prompt": "tell me a joke"
         }
      }
   }));
});


ws.on("message", function incoming(message) {
   console.log(message.toString())
});

Here’s an example code using the built-in browser WebSocket client:

const socket = new WebSocket("wss://gateway.ai.cloudflare.com/v1/my-account-id/my-gateway/", [
   "cf-aig-authorization.${AI_GATEWAY_TOKEN}"
]);

socket.addEventListener("open", (event) => {
  console.log("Connected to server.");
   socket.send(JSON.stringify({
      type: "universal.create",
      request: {
         "provider": "workers-ai",
         "endpoint": "@cf/meta/llama-3.1-8b-instruct",
         "headers": {
            "Authorization": "Bearer WORKERS_AI_TOKEN",
            "Content-Type": "application/json"
         },
         "query": {
            "stream": true,
            "prompt": "tell me a joke"
         }
      }
   }));
});

socket.addEventListener("message", (event) => {
  console.log(event.data);
});

And we will DO it again

In Q1 2025, we plan to support WebSocket-to-WebSocket connections (using DOs), allowing you to connect to OpenAI’s new real-time API directly through our platform. In the meantime, you can deploy this Worker in your account to proxy the requests yourself.

If you have any questions, reach out on our Discord channel. We’re also hiring for AI Gateway, check out Cloudflare Jobs in Lisbon!

Billions and billions (of logs): scaling AI Gateway with the Cloudflare Developer Platform

Post Syndicated from Catarina Pires Mota original https://blog.cloudflare.com/billions-and-billions-of-logs-scaling-ai-gateway-with-the-cloudflare

With the rapid advancements occurring in the AI space, developers face significant challenges in keeping up with the ever-changing landscape. New models and providers are continuously emerging, and understandably, developers want to experiment and test these options to find the best fit for their use cases. This creates the need for a streamlined approach to managing multiple models and providers, as well as a centralized platform to efficiently monitor usage, implement controls, and gather data for optimization.

AI Gateway is specifically designed to address these pain points. Since its launch in September 2023, AI Gateway has empowered developers and organizations by successfully proxying over 2 billion requests in just one year, as we highlighted during September’s Birthday Week. With AI Gateway, developers can easily store, analyze, and optimize their AI inference requests and responses in real time.

With our initial architecture, AI Gateway faced a significant challenge: the logs, those critical trails of data interactions between applications and AI models, could only be retained for 30 minutes. This limitation was not just a minor inconvenience; it posed a substantial barrier for developers and businesses needing to analyze long-term patterns, ensure compliance, or simply debug over more extended periods.

In this post, we’ll explore the technical challenges and strategic decisions behind extending our log storage capabilities from 30 minutes to being able to store billions of logs indefinitely. We’ll discuss the challenges of scale, the intricacies of data management, and how we’ve engineered a system that not only meets the demands of today, but is also scalable for the future of AI development.

Background

AI Gateway is built on Cloudflare Workers, a serverless platform that runs on the Cloudflare network, allowing developers to write small JavaScript functions that can execute at the point of need, near the user, on Cloudflare’s vast network of data centers, without worrying about platform scalability.


Our customers use multiple providers and models and are always looking to optimize the way they do inference. And, of course, in order to evaluate their prompts, performance, cost, and to troubleshoot what’s going on, AI Gateway’s customers need to store requests and responses. New requests show up within 15 seconds and customers can check a request’s cost, duration, number of tokens, and provide their feedback (thumbs up or down).


This scales in a way where an account can have multiple gateways and each gateway has its own settings. In our first implementation, a backend worker was responsible for storing Real Time Logs and other background tasks. However, in the rapidly evolving domain of artificial intelligence, where real-time data is as precious as the insights it provides, managing log data efficiently becomes paramount. We recognized that to truly empower our users, we needed to offer a solution where logs weren’t just transient records but could be stored permanently. Permanent log storage means developers can now track the performance, security, and operational insights of their AI applications over time, enabling not only immediate troubleshooting but also longitudinal studies of AI behavior, usage trends, and system health.


The diagram above describes our old architecture, which could only store 30 minutes of data.

Tracing the path of a request through the AI Gateway, as depicted in the sequence above:

  1. A developer sends a new inference request, which is first received by our Gateway Worker.

  2. The Gateway Worker then performs several checks: it looks for cached results, enforces rate limits, and verifies any other configurations set by the user for their gateway. Provided all conditions are met, it forwards the request to the selected inference provider (in this diagram, OpenAI).

  3. The inference provider processes the request and sends back the response.

  4. Simultaneously, as the response is relayed back to the developer, the request and response details are also dispatched to our Backend Worker. This worker’s role is to manage and store the log of this transaction.

The challenge: Store two billion logs

First step: real-time logs

Initially, the AI Gateway project stored both request metadata and the actual request bodies in a D1 database. This approach facilitated rapid development in the project’s infancy. However, as customer engagement grew, the D1 database began to fill at an accelerating rate, eventually retaining logs for only 30 minutes at a time.

To mitigate this, we first optimized the database schema, which extended the log retention to one hour. However, we soon encountered diminishing returns due to the sheer volume of byte data from the request bodies. Post-launch, it became clear that a more scalable solution was necessary. We decided to migrate the request bodies to R2 storage, significantly alleviating the data load on D1. This adjustment allowed us to incrementally extend log retention to 24 hours.

Consequently, D1 functioned primarily as a log index, enabling users to search and filter logs efficiently. When users needed to view details or download a log, these actions were seamlessly proxied through to R2.

This dual-system approach provided us with the breathing room to contemplate and develop more sophisticated storage solutions for the future.

Second step: persistent logs and Durable Object transactional storage

As our traffic surged, we encountered a growing number of requests from customers wanting to access and compare older logs.

Upon learning that the Durable Objects team was seeking beta testers for their new Durable Objects with SQLite, we eagerly signed up.

Originally, we considered Durable Objects as the ideal solution for expanding our log storage capacity, which required us to shard the logs by a unique string. Initially, this string was the account ID, but during a mid-development load test, we hit a cap at 10 million logs per Durable Object. This limitation meant that each account could only support up to this number of logs.

Given our commitment to the DO migration, we saw an opportunity rather than a constraint. To overcome the 10 million log limit per account, we refined our approach to shard by both account ID and gateway name. This adjustment effectively raised the storage ceiling from 10 million logs per account to 10 million per gateway. With the default setting allowing each account up to 10 gateways, the potential storage for each account skyrocketed to 100 million logs.

This strategic pivot not only enabled us to store a significantly larger number of logs. But also enhanced our flexibility in gateway management. Now, when a gateway is deleted, we can simply remove the corresponding Durable Object.

Additionally, this sharding method isolates high-volume request scenarios. If one customer’s heavy usage slows down log insertion, it only impacts their specific Durable Object, thereby preserving performance for other customers.


Taking a glance at the revised architecture diagram, we replaced the Backend Worker with our newly integrated Durable Object. The rest of the request flow remains unchanged, including the concurrent response to the user and the interaction with the Durable Object, which occurs in the fourth step.

Leveraging Cloudflare’s network, our Gateway Worker operates near the user’s location, which in turn positions the user’s Durable Object close by. This proximity significantly enhances the speed of log insertion and query operations.

Third step: managing thousands of Durable Objects

As the number of users and requests on AI Gateway grows, managing each unique Durable Object (DO) becomes increasingly complex. New customers join continuously, and we needed an efficient method to track each DO, ensure users stay within their 10 gateway limit, and manage the storage capacity for free users.

To address these challenges, we introduced another layer of control with a new Durable Object we’ve named the Account Manager. The primary function of the Account Manager is straightforward yet crucial: it keeps user activities in check.

Here’s how it works: before any Gateway commits a new log to permanent storage, it consults the Account Manager. This check determines whether the gateway is allowed to insert the log based on the user’s current usage and entitlements. The Account Manager uses its own SQLite database to verify the total number of rows a user has and their service level. If all checks pass, it signals the Gateway that the log can be inserted. It was paramount to guarantee that this entire validation process occurred in the background, ensuring that the user experience remains seamless and uninterrupted.

The Account Manager stays updated by periodically receiving data from each Gateway’s Durable Object. Specifically, after every 1000 inference requests, the Gateway sends an update on its total rows to the Account Manager, which then updates its local records. This system ensures that the Account Manager has the most current data when making its decisions.

Additionally, the Account Manager is responsible for monitoring customer entitlements. It tracks whether an account is on a free or paid plan, how many gateways a user is permitted to create, and the log storage capacity allocated to each gateway. 

Through these mechanisms, the Account Manager not only helps in maintaining system integrity but also ensures fair usage across all users of AI Gateway.

AI evaluations and Durable Objects sharding

As we continue to develop evaluations to fully automatic and, in the future, use Large Language Models (LLMs),  we are now taking the first step towards this goal and launching the open beta phase of comprehensive AI evaluations, centered on Human-in-the-Loop feedback.

This feature empowers users to create bespoke datasets from their application logs, thereby enabling them to score and evaluate the performance, speed, and cost-effectiveness of their models, with a primary focus on LLMs and automated scoring, analyzing the performance of LLMs, providing developers with objective, data-driven insights to refine their models.

To do this, developers require a reliable logging mechanism that persists logs from multiple gateways, storing up to 100 million logs in total (10 million logs per gateway, across 10 gateways). This represents a significant volume of data, as each request made through the AI Gateway generates a log entry, with some log entries potentially exceeding 50 MB in size.

This necessity leads us to work on the expansion of log storage capabilities. Since log storage is limited to 10 million logs per gateway, in future iterations, we aim to scale this capacity by implementing sharded Durable Objects (DO), allowing multiple Durable Objects per gateway to handle and store logs. This scaling strategy will enable us to store significantly larger volumes of logs, providing richer data for evaluations (using LLMs as a judge or from user input), all through AI Gateway.


Coming Soon

We are working on improving our existing Universal Endpoint, the next step on an enhanced solution that builds on existing fallback mechanisms to offer greater resilience, flexibility, and intelligence in request management.

Currently, when a provider encounters an error or is unavailable, our system falls back to an alternative provider to ensure continuity. The improved Universal Endpoint takes this a step further by introducing automatic retry capabilities, allowing failed requests to be reattempted before fallback is triggered. This significantly improves reliability by handling transient errors and increasing the likelihood of successful request fulfillment. It will look something like this:

curl --location 'https://aig.example.com/' \
--header 'CF-AIG-TOKEN: Bearer XXXX' \
--header 'Content-Type: application/json' \
--data-raw '[
    {
        "id": "0001",
        "provider": "openai",
        "endpoint": "chat/completions",
        "headers": {
            "Authorization": "Bearer XXXX",
            "Content-Type": "application/json"
        },
        "query": {
            "model": "gpt-3.5-turbo",
            "messages": [
                {
                    "role": "user",
                    "content": "generate a prompt to create cloudflare random images"
                }
            ]
        },
        "option": {
            "retry": 2,
            "delay": 200,
            "onComplete": {
                "provider": "workers-ai",
                "endpoint": "@cf/stabilityai/stable-diffusion-xl-base-1.0",
                "headers": {
                    "Authorization": "Bearer A5UFQkHewHF1-sA3hTVQFaPxRuu5wmS0eJcCS_MC",
                    "Content-Type": "application/json"
                },
                "query": {
                    "messages": [
                        {
                            "role": "user",
                            "content": "<prompt-response id='\''0001'\'' />"
                        }
                    ]
                }
            }
        }
    },
    {
        "provider": "workers-ai",
        "endpoint": "@cf/stabilityai/stable-diffusion-xl-base-1.0",
        "headers": {
            "Authorization": "Bearer XXXXXX",
            "Content-Type": "application/json"
        },
        "query": {
            "messages": [
                {
                    "role": "user",
                    "content": "create a image of a missing cat"
                }
            ]
        }
    }
]'

The request to the improved Universal Endpoint system demonstrates how it handles multiple providers with integrated retry mechanisms and fallback logic. In this example, the first request is sent to a provider like OpenAI, asking it to generate a text-to-image prompt. The “retry” option ensures that transient issues don’t result in immediate failure.

The system’s ability to seamlessly switch between providers while applying retry strategies ensures higher reliability and robustness in managing requests. By leveraging fallback logic, the Improved Universal Endpoint can dynamically adapt to provider failures, ensuring that tasks are completed successfully even in complex, multi-step workflows.

In addition to retry logic, we will have the ability to inspect requests and responses and make dynamic decisions based on the content of the result. This enables developers to create conditional workflows where the system can adapt its behavior depending on the nature of the response, creating a highly flexible and intelligent decision-making process.

If you haven’t yet used AI Gateway, check out our developer documentation on how to get started. If you have any questions, reach out on our Discord channel.