Distributed Workflows
Run sub-workflows across machines with blazen-peer
Blazen workflows normally run in a single process. The blazen-peer crate extends this to multiple machines: a parent workflow on machine A can delegate a sub-workflow to machine B over gRPC, get the result back, and lazily dereference any session refs that stayed on the remote peer.
When to use distributed workflows
- Privacy boundaries. Keep sensitive data on one machine and run the steps that touch it there, while the orchestrating workflow lives elsewhere.
- Cost optimization. Route GPU-intensive steps (embedding generation, image diffusion) to machines with the right hardware instead of paying for GPU on every node.
- Hardware-specific steps. Some steps require specialized hardware (TPUs, large-memory instances, local SSDs). Run those steps on the machine that has the hardware.
- Regulatory compliance. Data residency requirements may mandate that certain processing happens in a specific region or on specific infrastructure.
How it works
- The parent builds a
SubWorkflowRequestcontaining a workflow name, an ordered list of step IDs, and a JSON input. - The request is sent over a tonic gRPC channel (HTTP/2, optional mTLS) to the peer.
- The peer resolves each step ID against its local step registry, assembles a
Workflow, and runs it. - The peer returns a
SubWorkflowResponsewith the terminal result, exported state values, andRemoteRefDescriptorhandles for any session refs that could not be serialized inline. - The parent can dereference remote refs lazily over the same channel, and release them when done.
Setup (Rust)
1. Register steps on both sides
Each machine registers the steps it can execute in the global step registry:
use blazen_core::register_step_builder;
// On the peer (machine B) -- register the steps it will run.
register_step_builder("my_app::analyze", my_analyze_step_builder);
register_step_builder("my_app::summarize", my_summarize_step_builder);
2. Start the peer server
use blazen_peer::BlazenPeerServer;
let server = BlazenPeerServer::new("node-b");
server.serve("0.0.0.0:50051".parse()?).await?;
The server uses an internal SessionRefRegistry by default. To share a registry with in-process workflows running on the same machine, call .with_session_refs(arc_registry).
3. Connect the client and invoke
use blazen_peer::BlazenPeerClient;
use blazen_peer::SubWorkflowRequest;
let mut client = BlazenPeerClient::connect("http://peer-b:50051", "node-a").await?;
let input = serde_json::json!({ "document": "..." });
let request = SubWorkflowRequest::new(
"analyze-pipeline",
vec!["my_app::analyze".to_string(), "my_app::summarize".to_string()],
&input,
Some(60), // timeout in seconds
)?;
let response = client.invoke_sub_workflow(request).await?;
if let Some(err) = &response.error {
eprintln!("remote workflow failed: {err}");
} else {
let result = response.result_value()?;
println!("result: {result:?}");
}
Python example (planned API)
Python bindings for blazen-peer are not yet wired. The planned API will expose PeerClient and integrate with Workflow.run_remote:
from blazen import Workflow, step, Event, StopEvent, Context
from blazen.peer import PeerClient
# Connect to a remote peer.
client = await PeerClient.connect("http://peer-b:50051", node_id="node-a")
# Define a local workflow that delegates to the peer.
@step
async def orchestrate(ctx: Context, ev: Event):
result = await client.invoke_sub_workflow(
workflow_name="analyze-pipeline",
step_ids=["my_app::analyze", "my_app::summarize"],
input={"document": ev.document},
timeout_secs=60,
)
return StopEvent(result=result)
wf = Workflow("distributed-example", [orchestrate])
handler = await wf.run(document="...")
result = await handler.result()
Until the bindings land, you can call the Rust API directly from a Rust workflow step and expose the result to Python via the existing StopEvent.result bridge.
Node.js example (planned API)
Node.js bindings follow the same pattern:
import { Workflow, CompletionModel } from "blazen";
import { PeerClient } from "blazen/peer";
const client = await PeerClient.connect("http://peer-b:50051", "node-a");
const wf = new Workflow("distributed-example");
wf.addStep("orchestrate", ["blazen::StartEvent"], async (event, ctx) => {
const result = await client.invokeSubWorkflow({
workflowName: "analyze-pipeline",
stepIds: ["my_app::analyze", "my_app::summarize"],
input: { document: event.document },
timeoutSecs: 60,
});
return { type: "blazen::StopEvent", result };
});
const result = await wf.run({ document: "..." });
console.log(result.data);
Until the Node bindings land, use the Rust core directly via a native addon or call the gRPC endpoint with any Node gRPC client library (@grpc/grpc-js).
Session refs across machines
When a sub-workflow on the peer creates a session ref — for example, a model weight cache or a GPU-resident tensor — the value stays on the peer. The parent receives a RemoteRefDescriptor containing:
| Field | Type | Description |
|---|---|---|
origin_node_id | String | Stable identifier of the node that owns the value. |
type_tag | String | Type tag from SessionRefSerializable::blazen_type_tag. Used by the parent’s deserializer to rehydrate the bytes. |
created_at_epoch_ms | u64 | Wall-clock creation time on the origin node. Useful for tracing and TTL bookkeeping. |
Lazy dereference
The parent can fetch the underlying bytes at any time by calling deref_session_ref with the ref’s RegistryKey (the UUID from the SubWorkflowResponse.remote_refs map):
use blazen_core::session_ref::RegistryKey;
for (uuid, descriptor) in &response.remote_refs {
let key = RegistryKey(*uuid);
let bytes = client.deref_session_ref(key).await?;
// Deserialize `bytes` using the deserializer keyed by `descriptor.type_tag`.
}
Release
When the parent no longer needs a remote ref, it should release it so the peer can free the memory:
let was_released = client.release_session_ref(key).await?;
Returns true if the ref was found and dropped, false if it was already gone (expired by lifetime policy or released by another caller).
RefLifetime interaction
The RefLifetime policy on each session ref controls when it is automatically purged on the peer:
UntilContextDrop(default) — purged when the sub-workflow finishes. The ref must be serialized into the response or it is lost.UntilParentFinish— survives the sub-workflow. Available for lazyDerefSessionRefuntil the parent explicitly releases it. This is the recommended policy for distributed workflows.UntilExplicitDrop— never purged automatically. The parent must callReleaseSessionRef.
Set the lifetime when inserting a ref in your step:
use blazen_core::session_ref::RefLifetime;
ctx.session_refs()
.insert_with_lifetime(my_value, RefLifetime::UntilParentFinish)
.await;
mTLS configuration for production
In production, always enable mutual TLS between peers. The blazen_peer::tls module provides helpers that read PEM files and produce tonic TLS configs:
use std::path::Path;
use blazen_peer::tls::{load_server_tls, load_client_tls};
// Server side
let server_tls = load_server_tls(
Path::new("/certs/server.crt"),
Path::new("/certs/server.key"),
Path::new("/certs/ca.crt"),
)?;
// Client side
let client_tls = load_client_tls(
Path::new("/certs/client.crt"),
Path::new("/certs/client.key"),
Path::new("/certs/ca.crt"),
)?;
Both sides must present certificates signed by the same CA. In Kubernetes, use cert-manager with a shared Issuer to automate certificate issuance and rotation for each peer pod.
When mTLS is not practical (development, internal networks), peers can authenticate with a shared secret token by setting the BLAZEN_PEER_TOKEN environment variable on both sides. See blazen_peer::auth::resolve_peer_token.
Error handling
The PeerError enum covers all failure modes:
| Variant | Meaning |
|---|---|
PeerError::UnknownStep | The peer does not have a requested step ID registered. |
PeerError::EnvelopeVersion | The client sent an envelope version newer than the server supports. |
PeerError::Workflow | The remote workflow ran but produced an error. |
PeerError::Transport | Network-level failure (connection refused, timeout, TLS handshake failure). |
PeerError::Encode | Postcard serialization or deserialization failed. |
PeerError::Tls | TLS configuration error (bad PEM, missing key file). |
Additionally, the SubWorkflowResponse.error field carries workflow-level errors as a string when the sub-workflow itself fails (as opposed to transport or protocol errors).
Envelope versioning
Every wire payload carries an envelope_version field (currently 1). Adding optional fields at the end of a struct is forward-compatible and does not require a version bump. Renaming, reordering, or removing fields is a breaking change that requires incrementing the ENVELOPE_VERSION constant. The server rejects payloads with a version newer than it supports (FAILED_PRECONDITION) but accepts all older versions.