Distributed Workflows

Run sub-workflows across machines with blazen-peer from Swift

Distributed Workflows

Blazen workflows normally run in a single process. The blazen-peer crate extends this to multiple machines: a parent workflow on machine A can delegate a sub-workflow to machine B over gRPC and get the terminal result back. The Swift binding exposes this surface as two classes — PeerServer for the receiving side and PeerClient for the calling side — both vended directly by BlazenSwift.

When to use distributed workflows

  • Privacy boundaries. Keep sensitive data on one machine and run the steps that touch it there, while the orchestrating workflow lives elsewhere.
  • Cost optimization. Route GPU-intensive steps (embedding generation, image diffusion) to machines with the right hardware instead of paying for GPU on every node.
  • Hardware-specific steps. Some steps require specialized hardware (TPUs, large-memory instances, local SSDs). Run those on the machine that has them.
  • Regulatory compliance. Data residency requirements may mandate that certain processing happens in a specific region or on specific infrastructure.

How it works

  1. The parent builds a request containing a workflow name, an ordered list of step IDs, and a JSON input payload.
  2. The request is sent over a tonic gRPC channel (HTTP/2) to the peer.
  3. The peer resolves each step ID against its process-wide step registry, assembles a Workflow, and runs it.
  4. The peer returns a response carrying the terminal StopEvent payload.
  5. The parent receives that payload as a normal WorkflowResult.

The Swift surface is a flattened version of the upstream blazen-peer API: it covers fire-and-forget remote execution where the remote workflow’s terminal value crosses the wire as JSON. Lazy session-ref dereferencing across machines is not currently exposed to Swift — refs created on the peer must be serialized into the response.

Prerequisites

The PeerServer and PeerClient symbols are only present when the native library was built with the distributed Cargo feature. The standard release artifacts that ship with the Swift package include it. If you are building libblazen_uniffi locally, ensure distributed is in the feature list.

Server setup

The server process must register the steps it can execute in the global step registry before calling serve. Step registration happens in Rust today — define your steps in a Rust crate that links the workflow into the same binary that hosts the PeerServer. On the Swift side, the server is just two calls: construct and serve.

import Foundation
import BlazenSwift

@main
struct PeerNode {
    static func main() async throws {
        Blazen.initialize()
        defer { Blazen.shutdown() }

        let server = PeerServer(nodeId: "node-b")
        try await server.serve(listenAddress: "0.0.0.0:50051")
    }
}

nodeId is the stable identifier this peer stamps onto every outgoing remote-ref descriptor. Use a hostname, a Kubernetes pod name, or a UUID picked at process startup.

serve(listenAddress:) binds the gRPC listener and runs until the listener errors or the surrounding Task is cancelled. The method consumes the underlying server — calling serve a second time on the same PeerServer throws BlazenError.Validation.

Blocking variant

For CLI tools that don’t want to drive Swift Concurrency at the entry point, use serveBlocking(listenAddress:):

let server = PeerServer(nodeId: "node-b")
try server.serveBlocking(listenAddress: "0.0.0.0:50051")

This blocks the current thread on the embedded Tokio runtime until the server exits.

Client setup

Open a connection with the static PeerClient.connect(address:clientNodeId:) constructor. The constructor is blocking (it drives the TCP/HTTP/2 handshake on the shared Tokio runtime), so it is not an async call — only throws:

import BlazenSwift

Blazen.initialize()
defer { Blazen.shutdown() }

let client = try PeerClient.connect(
    address: "http://peer-b.local:50051",
    clientNodeId: "node-a"
)

print("connected as: \(client.nodeId())")
  • address is any valid gRPC endpoint URI (http://... for plaintext, https://... once your transport adds TLS).
  • clientNodeId identifies the calling end in trace logs on both sides. Hostnames or process-startup UUIDs are typical.

A single PeerClient holds one multiplexed HTTP/2 channel; concurrent calls from multiple Swift tasks against the same client are safe and share the connection.

Invoking a remote workflow

runRemoteWorkflow(workflowName:stepIds:inputJson:timeoutSecs:) sends a request and awaits the terminal result:

struct AnalyzeInput: Encodable {
    let document: String
}

let input = AnalyzeInput(document: "...long document text...")
let inputJson = String(
    data: try JSONEncoder().encode(input),
    encoding: .utf8
) ?? "{}"

let result = try await client.runRemoteWorkflow(
    workflowName: "analyze-pipeline",
    stepIds: ["my_app::analyze", "my_app::summarize"],
    inputJson: inputJson,
    timeoutSecs: 60
)

print("remote result: \(result.event.dataJson)")
  • workflowName is the symbolic name the remote peer knows the workflow by.
  • stepIds is the ordered list of step identifiers to execute. Every entry must be registered on the remote peer’s process — sending an unknown id fails with BlazenError.Peer(kind: "UnknownStep", ...).
  • inputJson is a JSON string fed to the workflow’s entry step. Encode your Encodable payload yourself; Blazen does not encode here on your behalf.
  • timeoutSecs bounds the remote workflow’s wall-clock execution. Pass nil to defer to the server’s default deadline.

The returned WorkflowResult carries the terminal StopEvent payload synthesized from the remote response. Per-run token usage and cost are not propagated over the peer wire format and will read as zero — if you need those numbers, query the remote peer’s telemetry directly.

Blocking variant

If the calling code cannot await — for example, a synchronous batch script — runRemoteWorkflowBlocking(...) returns the same WorkflowResult without an await:

let result = try client.runRemoteWorkflowBlocking(
    workflowName: "analyze-pipeline",
    stepIds: ["my_app::analyze"],
    inputJson: inputJson,
    timeoutSecs: 60
)

Composing into a parent workflow

A common pattern is to wrap the remote call in a local step so the rest of your workflow code treats it as just another step. The step holds the PeerClient and forwards the start payload across the wire:

final class RemoteAnalyzeHandler: StepHandler, @unchecked Sendable {
    let client: PeerClient

    init(client: PeerClient) {
        self.client = client
    }

    func invoke(event: Event) async throws -> StepOutput {
        let remote = try await client.runRemoteWorkflow(
            workflowName: "analyze-pipeline",
            stepIds: ["my_app::analyze", "my_app::summarize"],
            inputJson: event.dataJson,
            timeoutSecs: 60
        )
        return .single(event: Event(
            eventType: "blazen::StopEvent",
            dataJson: remote.event.dataJson
        ))
    }
}

let client = try PeerClient.connect(
    address: "http://peer-b.local:50051",
    clientNodeId: "node-a"
)

let builder = WorkflowBuilder(name: "distributed-example")
_ = try builder.step(
    name: "remote_analyze",
    accepts: ["blazen::StartEvent"],
    emits: ["blazen::StopEvent"],
    handler: RemoteAnalyzeHandler(client: client)
)
let workflow = try builder.build()

let result = try await workflow.run(["document": "..."])
print(result.event.dataJson)

Error handling

Peer calls throw BlazenError. The variant tells you where the failure happened:

VariantMeaning
.Peer(kind: "Transport", message:)Network-level failure: connection refused, timeout, TLS handshake failure.
.Peer(kind: "UnknownStep", message:)The remote peer does not have a requested step ID registered.
.Peer(kind: "EnvelopeVersion", message:)The client sent an envelope version newer than the server supports.
.Peer(kind: "Encode", message:)Postcard serialization or deserialization failed on the wire.
.Workflow(message:)The remote workflow ran but produced an error — the message is the remote’s SubWorkflowResponse.error string.
.Validation(message:)The listen address could not be parsed, or serve was called twice on the same PeerServer.

Pattern-match the kind discriminant on .Peer to react to transport failures differently from protocol or registration errors:

do {
    let result = try await client.runRemoteWorkflow(
        workflowName: "analyze-pipeline",
        stepIds: ["my_app::analyze"],
        inputJson: inputJson,
        timeoutSecs: 30
    )
    handle(result)
} catch let BlazenError.Peer(kind: "Transport", message: msg) {
    // Connection died — fall back to local execution or retry.
    print("transport failure: \(msg)")
} catch let BlazenError.Peer(kind: "UnknownStep", message: msg) {
    // The remote is out of date and missing a step we just deployed.
    print("step not registered on peer: \(msg)")
} catch let BlazenError.Workflow(message: msg) {
    // The remote ran but the workflow itself failed.
    print("remote workflow error: \(msg)")
} catch {
    print("other failure: \(error.localizedDescription)")
}

BlazenError is LocalizedError, so error.localizedDescription always returns the inner message for logging.

Cancellation

runRemoteWorkflow(...) is async throws and respects Swift’s structured concurrency. Cancelling the surrounding Task cancels the awaiting runRemoteWorkflow call; the throw surfaces as a BlazenError via the framework’s wrap(_:) helper if you route it through one. Note that cancellation only severs the client-side wait — the remote peer continues running the sub-workflow until the timeoutSecs deadline expires.

mTLS and authentication

Mutual-TLS configuration and shared-secret token authentication are configured in the underlying Rust transport (blazen_peer::tls and the BLAZEN_PEER_TOKEN environment variable) and are not currently exposed to Swift through the UniFFI surface. To run encrypted, authenticated peer traffic today, either:

  • Build libblazen_uniffi against a custom transport that wires mTLS on both ends, then load it from your Package.swift, or
  • Terminate TLS at a sidecar proxy in front of each peer (Envoy, Linkerd, etc.) and let PeerServer bind to the proxy’s loopback socket.

The shared-secret token path is the simpler interim option for internal networks: export BLAZEN_PEER_TOKEN with the same value on every peer process before calling PeerServer(nodeId:) or PeerClient.connect(...), and the underlying transport will refuse requests that do not carry it.

Envelope versioning

Every wire payload carries an envelope_version field (currently 1) negotiated by the Rust transport. Adding optional fields at the end of an envelope struct is forward-compatible and does not require a version bump. Renaming, reordering, or removing fields increments ENVELOPE_VERSION; a server that receives a newer version than it supports rejects the request with BlazenError.Peer(kind: "EnvelopeVersion", ...). Catch that variant if you need to surface “peer is out of date, please upgrade” diagnostics to operators.

Next steps

  • Add observability with the Telemetry guide — peer calls show up as ordinary spans on both sides.
  • Wrap remote steps inside a larger workflow with the Workflow guide.
  • See the Rust distributed guide for the full upstream surface including session-ref deref, TLS helpers, and the lifetime policies that govern remote-ref retention on the peer.