Distributed Workflows
Run sub-workflows across machines from Kotlin with blazen-peer
Distributed Workflows
Blazen workflows normally run in a single JVM. The blazen-peer gRPC layer extends this across machines: a parent process on machine A can delegate a named sub-workflow to machine B over HTTP/2, wait for the terminal StopEvent, and surface its payload as a regular WorkflowResult. The Kotlin binding exposes this through two UniFFI handles, PeerServer and PeerClient, both of which expose idiomatic suspend fun entry points plus blocking counterparts for non-coroutine call sites.
When to use distributed workflows
- Privacy boundaries. Keep sensitive data on one machine and run the steps that touch it there, while the orchestrating workflow runs elsewhere.
- Cost optimization. Route GPU-intensive steps (embedding generation, image diffusion) to nodes with the right hardware instead of paying for GPU on every JVM.
- Hardware-specific steps. Some steps require specialized hardware (TPUs, large-memory instances, local SSDs). Run those steps on the node that has the hardware.
- Regulatory compliance. Data residency rules may mandate that certain processing happens in a specific region or on specific infrastructure.
How it works
- The parent JVM calls
PeerClient.runRemoteWorkflowwith a workflow name, an ordered list of step IDs, and a JSON-encoded input. - The request travels over an HTTP/2 channel held inside the
PeerClient. Multiple concurrent calls on the same client are safe and share the connection. - The remote
PeerServerresolves each step ID against the process-wide step registry of its JVM, assembles aWorkflow, and runs it to completion. - The peer returns the terminal
StopEventpayload, which the Kotlin binding lifts back into a regularWorkflowResultwhoseevent.dataJsoncarries the remote result.
Server setup
A peer server is a regular AutoCloseable Kotlin object. The nodeId constructor argument is a stable identifier (typically the hostname or a process-startup UUID) that the server stamps onto outgoing trace metadata.
import dev.zorpx.blazen.uniffi.PeerServer
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
PeerServer("worker-42").use { server ->
// Blocks the calling coroutine until the listener errors or is cancelled.
server.serve("0.0.0.0:7800")
}
}
Step builders must be registered on the server JVM before serve is called — runRemoteWorkflow resolves step IDs against the server’s process-wide step registry. Use whatever wiring your application already does to seed steps for in-process workflows; the peer dispatch path picks them up from the same registry.
serve consumes the underlying gRPC server, so calling it twice on the same PeerServer raises BlazenException.Validation. Construct a fresh PeerServer if you need to restart the listener.
Blocking variant
For non-coroutine contexts (a main that wants to block until shutdown, a Spring boot lifecycle hook, a thread pool worker), call serveBlocking instead:
import dev.zorpx.blazen.uniffi.PeerServer
fun main() {
val server = PeerServer("worker-42")
Runtime.getRuntime().addShutdownHook(Thread { server.close() })
server.serveBlocking("0.0.0.0:7800")
}
serveBlocking runs on the shared Tokio runtime owned by the native library and returns only when the listener exits.
Client setup
PeerClient.connect is a blocking factory — it drives the TCP / HTTP/2 handshake on the shared Tokio runtime so the client is ready to issue RPCs as soon as it returns:
import dev.zorpx.blazen.uniffi.PeerClient
val client = PeerClient.connect(
address = "http://worker-42.cluster.local:7800",
clientNodeId = "orchestrator-1",
)
clientNodeId shows up in trace logs on both sides. client.nodeId() reads it back. The handle is shareable across coroutines; reuse one PeerClient per remote endpoint rather than reconnecting per call.
Invoking a remote sub-workflow
runRemoteWorkflow is a suspend fun that returns a WorkflowResult. The terminal StopEvent payload from the remote workflow is exposed as result.event.dataJson:
import dev.zorpx.blazen.uniffi.PeerClient
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
PeerClient.connect("http://worker-42.cluster.local:7800", "orchestrator-1").use { client ->
val result = client.runRemoteWorkflow(
workflowName = "analyze-pipeline",
stepIds = listOf("my_app::analyze", "my_app::summarize"),
inputJson = """{"document":"...the document text..."}""",
timeoutSecs = 60uL,
)
println("remote result: ${result.event.dataJson}")
}
}
Passing timeoutSecs = null defers to the server’s default deadline. stepIds must be an exact, ordered list of identifiers the remote JVM has registered — sending an unknown step raises BlazenException.Peer with kind = "UnknownStep".
The returned WorkflowResult.totalInputTokens, totalOutputTokens, and totalCostUsd are reported as zero for remote runs: per-run LLM usage is not propagated across the peer wire format. Query the remote node’s telemetry endpoint directly if you need those numbers.
Blocking variant
For Java interop or non-coroutine call sites, runRemoteWorkflowBlocking has the identical signature minus the suspend modifier:
val result = client.runRemoteWorkflowBlocking(
workflowName = "analyze-pipeline",
stepIds = listOf("my_app::analyze", "my_app::summarize"),
inputJson = """{"document":"..."}""",
timeoutSecs = 60uL,
)
Internally it parks the calling thread on the shared Tokio runtime; do not call it from inside a coroutine or you risk thread-starving the dispatcher.
Session refs across machines
The Kotlin UniFFI surface currently exposes fire-and-forget remote execution: the remote workflow runs end-to-end and returns its terminal StopEvent payload as JSON. Cross-machine session-ref proxies (RemoteRefDescriptor, lazy derefSessionRef, explicit releaseSessionRef) are not part of the Kotlin binding today — workflows that need to ship large values back across the network should serialize them into the StopEvent payload directly.
If your sub-workflow on the peer produces a value too large to inline, run an additional step on the same peer that persists it (Redis, S3, a shared volume) and return the reference URL in the StopEvent payload. The parent JVM can then fetch the value via its normal client.
Securing the channel
The Kotlin binding accepts any URI tonic understands as the address argument to PeerClient.connect. In a development setting an http:// URI is fine; for production cross-region traffic, terminate TLS in front of the peer (an Envoy or nginx sidecar) and connect with an https:// URI:
val client = PeerClient.connect(
address = "https://worker-42.cluster.local:7800",
clientNodeId = "orchestrator-1",
)
Direct in-process mTLS configuration (loading client / server PEM files into the gRPC channel from Kotlin) is not exposed in today’s binding. For a Kubernetes deployment the recommended pattern is to run a cert-manager-issued sidecar in front of each peer and have the Kotlin client talk to it over https://.
Error handling
Every failure in runRemoteWorkflow raises a subclass of the BlazenException sealed class. Match on it with a when expression:
import dev.zorpx.blazen.uniffi.BlazenException
try {
val result = client.runRemoteWorkflow(
workflowName = "analyze-pipeline",
stepIds = listOf("my_app::analyze"),
inputJson = """{"document":"..."}""",
timeoutSecs = 30uL,
)
println(result.event.dataJson)
} catch (e: BlazenException.Peer) {
when (e.kind) {
"Transport" -> System.err.println("network failure talking to peer: ${e.message}")
"UnknownStep" -> System.err.println("peer does not have the requested step: ${e.message}")
"EnvelopeVersion" -> System.err.println("peer is running an older protocol: ${e.message}")
"Tls" -> System.err.println("TLS handshake failed: ${e.message}")
"Encode" -> System.err.println("payload (de)serialization failed: ${e.message}")
else -> System.err.println("peer error (${e.kind}): ${e.message}")
}
} catch (e: BlazenException.Workflow) {
System.err.println("remote workflow returned an error: ${e.message}")
} catch (e: BlazenException.Validation) {
System.err.println("local validation failed (bad address, double serve): ${e.message}")
}
BlazenException.Peer.kind is one of "Encode", "Transport", "EnvelopeVersion", "Workflow", "Tls", or "UnknownStep". The "Workflow" kind covers protocol-level workflow envelope failures; a workflow that ran but returned a step error surfaces as a separate BlazenException.Workflow carrying the remote error message.
Cancellation works the same as for any other Blazen suspend fun: cancelling the surrounding coroutine unwinds the in-flight RPC and surfaces as BlazenException.Cancelled for adapters that want a uniform error type.
Envelope versioning
Every peer-to-peer wire payload carries a numeric envelope version that the server uses to reject clients newer than itself. The Kotlin binding does not expose the constant directly; the only situation in which it surfaces is as BlazenException.Peer with kind = "EnvelopeVersion" from runRemoteWorkflow. Treat that error the way you would treat a protocol mismatch: log the peer address, alert your operators, and back off — adding optional fields to the wire format is forward-compatible, but renamed or removed fields require both peers to upgrade in lockstep.
Lifecycle
Both PeerServer and PeerClient implement AutoCloseable. Wrap them in use { } blocks or try { ... } finally { it.close() } so the underlying native handle is released deterministically. The cleaner finaliser is a safety net, not a substitute — JVM finaliser timing is non-deterministic, and you do not want gRPC channels and open sockets lingering in the background while the GC schedules them.
PeerClient.connect("http://worker-42:7800", "orchestrator-1").use { client ->
repeat(3) { i ->
val result = client.runRemoteWorkflow(
workflowName = "analyze-pipeline",
stepIds = listOf("my_app::analyze"),
inputJson = """{"chunk":$i}""",
timeoutSecs = 30uL,
)
println("chunk $i -> ${result.event.dataJson}")
}
}
Next steps
- Define the steps your peer should expose — see Events and Quickstart.
- Front the peer with a TLS-terminating sidecar for production traffic.
- For very large remote results, write the value to shared storage on the peer and return the reference URL in the
StopEventpayload.