Distributed Workflows

Run sub-workflows across machines with blazen-peer from Python

Blazen workflows normally run in a single process. The peer bindings extend this to multiple machines: a parent workflow on machine A can delegate a sub-workflow to machine B over gRPC (or HTTP/JSON), get the result back, and lazily dereference any session refs that stayed on the remote peer.

The Python bindings are wired up. Import BlazenPeerServer, BlazenPeerClient, HttpPeerClient, SubWorkflowRequest, SubWorkflowResponse, and the helpers resolve_peer_token, load_server_tls, load_client_tls directly from blazen.

When to use distributed workflows

  • Privacy boundaries. Keep sensitive data on one machine and run the steps that touch it there, while the orchestrating workflow lives elsewhere.
  • Cost optimization. Route GPU-intensive steps (embedding generation, image diffusion) to machines with the right hardware instead of paying for GPU on every node.
  • Hardware-specific steps. Some steps require specialized hardware (TPUs, large-memory instances, local SSDs). Run those steps on the machine that has the hardware.
  • Regulatory compliance. Data residency requirements may mandate that certain processing happens in a specific region or on specific infrastructure.

How it works

  1. The parent builds a SubWorkflowRequest containing a workflow name, an ordered list of step IDs, and a JSON-compatible input dict.
  2. The request is sent over a tonic gRPC channel (HTTP/2, optional mTLS) to the peer, or over plain HTTP/JSON via HttpPeerClient.
  3. The peer resolves each step ID against its local step registry, assembles a Workflow, and runs it.
  4. The peer returns a SubWorkflowResponse carrying the terminal result, exported state values, and PeerRemoteRefDescriptor handles for any session refs that could not be serialized inline.
  5. The parent can dereference remote refs lazily over the same channel, and release them when done.

Step registration

Steps are looked up by string ID in a process-global registry on the peer. The Python register_step_builder(step_id, builder) binding currently rejects Python callables, so for now any step that runs on a peer must be registered on the Rust side. You can still introspect the registry from Python:

from blazen import lookup_step_builder, registered_step_ids

print(registered_step_ids())     # every step ID the peer can serve
print(lookup_step_builder("my_app::analyze"))   # True / False

Starting the peer server

import asyncio
from blazen import BlazenPeerServer

async def main():
    server = BlazenPeerServer("node-b")
    # Bind and serve forever. Raises PeerError if the bind fails.
    await server.serve("0.0.0.0:7443")

asyncio.run(main())

The server uses an internal session-ref registry by default. To share it with in-process workflows running on the same process, pass a _SessionRegistryHandle via server.with_session_refs(handle) — this lets a single Blazen process expose its in-memory refs to both local and remote callers.

Connecting and invoking from the parent

import asyncio
from blazen import BlazenPeerClient, SubWorkflowRequest

async def main():
    client = await BlazenPeerClient.connect("http://node-b.local:7443", "node-a")

    request = SubWorkflowRequest(
        workflow_name="analyze-pipeline",
        step_ids=["my_app::analyze", "my_app::summarize"],
        input={"document": "..."},
        timeout_secs=60,
    )

    response = await client.invoke_sub_workflow(request)

    if response.error is not None:
        print(f"remote workflow failed: {response.error}")
    else:
        print("result:", response.result_value())
        print("state:", response.state_values())

asyncio.run(main())

SubWorkflowRequest keeps step_ids and input optional — an empty step_ids list defers to the peer’s default workflow definition, and an omitted input becomes an empty JSON object. timeout_secs is a wall-clock cap applied on the peer.

response.result_value() decodes the terminal StopEvent.result JSON into a Python value. response.state_values() returns the full exported ctx.state dict.

Calling a peer over HTTP/JSON

When the native gRPC transport is unavailable — in environments without HTTP/2, in tests behind a JSON-only proxy, or when you just want pure HTTP/JSON wire format — use HttpPeerClient instead. The method surface is the same, but deref_session_ref and release_session_ref take typed DerefRequest / ReleaseRequest objects and return typed DerefResponse / ReleaseResponse objects.

import asyncio
from blazen import (
    HttpPeerClient,
    SubWorkflowRequest,
    DerefRequest,
    ReleaseRequest,
)

async def main():
    client = HttpPeerClient.new_http("https://peer.example.com", "node-a")

    request = SubWorkflowRequest("analyze-pipeline", input={"document": "..."})
    response = await client.invoke_sub_workflow(request)

    for ref_uuid, descriptor in response.remote_refs().items():
        deref = await client.deref_session_ref(DerefRequest(ref_uuid))
        # `deref.payload` is the raw bytes the origin node's serializer produced.
        await client.release_session_ref(ReleaseRequest(ref_uuid))

asyncio.run(main())

HttpPeerClient.new_http is synchronous and performs no network I/O — the first call happens on invoke_sub_workflow.

Session refs across machines

When a sub-workflow on the peer creates a session ref — for example, a model weight cache or a GPU-resident tensor — the value stays on the peer. The parent receives a PeerRemoteRefDescriptor keyed by UUID string. Each descriptor exposes:

PropertyTypeDescription
origin_node_idstrStable identifier of the node that owns the value.
type_tagstrType tag from SessionRefSerializable::blazen_type_tag. Used by the parent’s deserializer to rehydrate the bytes.
created_at_epoch_msintWall-clock creation time on the origin node. Useful for tracing and TTL bookkeeping.

Lazy dereference

Fetch the underlying bytes whenever the parent needs them. With BlazenPeerClient the UUID is a plain string argument:

for ref_uuid, descriptor in response.remote_refs().items():
    payload: bytes = await client.deref_session_ref(ref_uuid)
    # Deserialize `payload` using whatever codec matches `descriptor.type_tag`.

Release

When the parent no longer needs a remote ref, release it so the peer can free the memory:

released = await client.release_session_ref(ref_uuid)
# released is True if the ref was found and dropped, False if it was already gone.

False means the ref was already purged — either by lifetime policy on the peer or by another caller that released it first.

RefLifetime interaction

The RefLifetime enum (blazen.RefLifetime) controls when a session ref is automatically purged on the peer:

  • UntilContextDrop — purged when the sub-workflow finishes. The ref must be serialized into the response or it is lost.
  • UntilParentFinish — survives the sub-workflow. Available for lazy deref_session_ref until the parent explicitly releases it. This is the recommended policy for distributed workflows.
  • UntilExplicitDrop — never purged automatically. The parent must call release_session_ref.

Today the lifetime is chosen when the ref is inserted on the Rust side of the peer (step code controls this). From Python you can inspect the policy of a known key via registry.lifetime_of(key) on a SessionRefRegistry handle.

mTLS configuration for production

In production, always enable mutual TLS between peers. load_server_tls and load_client_tls read PEM files eagerly and surface parse errors as PeerError before any network I/O happens, so they double as a smoke check that your cert material is on disk and well-formed:

from blazen import load_server_tls, load_client_tls

# Server side -- run during startup.
load_server_tls(
    "/certs/server.crt",
    "/certs/server.key",
    "/certs/ca.crt",
)

# Client side -- run before BlazenPeerClient.connect(...).
load_client_tls(
    "/certs/client.crt",
    "/certs/client.key",
    "/certs/ca.crt",
)

Both sides must present certificates signed by the same CA. In Kubernetes, use cert-manager with a shared Issuer to automate certificate issuance and rotation for each peer pod.

Shared-token fallback

When mTLS is not practical (development, internal networks), peers can authenticate with a shared secret token. Set the env var named by PEER_TOKEN_ENV (BLAZEN_PEER_TOKEN) on both sides, and use resolve_peer_token() to read it back:

export BLAZEN_PEER_TOKEN="$(openssl rand -hex 32)"
from blazen import PEER_TOKEN_ENV, resolve_peer_token

token = resolve_peer_token()  # str | None -- None if env var is unset or empty
if token is None:
    raise RuntimeError(f"{PEER_TOKEN_ENV} must be set in production")

Error handling

Every peer RPC raises PeerError (exported as blazen.PeerError) on failure. The exception message carries the underlying blazen_peer::PeerError variant rendered as a string — transport, encode, TLS, unknown-step, envelope-version, and workflow categories all flow through the same exception type.

Workflow-level errors (the sub-workflow ran but produced a failing StopEvent) are returned successfully and surfaced via response.error: str | None. Treat that field as the canonical signal for “remote ran but went sideways”:

from blazen import PeerError

try:
    response = await client.invoke_sub_workflow(request)
except PeerError as e:
    # Transport, protocol, TLS, encode, unknown-step, envelope-version, etc.
    print(f"peer RPC failed: {e}")
else:
    if response.error is not None:
        print(f"remote workflow failed: {response.error}")
    else:
        print(response.result_value())

Envelope versioning

Every wire payload carries an envelope_version field, and the current version is exposed as the module-level constant blazen.ENVELOPE_VERSION. Adding optional fields at the end of a struct is forward-compatible and does not require a version bump. Renaming, reordering, or removing fields is a breaking change that requires incrementing ENVELOPE_VERSION. The server rejects payloads with a version newer than it supports and accepts all older versions.

You can read the version off any request or response for diagnostics:

from blazen import ENVELOPE_VERSION

print("client envelope:", ENVELOPE_VERSION)
print("request envelope:", request.envelope_version)
print("response envelope:", response.envelope_version)