Streaming
Stream tokens from LLM completions in Kotlin
Streaming
Blazen’s Kotlin binding surfaces streaming chat completions through a sink-callback interface. The most ergonomic way to consume one is to wrap it in a kotlinx.coroutines.flow.Flow<StreamChunk> via callbackFlow — iterate with collect, cancel by cancelling the surrounding coroutine, and read the terminal finish_reason / usage after the flow completes.
The streaming primitives
Two pieces drive every streaming completion:
completeStreaming(model, request, sink)— a top-levelsuspend funfromdev.zorpx.blazen.uniffi.*that starts the stream and returns once the model has finished generating.CompletionStreamSink— a generated callback interface the framework calls into for each chunk plus the terminalonDone/onError:
public interface CompletionStreamSink {
suspend fun onChunk(chunk: StreamChunk)
suspend fun onDone(finishReason: String, usage: TokenUsage)
suspend fun onError(err: BlazenException)
}
StreamChunk carries contentDelta: String (the incremental text), toolCalls: List<ToolCall> (a snapshot, not append-only), and isFinal: Boolean (a UI hint — the authoritative completion signal is onDone).
On a successful run the sink receives zero or more onChunk callbacks followed by exactly one onDone. On failure the framework calls onError instead — onDone is only invoked on the happy path, so a missing onDone always means an error fired.
Wrapping the sink in a Flow
callbackFlow from kotlinx.coroutines.flow adapts the sink-callback interface into a structured-concurrency Flow<StreamChunk>:
import dev.zorpx.blazen.uniffi.BlazenException
import dev.zorpx.blazen.uniffi.ChatMessage
import dev.zorpx.blazen.uniffi.CompletionModel
import dev.zorpx.blazen.uniffi.CompletionRequest
import dev.zorpx.blazen.uniffi.CompletionStreamSink
import dev.zorpx.blazen.uniffi.StreamChunk
import dev.zorpx.blazen.uniffi.TokenUsage
import dev.zorpx.blazen.uniffi.completeStreaming
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
fun CompletionModel.streamChunks(request: CompletionRequest): Flow<StreamChunk> = callbackFlow {
val sink = object : CompletionStreamSink {
override suspend fun onChunk(chunk: StreamChunk) {
send(chunk)
}
override suspend fun onDone(finishReason: String, usage: TokenUsage) {
close()
}
override suspend fun onError(err: BlazenException) {
close(err)
}
}
completeStreaming(this@streamChunks, request, sink)
awaitClose { /* nothing to clean up — sink references are dropped on cancel */ }
}
Now consumers just collect the flow:
import dev.zorpx.blazen.uniffi.newOpenaiCompletionModel
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val model = newOpenaiCompletionModel(
apiKey = System.getenv("OPENAI_API_KEY") ?: "",
model = "gpt-4o-mini",
baseUrl = null,
)
model.use {
val request = CompletionRequest(
messages = listOf(
ChatMessage(role = "system", content = "You are a helpful assistant.",
mediaParts = emptyList(), toolCalls = emptyList(),
toolCallId = null, name = null),
ChatMessage(role = "user", content = "Tell me a short joke.",
mediaParts = emptyList(), toolCalls = emptyList(),
toolCallId = null, name = null),
),
tools = emptyList(),
temperature = 0.7,
maxTokens = null,
topP = null,
model = null,
responseFormatJson = null,
system = null,
)
model.streamChunks(request).collect { chunk ->
print(chunk.contentDelta)
}
println()
}
}
Pass the empty string for apiKey to fall back to the provider’s well-known environment variable (OPENAI_API_KEY, ANTHROPIC_API_KEY, …). Every provider factory in dev.zorpx.blazen.uniffi follows the same shape.
Capturing finishReason and usage
callbackFlow does not let you smuggle extra fields out through the flow itself, so park the terminal metadata on a CompletableDeferred you close from onDone:
import kotlinx.coroutines.CompletableDeferred
data class StreamSummary(val finishReason: String, val usage: TokenUsage)
fun CompletionModel.streamChunks(
request: CompletionRequest,
summary: CompletableDeferred<StreamSummary>,
): Flow<StreamChunk> = callbackFlow {
val sink = object : CompletionStreamSink {
override suspend fun onChunk(chunk: StreamChunk) { send(chunk) }
override suspend fun onDone(finishReason: String, usage: TokenUsage) {
summary.complete(StreamSummary(finishReason, usage))
close()
}
override suspend fun onError(err: BlazenException) {
summary.completeExceptionally(err)
close(err)
}
}
completeStreaming(this@streamChunks, request, sink)
awaitClose { }
}
// caller:
val summary = CompletableDeferred<StreamSummary>()
model.streamChunks(request, summary).collect { print(it.contentDelta) }
val done = summary.await()
println("\n[done: ${done.finishReason}, tokens out=${done.usage.completionTokens}]")
Cancelling early
Cancelling the collector — by break-ing out via take(n), by cancelling the surrounding coroutine, or by throwing inside collect — closes the flow, which fires awaitClose, which cancels the underlying Tokio task. The native runtime observes the cancellation and shuts down cooperatively:
import kotlinx.coroutines.flow.take
model.streamChunks(request).take(20).collect { chunk ->
print(chunk.contentDelta)
}
// after 20 chunks the flow is cancelled, the stream is torn down,
// and the underlying provider connection is closed.
The same applies to cancelling the enclosing coroutine — every suspend point in the collector honors structured-concurrency cancellation, so job.cancel() from any other context terminates the stream cleanly.
Error handling
Provider failures and sink-side errors surface as a BlazenException thrown out of collect. Catching it directly is the canonical pattern:
try {
model.streamChunks(request).collect { chunk ->
print(chunk.contentDelta)
}
} catch (e: BlazenException) {
when (e) {
is BlazenException.RateLimit -> println("rate limited: ${e.message}")
is BlazenException.Cancelled -> println("user cancelled")
is BlazenException.Provider -> println("provider failure: ${e.message}")
else -> println("stream failed: ${e.message}")
}
}
BlazenException is a sealed class, so the when over its variants is exhaustive; the compiler will flag missing branches.
Stream events vs workflow events
The streaming API described here is for LLM completions: each StreamChunk is an incremental delta from the model’s output. It is separate from a workflow’s event stream, which is the routing fabric between steps — see Events for that.
Inside a workflow step you can still call model.streamChunks(request) (the extension above) and forward each delta wherever you want — into a custom event you emit downstream, into a UI publisher, or into any Flow your own code produces.
Inside a workflow step
A streaming step that turns model output into a StopEvent typically buffers the deltas and emits the full text at the end:
class StreamingGreetHandler(
private val model: CompletionModel,
) : StepHandler {
override suspend fun invoke(event: Event): StepOutput {
val request = CompletionRequest(
messages = listOf(
ChatMessage(role = "user", content = "Greet the user.",
mediaParts = emptyList(), toolCalls = emptyList(),
toolCallId = null, name = null),
),
tools = emptyList(), temperature = null, maxTokens = null, topP = null,
model = null, responseFormatJson = null, system = null,
)
val buffer = StringBuilder()
model.streamChunks(request).collect { chunk ->
buffer.append(chunk.contentDelta)
}
val payload = """{"result":"${buffer}"}"""
return StepOutput.Single(Event(eventType = "blazen::StopEvent", dataJson = payload))
}
}
For a step that wants to publish each delta as it arrives — e.g. into a MutableSharedFlow driving a UI — pass that flow through the handler’s constructor and emit each chunk inside the collect block. The handler can return StepOutput.None and let the surrounding code track completion via the flow itself.
See also
- LLM — non-streaming completions, embeddings, and the full provider factory list.
- Agent — the agent loop drives streaming completions internally and surfaces the same
BlazenExceptionvariants. - Multimodal — attach images / audio / video to a streaming request.