Streaming
Stream events from running workflows in Python
Stream vs Routing Events
Routing events flow between steps inside a workflow. Stream events are published outward for external consumers — they do not affect step routing.
Publishing Stream Events
Define a custom event by subclassing Event, then use ctx.write_event_to_stream() inside any step to push it to the external stream:
class ProgressEvent(Event):
step: int
message: str
@step
async def process(ctx: Context, ev: Event):
for i in range(3):
ctx.write_event_to_stream(ProgressEvent(step=i, message=f"Processing step {i}"))
return StopEvent(result={"done": True})
ctx.write_event_to_stream() is synchronous — do not await it.
Consuming Stream Events
After starting a workflow, iterate over handler.stream_events() to receive stream events as they arrive:
handler = await wf.run()
async for event in handler.stream_events():
print(f"[{event.event_type}] step={event.step}")
result = await handler.result()
handler.stream_events() returns an async iterator. It completes once the workflow finishes and all buffered events have been yielded.