Streaming

Stream events from running workflows in Python

Stream vs Routing Events

Routing events flow between steps inside a workflow. Stream events are published outward for external consumers — they do not affect step routing.

Publishing Stream Events

Define a custom event by subclassing Event, then use ctx.write_event_to_stream() inside any step to push it to the external stream:

class ProgressEvent(Event):
    step: int
    message: str

@step
async def process(ctx: Context, ev: Event):
    for i in range(3):
        ctx.write_event_to_stream(ProgressEvent(step=i, message=f"Processing step {i}"))
    return StopEvent(result={"done": True})

ctx.write_event_to_stream() is synchronous — do not await it.

Consuming Stream Events

After starting a workflow, iterate over handler.stream_events() to receive stream events as they arrive:

handler = await wf.run()

async for event in handler.stream_events():
    print(f"[{event.event_type}] step={event.step}")

result = await handler.result()

handler.stream_events() returns an async iterator. It completes once the workflow finishes and all buffered events have been yielded.