Streaming

Stream tokens from LLM completions in Ruby

Streaming

Blazen’s Ruby binding exposes streaming chat completions through Blazen::Streaming.complete (blocking) and Blazen::Streaming.complete_async (fiber-yielding). Both accept either a single block that fans out to per-event handlers or explicit on_chunk: / on_done: / on_error: kwargs.

Block-form callback API

The single-block form is the shortest path — the block receives (kind, *args), where kind is one of :chunk, :done, or :error and the remaining arguments depend on the kind:

require 'blazen'

Blazen.init

model = Blazen::Providers.openai(
  api_key: ENV.fetch('OPENAI_API_KEY'),
  model: 'gpt-4o-mini',
)

req = Blazen::Llm.completion_request(
  messages: [
    Blazen::Llm.system('You are a helpful assistant.'),
    Blazen::Llm.user('Tell me a short joke about Ruby blocks.'),
  ],
  max_tokens: 256,
)

Blazen::Streaming.complete(model, req) do |kind, *args|
  case kind
  when :chunk
    chunk = args.first
    print chunk.content_delta
  when :done
    reason, usage = args
    puts "\nFinished: #{reason} (#{usage.completion_tokens} tokens)"
  when :error
    err = args.first
    warn "Stream failed: #{err.class}: #{err.message}"
  end
end

Blazen::Streaming.complete consumes the CompletionRequest — once you pass a request in, do not reuse the same wrapper for a second call. Build a fresh one each time.

Explicit handlers

For longer or already-named callbacks, pass each handler as a kwarg instead of a single block:

on_chunk = ->(chunk) { $stdout.write(chunk.content_delta.to_s) }
on_done  = ->(reason, usage) do
  puts "\n[done: #{reason}; tokens=#{usage&.total_tokens}]"
end
on_error = ->(err) { warn "stream error: #{err.message}" }

Blazen::Streaming.complete(
  model, req,
  on_chunk: on_chunk,
  on_done:  on_done,
  on_error: on_error,
)

The two forms are equivalent. If you pass both a block and a kwarg, the kwarg wins.

Event kinds

The three event kinds correspond to the three vtable callbacks on the cabi side:

  • :chunk — args = [chunk], where chunk is a Blazen::Streaming::StreamChunk. Each chunk carries:

    • chunk.content_delta — incremental text delta since the previous chunk (String or nil)
    • chunk.final?true for the terminal content-bearing chunk (UI hint; not the authoritative completion signal)
    • chunk.tool_calls — cumulative tool-call snapshot at this point (Array<Blazen::Llm::ToolCall>). Replace your local state on each delivery rather than appending.
  • :done — args = [finish_reason, usage]. Fires exactly once on the happy path. finish_reason is a String ("stop", "tool_calls", "length", or the empty string for providers that don’t report one). usage is a Blazen::Llm::TokenUsage (or nil for providers that don’t report it).

  • :error — args = [err]. Fires at most once on failure. err is an instance of the matching Blazen::Error subclass (Blazen::RateLimitError, Blazen::ProviderError, Blazen::CancelledError, …) — the cabi decodes the typed error before invoking your callback. The stream is torn down after :error fires; you will not see a subsequent :done.

Exactly one of :done or :error fires per stream. If your callback raises, the wrapper logs to STDERR and returns -1 to the cabi, which aborts the stream.

Tool calls during a stream

When the model emits tool calls instead of text, each :chunk carries the cumulative tool-call snapshot. Replace your local state on every chunk:

tool_calls = []
Blazen::Streaming.complete(model, req) do |kind, *args|
  case kind
  when :chunk
    chunk = args.first
    tool_calls = chunk.tool_calls if chunk.tool_calls.any?
    print chunk.content_delta
  when :done
    reason, _usage = args
    if reason == 'tool_calls'
      tool_calls.each do |tc|
        puts "\n[tool call] #{tc.name}(#{tc.arguments_json})"
      end
    end
  when :error
    warn args.first.message
  end
end

chunk.tool_calls is a snapshot, not a delta — the final snapshot is the one that arrives in the chunk immediately before :done.

Async variant

Blazen::Streaming.complete_async does the same work but returns from the cabi via the async future surface. When a Fiber.scheduler is active (e.g. inside an Async { ... } block from the async gem), the calling fiber yields while the stream is in flight; otherwise it blocks the calling thread, like complete.

require 'async'

Async do
  Blazen::Streaming.complete_async(model, req) do |kind, *args|
    case kind
    when :chunk then print args.first.content_delta
    when :done  then puts "\n[done: #{args[0]}]"
    when :error then warn args.first.message
    end
  end
end

The sink callbacks themselves fire on cabi Tokio worker threads, not on the calling fiber — the complete_async API only changes how the outer call yields control. If your callbacks need fiber-local state, marshal back to the calling fiber explicitly (via a Queue or an Async::Notification) rather than touching fiber-locals from inside the callback.

Cancellation

There is no per-call cancellation handle on the Ruby streaming surface today. The two practical options for ending a stream early:

  1. Raise from a callback. Raising any StandardError inside on_chunk causes the wrapper to return -1 to the cabi, which tears the stream down. The Rust side surfaces a Blazen::InternalError to whichever entry point owns the stream lifecycle.
  2. Cancel the surrounding fiber. Inside Async { ... }, cancelling the parent task unwinds the calling fiber. Cleanup happens on the cabi side when the future drops.

Cancellation propagation into the native provider request is best-effort — the upstream HTTP call keeps running until it finishes naturally or hits its own timeout. Set a Blazen::Providers.openai(..., base_url: ...) proxy with an aggressive read timeout, or rely on the workflow-level b.timeout_ms(...) budget, if you need a hard wall-clock cap.

Streaming inside a workflow step

A step block can call Blazen::Streaming.complete to drive a streaming completion and buffer the tokens before returning a StopEvent:

workflow = Blazen.workflow('streaming_greet') do |b|
  b.step('greet', accepts: ['blazen::StartEvent'], emits: ['blazen::StopEvent']) do |_evt|
    buffer = +''
    Blazen::Streaming.complete(model, req) do |kind, *args|
      buffer << args.first.content_delta.to_s if kind == :chunk
    end
    Blazen::Workflow::StepOutput.single(
      Blazen::Workflow::Event.create(
        event_type: 'blazen::StopEvent',
        data: { result: { content: buffer } },
      ),
    )
  end
end

For a step that wants to publish each delta as it arrives — into a Rack SSE response, an ActionCable channel, or any other downstream sink — forward chunks from the callback into a Queue your handler reads from outside the workflow run.

See also

  • Quickstart — non-streaming complete_blocking and the basic completion request shape.
  • Events — how to wrap streamed content into routing events.
  • Multimodal — attach images / audio to a streaming request the same way.