Streaming
Stream events from running workflows in Python
Stream vs Routing Events
Routing events flow between steps inside a workflow. Stream events are published outward for external consumers — they do not affect step routing.
Publishing Stream Events
Define a custom event by subclassing Event, then use ctx.write_event_to_stream() inside any step to push it to the external stream:
class ProgressEvent(Event):
step: int
message: str
@step
async def process(ctx: Context, ev: Event):
for i in range(3):
ctx.write_event_to_stream(ProgressEvent(step=i, message=f"Processing step {i}"))
return StopEvent(result={"done": True})
ctx.write_event_to_stream() is synchronous — do not await it.
Consuming Stream Events
After starting a workflow, iterate over handler.stream_events() to receive stream events as they arrive:
handler = await wf.run()
async for event in handler.stream_events():
print(f"[{event.event_type}] step={event.step}")
result = await handler.result()
handler.stream_events() returns an async iterator. It completes once the workflow finishes and all buffered events have been yielded.
Streaming Local Inference
The local-inference providers (MistralRsProvider, LlamaCppProvider) stream tokens incrementally through a callback. Each call to provider.stream(messages, on_chunk, options) resolves once the model has finished generating; the callback is invoked once per chunk in between.
from blazen import ChatMessage, MistralRsOptions, MistralRsProvider
provider = MistralRsProvider(options=MistralRsOptions("mistralai/Mistral-7B-Instruct-v0.3"))
def on_chunk(chunk: dict) -> None:
if delta := chunk.get("delta"):
print(delta, end="", flush=True)
if reason := chunk.get("finish_reason"):
print(f"\n[finish: {reason}]")
await provider.stream([ChatMessage.user("Tell me a joke")], on_chunk)
The chunk dict mirrors the typed InferenceChunk class registered in blazen: it carries delta (incremental text), reasoning_delta (incremental reasoning content for thinking models), tool_calls (any tool calls completed in this chunk), and finish_reason (set on the final chunk only).
llama.cpp Backend
LlamaCppProvider.stream() follows the same callback pattern. Chunks mirror the typed LlamaCppInferenceChunk, which exposes delta and finish_reason (no reasoning split, no native tool calls):
from blazen import ChatMessage, LlamaCppOptions, LlamaCppProvider
provider = LlamaCppProvider(
options=LlamaCppOptions(model_path="/models/llama-3.2-1b-q4_k_m.gguf")
)
def on_chunk(chunk: dict) -> None:
if delta := chunk.get("delta"):
print(delta, end="", flush=True)
await provider.stream([ChatMessage.user("Hello!")], on_chunk)
Typed Async Iterators
Alongside the callback API, blazen exposes typed async iterators — InferenceChunkStream (mistral.rs) and LlamaCppInferenceChunkStream (llama.cpp) — that yield InferenceChunk / LlamaCppInferenceChunk instances directly. They implement __aiter__ / __anext__ so they consume cleanly in an async for loop:
async for chunk in stream: # InferenceChunkStream or LlamaCppInferenceChunkStream
if chunk.delta:
print(chunk.delta, end="", flush=True)
if chunk.finish_reason:
break
Iteration terminates with StopAsyncIteration once the underlying engine stream is exhausted.
Use InferenceChunkStream for mistral.rs models and LlamaCppInferenceChunkStream for llama.cpp; the per-chunk fields differ (see InferenceChunk vs LlamaCppInferenceChunk above), but the iteration shape is identical.