Cloudflare Queues let a developer decouple their Workers into event-driven services. Producer Workers write events to a Queue, and consumer Workers are invoked to take actions on the events. For example, you can use a Queue to decouple an e-commerce website from a service which sends purchase confirmation emails to users. During 2024’s Birthday Week, we announced that Cloudflare Queues is now Generally Available, with significant performance improvements that enable larger workloads. To accomplish this, we switched to a new architecture for Queues that enabled the following improvements:
Median latency for sending messages has dropped from ~200ms to ~60ms
Maximum throughput for each Queue has increased over 10x, from 400 to 5000 messages per second
Maximum Consumer concurrency for each Queue has increased from 20 to 250 concurrent invocations
Median latency drops from ~200ms to ~60ms as Queues are migrated to the new architecture
In this blog post, we’ll share details about how we built Queues using Durable Objects and the Cloudflare Developer Platform, and how we migrated from an initial Beta architecture to a geographically-distributed, horizontally-scalable architecture for General Availability.
v1 Beta architecture
When initially designing Cloudflare Queues, we decided to build something simple that we could get into users’ hands quickly. First, we considered leveraging an off-the-shelf messaging system such as Kafka or Pulsar. However, we decided that it would be too challenging to operate these systems at scale with the large number of isolated tenants that we wanted to support.
Instead of investing in new infrastructure, we decided to build on top of one of Cloudflare’s existing developer platform building blocks: Durable Objects. Durable Objects are a simple, yet powerful building block for coordination and storage in a distributed system. In our initial v1 architecture, each Queue was implemented using a single Durable Object. As shown below, clients would send messages to a Worker running in their region, which would be forwarded to the single Durable Object hosted in the WNAM (Western North America) region. We used a single Durable Object for simplicity, and hosted it in WNAM for proximity to our centralized configuration API service.
One of a Queue’s main responsibilities is to accept and store incoming messages. Sending a message to a v1 Queue used the following flow:
A client sends a POST request containing the message body to the Queues API at
/accounts/:accountID/queues/:queueID/messages
The request is handled by an instance of the Queue Broker Worker in a Cloudflare data center running near the client.
The Worker performs authentication, and then uses Durable Objects
idFromName
API to route the request to the Queue Durable Object for the givenqueueID
The Queue Durable Object persists the message to storage before returning a success back to the client.
Durable Objects handled most of the heavy-lifting here: we did not need to set up any new servers, storage, or service discovery infrastructure. To route requests, we simply provided a queueID
and the platform handled the rest. To store messages, we used the Durable Object storage API to put
each message, and the platform handled reliably storing the data redundantly.
Consuming messages
The other main responsibility of a Queue is to deliver messages to a Consumer. Delivering messages in a v1 Queue used the following process:
Each Queue Durable Object maintained an alarm that was always set when there were undelivered messages in storage. The alarm guaranteed that the Durable Object would reliably wake up to deliver any messages in storage, even in the presence of failures. The alarm time was configured to fire after the user’s selected max wait time, if only a partial batch of messages was available. Whenever one or more full batches were available in storage, the alarm was scheduled to fire immediately.
The alarm would wake the Durable Object, which continually looked for batches of messages in storage to deliver.
Each batch of messages was sent to a “Dispatcher Worker” that used Workers for Platforms dynamic dispatch to pass the messages to the
queue()
function defined in a user’s Consumer Worker
This v1 architecture let us flesh out the initial version of the Queues Beta product and onboard users quickly. Using Durable Objects allowed us to focus on building application logic, instead of complex low-level systems challenges such as global routing and guaranteed durability for storage. Using a separate Durable Object for each Queue allowed us to host an essentially unlimited number of Queues, and provided isolation between them.
However, using only one Durable Object per queue had some significant limitations:
Latency: we created all of our v1 Queue Durable Objects in Western North America. Messages sent from distant regions incurred significant latency when traversing the globe.
Throughput: A single Durable Object is not scalable: it is single-threaded and has a fixed capacity for how many requests per second it can process. This is where the previous 400 messages per second limit came from.
Consumer Concurrency: Due to concurrent subrequest limits, a single Durable Object was limited in how many concurrent subrequests it could make to our Dispatcher Worker. This limited the number of
queue()
handler invocations that it could run simultaneously.
To solve these issues, we created a new v2 architecture that horizontally scales across multiple Durable Objects to implement each single high-performance Queue.
v2 Architecture
In the new v2 architecture for Queues, each Queue is implemented using multiple Durable Objects, instead of just one. Instead of a single region, we place Storage Shard Durable Objects in all available regions to enable lower latency. Within each region, we create multiple Storage Shards and load balance incoming requests amongst them. Just like that, we’ve multiplied message throughput.
Sending a message to a v2 Queue uses the following flow:
A client sends a POST request containing the message body to the Queues API at
/accounts/:accountID/queues/:queueID/messages
The request is handled by an instance of the Queue Broker Worker running in a Cloudflare data center near the client.
The Worker:
Performs authentication
Reads from Workers KV to obtain a Shard Map that lists available storage shards for the given
region
andqueueID
Picks one of the region’s Storage Shards at random, and uses Durable Objects
idFromName
API to route the request to the chosen shard
The Storage Shard persists the message to storage before returning a success back to the client.
In this v2 architecture, messages are stored in the closest available Durable Object storage cluster near the user, greatly reducing latency since messages don’t need to be shipped all the way to WNAM. Using multiple shards within each region removes the bottleneck of a single Durable Object, and allows us to scale each Queue horizontally to accept even more messages per second. Workers KV acts as a fast metadata store: our Worker can quickly look up the shard map to perform load balancing across shards.
To improve the Consumer side of v2 Queues, we used a similar “scale out” approach. A single Durable Object can only perform a limited number of concurrent subrequests. In v1 Queues, this limited the number of concurrent subrequests we could make to our Dispatcher Worker. To work around this, we created a new Consumer Shard Durable Object class that we can scale horizontally, enabling us to execute many more concurrent instances of our users’ queue()
handlers.
Consumer Durable Objects in v2 Queues use the following approach:
Each Consumer maintains an alarm that guarantees it will wake up to process any pending messages. v2 Consumers are notified by the Queue’s Coordinator (introduced below) when there are messages ready for consumption. Upon notification, the Consumer sets an alarm to go off immediately.
The Consumer looks at the shard map, which contains information about the storage shards that exist for the Queue, including the number of available messages on each shard.
The Consumer picks a random storage shard with available messages, and asks for a batch.
The Consumer sends the batch to the Dispatcher Worker, just like for v1 Queues.
After processing the messages, the Consumer sends another request to the Storage Shard to either “acknowledge” or “retry” the messages.
This scale-out approach enabled us to work around the subrequest limits of a single Durable Object, and increase the maximum supported concurrency level of a Queue from 20 to 250.
The Coordinator and “Control Plane”
So far, we have primarily discussed the “Data Plane” of a v2 Queue: how messages are load balanced amongst Storage Shards, and how Consumer Shards read and deliver messages. The other main piece of a v2 Queue is the “Control Plane”, which handles creating and managing all the individual Durable Objects in the system. In our v2 architecture, each Queue has a single Coordinator Durable Object that acts as the brain of the Queue. Requests to create a Queue, or change its settings, are sent to the Queue’s Coordinator.
The Coordinator maintains a Shard Map for the Queue, which includes metadata about all the Durable Objects in the Queue (including their region, number of available messages, current estimated load, etc.). The Coordinator periodically writes a fresh copy of the Shard Map into Workers KV, as pictured in step 1 of the diagram. Placing the shard map into Workers KV ensures that it is globally cached and available for our Worker to read quickly, so that it can pick a shard to accept the message.
Every shard in the system periodically sends a heartbeat to the Coordinator as shown in steps 2 and 3 of the diagram. Both Storage Shards and Consumer Shards send heartbeats, including information like the number of messages stored locally, and the current load (requests per second) that the shard is handling. The Coordinator uses this information to perform autoscaling. When it detects that the shards in a particular region are overloaded, it creates additional shards in the region, and adds them to the shard map in Workers KV. Our Worker sees the updated shard map and naturally load balances messages across the freshly added shards. Similarly, the Coordinator looks at the backlog of available messages in the Queue, and decides to add more Consumer shards to increase Consumer throughput when the backlog is growing. Consumer Shards pull messages from Storage Shards for processing as shown in step 4 of the diagram.
Switching to a new scalable architecture allowed us to meet our performance goals and take Queues to GA. As a recap, this new architecture delivered these significant improvements:
P50 latency for writing to a Queue has dropped from ~200ms to ~60ms.
Maximum throughput for a Queue has increased from 400 to 5000 messages per second.
Maximum consumer concurrency has increased from 20 to 250 invocations.
What’s next for Queues
We plan on leveraging the performance improvements in the new beta version of Durable Objects which use SQLite to continue to improve throughput/latency in Queues.
We will soon be adding message management features to Queues so that you can take actions to purge messages in a queue, pause consumption of messages, or “redrive”/move messages from one queue to another (for example messages that have been sent to a Dead Letter Queue could be “redriven” or moved back to the original queue).
Work to make Queues the “event hub” for the Cloudflare Developer Platform:
Create a low-friction way for events emitted from other Cloudflare services with event schemas to be sent to Queues.
Build multi-Consumer support for Queues so that Queues are no longer limited to one Consumer per queue.
To start using Queues, head over to our Getting Started guide.
Do distributed systems like Cloudflare Queues and Durable Objects interest you? Would you like to help build them at Cloudflare? We’re Hiring!
Source:: CloudFlare