Distributed Workflows
Run sub-workflows across machines from Ruby with blazen-peer
Distributed Workflows
Blazen workflows normally run in one process. The blazen-peer gRPC layer extends that to multiple machines: a parent program on machine A can ask machine B to run a workflow that machine B has registered, then receive the terminal Blazen::Workflow::WorkflowResult back over the wire. The Ruby binding exposes this through Blazen::Peer.connect (client) and Blazen::Peer.server (server).
When to use distributed workflows
- Privacy boundaries. Keep sensitive data on one machine and run the steps that touch it there, while the orchestrating program lives elsewhere.
- Cost optimization. Route GPU-intensive steps (embeddings, diffusion, transcription) to machines with the right hardware instead of paying for GPU on every node.
- Hardware-specific steps. Some steps require TPUs, large-memory instances, or local SSDs. Run those steps on the machine that has the hardware.
- Regulatory compliance. Data residency rules may mandate that processing happens in a specific region or on specific infrastructure.
How it works
- The parent calls
client.run_remote_workflow(workflow_name, step_ids:, input_json:, timeout_secs:). - The cabi serialises the request and sends it over a tonic gRPC channel (HTTP/2) to the peer.
- The peer looks up
workflow_namein its local registry, assembles the named steps, and runs them on its own embedded Tokio runtime. - The peer returns the terminal event (typically a
blazen::StopEvent) plus rolled-up token / cost totals. - The parent receives a
Blazen::Workflow::WorkflowResult— the same wrapper class thatworkflow.runreturns locally.
The Ruby Async-aware variants (run_remote_workflow, serve) integrate with Fiber.scheduler, so they yield instead of blocking when called inside an Async { ... } block.
Feature availability
Distributed peer support is gated on the distributed cabi feature. The prebuilt gem ships with it enabled, but if you build the native library yourself you must opt in. Before constructing a server or client, check:
require 'blazen'
unless Blazen::Peer.available?
abort 'blazen was built without the distributed feature -- peer surface unavailable'
end
If the symbols are missing, every Blazen::Peer.* constructor raises Blazen::UnsupportedError.
Server setup
Bind a peer server to a socket address and serve until the listener errors or the surrounding fiber is cancelled.
require 'blazen'
require 'async'
Blazen.init
server = Blazen::Peer.server(node_id: 'node-b')
Async do
# serve composes with Fiber.scheduler -- the calling fiber yields while
# the gRPC server is accepting connections.
server.serve('0.0.0.0:50051')
end
For scripts that don’t have a fiber scheduler — or for embedded uses where you want to dedicate a real OS thread to the listener — use the blocking variant:
server = Blazen::Peer.server(node_id: 'node-b')
server.serve_blocking('0.0.0.0:50051') # blocks until the listener errors
The node_id is the stable identifier this server stamps onto trace logs and remote-ref metadata. Typical values are the hostname or a UUID generated at process startup.
Workflows the server can run must be registered on the same process before serve is called — the peer dispatches by workflow_name against the local registry. See Quickstart for how to build and register a workflow.
Client setup
Open a connection and invoke a remote workflow. Blazen::Peer.connect performs the TCP / HTTP/2 handshake synchronously on the cabi Tokio runtime, then hands you a PeerClient wrapper.
require 'blazen'
require 'async'
require 'json'
Blazen.init
client = Blazen::Peer.connect(
address: 'http://peer-b.local:50051',
client_node_id: 'node-a',
)
puts "connected as #{client.node_id}"
Async do
result = client.run_remote_workflow(
'analyze-pipeline',
step_ids: ['my_app::analyze', 'my_app::summarize'],
input_json: JSON.generate({ document: 'The quick brown fox.' }),
timeout_secs: 60,
)
puts result.event.event_type
puts result.event_data.inspect
puts "tokens=#{result.total_input_tokens}/#{result.total_output_tokens}"
puts "cost=$#{result.total_cost_usd}"
end
If you’d rather block the calling thread instead of yielding to a fiber scheduler, swap in run_remote_workflow_blocking:
result = client.run_remote_workflow_blocking(
'analyze-pipeline',
step_ids: [], # empty == "use the workflow's declared steps"
input_json: JSON.generate({ document: '...' }),
timeout_secs: nil, # nil == defer to the server default
)
Argument semantics
| Keyword | Type | Meaning |
|---|---|---|
workflow_name (positional) | String | Symbolic name the peer’s registry knows the workflow as. |
step_ids: | Array<String> | Step IDs to execute. Pass [] to use the workflow’s declared steps. |
input_json: | String | JSON-encoded entry-step payload. The peer decodes it into a blazen::StartEvent. |
timeout_secs: | Integer, nil | Wall-clock bound. nil defers to the server default; non-negative integers cap at that many seconds. |
Pass already-encoded JSON in input_json: — JSON.generate(hash) is the canonical way to produce it. The Ruby helper does not run an implicit .to_json on a Hash for you, because the cabi takes a raw JSON string across the FFI.
Inspecting the result
run_remote_workflow returns a Blazen::Workflow::WorkflowResult, the same class your local workflow.run invocations return. Useful accessors:
result.event— the terminalBlazen::Workflow::Event(typically ablazen::StopEvent).result.event_data— the terminal event’s payload, JSON-decoded into Ruby primitives.result.total_input_tokens/result.total_output_tokens— rolled-up token usage from every LLM call the remote workflow made.result.total_cost_usd— rolled-up USD cost (when the providers involved report pricing).
event_type = result.event.event_type
case event_type
when 'blazen::StopEvent'
puts result.event_data.fetch('result')
else
warn "unexpected terminal event: #{event_type}"
end
Error handling
Failures surface as Ruby exceptions decoded from the cabi error-kind enum. The peer-specific subclass is Blazen::PeerError; other relevant subclasses include Blazen::TimeoutError (when timeout_secs: elapses), Blazen::WorkflowError (the remote workflow itself failed), Blazen::CancelledError (the fiber unwound or the connection dropped), and Blazen::UnsupportedError (the cabi was built without the distributed feature).
begin
result = client.run_remote_workflow(
'analyze-pipeline',
step_ids: [],
input_json: JSON.generate(input),
timeout_secs: 30,
)
rescue Blazen::TimeoutError => e
warn "remote workflow exceeded its 30s budget: #{e.message}"
rescue Blazen::PeerError => e
warn "peer-level failure (transport, envelope, unknown step): #{e.message}"
rescue Blazen::WorkflowError => e
warn "remote workflow ran but returned an error: #{e.message}"
rescue Blazen::CancelledError
warn 'remote workflow was cancelled (caller fiber unwound)'
end
Blazen::PeerError covers transport-level failures (connection refused, TLS handshake, envelope version too new) and registry misses (the remote does not have the requested workflow registered). Workflow-level failures inside the remote steps surface as Blazen::WorkflowError instead.
Driving a remote sub-workflow from a local step
The most common pattern is a local workflow whose step delegates to a remote peer:
require 'blazen'
require 'json'
Blazen.init
client = Blazen::Peer.connect(
address: 'http://peer-b.local:50051',
client_node_id: 'node-a',
)
workflow = Blazen.workflow('orchestrate') do |b|
b.step('delegate', accepts: ['blazen::StartEvent'], emits: ['blazen::StopEvent']) do |evt|
payload = evt.data['data'] || {}
response = client.run_remote_workflow_blocking(
'analyze-pipeline',
step_ids: ['my_app::analyze', 'my_app::summarize'],
input_json: JSON.generate(payload),
timeout_secs: 60,
)
Blazen::Workflow::StepOutput.single(
Blazen::Workflow::Event.create(
event_type: 'blazen::StopEvent',
data: { result: response.event_data.fetch('result', {}) },
),
)
end
end
local_result = workflow.run_blocking({ document: 'The quick brown fox.' })
puts local_result.event_data.inspect
Use the _blocking variant inside a step block by default. The step callback fires on a cabi Tokio worker thread, which is not running a Fiber.scheduler — so run_remote_workflow (the fiber-yielding variant) would fall back to thread-blocking anyway. Calling run_remote_workflow_blocking makes the intent explicit.
Transport security
The current Ruby surface exposes only the plain gRPC endpoint — the address: argument to Blazen::Peer.connect is passed through to tonic as-is. mTLS PEM configuration is not yet wired through the C ABI, so production deployments that need certificate-based mutual auth should terminate TLS at a sidecar (Envoy, Linkerd, a service mesh) and connect over the resulting plaintext loopback. Internal-network deployments can use the plain endpoint directly.
When TLS configuration lands in the cabi it will surface here as additional keyword args on Blazen::Peer.connect / Blazen::Peer.server. Until then, prefer running both endpoints inside the same VPC or under a service mesh that enforces mTLS at the network layer.
Lifecycle and handle cleanup
Both PeerClient and PeerServer install FFI::AutoPointer finalizers, so a forgotten handle does not leak native memory across a GC cycle. For long-lived deployments, pin the client in a constant or an instance variable so its connection survives across many remote calls rather than reconnecting on every dispatch:
class RemoteAnalyzer
PEER = Blazen::Peer.connect(
address: ENV.fetch('BLAZEN_PEER_ADDRESS'),
client_node_id: ENV.fetch('BLAZEN_NODE_ID'),
)
def self.analyze(document)
PEER.run_remote_workflow_blocking(
'analyze-pipeline',
step_ids: [],
input_json: JSON.generate(document: document),
timeout_secs: 60,
)
end
end
The underlying gRPC channel multiplexes many in-flight requests onto a single HTTP/2 connection, so reusing one PeerClient across the process is the right default. Cleanup happens automatically when the wrapper is collected; there is no explicit close to call.
See also
- Quickstart — building the workflows that the remote peer will register and run.
- Streaming — driving LLM streams from inside steps that may run on the remote side.
- Human-in-the-Loop — pausing workflows for external input, which composes with remote execution.