Site icon GIXtools

Data Anywhere with Pipelines, Event Notifications, and Workflows

Data is fundamental to any real-world application: the database storing your user data and inventory, the analytics tracking sales events and/or error rates, the object storage with your web assets and/or the Parquet files driving your data science team, and the vector database enabling semantic search or AI-powered recommendations for your users.

When we first announced Workers back in 2017, and then Workers KV, Cloudflare R2, and D1, it was obvious that the next big challenge to solve for developers would be in making it easier to ingest, store, and query the data needed to build scalable, full-stack applications.

To that end, as part of our quest to make building stateful, distributed-by-default applications even easier, we’re launching our new Event Notifications service; a preview of our upcoming streaming ingestion product, Pipelines; and a sneak peek into our take on durable execution, Workflows.

Event-based architectures

When you’re writing data — whether that’s new data, changing existing data, or deleting old data — you often want to trigger other, asynchronous work to run in response. That could be processing user-driven uploads, updating search indexes as the underlying data changes, or removing associated rows in your SQL database when content is removed.

In order to make these event-driven workflows far easier to build across Cloudflare, we’re launching the first step towards a wider Event Notifications platform across Cloudflare, starting with notifications support in R2.

You can read more in the deep-dive on Event Notifications for R2, but in a nutshell: you can configure changes to content in any R2 bucket to write directly to a Queue, allowing you to reliably consume those events in a Worker or to pull from compute in a legacy cloud.

Event Notifications for R2 are just the beginning, though. There are many kinds of events you might want to trigger as a developer — these are just some of the event types we’re planning to support:

Consuming event notifications from a single Worker is just one approach, though. As you start to consume events, you may want to trigger multi-step workflows that execute reliably, resume from errors or exceptions, and ensure that previous steps aren’t duplicated or repeated unnecessarily. An event notification framework turns out to be just the thing needed to drive a workflow engine that executes durably

Making it even easier to ingest data

When we launched Cloudflare R2, our object storage service, we knew that supporting the de facto-standard S3 API was critical in order to allow developers to bring the tooling and services they already had over to R2. But the S3 API is designed to be simple: at its core, it provides APIs for upload, download, multipart and metadata operations, and many tools don’t support the S3 API.

What if you want to batch clickstream data from your web services so that it’s efficient (and cost-effective) to query by your analytics team? Or partition data by customer ID, merchant ID, or locale within a structured data format like JSON?

Well, we want to help solve this problem too, and so we’re announcing Pipelines, an upcoming streaming ingestion service designed to ingest data at scale, aggregate it, and write it directly to R2, without you having to manage infrastructure, partitions, runners, or worry about durability.

With Pipelines, creating a globally scalable ingestion endpoint that can ingest tens-of-thousands of events per second doesn’t require any code:

$ wrangler pipelines create clickstream-ingest-prod --batch-size="1MB" --batch-timeout-secs=120 --batch-on-json-key=".merchantId" --destination-bucket="prod-cs-data"

✅ Successfully created new pipeline "clickstream-ingest-prod"
📥 Created endpoints:
➡ WebSocket: wss://
➡ Kafka: (topic: clickstream-ingest-prod)

As you can see here, we’re already thinking about how to make Pipelines protocol-agnostic: write from a HTTP client, stream events over a WebSocket, and/or redirect your existing Kafka producer (and stop having to manage and scale Kafka) directly to Pipelines.

But that’s just the beginning of our vision here. Scalable ingestion and simple batching is one thing, but what about if you have more complex needs? Well, we have a massively scalable compute platform (Cloudflare Workers) that can help address this too.

The code below is just an initial exploration for how we’re thinking about an API for running transforms over streaming data. If you’re aware of projects like Apache Beam or Flink, this programming model might even look familiar:

export default {    
   // Pipeline handler is invoked when batch criteria are met
   async pipeline(stream: StreamPipeline, env: Env, ctx: ExecutionContext): Promise<StreamingPipeline> {
      // ...
      return stream
         // Type: transform(label: string, transformFunc: TransformFunction): Promise<StreamPipeline>
         // Each transform has a label that is used in metrics to provide
    // per-transform observability and debugging
         .transform("human readable label", (events: Array<StreamEvent>) => {
            return => ...)
         .transform("another transform", (events: Array<StreamEvent>) => {
            return => ...)
            format: "json",
            bucket: "MY_BUCKET_NAME",
            prefix: somePrefix,
            batchSize: "10MB"


We’ll be bringing Pipelines into open beta later in 2024, and it will initially launch with support for HTTP ingestion and R2 as a destination (sink), but we’re already thinking bigger.

We’ll be sharing more as Pipelines gets closer to release. In the meantime, you can register your interest and share your use-case, and we’ll reach out when Pipelines reaches open beta.

Durable Execution

If the term “Durable Execution” is new to you, don’t worry: the term comes from the desire to run applications that can resume execution from where they left off, even if the underlying host or compute fails (where the “durable” part comes from).

As we’ve continued to build out our data and AI platforms, we’ve been acutely aware that developers need ways to create reliable, repeatable workflows that operate over that data, turn unstructured data into structured data, trigger on fresh data (or periodically), and automatically retry, restart, and export metrics for each step along the way. The industry calls this Durable Execution: we’re just calling it Workflows.

What makes Workflows different from other takes on Durable Execution is that we provide the underlying compute as part of the platform. You don’t have to bring-your-own compute, or worry about scaling it or provisioning it in the right locations. Workflows runs on top of Cloudflare Workers – you write the workflow, and we take care of the rest.

Here’s an early example of writing a Workflow that generates text embeddings using Workers AI and stores them (ready to query) in Vectorize as new content is written to (or updated within) R2.

Step-by-step, it looks like this:

Transforming this series of steps into real code, here’s what this would look like with Workflows:

import { Ai } from "@cloudflare/ai";
import { Workflow } from "cloudflare:workers";

export interface Env {
  R2: R2Bucket;
  AI: any;
  VECTOR_INDEX: VectorizeIndex;

export default class extends Workflow {
  async run(event: Event) {
    const ai = new Ai(this.env.AI);

    // List of keys to fetch from our incoming event notification
    const keysToFetch = => {
      return val.object.key;

    // The return value of each step is stored (the "durable" part
    // of "durable execution")
    // This ensures that state can be persisted between steps, reducing
    // the need to recompute results ($$, time) should subsequent
    // steps fail.
    const inputs = await
      // Each step has a user-defined label
      // Metrics are emitted as each step runs (to success or failure)
// with this label attached and available within per-Workflow
// analytics in near-real-time.
"read objects from R2", async () => {
      const objects = [];

      for (const key of keysToFetch) {
        const object = await this.env.R2.get(key);
        objects.push(await object.text());

      return objects;

    // Persist the output of this step.
    const embeddings = await
      "generate embeddings",
      async () => {
        const { data } = await"@cf/baai/bge-small-en-v1.5", {
          text: inputs,

        if (data.length) {
          return data;
        } else {
          // Uncaught exceptions trigger an automatic retry of the step
          // Retries and timeouts have sane defaults and can be overridden
    // per step
          throw new Error("Failed to generate embeddings");
        retries: {
          limit: 5,
          delayMs: 1000,
          backoff: "exponential",

    await"insert vectors", async () => {
      const vectors = [];

      keysToFetch.forEach((key, index) => {
          id: crypto.randomUUID(),
          // Our embeddings from the previous step
          values: embeddings[index].values, 
          // The path to each R2 object to map back to during
 	    // vector search
          metadata: { r2Path: key },

      return this.env.VECTOR_INDEX.upsert();

This is just one example of what a Workflow can do. The ability to durably execute an application, modeled as a series of steps, applies to a wide number of domains. You can apply this model of execution to a number of use-cases, including:

Each of these use cases model tasks that you want to run to completion, minimize redundant retries by persisting intermediate state, and (importantly) easily observe success and failure.

We’ll be sharing more about Workflows during the second quarter of 2024 as we work towards an open (public!) beta. This includes how we’re thinking about idempotency and interactions with our storage, per-instance observability and metrics, local development, and templates to bootstrap common workflows.

Putting it together

We’ve often thought of Cloudflare’s own network as one massively scalable parallel data processing cluster: data centers in 310+ cities, with the ability to run compute close to users and/or close to data, keep it within the bounds of regulatory or compliance requirements, and most importantly, use our massive scale to enable our customers to scale as well.

Recapping, a fully-fledged data platform needs to enable three things:

  • Ingesting data: getting data into the platform (in the right format, from the right sources)
  • Storing data: securely, reliably, and durably.
  • Querying data: understanding and extracting insights from the data, and/or transforming it for use by other tools.
  • When we launched R2 we tackled the second part, but knew that we’d need to follow up with the first and third parts in order to make it easier for developers to get data in and make use of it.

    If we look at how we can build a system that helps us solve each of these three parts together with Pipelines, Event Notifications, R2, and Workflows, we end up with an architecture that resembles this:

    Specifically, we have Pipelines (1) scaling out to ingest data, batch it, filter it, and then durably store it in R2 (2) in a format that’s ready and optimized for querying. Workflows, ClickHouse, Databricks, or the query engine of your choice can then query (3) that data as soon as it’s ready — with “ready” being automatically triggered by an Event Notification as soon as the data is ingested and written to R2.

    There’s no need to poll, no need to batch after the fact, no need to have your query engine slow down on data that wasn’t pre-aggregated or filtered, and no need to manage and scale infrastructure in order to keep up with load or data jurisdiction requirements. Create a Pipeline, write your data directly to R2, and query directly from it.

    If you’re also looking at this and wondering about the costs of moving this data around, then we’re holding to one important principle: zero egress fees across all of our data products. Just as we set the stage for this with our R2 object storage, we intend to apply this to every data product we’re building, Pipelines included.

    Start Building

    We’ve shared a lot of what we’re building so that developers have an opportunity to provide feedback (including via our Developer Discord), share use-cases, and think about how to build their next application on Cloudflare.

    Source:: CloudFlare

    Exit mobile version