Streaming
Stream events from running workflows in Node.js
Stream vs Routing Events
Routing events flow between steps — they drive the workflow forward. Stream events are published for external observation without affecting the workflow graph.
Publishing Stream Events
Use ctx.writeEventToStream() inside a step to emit events to external consumers:
wf.addStep("process", ["blazen::StartEvent"], async (event, ctx) => {
for (let i = 0; i < 3; i++) {
await ctx.writeEventToStream({ type: "Progress", step: i, message: `Processing step ${i}` });
}
return { type: "blazen::StopEvent", result: { done: true } };
});
ctx.writeEventToStream() is async — always await it.
Consuming Stream Events
Call wf.runStreaming() instead of wf.run() to receive streamed events via a callback:
const collected = [];
const result = await wf.runStreaming({}, (event) => {
console.log(`[${event.type}] step=${event.step}`);
collected.push(event);
});
console.log("Final result:", result.data);
console.log("Streamed events:", collected.length);
wf.runStreaming(input, callback) calls the callback for each streamed event and resolves with the final workflow result.
Handler-Based Event Streaming
When you need control over a running workflow (pause, snapshot, abort) alongside event observation, use runWithHandler() and subscribe via streamEvents():
const handler = await wf.runWithHandler({ message: "hello" });
await handler.streamEvents((event) => {
console.log(`[${event.type}]`, event);
});
const result = await handler.result();
streamEvents(onEvent) resolves once the underlying broadcast subscription is wired up; the callback continues to fire for each event published by ctx.writeEventToStream() until the workflow finishes. Subscribe before calling result() — events emitted before the subscription is attached are not replayed.
Typed Local Inference Streams
Local-inference providers expose typed chunk streams in addition to the callback-style stream() method. The wrappers are InferenceChunkStream (mistral.rs) and LlamaCppInferenceChunkStream (llama.cpp), and each yields strongly-typed InferenceChunk / LlamaCppInferenceChunk values you pull with await stream.next().
import type { InferenceChunkStream } from "blazen";
const stream: InferenceChunkStream = await provider.inferStream(messages);
while (true) {
const chunk = await stream.next();
if (chunk === null) break;
if (chunk.delta) process.stdout.write(chunk.delta);
if (chunk.reasoningDelta) process.stderr.write(chunk.reasoningDelta);
if (chunk.finishReason) console.log(`\n[done: ${chunk.finishReason}]`);
}
LlamaCppInferenceChunkStream follows the same shape with a narrower LlamaCppInferenceChunk payload (delta, finishReason):
for (let chunk = await stream.next(); chunk !== null; chunk = await stream.next()) {
process.stdout.write(chunk.delta ?? "");
}
stream.next() resolves to null once generation is exhausted; engine-level errors are thrown as awaited rejections, so wrap the loop in try/catch if you need to recover mid-stream.