Context

Pass state between workflow steps in Ruby

Context

What context means in the Ruby binding

Unlike the Python binding (which exposes a per-run Context object with ctx.set / ctx.get storage tiers), the Ruby binding does not surface a context object to step blocks. Step blocks receive only a Blazen::Workflow::Event and return a Blazen::Workflow::StepOutput. Workflow-level state flows entirely through the JSON payloads attached to events.

This is intentional: the cabi callback ABI hands the step block exactly one argument (the incoming event) plus an output slot, with no side channel for shared state. Anything you want a downstream step to see needs to ride along inside the event payload, or live in Ruby state your block closes over.

State via event payloads

The most natural pattern for threading data through a workflow is to keep enriching the event payload as it moves between steps:

workflow = Blazen.workflow('enrich') do |b|
  b.step('load_user', accepts: ['blazen::StartEvent'], emits: ['UserLoaded']) do |evt|
    user_id = evt.data['data']['user_id']
    user = { id: user_id, name: 'Alice', tier: 'pro' }
    Blazen::Workflow::StepOutput.single(
      Blazen::Workflow::Event.create(
        event_type: 'UserLoaded',
        data: { user: user, request_count: 1 },
      ),
    )
  end

  b.step('load_orders', accepts: ['UserLoaded'], emits: ['OrdersLoaded']) do |evt|
    payload = evt.data
    orders = [{ id: 'o1', total: 12.50 }, { id: 'o2', total: 99.00 }]
    Blazen::Workflow::StepOutput.single(
      Blazen::Workflow::Event.create(
        event_type: 'OrdersLoaded',
        data: payload.merge(orders: orders, request_count: payload['request_count'] + 1),
      ),
    )
  end

  b.step('summarize', accepts: ['OrdersLoaded'], emits: ['blazen::StopEvent']) do |evt|
    payload = evt.data
    total = payload['orders'].sum { |o| o['total'] }
    Blazen::Workflow::StepOutput.single(
      Blazen::Workflow::Event.create(
        event_type: 'blazen::StopEvent',
        data: {
          result: {
            user_name: payload['user']['name'],
            order_total: total,
            steps_run: payload['request_count'],
          },
        },
      ),
    )
  end
end

result = workflow.run_blocking({ user_id: 'u_42' })
puts result.event_data.inspect
# => {"result"=>{"user_name"=>"Alice", "order_total"=>111.5, "steps_run"=>2}}

Every value that needs to survive a step boundary is in the JSON payload. Hash keys round-trip as strings (Ruby JSON conversion), so always read them back with string keys (payload['user'], not payload[:user]).

State via closures

When state should not be on the wire — a database handle, a counter, a logger, an in-process cache — close over it from the surrounding Ruby scope. Step blocks are regular Ruby Proc objects; they capture their enclosing binding the same way any other block does.

require 'logger'
require 'sqlite3'

logger = Logger.new($stdout)
db = SQLite3::Database.new(':memory:')
db.execute('CREATE TABLE seen(id TEXT)')

workflow = Blazen.workflow('with_closure') do |b|
  b.step('record', accepts: ['blazen::StartEvent'], emits: ['blazen::StopEvent']) do |evt|
    user_id = evt.data['data']['user_id']
    logger.info("processing #{user_id}")
    db.execute('INSERT INTO seen VALUES (?)', user_id)
    seen_count = db.execute('SELECT COUNT(*) FROM seen').first.first

    Blazen::Workflow::StepOutput.single(
      Blazen::Workflow::Event.create(
        event_type: 'blazen::StopEvent',
        data: { result: { user_id: user_id, seen_count: seen_count } },
      ),
    )
  end
end

workflow.run_blocking({ user_id: 'u_42' })
workflow.run_blocking({ user_id: 'u_99' })
# logger emits a line for each run; db row count grows across calls.

Two things to watch for with closures:

  1. Thread safety. The cabi may dispatch step blocks on a Tokio worker thread. If multiple workflows reuse the same closure (or if you fan out and run handler-A and handler-B concurrently), wrap mutable state in a Mutex or use a thread-safe data structure.
  2. Cross-process state. Anything in a closure is in-process only. If you need state that survives process restarts (or coordinates across multiple Ruby processes), persist through a real backing store — Redis, SQLite, Postgres, etc. — and pass references through the event payload.

JSON-only constraint

Because event payloads cross an FFI boundary as JSON strings, only JSON-serialisable values survive a step boundary:

  • strings, integers, floats, booleans, nil
  • Array and Hash of the above
  • Anything you can pass to JSON.dump (and read back with JSON.parse)

Symbol keys are stringified on the way out and arrive as strings ({:user=>"a"} becomes {"user"=>"a"} after a round trip). Binary blobs need to be base64-encoded by hand before being placed on the payload — there is no automatic binary tier the way ctx.set provides on the Python side. For large blobs, store the bytes in an external system (Redis, S3, a temp file) and pass the handle / URL on the wire.

Run-level state in the start payload

The simplest place to put workflow-wide configuration is the start payload itself:

workflow.run_blocking({
  config: { max_retries: 3, timeout_ms: 30_000 },
  inputs: ['a', 'b', 'c'],
})

Every step that downstream the workflow receives this in its evt.data['data'] envelope (because Blazen wraps the start input under the "data" key). Forward whichever sub-trees each step needs into its emitted event so downstream steps can read them without parsing the original start envelope again.

Timeouts

The builder exposes two engine-enforced budgets:

workflow = Blazen.workflow('budgeted') do |b|
  b.step_timeout_ms(5_000)    # per-step cap
  b.timeout_ms(30_000)        # whole-workflow cap
  b.step('work', accepts: ['blazen::StartEvent'], emits: ['blazen::StopEvent']) do |evt|
    # ...
  end
end

A run that overruns the workflow timeout fails with a Blazen::TimeoutError. The Rust runtime tears down the active step and returns immediately — these are enforced budgets, not advisory hints.

Run IDs and telemetry

Run IDs are surfaced via the native telemetry exporters (Langfuse, OTLP, Prometheus), not through a Ruby-side accessor on Event or WorkflowResult. Wire up an exporter through Blazen::Telemetry.init_otlp(...) (or the matching Langfuse / Prometheus initialisers) if you need to correlate run IDs across systems. The total_input_tokens, total_output_tokens, and total_cost_usd fields on WorkflowResult are available unconditionally for post-hoc aggregation.

See also

  • Quickstart — the minimum end-to-end workflow.
  • Events — how event payloads route between steps.
  • Human-in-the-Loop — structuring workflows that pause for external input.