Distributed Workflows
Run sub-workflows across machines with blazen-peer from Node.js
Blazen workflows normally run inside a single Node.js process. The blazen-peer transport extends that to multiple machines: a parent workflow on machine A can delegate a sub-workflow to machine B over gRPC, receive the terminal result back, and lazily dereference any session refs that stayed on the remote peer.
The Node binding ships both the native gRPC transport (BlazenPeerServer / BlazenPeerClient) and an HTTP/JSON variant (HttpPeerClient) for environments where binding to a tonic client is awkward (serverless, browsers via a proxy, wasi-style hosts).
When to use distributed workflows
- Privacy boundaries. Keep sensitive data on one machine and run the steps that touch it there, while the orchestrating workflow lives elsewhere.
- Cost optimization. Route GPU-intensive steps (embedding generation, image diffusion) to machines with the right hardware instead of paying for GPU on every node.
- Hardware-specific steps. Some steps require specialized hardware (TPUs, large-memory instances, local SSDs). Run those steps on the machine that has the hardware.
- Regulatory compliance. Data residency requirements may mandate that certain processing happens in a specific region or on specific infrastructure.
How it works
- The parent builds a
JsSubWorkflowRequestwith a workflow name, an ordered list of step IDs, and a JSON input. - The request is sent over a tonic gRPC channel (HTTP/2, optional mTLS) to the peer.
- The peer resolves each step ID against its local step registry, assembles a workflow, and runs it.
- The peer returns a
JsSubWorkflowResponsewith the terminal result, exported state values, andJsPeerRemoteRefDescriptorhandles for any session refs that could not be serialized inline. - The parent can dereference remote refs lazily over the same channel, and release them when done.
Server setup
The peer process needs to register the steps it will execute, then bind a gRPC server. Step registration happens through the regular Workflow.addStep calls inside the Node process that hosts the peer — the registry is populated by side effect of constructing workflows in that process.
import { Workflow, BlazenPeerServer } from "blazen";
// Register the steps this peer can run. Building the workflow is enough --
// the addStep calls populate the global step registry used by the peer.
const local = new Workflow("analyze-pipeline");
local.addStep("my_app::analyze", ["blazen::StartEvent"], async (event, ctx) => {
// ... extract entities, compute embeddings, etc.
return { type: "AnalyzedEvent", entities: [] };
});
local.addStep("my_app::summarize", ["AnalyzedEvent"], async (event, ctx) => {
return { type: "blazen::StopEvent", result: { summary: "..." } };
});
// Bind the gRPC server. The node ID is stamped onto every RemoteRefDescriptor
// this server hands out, so make it stable across restarts of the same node.
const server = BlazenPeerServer.create("node-b");
await server.serve("0.0.0.0:7443");
BlazenPeerServer.serve(addr) consumes the server — a second call on the same instance throws. The promise resolves only when the gRPC stack shuts down; in a long-running peer process you typically await it at the top level.
Client setup
import { BlazenPeerClient } from "blazen";
const client = await BlazenPeerClient.connect("http://peer-b:7443", "node-a");
const response = await client.invokeSubWorkflow({
workflowName: "analyze-pipeline",
stepIds: ["my_app::analyze", "my_app::summarize"],
input: { document: "..." },
timeoutSecs: 60,
});
if (response.error) {
console.error("remote workflow failed:", response.error);
} else {
console.log("result:", response.result);
console.log("exported state:", response.stateJson);
}
BlazenPeerClient.connect(endpoint, nodeId) returns a connected client. nodeId identifies this caller in the peer’s trace logs. The endpoint must be a valid gRPC URI (http://... or https://... for mTLS).
Invoking from inside a workflow step
The typical pattern is to delegate part of a parent workflow to a peer:
import { Workflow, BlazenPeerClient } from "blazen";
const client = await BlazenPeerClient.connect("http://peer-b:7443", "node-a");
const wf = new Workflow("distributed-example");
wf.addStep("orchestrate", ["blazen::StartEvent"], async (event, ctx) => {
const response = await client.invokeSubWorkflow({
workflowName: "analyze-pipeline",
stepIds: ["my_app::analyze", "my_app::summarize"],
input: { document: event.document },
timeoutSecs: 60,
});
if (response.error) {
throw new Error(`peer error: ${response.error}`);
}
return { type: "blazen::StopEvent", result: response.result };
});
const result = await wf.run({ document: "..." });
console.log(result.data);
HTTP/JSON peer client
For hosts that cannot link the native gRPC client (wasi, restricted serverless runtimes, code running behind an HTTP-only proxy), use HttpPeerClient. It exposes the same three methods as BlazenPeerClient but speaks pure HTTP/JSON.
import { HttpPeerClient } from "blazen";
const client = HttpPeerClient.newHttp("https://peer.example.com", "node-a");
const response = await client.invokeSubWorkflow({
workflowName: "analyze-pipeline",
stepIds: ["my_app::analyze"],
input: { document: "..." },
timeoutSecs: 30,
});
HttpPeerClient.newHttp is synchronous (no connection is opened until the first request). The nodeId is sent on every request as the X-Blazen-Peer-Node-Id header. The peer must be running a compatible HTTP shim; the gRPC server alone is not enough.
Session refs across machines
When a sub-workflow on the peer creates a session ref — a model-weight cache, a GPU-resident tensor, a fetched document body — the value stays on the peer. The parent receives a JsPeerRemoteRefDescriptor keyed by the registry UUID:
| Field | Type | Description |
|---|---|---|
originNodeId | string | Stable identifier of the node that owns the value. |
typeTag | string | Type tag from the originating SessionRefSerializable. Used by the parent to pick a deserializer for the dereffed bytes. |
createdAtEpochMs | number | Wall-clock creation time on the origin node. Useful for tracing and TTL bookkeeping. |
for (const [refUuid, descriptor] of Object.entries(response.remoteRefs)) {
console.log(
`ref ${refUuid} from ${descriptor.originNodeId}`,
`type=${descriptor.typeTag}`,
`createdAt=${new Date(descriptor.createdAtEpochMs).toISOString()}`,
);
}
Lazy dereference
Fetch the underlying bytes on demand by calling derefSessionRef with the ref’s UUID. The response carries the raw bytes as a Buffer:
import { peerEnvelopeVersion } from "blazen";
for (const refUuid of Object.keys(response.remoteRefs)) {
const deref = await client.derefSessionRef({
envelopeVersion: peerEnvelopeVersion(),
refUuid,
});
// `deref.payload` is a Buffer; decode it with whatever serializer
// matches the descriptor's typeTag.
const decoded = JSON.parse(deref.payload.toString("utf8"));
console.log("dereffed:", decoded);
}
Release
When the parent no longer needs a remote ref, release it so the peer can free the memory:
const release = await client.releaseSessionRef({
envelopeVersion: peerEnvelopeVersion(),
refUuid,
});
console.log("was released?", release.released);
release.released is true if the ref was found and dropped, false if it was already gone (expired by lifetime policy or released by another caller).
RefLifetime interaction
The RefLifetime policy on each session ref controls when the peer purges it automatically:
UntilContextDrop(default) — purged when the sub-workflow finishes. The ref must be serialized into the response or it is lost.UntilParentFinish— survives the sub-workflow and is available for lazyderefSessionRefuntil the parent explicitly releases it. Recommended for distributed workflows.UntilExplicitDrop— never purged automatically. The parent must callreleaseSessionRefor the bytes leak on the peer.
The lifetime is set on the peer side when the step inserts the ref; the parent receives it as-is.
mTLS configuration
In production, always enable mutual TLS between peers. The Node binding exposes PEM-file validators that load the same rustls configuration the server and client use natively:
import { loadServerTls, loadClientTls } from "blazen";
// Validate on startup that the PEM files exist, parse, and chain correctly.
// Throws a PeerTlsError if anything is malformed.
loadServerTls("/certs/server.crt", "/certs/server.key", "/certs/ca.crt");
loadClientTls("/certs/client.crt", "/certs/client.key", "/certs/ca.crt");
loadServerTls and loadClientTls return true on success. They are validators only — the actual TLS configuration is wired into the gRPC stack by the native peer crate when the server binds and the client connects. The recommended pattern is to call them at process startup so misconfigured certs fail fast rather than surfacing during the first RPC.
Both sides must present certificates signed by the same CA. In Kubernetes, use cert-manager with a shared Issuer to automate certificate issuance and rotation for each peer pod.
Auth-token fallback
When mTLS is not practical (development, internal networks, ad-hoc benchmarking), peers can authenticate with a shared secret instead. Set the same token on both sides in the env var reported by peerTokenEnv():
import { peerTokenEnv, resolvePeerToken } from "blazen";
console.log("set this env var on both peers:", peerTokenEnv());
const token = resolvePeerToken();
if (!token) {
console.warn("peer auth token not set -- peers will reject calls");
}
resolvePeerToken() returns the current token from the process environment, or null when it is unset or empty.
Error handling
The native bindings throw typed subclasses of BlazenError for each peer failure mode. Catch the base class for a unified handler, or narrow on the concrete type:
import {
BlazenPeerClient,
PeerUnknownStepError,
PeerEnvelopeVersionError,
PeerWorkflowError,
PeerTransportError,
PeerEncodeError,
PeerTlsError,
} from "blazen";
try {
await client.invokeSubWorkflow({
workflowName: "analyze-pipeline",
stepIds: ["my_app::analyze"],
input: {},
});
} catch (err) {
if (err instanceof PeerUnknownStepError) {
// The peer does not have one of the requested step IDs registered.
} else if (err instanceof PeerEnvelopeVersionError) {
// Client and server speak incompatible envelope versions.
} else if (err instanceof PeerWorkflowError) {
// The remote workflow ran but reported an error.
} else if (err instanceof PeerTransportError) {
// Network-level failure (connection refused, timeout, TLS handshake).
} else if (err instanceof PeerEncodeError) {
// Postcard serialization or deserialization failed.
} else if (err instanceof PeerTlsError) {
// TLS configuration error (bad PEM, missing key file).
} else {
throw err;
}
}
In addition to thrown errors, the JsSubWorkflowResponse.error field carries workflow-level failures as a string when the sub-workflow itself fails (as opposed to transport or protocol errors). Always check response.error before reading response.result or response.stateJson.
Envelope versioning
Every wire payload carries an envelopeVersion field. Read the version this build speaks with peerEnvelopeVersion():
import { peerEnvelopeVersion } from "blazen";
console.log("speaking envelope version", peerEnvelopeVersion());
Adding optional fields to a payload struct is forward-compatible and does not require a version bump. Renaming, reordering, or removing fields is a breaking change that increments the version. The server rejects payloads with a version newer than it supports (surfaced as a PeerEnvelopeVersionError on the client) but accepts all older versions, so rolling upgrades work as long as the newer side is the server.