Streaming

Stream tokens from LLM completions in Swift

Streaming

Blazen’s Swift binding surfaces streaming chat completions through AsyncThrowingStream<StreamEvent, Error>. Iterate with for try await, break out of the loop to cancel, and read the terminal done signal in the same loop you used for content chunks.

The StreamEvent enum

StreamEvent is the element type of every streaming completion. It carries either an incremental chunk or the terminal completion signal:

public enum StreamEvent: Sendable, Equatable {
    case chunk(StreamChunk)
    case done(finishReason: String, usage: TokenUsage)
}

On a successful run the stream emits zero or more .chunk events followed by exactly one .done. On failure the stream throws instead — the .done event is only emitted on the happy path, so a missing .done always means the iterator threw.

Consuming a stream

CompletionModel.completeStream(_:) returns the async sequence directly:

import BlazenSwift

Blazen.initialize()
defer { Blazen.shutdown() }

let model = try Providers.openAI(apiKey: "", model: "gpt-4o-mini")
let request = CompletionRequest(messages: [
    .system("You are a helpful assistant."),
    .user("Tell me a short joke."),
])

for try await event in model.completeStream(request) {
    switch event {
    case .chunk(let chunk):
        if let delta = chunk.contentDelta { print(delta, terminator: "") }
    case .done(let reason, let usage):
        print("\n[done: \(reason), tokens in=\(usage.inputTokens) out=\(usage.outputTokens)]")
    }
}

Pass the empty string for apiKey to fall back to the provider’s well-known environment variable (OPENAI_API_KEY, ANTHROPIC_API_KEY, …). Every provider factory in Providers accepts the same shape.

Cancelling early

Breaking out of the for try await loop terminates the iterator, which fires the stream’s onTermination hook, which in turn cancels the underlying Tokio task. The native runtime observes the cancellation and shuts down cooperatively:

for try await event in model.completeStream(request) {
    guard case .chunk(let chunk) = event,
          let delta = chunk.contentDelta else { continue }
    print(delta, terminator: "")
    if delta.contains("STOP") { break }   // tears the stream down
}

The same applies to cancelling the enclosing Task — every await in the loop honors structured-concurrency cancellation, so task.cancel() from any other context terminates the stream cleanly.

Error handling

Provider failures and sink-side errors surface by the iterator throwing. The framework’s wrap(_:) helper folds any Error thrown on the Swift side (e.g. a CancellationError) into the equivalent BlazenError variant:

do {
    for try await event in model.completeStream(request) {
        if case .chunk(let chunk) = event,
           let delta = chunk.contentDelta {
            print(delta, terminator: "")
        }
    }
} catch let error as BlazenError {
    switch error {
    case .RateLimit: print("rate limited: \(error.message)")
    case .Cancelled: print("user cancelled")
    case .Provider:  print("provider failure: \(error.message)")
    default:         print("stream failed: \(error.message)")
    }
}

BlazenError.message is a typed String (not String?) — it works on every variant regardless of which kind of failure crossed the FFI boundary.

Stream events vs workflow events

The streaming API described here is for LLM completions: each StreamChunk is an incremental delta from the model’s output. It is separate from a workflow’s event stream, which is the routing fabric between steps — see Events for that.

Inside a workflow step you can still call model.completeStream(_:) and forward each delta wherever you want — into a custom event you emit downstream, into a UI publisher, or into any AsyncStream your own code produces.

Inside a workflow step

A streaming step that turns model output into a StopEvent typically buffers the deltas and emits the full text at the end:

final class StreamingGreetHandler: StepHandler, @unchecked Sendable {
    let model: CompletionModel
    init(model: CompletionModel) { self.model = model }

    func invoke(event: Event) async throws -> StepOutput {
        let request = CompletionRequest(messages: [
            .user("Greet the user."),
        ])

        var buffer = ""
        for try await streamEvent in model.completeStream(request) {
            if case .chunk(let chunk) = streamEvent,
               let delta = chunk.contentDelta {
                buffer += delta
            }
        }

        let json = "{\"result\":\(buffer.debugDescription)}"
        return .single(event: Event(eventType: "StopEvent", dataJson: json))
    }
}

For a step that wants to publish each delta as it arrives — e.g. into a SwiftUI view or a WebSocket — wire a sink (a Continuation, an AsyncStream you own, or an actor) through the handler’s init and forward the chunks as they land. The handler can return .none and let the surrounding code track completion via the sink itself.

See also

  • LLM — non-streaming completions, embeddings, and the full Providers factory list.
  • Agent — the agent loop drives streaming completions internally and surfaces the same BlazenError variants.
  • Multimodal — attach images / audio / video to a streaming request.