Human-in-the-Loop

Build workflows that pause for human input in Python

Side-Effect Steps

A step can return None and use ctx.send_event() to manually route events:

class ReviewComplete(Event):
    pass

@step
async def review(ctx: Context, ev: Event):
    ctx.set("needs_approval", True)
    approved = simulate_human_review(ev)
    ctx.set("approved", approved)
    ctx.send_event(ReviewComplete())
    return None

Both ctx.send_event() and ctx.set() are synchronous.

Pause and Resume

handler = await wf.run(data="some input")
snapshot = handler.pause()

# Later...
handler = await Workflow.resume(snapshot)
result = await handler.result()

:::caution[ctx.session and pause/resume] If you store live in-process objects in ctx.session (DB connections, file handles, sockets), they are deliberately excluded from snapshots. The workflow’s session_pause_policy governs what happens at pause time:

  • pickle_or_error (default) — attempt to pickle each live ref into the snapshot; raise a clear error if any entry can’t be serialised.
  • warn_drop — drop live refs from the snapshot and emit a warning. For ephemeral runs.
  • hard_error — refuse to pause if any live refs are in flight.

The practical rule: put anything that must survive pause() / resume() in ctx.state, and everything else in ctx.session. :::