Context

Cancellation, lifecycle, and sharing state across Kotlin workflows

Context

Blazen’s Kotlin binding leans on kotlinx.coroutines for cancellation and lifecycle. There is no Context parameter that gets threaded through every step the way the Python binding does — instead, the surrounding CoroutineScope carries cancellation through every suspend point, and per-run state lives on your own handler types.

Process lifecycle

Touching Blazen.version (or any UniFFI factory) forces JNA to resolve and load libblazen_uniffi.{so,dylib,dll} from the classpath. There is no explicit init() to call — the library is loaded lazily on first use:

import dev.zorpx.blazen.Blazen

fun main() {
    println("Blazen ${Blazen.version}")
}

Blazen.version returns the semantic version of the underlying libblazen_uniffi artefact — useful for diagnosing version skew between the Kotlin wrapper and the bundled native lib.

For long-running services that want a clean shutdown of any installed telemetry exporters, call shutdownTelemetry():

import dev.zorpx.blazen.uniffi.shutdownTelemetry

Runtime.getRuntime().addShutdownHook(Thread { shutdownTelemetry() })

shutdownTelemetry() is always safe to call, even when no exporter was initialised.

Cancellation through coroutines

Every suspend fun in the binding honors Kotlin’s structured-concurrency cancellation. When the surrounding coroutine is cancelled, the suspended await unwinds by throwing — usually kotlinx.coroutines.CancellationException, which the framework surfaces as BlazenException.Cancelled for adapters that need a uniform error type:

import dev.zorpx.blazen.uniffi.BlazenException
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withTimeout

suspend fun runWithTimeout() = coroutineScope {
    val job = launch {
        try {
            val result = workflow.run("""{"name":"Blazen"}""")
            println(result.event.dataJson)
        } catch (e: CancellationException) {
            println("cancelled")
            throw e   // re-throw so the cancellation propagates
        } catch (e: BlazenException) {
            println("workflow failed: ${e.message}")
        }
    }

    // Cancel after 30 seconds.
    withTimeout(30_000) { job.join() }
}

The same applies to streaming completions and the agent loop — cancelling the enclosing coroutine signals the underlying Tokio task to shut down cooperatively. See Streaming for the Flow-level details and Agent for the agent loop.

Per-run state in handlers

Because the Kotlin binding does not pass a Context object into StepHandler.invoke(event), run-scoped state lives on your handler instance. The simplest pattern is a stored property:

import dev.zorpx.blazen.uniffi.Event
import dev.zorpx.blazen.uniffi.StepHandler
import dev.zorpx.blazen.uniffi.StepOutput
import java.util.concurrent.atomic.AtomicInteger

class CounterHandler : StepHandler {
    private val counter = AtomicInteger(0)

    override suspend fun invoke(event: Event): StepOutput {
        val next = counter.incrementAndGet()
        val payload = """{"count":$next}"""
        return StepOutput.Single(Event(eventType = "CountedEvent", dataJson = payload))
    }
}

AtomicInteger is the idiomatic JVM choice when multiple handler invocations may race. For richer shared state, wrap it in a Mutex from kotlinx.coroutines.sync or expose it through an actor-style Channel.

Sharing state across steps

When two handlers need to communicate, hand both of them a reference to the same shared object at construction time:

import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

class SharedState {
    private val mutex = Mutex()
    private var name: String = ""

    suspend fun setName(value: String) = mutex.withLock { name = value }
    suspend fun currentName(): String = mutex.withLock { name }
}

class ParseHandler(private val shared: SharedState) : StepHandler {
    override suspend fun invoke(event: Event): StepOutput {
        val parsed = Json.parseToJsonElement(event.dataJson).jsonObject
        val data = parsed["data"]?.jsonObject ?: return StepOutput.None
        val name = data["name"]?.jsonPrimitive?.content ?: "world"
        shared.setName(name)
        return StepOutput.Single(Event(eventType = "GreetEvent", dataJson = "{}"))
    }
}

class GreetHandler(private val shared: SharedState) : StepHandler {
    override suspend fun invoke(event: Event): StepOutput {
        val name = shared.currentName()
        val json = """{"result":"Hello, $name!"}"""
        return StepOutput.Single(Event(eventType = "blazen::StopEvent", dataJson = json))
    }
}

val shared = SharedState()
val builder = WorkflowBuilder("greeter")
builder.step(
    name = "parse",
    accepts = listOf("blazen::StartEvent"),
    emits = listOf("GreetEvent"),
    handler = ParseHandler(shared),
)
builder.step(
    name = "greet",
    accepts = listOf("GreetEvent"),
    emits = listOf("blazen::StopEvent"),
    handler = GreetHandler(shared),
)

Each handler instance is registered once and reused across every event the engine dispatches to it. Because handler references are pinned for the lifetime of the workflow, captured state survives every step invocation in a run.

Timeouts

WorkflowBuilder exposes both per-step and overall-run timeouts in milliseconds:

val builder = WorkflowBuilder("timed")
builder.step(
    name = "compute",
    accepts = listOf("blazen::StartEvent"),
    emits = listOf("blazen::StopEvent"),
    handler = ComputeHandler(),
)
builder.stepTimeoutMs(5_000u)   // 5 seconds per step
builder.timeoutMs(30_000u)      // 30 seconds total
val workflow = builder.build()

When a step exceeds its budget the engine raises BlazenException.Timeout; when the overall workflow budget fires, the next suspend point resumes by throwing the same. The argument is a ULong, so use the u suffix on the literal.

Closing native handles

Every UniFFI handle — Workflow, WorkflowBuilder, CompletionModel, EmbeddingModel, Agent, Pipeline, CheckpointStore — implements AutoCloseable. Use Kotlin’s use { } block to close them deterministically:

WorkflowBuilder("greeter").use { builder ->
    builder.step(name = "greet", accepts = listOf("blazen::StartEvent"),
                 emits = listOf("blazen::StopEvent"), handler = GreetHandler())
    builder.build().use { workflow ->
        val result = workflow.run("""{"name":"Blazen"}""")
        println(result.event.dataJson)
    }
}

A JVM cleaner is attached as a safety net, but JVM finalisation timing is not deterministic — call close() explicitly (or use use { }) to release the native handle promptly.

Errors

Every failure that crosses the FFI boundary is a BlazenException (declared sealed class so a when over the variants is exhaustive). All variants carry a message: String; some carry extra fields:

import dev.zorpx.blazen.uniffi.BlazenException

try {
    val result = workflow.run("""{"name":"Blazen"}""")
    println(result.event.dataJson)
} catch (e: BlazenException) {
    when (e) {
        is BlazenException.Auth         -> println("bad credentials: ${e.message}")
        is BlazenException.RateLimit    -> println("rate limited: ${e.message} (retry after ${e.retryAfterMs}ms)")
        is BlazenException.Timeout      -> println("timed out after ${e.elapsedMs}ms: ${e.message}")
        is BlazenException.Validation   -> println("bad input: ${e.message}")
        is BlazenException.ContentPolicy -> println("blocked: ${e.message}")
        is BlazenException.Unsupported  -> println("unsupported: ${e.message}")
        is BlazenException.Compute      -> println("compute failed: ${e.message}")
        is BlazenException.Media        -> println("media error: ${e.message}")
        is BlazenException.Provider     -> println("provider ${e.provider} (${e.kind}, status=${e.status}): ${e.message}")
        is BlazenException.Workflow     -> println("workflow error: ${e.message}")
        is BlazenException.Tool         -> println("tool error: ${e.message}")
        is BlazenException.Peer         -> println("peer ${e.kind}: ${e.message}")
        is BlazenException.Persist      -> println("persist error: ${e.message}")
        is BlazenException.Prompt       -> println("prompt ${e.kind}: ${e.message}")
        is BlazenException.Memory       -> println("memory ${e.kind}: ${e.message}")
        is BlazenException.Cache        -> println("cache ${e.kind}: ${e.message}")
        is BlazenException.Cancelled    -> println("cancelled")
        is BlazenException.Internal     -> println("internal: ${e.message}")
    }
}

Variants with structured payloads:

  • Provider(kind, message, provider, status, endpoint, requestId, detail, retryAfterMs) — provider HTTP failures. kind names the backend (“OpenAIHttp”, “AnthropicHttp”, “LlamaCppModelLoad”, …).
  • RateLimit(message, retryAfterMs)retryAfterMs is set when the provider returned a Retry-After hint.
  • Timeout(message, elapsedMs) — milliseconds spent before the timeout fired.
  • Peer(kind, message) — distributed peer-to-peer. kind: "Encode", "Transport", "EnvelopeVersion", "Workflow", "Tls", "UnknownStep".
  • Prompt(kind, message), Memory(kind, message), Cache(kind, message) — subsystem-level errors with a kind discriminator.

BlazenException.Cancelled has no payload — e.message returns the empty string.

See also

  • Events — declare event types and emit them from handlers.
  • Streaming — coroutine cancellation for streaming completions.
  • Agent — the agent loop honors the same coroutine cancellation rules.