Streaming

Stream events from running workflows in Python

Stream vs Routing Events

Routing events flow between steps inside a workflow. Stream events are published outward for external consumers — they do not affect step routing.

Publishing Stream Events

Define a custom event by subclassing Event, then use ctx.write_event_to_stream() inside any step to push it to the external stream:

class ProgressEvent(Event):
    step: int
    message: str

@step
async def process(ctx: Context, ev: Event):
    for i in range(3):
        ctx.write_event_to_stream(ProgressEvent(step=i, message=f"Processing step {i}"))
    return StopEvent(result={"done": True})

ctx.write_event_to_stream() is synchronous — do not await it.

Consuming Stream Events

After starting a workflow, iterate over handler.stream_events() to receive stream events as they arrive:

handler = await wf.run()

async for event in handler.stream_events():
    print(f"[{event.event_type}] step={event.step}")

result = await handler.result()

handler.stream_events() returns an async iterator. It completes once the workflow finishes and all buffered events have been yielded.

Streaming Local Inference

The local-inference providers (MistralRsProvider, LlamaCppProvider) stream tokens incrementally through a callback. Each call to provider.stream(messages, on_chunk, options) resolves once the model has finished generating; the callback is invoked once per chunk in between.

from blazen import ChatMessage, MistralRsOptions, MistralRsProvider

provider = MistralRsProvider(options=MistralRsOptions("mistralai/Mistral-7B-Instruct-v0.3"))

def on_chunk(chunk: dict) -> None:
    if delta := chunk.get("delta"):
        print(delta, end="", flush=True)
    if reason := chunk.get("finish_reason"):
        print(f"\n[finish: {reason}]")

await provider.stream([ChatMessage.user("Tell me a joke")], on_chunk)

The chunk dict mirrors the typed InferenceChunk class registered in blazen: it carries delta (incremental text), reasoning_delta (incremental reasoning content for thinking models), tool_calls (any tool calls completed in this chunk), and finish_reason (set on the final chunk only).

llama.cpp Backend

LlamaCppProvider.stream() follows the same callback pattern. Chunks mirror the typed LlamaCppInferenceChunk, which exposes delta and finish_reason (no reasoning split, no native tool calls):

from blazen import ChatMessage, LlamaCppOptions, LlamaCppProvider

provider = LlamaCppProvider(
    options=LlamaCppOptions(model_path="/models/llama-3.2-1b-q4_k_m.gguf")
)

def on_chunk(chunk: dict) -> None:
    if delta := chunk.get("delta"):
        print(delta, end="", flush=True)

await provider.stream([ChatMessage.user("Hello!")], on_chunk)

Typed Async Iterators

Alongside the callback API, blazen exposes typed async iterators — InferenceChunkStream (mistral.rs) and LlamaCppInferenceChunkStream (llama.cpp) — that yield InferenceChunk / LlamaCppInferenceChunk instances directly. They implement __aiter__ / __anext__ so they consume cleanly in an async for loop:

async for chunk in stream:  # InferenceChunkStream or LlamaCppInferenceChunkStream
    if chunk.delta:
        print(chunk.delta, end="", flush=True)
    if chunk.finish_reason:
        break

Iteration terminates with StopAsyncIteration once the underlying engine stream is exhausted.

Use InferenceChunkStream for mistral.rs models and LlamaCppInferenceChunkStream for llama.cpp; the per-chunk fields differ (see InferenceChunk vs LlamaCppInferenceChunk above), but the iteration shape is identical.