Context
Pass state between workflow steps in Ruby
Context
What context means in the Ruby binding
Unlike the Python binding (which exposes a per-run Context object with ctx.set / ctx.get storage tiers), the Ruby binding does not surface a context object to step blocks. Step blocks receive only a Blazen::Workflow::Event and return a Blazen::Workflow::StepOutput. Workflow-level state flows entirely through the JSON payloads attached to events.
This is intentional: the cabi callback ABI hands the step block exactly one argument (the incoming event) plus an output slot, with no side channel for shared state. Anything you want a downstream step to see needs to ride along inside the event payload, or live in Ruby state your block closes over.
State via event payloads
The most natural pattern for threading data through a workflow is to keep enriching the event payload as it moves between steps:
workflow = Blazen.workflow('enrich') do |b|
b.step('load_user', accepts: ['blazen::StartEvent'], emits: ['UserLoaded']) do |evt|
user_id = evt.data['data']['user_id']
user = { id: user_id, name: 'Alice', tier: 'pro' }
Blazen::Workflow::StepOutput.single(
Blazen::Workflow::Event.create(
event_type: 'UserLoaded',
data: { user: user, request_count: 1 },
),
)
end
b.step('load_orders', accepts: ['UserLoaded'], emits: ['OrdersLoaded']) do |evt|
payload = evt.data
orders = [{ id: 'o1', total: 12.50 }, { id: 'o2', total: 99.00 }]
Blazen::Workflow::StepOutput.single(
Blazen::Workflow::Event.create(
event_type: 'OrdersLoaded',
data: payload.merge(orders: orders, request_count: payload['request_count'] + 1),
),
)
end
b.step('summarize', accepts: ['OrdersLoaded'], emits: ['blazen::StopEvent']) do |evt|
payload = evt.data
total = payload['orders'].sum { |o| o['total'] }
Blazen::Workflow::StepOutput.single(
Blazen::Workflow::Event.create(
event_type: 'blazen::StopEvent',
data: {
result: {
user_name: payload['user']['name'],
order_total: total,
steps_run: payload['request_count'],
},
},
),
)
end
end
result = workflow.run_blocking({ user_id: 'u_42' })
puts result.event_data.inspect
# => {"result"=>{"user_name"=>"Alice", "order_total"=>111.5, "steps_run"=>2}}
Every value that needs to survive a step boundary is in the JSON payload. Hash keys round-trip as strings (Ruby JSON conversion), so always read them back with string keys (payload['user'], not payload[:user]).
State via closures
When state should not be on the wire — a database handle, a counter, a logger, an in-process cache — close over it from the surrounding Ruby scope. Step blocks are regular Ruby Proc objects; they capture their enclosing binding the same way any other block does.
require 'logger'
require 'sqlite3'
logger = Logger.new($stdout)
db = SQLite3::Database.new(':memory:')
db.execute('CREATE TABLE seen(id TEXT)')
workflow = Blazen.workflow('with_closure') do |b|
b.step('record', accepts: ['blazen::StartEvent'], emits: ['blazen::StopEvent']) do |evt|
user_id = evt.data['data']['user_id']
logger.info("processing #{user_id}")
db.execute('INSERT INTO seen VALUES (?)', user_id)
seen_count = db.execute('SELECT COUNT(*) FROM seen').first.first
Blazen::Workflow::StepOutput.single(
Blazen::Workflow::Event.create(
event_type: 'blazen::StopEvent',
data: { result: { user_id: user_id, seen_count: seen_count } },
),
)
end
end
workflow.run_blocking({ user_id: 'u_42' })
workflow.run_blocking({ user_id: 'u_99' })
# logger emits a line for each run; db row count grows across calls.
Two things to watch for with closures:
- Thread safety. The cabi may dispatch step blocks on a Tokio worker thread. If multiple workflows reuse the same closure (or if you fan out and run handler-A and handler-B concurrently), wrap mutable state in a
Mutexor use a thread-safe data structure. - Cross-process state. Anything in a closure is in-process only. If you need state that survives process restarts (or coordinates across multiple Ruby processes), persist through a real backing store — Redis, SQLite, Postgres, etc. — and pass references through the event payload.
JSON-only constraint
Because event payloads cross an FFI boundary as JSON strings, only JSON-serialisable values survive a step boundary:
- strings, integers, floats, booleans,
nil ArrayandHashof the above- Anything you can pass to
JSON.dump(and read back withJSON.parse)
Symbol keys are stringified on the way out and arrive as strings ({:user=>"a"} becomes {"user"=>"a"} after a round trip). Binary blobs need to be base64-encoded by hand before being placed on the payload — there is no automatic binary tier the way ctx.set provides on the Python side. For large blobs, store the bytes in an external system (Redis, S3, a temp file) and pass the handle / URL on the wire.
Run-level state in the start payload
The simplest place to put workflow-wide configuration is the start payload itself:
workflow.run_blocking({
config: { max_retries: 3, timeout_ms: 30_000 },
inputs: ['a', 'b', 'c'],
})
Every step that downstream the workflow receives this in its evt.data['data'] envelope (because Blazen wraps the start input under the "data" key). Forward whichever sub-trees each step needs into its emitted event so downstream steps can read them without parsing the original start envelope again.
Timeouts
The builder exposes two engine-enforced budgets:
workflow = Blazen.workflow('budgeted') do |b|
b.step_timeout_ms(5_000) # per-step cap
b.timeout_ms(30_000) # whole-workflow cap
b.step('work', accepts: ['blazen::StartEvent'], emits: ['blazen::StopEvent']) do |evt|
# ...
end
end
A run that overruns the workflow timeout fails with a Blazen::TimeoutError. The Rust runtime tears down the active step and returns immediately — these are enforced budgets, not advisory hints.
Run IDs and telemetry
Run IDs are surfaced via the native telemetry exporters (Langfuse, OTLP, Prometheus), not through a Ruby-side accessor on Event or WorkflowResult. Wire up an exporter through Blazen::Telemetry.init_otlp(...) (or the matching Langfuse / Prometheus initialisers) if you need to correlate run IDs across systems. The total_input_tokens, total_output_tokens, and total_cost_usd fields on WorkflowResult are available unconditionally for post-hoc aggregation.
See also
- Quickstart — the minimum end-to-end workflow.
- Events — how event payloads route between steps.
- Human-in-the-Loop — structuring workflows that pause for external input.