Tag Archives: Producer

Cloudflare Queues: messages at your speed with consumer concurrency and explicit acknowledgement

Post Syndicated from Charles Burnett original http://blog.cloudflare.com/messages-at-your-speed-with-concurrency-and-explicit-acknowledgement/

Cloudflare Queues: messages at your speed with consumer concurrency and explicit acknowledgement

Cloudflare Queues: messages at your speed with consumer concurrency and explicit acknowledgement

Communicating between systems can be a balancing act that has a major impact on your business. APIs have limits, billing frequently depends on usage, and end-users are always looking for more speed in the services they use. With so many conflicting considerations, it can feel like a challenge to get it just right. Cloudflare Queues is a tool to make this balancing act simple. With our latest features like consumer concurrency and explicit acknowledgment, it’s easier than ever for developers to focus on writing great code, rather than worrying about the fees and rate limits of the systems they work with.

Queues is a messaging service, enabling developers to send and receive messages across systems asynchronously with guaranteed delivery. It integrates directly with Cloudflare Workers, making for easy message production and consumption working with the many products and services we offer.

What’s new in Queues?

Consumer concurrency

Oftentimes, the systems we pull data from can produce information faster than other systems can consume them. This can occur when consumption involves processing information, storing it, or sending and receiving information to a third party system. The result of which is that sometimes, a queue can fall behind where it should be. At Cloudflare, a queue shouldn't be a quagmire. That’s why we’ve introduced Consumer Concurrency.

With Concurrency, we automatically scale up the amount of consumers needed to match the speed of information coming into any given queue. In this way, customers no longer have to worry about an ever-growing backlog of information bogging down their system.

How it works

When setting up a queue, developers can set a Cloudflare Workers script as a target to send messages to. With concurrency enabled, Cloudflare will invoke multiple instances of the selected Worker script to keep the messages in the queue moving effectively. This feature is enabled by default for every queue and set to automatically scale.

Autoscaling considers the following factors when spinning up consumers:  the number of messages in a queue, the rate of new messages, and successful vs. unsuccessful consumption attempts.

If a queue has enough messages in it, concurrency will increase each time a message batch is successfully processed. Concurrency is decreased when message batches encounter errors. Customers can set a max_concurrency value in the Dashboard or via Wrangler, which caps out how many consumers can be automatically created to perform processing for a given queue.

Setting the max_concurrency value manually can be helpful in the following situations where producer data is provided in bursts, the datasource API is rate limited, and datasource API has higher costs with more usage.

Setting a max concurrency value manually allows customers to optimize their workflows for other factors beyond speed.

// in your wrangler.toml file


[[queues.consumers]]
  queue = "my-queue"

//max concurrency can be set to a number between 1 and 10
//this defines the total amount of consumers running simultaneously

max_concurrency = 7

To learn more about concurrency you can check out our developer documentation here.

Concurrency in practice

It’s baseball season in the US, and for many of us that means fantasy baseball is back! This year is the year we finally write a program that uses data and statistics to pick a winning team, as opposed to picking players based on “feelings” and “vibes”. We’re engineers after all, and baseball is a game of rules. If the Oakland A’s can do it, so can we!

So how do we put this together? We’ll need a few things:

  1. A list of potential players
  2. An API to pull historical game statistics from
  3. A queue to send this data to its consumer
  4. A Worker script to crunch the numbers and generate a score

A developer can pull from a baseball reference API into a Workers script, and from that worker pass this information to a queue. Historical data is… historical, so we can pull data into our queue as fast as the baseball API will allow us. For our list of potential players, we pull statistics for each game they’ve played. This includes everything from batting averages, to balls caught, to game day weather. Score!

//get data from a third party API and pass it along to a queue


const response = await fetch("http://example.com/baseball-stats.json");
const gamesPlayedJSON = await response.json();

for (game in gamesPlayedJSON){
//send JSON to your queue defined in your workers environment
env.baseballqueue.send(jsonData)
}

Our producer Workers script then passes these statistics onto the queue. As each game contains quite a bit of data, this results in hundreds of thousands of “game data” messages waiting to be processed in our queue. Without concurrency, we would have to wait for each batch of messages to be processed one at a time, taking minutes if not longer. But, with Consumer Concurrency enabled, we watch as multiple instances of our worker script invoked to process this information in no time!

Our Worker script would then take these statistics, apply a heuristic, and store the player name and a corresponding quality score into a database like a Workers KV store for easy access by your application presenting the data.

Explicit Acknowledgment

In Queues previously, a failure of a single message in a batch would result in the whole batch being resent to the consumer to be reprocessed. This resulted in extra cycles being spent on messages that were processed successfully, in addition to the failed message attempt. This hurts both customers and developers, slowing processing time, increasing complexity, and increasing costs.

With Explicit Acknowledgment, we give developers the precision and flexibility to handle each message individually in their consumer, negating the need to reprocess entire batches of messages. Developers can now tell their queue whether their consumer has properly processed each message, or alternatively if a specific message has failed and needs to be retried.

An acknowledgment of a message means that that message will not be retried if the batch fails. Only messages that were not acknowledged will be retried. Inversely, a message that is explicitly retried, will be sent again from the queue to be reprocessed without impacting the processing of the rest of the messages currently being processed.

How it works

In your consumer, there are 4 new methods you can call to explicitly acknowledge a given message: .ack(), .retry(), .ackAll(), .retryAll().

Both ack() and retry() can be called on individual messages. ack() tells a queue that the message has been processed successfully and that it can be deleted from the queue, whereas retry() tells the queue that this message should be put back on the queue and delivered in another batch.

async queue(batch, env, ctx) {
    for (const msg of batch.messages) {
	try {
//send our data to a 3rd party for processing
await fetch('https://thirdpartyAPI.example.com/stats', {
	method: 'POST',
	body: msg, 
	headers: {
		'Content-type': 'application/json'
}
});
//acknowledge if successful
msg.ack();
// We don't have to re-process this if subsequent messages fail!
}
catch (error) {
	//send message back to queue for a retry if there's an error
      msg.retry();
		console.log("Error processing", msg, error);
}
    }
  }

ackAll() and retryAll() work similarly, but act on the entire batch of messages instead of individual messages.

For more details check out our developer documentation here.

Explicit Acknowledgment in practice

In the course of making our Fantasy Baseball team picker, we notice that data isn’t always sent correctly from the baseball reference API. This results in data not being correctly parsed and rejected from our player heuristics.

Without Explicit Acknowledgment, the entire batch of baseball statistics would need to be retried. Thankfully, we can use Explicit Acknowledgment to avoid that, and tell our queue which messages were parsed successfully and which were not.

import heuristic from "baseball-heuristic";
export default {
  async queue(batch: MessageBatch, env: Env, ctx: ExecutionContext) {
    for (const msg of batch.messages) {
      try {
        // Calculate the score based on the game stats
        heuristic.generateScore(msg)
        // Explicitly acknowledge results 
        msg.ack()
      } catch (err) {
        console.log(err)
        // Retry just this message
        msg.retry()
      } 
    }
  },
};

Higher throughput

Under the hood, we’ve been working on improvements to further increase the amount of messages per second each queue can handle. In the last few months, that number has quadrupled, improving from 100 to over 400 messages per second.

Scalability can be an essential factor when deciding which services to use to power your application. You want a service that can grow with your business. We are always aiming to improve our message throughput and hope to see this number quadruple again over the next year. We want to grow with you.

What’s next?

As our service grows, we want to provide our customers with more ways to interact with our service beyond the traditional Cloudflare Workers workflow. We know our customers’ infrastructure is often complex, spanning across multiple services. With that in mind, our focus will be on enabling easy connection to services both within the Cloudflare ecosystem and beyond.

R2 as a consumer

Today, the only type of consumer you can configure for a queue is a Workers script. While Workers are incredibly powerful, we want to take it a step further and give customers a chance to write directly to other services, starting with R2. Coming soon, customers will be able to select an R2 bucket in the Cloudflare Dashboard for a Queue to write to directly, no code required. This will save valuable developer time by avoiding the initial setup in a Workers script, and any maintenance that is required as services evolve. With R2 as a first party consumer in Queues, customers can simply select their bucket, and let Cloudflare handle the rest.

HTTP pull

We're also working to allow you to consume messages from existing infrastructure you might have outside of Cloudflare. Cloudflare Queues will provide an HTTP API for each queue from which any consumer can pull batches of messages for processing. Customers simply make a request to the API endpoint for their queue, receive data they requested, then send an acknowledgment that they have received the data, so the queue can continue working on the next batch.

Always working to be faster

For the Queues team, speed is always our focus, as we understand our customers don't want bottlenecks in the performance of their applications. With this in mind the team will be continuing to look for ways to increase the velocity through which developers can build best in class applications on our developer platform. Whether it's reducing message processing time, the amount of code you need to manage, or giving developers control over their application pipeline, we will continue to implement solutions to allow you to focus on just the important things, while we handle the rest.

Cloudflare Queues is currently in Open Beta and ready to power your most complex applications.

Check out our getting started guide and build your service with us today!