Streaming
Stream tokens from LLM completions in Ruby
Streaming
Blazen’s Ruby binding exposes streaming chat completions through Blazen::Streaming.complete (blocking) and Blazen::Streaming.complete_async (fiber-yielding). Both accept either a single block that fans out to per-event handlers or explicit on_chunk: / on_done: / on_error: kwargs.
Block-form callback API
The single-block form is the shortest path — the block receives (kind, *args), where kind is one of :chunk, :done, or :error and the remaining arguments depend on the kind:
require 'blazen'
Blazen.init
model = Blazen::Providers.openai(
api_key: ENV.fetch('OPENAI_API_KEY'),
model: 'gpt-4o-mini',
)
req = Blazen::Llm.completion_request(
messages: [
Blazen::Llm.system('You are a helpful assistant.'),
Blazen::Llm.user('Tell me a short joke about Ruby blocks.'),
],
max_tokens: 256,
)
Blazen::Streaming.complete(model, req) do |kind, *args|
case kind
when :chunk
chunk = args.first
print chunk.content_delta
when :done
reason, usage = args
puts "\nFinished: #{reason} (#{usage.completion_tokens} tokens)"
when :error
err = args.first
warn "Stream failed: #{err.class}: #{err.message}"
end
end
Blazen::Streaming.complete consumes the CompletionRequest — once you pass a request in, do not reuse the same wrapper for a second call. Build a fresh one each time.
Explicit handlers
For longer or already-named callbacks, pass each handler as a kwarg instead of a single block:
on_chunk = ->(chunk) { $stdout.write(chunk.content_delta.to_s) }
on_done = ->(reason, usage) do
puts "\n[done: #{reason}; tokens=#{usage&.total_tokens}]"
end
on_error = ->(err) { warn "stream error: #{err.message}" }
Blazen::Streaming.complete(
model, req,
on_chunk: on_chunk,
on_done: on_done,
on_error: on_error,
)
The two forms are equivalent. If you pass both a block and a kwarg, the kwarg wins.
Event kinds
The three event kinds correspond to the three vtable callbacks on the cabi side:
-
:chunk— args =[chunk], wherechunkis aBlazen::Streaming::StreamChunk. Each chunk carries:chunk.content_delta— incremental text delta since the previous chunk (Stringornil)chunk.final?—truefor the terminal content-bearing chunk (UI hint; not the authoritative completion signal)chunk.tool_calls— cumulative tool-call snapshot at this point (Array<Blazen::Llm::ToolCall>). Replace your local state on each delivery rather than appending.
-
:done— args =[finish_reason, usage]. Fires exactly once on the happy path.finish_reasonis aString("stop","tool_calls","length", or the empty string for providers that don’t report one).usageis aBlazen::Llm::TokenUsage(ornilfor providers that don’t report it). -
:error— args =[err]. Fires at most once on failure.erris an instance of the matchingBlazen::Errorsubclass (Blazen::RateLimitError,Blazen::ProviderError,Blazen::CancelledError, …) — the cabi decodes the typed error before invoking your callback. The stream is torn down after:errorfires; you will not see a subsequent:done.
Exactly one of :done or :error fires per stream. If your callback raises, the wrapper logs to STDERR and returns -1 to the cabi, which aborts the stream.
Tool calls during a stream
When the model emits tool calls instead of text, each :chunk carries the cumulative tool-call snapshot. Replace your local state on every chunk:
tool_calls = []
Blazen::Streaming.complete(model, req) do |kind, *args|
case kind
when :chunk
chunk = args.first
tool_calls = chunk.tool_calls if chunk.tool_calls.any?
print chunk.content_delta
when :done
reason, _usage = args
if reason == 'tool_calls'
tool_calls.each do |tc|
puts "\n[tool call] #{tc.name}(#{tc.arguments_json})"
end
end
when :error
warn args.first.message
end
end
chunk.tool_calls is a snapshot, not a delta — the final snapshot is the one that arrives in the chunk immediately before :done.
Async variant
Blazen::Streaming.complete_async does the same work but returns from the cabi via the async future surface. When a Fiber.scheduler is active (e.g. inside an Async { ... } block from the async gem), the calling fiber yields while the stream is in flight; otherwise it blocks the calling thread, like complete.
require 'async'
Async do
Blazen::Streaming.complete_async(model, req) do |kind, *args|
case kind
when :chunk then print args.first.content_delta
when :done then puts "\n[done: #{args[0]}]"
when :error then warn args.first.message
end
end
end
The sink callbacks themselves fire on cabi Tokio worker threads, not on the calling fiber — the complete_async API only changes how the outer call yields control. If your callbacks need fiber-local state, marshal back to the calling fiber explicitly (via a Queue or an Async::Notification) rather than touching fiber-locals from inside the callback.
Cancellation
There is no per-call cancellation handle on the Ruby streaming surface today. The two practical options for ending a stream early:
- Raise from a callback. Raising any
StandardErrorinsideon_chunkcauses the wrapper to return-1to the cabi, which tears the stream down. The Rust side surfaces aBlazen::InternalErrorto whichever entry point owns the stream lifecycle. - Cancel the surrounding fiber. Inside
Async { ... }, cancelling the parent task unwinds the calling fiber. Cleanup happens on the cabi side when the future drops.
Cancellation propagation into the native provider request is best-effort — the upstream HTTP call keeps running until it finishes naturally or hits its own timeout. Set a Blazen::Providers.openai(..., base_url: ...) proxy with an aggressive read timeout, or rely on the workflow-level b.timeout_ms(...) budget, if you need a hard wall-clock cap.
Streaming inside a workflow step
A step block can call Blazen::Streaming.complete to drive a streaming completion and buffer the tokens before returning a StopEvent:
workflow = Blazen.workflow('streaming_greet') do |b|
b.step('greet', accepts: ['blazen::StartEvent'], emits: ['blazen::StopEvent']) do |_evt|
buffer = +''
Blazen::Streaming.complete(model, req) do |kind, *args|
buffer << args.first.content_delta.to_s if kind == :chunk
end
Blazen::Workflow::StepOutput.single(
Blazen::Workflow::Event.create(
event_type: 'blazen::StopEvent',
data: { result: { content: buffer } },
),
)
end
end
For a step that wants to publish each delta as it arrives — into a Rack SSE response, an ActionCable channel, or any other downstream sink — forward chunks from the callback into a Queue your handler reads from outside the workflow run.
See also
- Quickstart — non-streaming
complete_blockingand the basic completion request shape. - Events — how to wrap streamed content into routing events.
- Multimodal — attach images / audio to a streaming request the same way.