Distributed Workflows

Run sub-workflows across machines with blazen-peer

Blazen workflows normally run in a single process. The blazen-peer crate extends this to multiple machines: a parent workflow on machine A can delegate a sub-workflow to machine B over gRPC, get the result back, and lazily dereference any session refs that stayed on the remote peer.

When to use distributed workflows

  • Privacy boundaries. Keep sensitive data on one machine and run the steps that touch it there, while the orchestrating workflow lives elsewhere.
  • Cost optimization. Route GPU-intensive steps (embedding generation, image diffusion) to machines with the right hardware instead of paying for GPU on every node.
  • Hardware-specific steps. Some steps require specialized hardware (TPUs, large-memory instances, local SSDs). Run those steps on the machine that has the hardware.
  • Regulatory compliance. Data residency requirements may mandate that certain processing happens in a specific region or on specific infrastructure.

How it works

  1. The parent builds a SubWorkflowRequest containing a workflow name, an ordered list of step IDs, and a JSON input.
  2. The request is sent over a tonic gRPC channel (HTTP/2, optional mTLS) to the peer.
  3. The peer resolves each step ID against its local step registry, assembles a Workflow, and runs it.
  4. The peer returns a SubWorkflowResponse with the terminal result, exported state values, and RemoteRefDescriptor handles for any session refs that could not be serialized inline.
  5. The parent can dereference remote refs lazily over the same channel, and release them when done.

Setup (Rust)

1. Register steps on both sides

Each machine registers the steps it can execute in the global step registry:

use blazen_core::register_step_builder;

// On the peer (machine B) -- register the steps it will run.
register_step_builder("my_app::analyze", my_analyze_step_builder);
register_step_builder("my_app::summarize", my_summarize_step_builder);

2. Start the peer server

use blazen_peer::BlazenPeerServer;

let server = BlazenPeerServer::new("node-b");
server.serve("0.0.0.0:50051".parse()?).await?;

The server uses an internal SessionRefRegistry by default. To share a registry with in-process workflows running on the same machine, call .with_session_refs(arc_registry).

3. Connect the client and invoke

use blazen_peer::BlazenPeerClient;
use blazen_peer::SubWorkflowRequest;

let mut client = BlazenPeerClient::connect("http://peer-b:50051", "node-a").await?;

let input = serde_json::json!({ "document": "..." });
let request = SubWorkflowRequest::new(
    "analyze-pipeline",
    vec!["my_app::analyze".to_string(), "my_app::summarize".to_string()],
    &input,
    Some(60), // timeout in seconds
)?;

let response = client.invoke_sub_workflow(request).await?;

if let Some(err) = &response.error {
    eprintln!("remote workflow failed: {err}");
} else {
    let result = response.result_value()?;
    println!("result: {result:?}");
}

Python example (planned API)

Python bindings for blazen-peer are not yet wired. The planned API will expose PeerClient and integrate with Workflow.run_remote:

from blazen import Workflow, step, Event, StopEvent, Context
from blazen.peer import PeerClient

# Connect to a remote peer.
client = await PeerClient.connect("http://peer-b:50051", node_id="node-a")

# Define a local workflow that delegates to the peer.
@step
async def orchestrate(ctx: Context, ev: Event):
    result = await client.invoke_sub_workflow(
        workflow_name="analyze-pipeline",
        step_ids=["my_app::analyze", "my_app::summarize"],
        input={"document": ev.document},
        timeout_secs=60,
    )
    return StopEvent(result=result)

wf = Workflow("distributed-example", [orchestrate])
handler = await wf.run(document="...")
result = await handler.result()

Until the bindings land, you can call the Rust API directly from a Rust workflow step and expose the result to Python via the existing StopEvent.result bridge.

Node.js example (planned API)

Node.js bindings follow the same pattern:

import { Workflow, CompletionModel } from "blazen";
import { PeerClient } from "blazen/peer";

const client = await PeerClient.connect("http://peer-b:50051", "node-a");

const wf = new Workflow("distributed-example");

wf.addStep("orchestrate", ["blazen::StartEvent"], async (event, ctx) => {
  const result = await client.invokeSubWorkflow({
    workflowName: "analyze-pipeline",
    stepIds: ["my_app::analyze", "my_app::summarize"],
    input: { document: event.document },
    timeoutSecs: 60,
  });

  return { type: "blazen::StopEvent", result };
});

const result = await wf.run({ document: "..." });
console.log(result.data);

Until the Node bindings land, use the Rust core directly via a native addon or call the gRPC endpoint with any Node gRPC client library (@grpc/grpc-js).

Session refs across machines

When a sub-workflow on the peer creates a session ref — for example, a model weight cache or a GPU-resident tensor — the value stays on the peer. The parent receives a RemoteRefDescriptor containing:

FieldTypeDescription
origin_node_idStringStable identifier of the node that owns the value.
type_tagStringType tag from SessionRefSerializable::blazen_type_tag. Used by the parent’s deserializer to rehydrate the bytes.
created_at_epoch_msu64Wall-clock creation time on the origin node. Useful for tracing and TTL bookkeeping.

Lazy dereference

The parent can fetch the underlying bytes at any time by calling deref_session_ref with the ref’s RegistryKey (the UUID from the SubWorkflowResponse.remote_refs map):

use blazen_core::session_ref::RegistryKey;

for (uuid, descriptor) in &response.remote_refs {
    let key = RegistryKey(*uuid);
    let bytes = client.deref_session_ref(key).await?;
    // Deserialize `bytes` using the deserializer keyed by `descriptor.type_tag`.
}

Release

When the parent no longer needs a remote ref, it should release it so the peer can free the memory:

let was_released = client.release_session_ref(key).await?;

Returns true if the ref was found and dropped, false if it was already gone (expired by lifetime policy or released by another caller).

RefLifetime interaction

The RefLifetime policy on each session ref controls when it is automatically purged on the peer:

  • UntilContextDrop (default) — purged when the sub-workflow finishes. The ref must be serialized into the response or it is lost.
  • UntilParentFinish — survives the sub-workflow. Available for lazy DerefSessionRef until the parent explicitly releases it. This is the recommended policy for distributed workflows.
  • UntilExplicitDrop — never purged automatically. The parent must call ReleaseSessionRef.

Set the lifetime when inserting a ref in your step:

use blazen_core::session_ref::RefLifetime;

ctx.session_refs()
    .insert_with_lifetime(my_value, RefLifetime::UntilParentFinish)
    .await;

mTLS configuration for production

In production, always enable mutual TLS between peers. The blazen_peer::tls module provides helpers that read PEM files and produce tonic TLS configs:

use std::path::Path;
use blazen_peer::tls::{load_server_tls, load_client_tls};

// Server side
let server_tls = load_server_tls(
    Path::new("/certs/server.crt"),
    Path::new("/certs/server.key"),
    Path::new("/certs/ca.crt"),
)?;

// Client side
let client_tls = load_client_tls(
    Path::new("/certs/client.crt"),
    Path::new("/certs/client.key"),
    Path::new("/certs/ca.crt"),
)?;

Both sides must present certificates signed by the same CA. In Kubernetes, use cert-manager with a shared Issuer to automate certificate issuance and rotation for each peer pod.

When mTLS is not practical (development, internal networks), peers can authenticate with a shared secret token by setting the BLAZEN_PEER_TOKEN environment variable on both sides. See blazen_peer::auth::resolve_peer_token.

Error handling

The PeerError enum covers all failure modes:

VariantMeaning
PeerError::UnknownStepThe peer does not have a requested step ID registered.
PeerError::EnvelopeVersionThe client sent an envelope version newer than the server supports.
PeerError::WorkflowThe remote workflow ran but produced an error.
PeerError::TransportNetwork-level failure (connection refused, timeout, TLS handshake failure).
PeerError::EncodePostcard serialization or deserialization failed.
PeerError::TlsTLS configuration error (bad PEM, missing key file).

Additionally, the SubWorkflowResponse.error field carries workflow-level errors as a string when the sub-workflow itself fails (as opposed to transport or protocol errors).

Envelope versioning

Every wire payload carries an envelope_version field (currently 1). Adding optional fields at the end of a struct is forward-compatible and does not require a version bump. Renaming, reordering, or removing fields is a breaking change that requires incrementing the ENVELOPE_VERSION constant. The server rejects payloads with a version newer than it supports (FAILED_PRECONDITION) but accepts all older versions.