Streaming
Channel-based streaming completions in Go
Channel-based streaming
The Go binding exposes streaming completions through a single function:
func Stream(ctx context.Context, model *CompletionModel, req CompletionRequest) <-chan StreamEvent
Stream returns immediately with a receive-only channel of blazen.StreamEvent. The channel delivers zero or more *StreamChunkEvent values as the model emits tokens, then exactly one terminal event — either *StreamDoneEvent (success) or *StreamErrorEvent (failure) — after which the channel is closed.
import (
"context"
"fmt"
"os"
blazen "github.com/zachhandley/Blazen/bindings/go"
)
func main() {
blazen.Init()
defer blazen.Shutdown()
model, err := blazen.NewOpenAICompletion(os.Getenv("OPENAI_API_KEY"), "gpt-4o", "")
if err != nil {
panic(err)
}
defer model.Close()
req := blazen.CompletionRequest{
Messages: []blazen.ChatMessage{
{Role: "user", Content: "Write a haiku about Go's channels."},
},
}
for ev := range blazen.Stream(context.Background(), model, req) {
switch e := ev.(type) {
case *blazen.StreamChunkEvent:
fmt.Print(e.Chunk.ContentDelta)
case *blazen.StreamDoneEvent:
fmt.Printf("\n[done: %s; tokens=%d]\n",
e.FinishReason, e.Usage.TotalTokens)
case *blazen.StreamErrorEvent:
fmt.Fprintf(os.Stderr, "\nstream error: %v\n", e.Err)
}
}
}
The event sum type
StreamEvent is a sealed interface. Switch on the concrete pointer type to inspect the payload:
type StreamChunkEvent struct {
Chunk StreamChunk
}
type StreamDoneEvent struct {
FinishReason string // "stop", "tool_calls", "length", or ""
Usage TokenUsage // aggregated final usage
}
type StreamErrorEvent struct {
Err error // already wrapped as a typed Blazen error
}
The StreamChunk payload carries the incremental delta:
type StreamChunk struct {
ContentDelta string // text delta since the previous chunk
ToolCalls []ToolCall // tool-call snapshot for this chunk
IsFinal bool // UI hint marking the last content-bearing chunk
}
IsFinal is a UI convenience — the authoritative completion signal is the terminal *StreamDoneEvent or *StreamErrorEvent. ToolCalls is a snapshot, not a delta — replace your local state on each delivery rather than appending.
Tool calls in a stream
When the model emits tool calls instead of text, each StreamChunkEvent carries the cumulative tool-call snapshot for that point in the stream. Consume them by replacing your local state on each chunk:
var toolCalls []blazen.ToolCall
for ev := range blazen.Stream(ctx, model, req) {
switch e := ev.(type) {
case *blazen.StreamChunkEvent:
if len(e.Chunk.ToolCalls) > 0 {
toolCalls = e.Chunk.ToolCalls
}
fmt.Print(e.Chunk.ContentDelta)
case *blazen.StreamDoneEvent:
if e.FinishReason == "tool_calls" {
// Dispatch toolCalls -- this is the final snapshot.
}
case *blazen.StreamErrorEvent:
return e.Err
}
}
Cancellation
Stream accepts a context.Context. When the context fires, the channel receives a *StreamErrorEvent carrying ctx.Err() and is closed:
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
for ev := range blazen.Stream(ctx, model, req) {
// ... if ctx fires, you'll get exactly one StreamErrorEvent
// with errors.Is(e.Err, context.DeadlineExceeded) == true.
}
The underlying Rust stream observes the channel closure on its next chunk delivery and tears down cooperatively, but cancellation propagation is best-effort — the provider request keeps running until it finishes naturally. See the Context guide for the full story.
Always drain the channel
A buffered consumer that stops reading before the terminal event will prevent the background goroutine that drives the FFI call from exiting, and will pin the model handle in memory. The channel buffer is small (16 events), so a stalled consumer also stalls the Rust producer once the buffer fills.
The idiomatic pattern is for ev := range stream { ... }, which naturally drains until the terminal event closes the channel. If you need to bail out early, cancel the context — that delivers a terminal *StreamErrorEvent and closes the channel from the Go side.
Lifetime
The model handle must be alive for the duration of the stream. Do not call model.Close() until you have drained the channel:
defer model.Close() // CORRECT: runs after the loop returns.
for ev := range blazen.Stream(ctx, model, req) {
// ... consume events ...
}
Closing the model mid-stream is undefined behavior on the FFI side — the finalizer will eventually clean up, but you may see a *ValidationError carrying "completion model has been closed" on subsequent chunks.