Realtime: Stream updates from Inngest functions
Inngest Realtime enables you to stream updates from your functions, power live UIs, and implement bi-directional workflows such as Human-in-the-Loop. Use channels and topics to broadcast updates, stream logs, or await user input.
Pattern: Stream updates from a single function run
Enable users to follow the progress of a long-running task by streaming updates from a dedicated channel. Here's how to trigger a function and subscribe to its updates:
src/inngest/channels.tsimport { realtime, staticSchema } from "inngest";
export const helloChannel = realtime.channel({
name: ({ uuid }: { uuid: string }) => `hello-world:${uuid}`,
topics: {
logs: { schema: staticSchema<{ message: string }>() },
},
});
app/api/hello-world/route.tsimport crypto from "crypto";
import { inngest } from "@/inngest/client";
import { subscribe } from "inngest/realtime";
import { helloChannel } from "@/inngest/channels";
export async function POST(req: Request) {
const json = await req.json();
const { prompt } = json;
const uuid = crypto.randomUUID();
await inngest.send({
name: "hello-world/hello",
data: { uuid },
});
const ch = helloChannel({ uuid });
const stream = await subscribe({
app: inngest,
channel: ch,
topics: ["logs"],
});
return new Response(stream.getEncodedStream(), {
headers: {
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
Connection: "keep-alive",
},
});
}
Your function can then publish updates to this channel:
src/inngest/functions/hello-world.tsimport { inngest } from "../client";
import { helloChannel } from "../channels";
export const someTask = inngest.createFunction(
{ id: "hello-world", triggers: [{ event: "hello-world/hello" }] },
async ({ event, step, publish }) => {
const { uuid } = event.data;
const ch = helloChannel({ uuid });
await publish(ch.logs, { message: "Hello, world!" });
}
);
By creating a channel with a unique identifier, you can stream updates for a specific run to the end user.
Pattern: Stream updates from multiple function runs
A Realtime channel can be used to stream updates from multiple function runs. Here, we'll define two channels: one global channel and one post-specific channel:
src/inngest/channels.tsimport { realtime, staticSchema } from "inngest";
import { z } from "zod";
export const globalChannel = realtime.channel({
name: "global",
topics: {
logs: { schema: staticSchema<{ message: string }>() },
},
});
export const postChannel = realtime.channel({
name: ({ postId }: { postId: string }) => `post:${postId}`,
topics: {
updated: {
schema: z.object({
id: z.string(),
likes: z.number(),
}),
},
deleted: {
schema: z.object({
id: z.string(),
reason: z.string(),
}),
},
},
});
Our likePost function will publish updates to both channels. Note that globalChannel is a static channel (string name), so it can be used directly without instantiation. postChannel is parameterized, so we call it with the post ID to get a channel instance:
src/inngest/functions/likePost.tsimport { inngest } from "../client";
import { globalChannel, postChannel } from "../channels";
export const likePost = inngest.createFunction(
{
id: "post/like",
retries: 0,
triggers: [{ event: "app/post.like" }],
},
async ({
event: {
data: { postId = "123" },
},
step,
publish,
}) => {
if (!postId) {
await publish(globalChannel.logs, {
message: "Missing postId when trying to like post",
});
throw new Error("Missing postId");
}
await publish(globalChannel.logs, {
message: `Liking post ${postId}`,
});
const post = await step.run("update-likes", async () => {
const fakePost = {
id: "123",
likes: Math.floor(Math.random() * 10000),
};
const ch = postChannel({ postId: fakePost.id });
await publish(ch.updated, fakePost);
return fakePost;
});
return post;
}
);
The globalChannel will be used to stream updates for all posts, and the postChannel will be used to stream updates for specific posts.
Human in the loop: Bi-directional workflows
Combine Realtime with waitForEvent() to enable workflows that require user input, such as review or approval steps. Here's how to send a message to the user and wait for their confirmation:
src/inngest/channels.tsimport { realtime } from "inngest";
import { z } from "zod";
export const agenticWorkflowChannel = realtime.channel({
name: "agentic-workflow",
topics: {
messages: {
schema: z.object({
message: z.string(),
confirmationId: z.string(),
}),
},
},
});
src/inngest/functions/agentic-workflow.tsimport crypto from "crypto";
import { inngest } from "../client";
import { agenticWorkflowChannel } from "../channels";
export const agenticWorkflow = inngest.createFunction(
{ id: "agentic-workflow", triggers: [{ event: "agentic-workflow/start" }] },
async ({ event, step, publish }) => {
await step.run(/* ... */);
const confirmationId = await step.run("get-confirmation-id", async () =>
crypto.randomUUID()
);
await publish(agenticWorkflowChannel.messages, {
message: "Confirm to proceed?",
confirmationId,
});
const confirmation = await step.waitForEvent("wait-for-confirmation", {
event: "agentic-workflow/confirmation",
timeout: "15m",
if: `async.data.confirmationId == \"${confirmationId}\"`,
});
if (confirmation) {
// continue workflow
}
}
);
The confirmationId links the published message to the reply event, ensuring the correct user response is handled.