Python API Reference
Complete API reference for blazen in Python
Event
The preferred way to define events is by subclassing Event. The event_type is automatically set to the class name.
class AnalyzeEvent(Event):
text: str
score: float
ev = AnalyzeEvent(text="hello", score=0.9)
ev.event_type # "AnalyzeEvent"
ev.text # "hello"
You can also construct events inline without a subclass:
Event(event_type: str, **kwargs)
ev = Event("AnalyzeEvent", text="hello", score=0.9)
| Member | Type | Description |
|---|---|---|
.event_type | str | The event type string. Auto-set to the class name for subclasses. |
.to_dict() | -> dict | Serialize the event data to a plain dictionary. |
.field_name | Any | Attribute access for any keyword argument supplied at construction. |
StartEvent
StartEvent(**kwargs)
Built-in event whose event_type is "blazen::StartEvent". All keyword arguments are available as attributes.
StopEvent
StopEvent(result=dict)
Built-in event whose event_type is "blazen::StopEvent".
| Member | Type | Description |
|---|---|---|
.result | Any | The value passed via the result keyword argument. |
StopEvent(result=x) preserves is-identity for non-JSON values. Pass class instances, Pydantic models, DB connections, or lambdas as the result and await handler.result() returns an event whose .result attribute is the same Python object — non-JSON values are routed through a per-Context session-ref registry and __getattr__ on the returned event resolves the marker transparently.
step decorator
The @step decorator reads the type hint of the ev parameter to automatically determine which events the step accepts.
class AnalyzeEvent(Event):
text: str
@step
async def analyze(ctx: Context, ev: AnalyzeEvent) -> Event | None:
...
# Equivalent to @step(accepts=["AnalyzeEvent"])
When the annotation is the base Event class or absent, the step defaults to accepting StartEvent:
@step
async def start(ctx: Context, ev: Event) -> Event | None:
...
# Equivalent to @step(accepts=["blazen::StartEvent"])
Explicit overrides still work:
| Variant | Description |
|---|---|
@step | Infers accepts from the ev type hint. Defaults to StartEvent when the hint is Event or missing. |
@step(accepts=["EventType"]) | Explicitly sets accepted event types, overriding type-hint inference. |
@step(emits=["EventType"]) | Declares the event types this step may produce. |
@step(max_concurrency=N) | Limits how many instances of this step may run concurrently. 0 means unlimited. |
Step signature
async def name(ctx: Context, ev: MyEvent) -> Event | list[Event] | None
Return an Event to emit it, a list[Event] to emit several, or None to emit nothing. Steps can be sync or async.
Workflow
Workflow(name: str, steps: list, timeout: float = None)
Create a workflow from a name and an ordered list of steps. The optional timeout is in seconds.
| Method | Signature | Description |
|---|---|---|
run | await wf.run(**kwargs) -> WorkflowHandler | Start the workflow. Keyword arguments become fields on the initial StartEvent. |
WorkflowHandler
Returned by Workflow.run(). Provides control over a running workflow instance.
| Method | Signature | Description |
|---|---|---|
result | await handler.result() -> Event | Block until the workflow emits a StopEvent and return it. |
stream_events | handler.stream_events() -> AsyncIterator[Event] | Async iterator yielding events written to the stream. |
handler = await wf.run(prompt="Hello")
# Stream intermediate events while waiting for the result
async for event in handler.stream_events():
print(event.event_type, event.to_dict())
result = await handler.result()
Context
Available as the first parameter of every step function. All methods are synchronous.
Two explicit namespaces are exposed alongside the smart-routing shortcuts on ctx itself:
| Field | Type | Description |
|---|---|---|
state | StateNamespace | Persistable workflow state. Routes through the same 4-tier dispatch as ctx.set (bytes / JSON / pickle / live-ref). Survives pause() / resume() and checkpoint stores. |
session | SessionNamespace | Live in-process references. Identity is preserved within a single workflow run. Values are deliberately excluded from snapshots. |
| Method | Signature | Description |
|---|---|---|
set | ctx.set(key: str, value: StateValue) -> None | Store any Python value via 4-tier dispatch: bytes/bytearray are stored as raw binary; JSON-serializable types (dict, list, str, int, float, bool, None) are stored as JSON; picklable objects (Pydantic models, dataclasses, custom classes) are pickled automatically; unpicklable objects (DB connections, file handles, lambdas) are stored as a live in-process reference (same-process only, excluded from snapshots). |
get | ctx.get(key: str) -> StateValue | None | Retrieve a value by key, or None if absent. Returns the original type transparently: JSON values come back as their Python type, bytes come back as bytes, and both pickled and live-reference values round-trip to the original Python object. |
set_bytes | ctx.set_bytes(key: str, data: bytes) -> None | Convenience alias for storing raw binary data. Equivalent to ctx.set(key, data) when data is bytes. |
get_bytes | ctx.get_bytes(key: str) -> bytes | None | Convenience alias for retrieving raw binary data, or None if absent. |
run_id | ctx.run_id() -> str | Return the UUID of the current workflow run. |
send_event | ctx.send_event(event: Event) -> None | Route an event to matching steps manually. |
write_event_to_stream | ctx.write_event_to_stream(event: Event) -> None | Publish an event to the stream visible via WorkflowHandler.stream_events(). |
StateValue = Any — a type alias defined in the .pyi stubs indicating that any Python value is accepted. The first three storage tiers (bytes / JSON / pickle) persist through pause/resume/checkpoint; the fourth tier (live in-process reference) is same-process only.
StateNamespace
Namespace for persistable workflow state, accessed via ctx.state. Routes values through the same 4-tier dispatch as Context.set: bytes → JSON → pickle → live-object. The first three tiers survive pause() / resume() and checkpoint stores; the fourth is in-process only.
| Method | Signature | Description |
|---|---|---|
set | ctx.state.set(key: str, value: StateValue) -> None | Store a value under key using 4-tier dispatch. |
get | ctx.state.get(key: str) -> StateValue | None | Retrieve the value under key, deserialized to its original Python type, or None if absent. |
set_bytes | ctx.state.set_bytes(key: str, data: bytes) -> None | Store raw binary data under key. |
get_bytes | ctx.state.get_bytes(key: str) -> bytes | None | Retrieve raw binary data under key, or None if absent. |
Dict protocol
Also supports __setitem__, __getitem__, and __contains__ so it can be used like a dict:
ctx.state.set("counter", 5)
ctx.state["counter"] = 5 # equivalent
count = ctx.state.get("counter")
print("counter" in ctx.state) # True
SessionNamespace
Namespace for live in-process references, accessed via ctx.session. Identity is preserved within a single workflow run; values are deliberately excluded from snapshots, so they are the right place to stash database connections, open file handles, ML model objects, or any other resource that cannot (or should not) be serialized.
| Method | Signature | Description |
|---|---|---|
set | ctx.session.set(key: str, value: Any) -> None | Store a live reference to value under key. |
get | ctx.session.get(key: str) -> Any | None | Retrieve the live reference under key, or None if absent. The returned object is the same Python object that was stored. |
has | ctx.session.has(key: str) -> bool | Return whether key is currently set. |
remove | ctx.session.remove(key: str) -> None | Drop the entry under key, if any. |
Dict protocol
Also supports __setitem__, __getitem__, and __contains__:
import sqlite3
conn = sqlite3.connect(":memory:")
ctx.session.set("db", conn)
assert ctx.session.get("db") is conn # same object, always
Pause/resume behavior
Session values are not serialized into workflow snapshots. What happens to them at pause time is governed by the workflow’s session_pause_policy (default "pickle_or_error"). The policy exists at the workflow level — see the workflow configuration for details.
BlazenState
Base class for typed workflow state with per-field context storage. Subclass with @dataclass to get automatic per-field serialization where each field is stored individually using its optimal tier.
from dataclasses import dataclass
from blazen import BlazenState
@dataclass
class MyState(BlazenState):
input_path: str = ""
conn: sqlite3.Connection | None = None
class Meta:
transient = {"conn"}
store_by = {}
def restore(self):
if self.input_path:
self.conn = sqlite3.connect(self.input_path)
Store and retrieve via context:
ctx.set("state", my_state) # Stores each field individually
state = ctx.get("state") # Reconstructs object, then calls restore()
Meta inner class
| Attribute | Type | Description |
|---|---|---|
transient | ClassVar[set[str]] | Field names excluded from serialization. These fields are set to None in snapshots and recreated by restore(). |
store_by | ClassVar[dict[str, FieldStore]] | Custom persistence strategy per field. Fields not listed use the default automatic tier (JSON / bytes / pickle). |
Methods
| Method | Signature | Description |
|---|---|---|
restore | def restore(self) -> None | Override to recreate transient fields after deserialization. Called automatically by ctx.get() once all serializable fields are populated. Transient fields are None at call time. |
FieldStore
Structural protocol for custom per-field persistence. There is no FieldStore class to import — any object whose shape matches the two methods below can be used as a value in BlazenState.Meta.store_by to route specific fields through custom storage (e.g., S3, a database, Redis).
Implement the shape directly with your own class, or use CallbackFieldStore below for the common case.
| Method | Signature | Description |
|---|---|---|
save | def save(self, key: str, value: Any, ctx: Context) -> None | Persist the field value under the given key. |
load | def load(self, key: str, ctx: Context) -> Any | Load and return the field value for the given key. |
CallbackFieldStore
Convenience implementation of the FieldStore structural protocol that delegates to plain callables. Importable directly from blazen.
from blazen import CallbackFieldStore
CallbackFieldStore(
save_fn: Callable[[str, Any], None],
load_fn: Callable[[str], Any],
)
| Parameter | Type | Description |
|---|---|---|
save_fn | Callable[[str, Any], None] | Called with (key, value) to persist a field. |
load_fn | Callable[[str], Any] | Called with (key) to load a field value. |
Also exposes save(key, value, ctx) and load(key, ctx) methods matching the FieldStore protocol — the ctx argument is accepted but not forwarded to your callbacks.
from blazen import CallbackFieldStore
store = CallbackFieldStore(
save_fn=lambda k, v: s3.put_object(Bucket="b", Key=k, Body=v),
load_fn=lambda k: s3.get_object(Bucket="b", Key=k)["Body"].read(),
)
This is a tiny ~15-line wrapper. If your store needs richer behavior (the
ctxargument, async I/O, batching), implement the structural protocol directly with your own class.
CompletionModel
Use static constructor methods to create a model for a specific provider, then call complete() or stream() to generate responses.
API keys are resolved from ProviderOptions(api_key=...) when passed explicitly, or from the provider’s standard environment variable (e.g. OPENAI_API_KEY, ANTHROPIC_API_KEY) when options is omitted.
from blazen import CompletionModel, ProviderOptions
# Read key from OPENAI_API_KEY env var
model = CompletionModel.openai()
# Pass an explicit key
model = CompletionModel.anthropic(options=ProviderOptions(api_key="sk-ant-..."))
# Override the default model
model = CompletionModel.openrouter(
options=ProviderOptions(api_key="sk-or-...", model="meta-llama/llama-3-70b")
)
Provider constructors
| Constructor | Signature |
|---|---|
openai | CompletionModel.openai(*, options: ProviderOptions = None) |
anthropic | CompletionModel.anthropic(*, options: ProviderOptions = None) |
gemini | CompletionModel.gemini(*, options: ProviderOptions = None) |
azure | CompletionModel.azure(*, options: AzureOptions) |
openrouter | CompletionModel.openrouter(*, options: ProviderOptions = None) |
groq | CompletionModel.groq(*, options: ProviderOptions = None) |
together | CompletionModel.together(*, options: ProviderOptions = None) |
mistral | CompletionModel.mistral(*, options: ProviderOptions = None) |
deepseek | CompletionModel.deepseek(*, options: ProviderOptions = None) |
fireworks | CompletionModel.fireworks(*, options: ProviderOptions = None) |
perplexity | CompletionModel.perplexity(*, options: ProviderOptions = None) |
xai | CompletionModel.xai(*, options: ProviderOptions = None) |
cohere | CompletionModel.cohere(*, options: ProviderOptions = None) |
bedrock | CompletionModel.bedrock(*, options: BedrockOptions) |
fal | CompletionModel.fal(*, options: FalOptions = None) |
Properties
| Property | Type | Description |
|---|---|---|
.model_id | str | The string identifier of the active model. |
complete()
response: CompletionResponse = await model.complete(
messages: list[ChatMessage],
options: CompletionOptions = None,
)
Returns a typed CompletionResponse (see below). Also supports dict-style access for backwards compatibility: response["content"].
opts = CompletionOptions(temperature=0.7, max_tokens=1024)
response = await model.complete(messages, opts)
stream()
await model.stream(
messages: list[ChatMessage],
on_chunk: Callable[[dict], Any],
options: CompletionOptions = None,
)
Streams a chat completion, calling on_chunk for each chunk received. Each chunk is a dict with the following keys:
| Key | Type | Description |
|---|---|---|
delta | str | None | The incremental text content for this chunk. |
finish_reason | str | None | Set on the final chunk (e.g. "stop", "tool_calls"). |
tool_calls | list[dict] | Tool call fragments, if any. |
def handle(chunk):
if chunk["delta"]:
print(chunk["delta"], end="")
await model.stream([ChatMessage.user("Tell me a story")], handle)
CompletionOptions
Options object for complete() and stream(). All fields are optional keyword arguments.
opts = CompletionOptions(
temperature=0.7,
max_tokens=1024,
top_p=0.9,
model="gpt-4o",
tools=[{"name": "search", "description": "...", "parameters": {...}}],
response_format={"type": "json_schema", "json_schema": {...}},
)
| Field | Type | Description |
|---|---|---|
temperature | float | None | Sampling temperature (0.0-2.0). |
max_tokens | int | None | Maximum tokens to generate. |
top_p | float | None | Nucleus sampling parameter (0.0-1.0). |
model | str | None | Model override for this request. |
tools | Any | None | Tool definitions for function calling. |
response_format | dict | None | JSON schema dict for structured output. |
Middleware decorators
Each decorator returns a new CompletionModel wrapping the original with additional behaviour.
| Method | Signature | Description |
|---|---|---|
with_retry | .with_retry(config: RetryConfig | None = None) | Automatic retry with exponential backoff on transient failures. |
with_cache | .with_cache(config: CacheConfig | None = None) | In-memory response cache for identical non-streaming requests. |
with_fallback | CompletionModel.with_fallback(models: list[CompletionModel]) | Static method. Tries providers in order; falls back on transient errors. |
RetryConfig and CacheConfig are typed configuration objects:
RetryConfig(*, max_retries=3, initial_delay_ms=1000, max_delay_ms=30000, honor_retry_after=True, jitter=True)
CacheConfig(*, strategy: CacheStrategy | None = None, ttl_seconds=300, max_entries=1000)
# Chain decorators with typed config objects
model = (
CompletionModel.openai()
.with_cache(CacheConfig(ttl_seconds=600))
.with_retry(RetryConfig(max_retries=5))
)
# Or use defaults (both config args are optional)
model = CompletionModel.openai().with_cache().with_retry()
# Fallback across providers
primary = CompletionModel.openai()
backup = CompletionModel.anthropic()
model = CompletionModel.with_fallback([primary, backup])
CompletionResponse
Returned by model.complete(). Supports both attribute access and dict-style access.
| Property | Type | Description |
|---|---|---|
.content | str | None | The generated text. |
.model | str | Model name used for the completion. |
.finish_reason | str | None | Why generation stopped ("stop", "tool_calls", etc.). |
.tool_calls | list[ToolCall] | Tool calls requested by the model. |
.usage | TokenUsage | None | Token usage statistics. |
.cost | float | None | Estimated cost in USD for this request. |
.timing | RequestTiming | None | Timing metadata for the request. |
.images | list[dict] | Image outputs (provider-dependent). |
.audio | list[dict] | Audio outputs (provider-dependent). |
.videos | list[dict] | Video outputs (provider-dependent). |
response = await model.complete([ChatMessage.user("Hello")])
print(response.content) # attribute access
print(response["content"]) # dict-style access (backwards compatible)
print(response.cost) # e.g. 0.0023
print(response.timing) # RequestTiming or None
print(response.keys()) # list of available keys
RequestTiming
Timing metadata attached to a CompletionResponse. All fields are optional since not every provider reports timing data.
| Property | Type | Description |
|---|---|---|
.queue_ms | int | None | Time spent waiting in the provider’s queue. |
.execution_ms | int | None | Time spent executing the request. |
.total_ms | int | None | Total round-trip time. |
response = await model.complete([ChatMessage.user("Hello")])
if response.timing:
print(f"Total: {response.timing.total_ms}ms")
print(f"Queue: {response.timing.queue_ms}ms")
print(f"Execution: {response.timing.execution_ms}ms")
ChatMessage
A single message in a chat conversation.
msg = ChatMessage(role="user", content="Hello, world!")
# role is optional, defaults to "user"
msg = ChatMessage(content="Hello!")
Static constructors
| Method | Description |
|---|---|
ChatMessage.system(content: str) | Create a system message. |
ChatMessage.user(content: str) | Create a user message. |
ChatMessage.assistant(content: str) | Create an assistant message. |
ChatMessage.tool(content: str) | Create a tool result message. |
ChatMessage.user_image_url(*, text, url, media_type=None) | Create a user message with text and an image URL. |
ChatMessage.user_image_base64(*, text, data, media_type) | Create a user message with text and a base64 image. |
ChatMessage.user_parts(*, parts: list[ContentPart]) | Create a user message with multiple content parts. |
Properties
| Property | Type | Description |
|---|---|---|
.role | str | One of "system", "user", "assistant", "tool". |
.content | str | None | The message text. |
.tool_call_id | str | None | For role="tool" messages, the call ID this message responds to. None otherwise. |
.name | str | None | For role="tool" messages, the tool/function name (set by some providers). None otherwise. |
.tool_result | ToolOutput | None | The structured tool-result payload. None for non-tool messages or when the tool returned a plain string (in which case the string lives in .content instead). See ToolOutput. |
Role
Constants for message roles.
from blazen import Role
Role.SYSTEM # "system"
Role.USER # "user"
Role.ASSISTANT # "assistant"
Role.TOOL # "tool"
ContentPart
Build multimodal content parts for use with ChatMessage.user_parts().
| Factory Method | Description |
|---|---|
ContentPart.text(*, text=...) | Create a text content part. |
ContentPart.image_url(*, url=..., media_type=...) | Create an image URL content part. |
ContentPart.image_base64(*, data=..., media_type=...) | Create a base64 image content part. |
msg = ChatMessage.user_parts(parts=[
ContentPart.text(text="What's in this image?"),
ContentPart.image_url(url="https://example.com/photo.jpg", media_type=MediaType.JPEG),
])
ToolCall
A tool invocation requested by the model.
| Property | Type | Description |
|---|---|---|
.id | str | Unique identifier for the tool call. |
.name | str | Name of the tool to invoke. |
.arguments | dict[str, Any] | Parsed arguments for the tool call. |
Supports dict-style access: tool_call["name"].
ToolOutput
Two-channel return value from a tool handler. Tool results have two distinct audiences. The caller (your Python code) wants the full structured data; the LLM, on the next turn, may need a different shape — sometimes shorter, sometimes provider-specific. ToolOutput carries both channels: data is what the caller sees, llm_override is what the LLM sees.
import blazen
out = blazen.ToolOutput(data={"items": [1, 2, 3]})
out.data # {"items": [1, 2, 3]}
out.llm_override # None
# Explicit override: the caller still gets the full dict, but the LLM
# sees a short summary on its next turn.
out2 = blazen.ToolOutput(
data={"items": [1, 2, 3], "_debug": "..."},
llm_override=blazen.LlmPayload.text("Found 3 items."),
)
Constructor
| Argument | Type | Description |
|---|---|---|
data | Any | The structured value the caller sees programmatically. Dict, list, scalar, or string — anything JSON-serializable. |
llm_override | LlmPayload | None | Optional override for what the LLM sees on the next turn. None means each provider applies its default conversion from data (see Per-provider behavior). |
Properties
| Property | Type | Description |
|---|---|---|
.data | Any | The user-visible structured payload. Re-materialized as a Python value on each access. |
.llm_override | LlmPayload | None | The LLM-side override, if set. |
A tool handler can return a bare dict/list/str/scalar, in which case Blazen auto-wraps it as ToolOutput(data=value, llm_override=None). Returning a ToolOutput instance directly is only required when you want to set llm_override.
LlmPayload
Provider-aware override for what the LLM sees as a tool result. Constructed only via the classmethod factories — there is no public __init__.
import blazen
blazen.LlmPayload.text("Found 3 results.")
blazen.LlmPayload.json({"items": [1, 2, 3]})
blazen.LlmPayload.provider_raw(
provider="anthropic",
value=[{"type": "text", "text": "..."}],
)
Variants
Variant kind | Constructor | Behavior |
|---|---|---|
"text" | LlmPayload.text(text: str) | Plain text. Works on every provider. |
"json" | LlmPayload.json(value: Any) | Structured JSON. Anthropic and Gemini natively consume the structure; OpenAI-family stringifies once at the wire boundary. |
"provider_raw" | LlmPayload.provider_raw(*, provider: str, value: Any) | Provider-specific escape hatch. Only the named provider sees value; every other provider falls back to converting ToolOutput.data with its default. provider is one of "openai", "openai_compat", "azure", "anthropic", "gemini", "responses", "fal". |
Properties
| Property | Type | Description |
|---|---|---|
.kind | str | The variant tag: "text", "json", "parts", or "provider_raw". |
.text_value | str | None | The text body for Text payloads. None for other variants. |
.value | Any | None | The structured value for Json and ProviderRaw payloads. None otherwise. |
.provider | str | None | The provider name for ProviderRaw payloads. None otherwise. |
Per-provider behavior
When a tool returns structured data and no llm_override, each provider sends a sensible default to the LLM:
- OpenAI / OpenAI-compat / Azure / Responses / Fal:
datais JSON-stringified into thecontentfield of the tool message. - Anthropic: structured
databecomes[{"type": "text", "text": <stringified-json>}]insidetool_result.content. - Gemini: structured object
datais passed natively asfunctionResponse.response. Scalar values (numbers, booleans, strings) are wrapped as{"result": <scalar>}.
Plain strings always pass through unchanged on every provider — a tool returning "hello" results in the LLM seeing hello, not "hello".
When an llm_override is set:
LlmPayload.text(...)is universally accepted.LlmPayload.json(...)is consumed natively by Anthropic and Gemini and stringified by the OpenAI family.LlmPayload.provider_raw(provider="X", value=V)only takes effect when the active provider matchesX. For every other provider the dispatcher falls back to convertingToolOutput.datawith the provider’s default rule above.
TokenUsage
Token usage statistics for a completion.
| Property | Type | Description |
|---|---|---|
.prompt_tokens | int | Tokens in the prompt. |
.completion_tokens | int | Tokens in the completion. |
.total_tokens | int | Total tokens used. |
Supports dict-style access: usage["total_tokens"].
Agent System
The agent system provides an agentic tool-execution loop on top of CompletionModel. Define tools with ToolDef, then call run_agent to let the model iteratively call tools until it produces a final answer.
ToolDef
Define a tool that the model can invoke during an agent run.
ToolDef(
*,
name: str,
description: str,
parameters: dict[str, Any],
handler: Callable | AsyncCallable,
)
| Parameter | Type | Description |
|---|---|---|
name | str | Unique tool name exposed to the model. |
description | str | Description the model uses to decide when to call this tool. |
parameters | dict | JSON Schema describing the tool’s input parameters. |
handler | Callable | Function called when the model invokes the tool. Can be sync or async. Receives a dict[str, Any] of arguments and returns either a JSON-serializable value (auto-wrapped into ToolOutput(data=value)) or an explicit ToolOutput when you want to override what the LLM sees. |
A handler may return:
- A bare
dict,list, scalar, orstr— Blazen wraps it asToolOutput(data=value, llm_override=None)and each provider applies its default conversion. - A
ToolOutput— full control. Setllm_overrideto send the LLM a different shape than the structureddatayour code reads back frommessages[-1].tool_result.
import blazen
from blazen import ToolDef
# 1. Sync handler — return a bare dict, auto-wrapped.
tool = ToolDef(
name="search",
description="Search the web for a query",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"],
},
handler=lambda args: {"results": ["result1", "result2"]},
)
# 2. Async handler — return a bare dict, auto-wrapped.
async def fetch_weather(args):
data = await weather_api(args["city"])
return {"temperature": data.temp, "conditions": data.conditions}
weather_tool = ToolDef(
name="weather",
description="Get current weather for a city",
parameters={
"type": "object",
"properties": {
"city": {"type": "string"}
},
"required": ["city"],
},
handler=fetch_weather,
)
# 3. Explicit ToolOutput — LLM sees a short summary, but the caller's
# `messages[-1].tool_result.data` still has the full structured payload.
def search_with_summary(args):
return blazen.ToolOutput(
data={"items": [1, 2, 3], "raw_response": "..."},
llm_override=blazen.LlmPayload.text("Found 3 items."),
)
search_tool = ToolDef(
name="search",
description="Search for items.",
parameters={
"type": "object",
"properties": {"q": {"type": "string"}},
"required": ["q"],
},
handler=search_with_summary,
)
After a tool runs, the result message in the conversation history exposes both channels:
result = await run_agent(model, messages, tools=[search_tool])
last = result.messages[-1]
last.role # "tool"
last.tool_call_id # the call ID this responded to
last.tool_result # ToolOutput | None — None if the handler returned a plain string
last.tool_result.data # full caller-visible payload
last.tool_result.llm_override # the LlmPayload sent to the LLM, if any
run_agent
Run an agentic tool-execution loop. The model is called repeatedly, executing any requested tool calls and feeding results back, until the model stops calling tools or max_iterations is reached.
result: AgentResult = await run_agent(
model: CompletionModel,
messages: list[ChatMessage],
*,
tools: list[ToolDef],
max_iterations: int = 10,
system_prompt: str = None,
temperature: float = None,
max_tokens: int = None,
add_finish_tool: bool = False,
)
| Parameter | Type | Default | Description |
|---|---|---|---|
model | CompletionModel | required | The model to use for completions. |
messages | list[ChatMessage] | required | Initial conversation messages. |
tools | list[ToolDef] | required | Tools available to the model. |
max_iterations | int | 10 | Maximum number of tool-call rounds before stopping. |
system_prompt | str | None | None | Optional system prompt prepended to messages. |
temperature | float | None | None | Sampling temperature override. |
max_tokens | int | None | None | Max tokens per completion call. |
add_finish_tool | bool | False | If True, adds a built-in “finish” tool the model can call to explicitly end the loop. |
model = CompletionModel.openai()
messages = [ChatMessage.user("What's the weather in Paris and London?")]
result = await run_agent(model, messages, tools=[weather_tool])
print(result.response.content) # Final answer
print(result.iterations) # Number of tool-call rounds
print(result.total_cost) # Accumulated cost across all iterations
AgentResult
Returned by run_agent.
| Property | Type | Description |
|---|---|---|
.response | CompletionResponse | The final completion response from the model. |
.messages | list[ChatMessage] | The full conversation history including all tool calls and results. |
.iterations | int | Number of tool-call iterations executed. |
.total_cost | float | None | Total cost in USD accumulated across all iterations. |
MediaType
Constants for common MIME types. Useful when constructing ContentPart or compute requests.
from blazen import MediaType
MediaType.PNG # "image/png"
MediaType.MP4 # "video/mp4"
MediaType.MP3 # "audio/mpeg"
MediaType.GLB # "model/gltf-binary"
Image types
| Constant | MIME Type |
|---|---|
MediaType.PNG | image/png |
MediaType.JPEG | image/jpeg |
MediaType.WEBP | image/webp |
MediaType.GIF | image/gif |
MediaType.SVG | image/svg+xml |
MediaType.BMP | image/bmp |
MediaType.TIFF | image/tiff |
MediaType.AVIF | image/avif |
Video types
| Constant | MIME Type |
|---|---|
MediaType.MP4 | video/mp4 |
MediaType.WEBM | video/webm |
MediaType.MOV | video/quicktime |
Audio types
| Constant | MIME Type |
|---|---|
MediaType.MP3 | audio/mpeg |
MediaType.WAV | audio/wav |
MediaType.OGG | audio/ogg |
MediaType.FLAC | audio/flac |
MediaType.AAC | audio/aac |
MediaType.M4A | audio/mp4 |
3D model types
| Constant | MIME Type |
|---|---|
MediaType.GLB | model/gltf-binary |
MediaType.GLTF | model/gltf+json |
MediaType.OBJ | model/obj |
MediaType.USDZ | model/vnd.usdz+zip |
MediaType.FBX | model/fbx |
MediaType.STL | model/stl |
Document types
| Constant | MIME Type |
|---|---|
MediaType.PDF | application/pdf |
MediaSource alias
MediaSource is a module-level alias for ImageSource, exported from blazen so callers writing media-source-typed code can use the more descriptive name:
from blazen import MediaSource # alias of ImageSource
src = MediaSource.from_path("./photo.png")
The two names refer to the same class — isinstance(src, ImageSource) is True.
Content Subsystem
A pluggable registry for multimodal blobs (images, audio, video, documents, 3D, CAD) that lets tool handlers emit and accept opaque handles instead of inline bytes. Handles are resolved to a wire-renderable MediaSource only when the model actually needs the content.
ContentKind
Taxonomy enum of multimodal content kinds. Used by tool-input declarations and ContentStore routing.
from blazen import ContentKind
ContentKind.Image
ContentKind.from_str("three_d_model")
ContentKind.from_mime("image/png")
ContentKind.from_extension("glb")
Members
| Member | Wire name (name_str) |
|---|---|
ContentKind.Image | "image" |
ContentKind.Audio | "audio" |
ContentKind.Video | "video" |
ContentKind.Document | "document" |
ContentKind.ThreeDModel | "three_d_model" |
ContentKind.Cad | "cad" |
ContentKind.Archive | "archive" |
ContentKind.Font | "font" |
ContentKind.Code | "code" |
ContentKind.Data | "data" |
ContentKind.Other | "other" |
Methods
| Method | Description |
|---|---|
kind.name_str | Property returning the canonical wire name (matches the JSON / serde tag). |
ContentKind.from_str(value: str) | Parse a kind from its canonical wire name. Unknown names raise ValueError. |
ContentKind.from_mime(mime: str) | Map a MIME type to a kind. Unknown MIME types resolve to ContentKind.Other. |
ContentKind.from_extension(ext: str) | Map a filename extension (no leading dot, case-insensitive) to a kind. Unknown extensions resolve to ContentKind.Other. |
ContentHandle
Stable, opaque reference to content registered with a ContentStore. Most callers obtain handles from ContentStore.put rather than constructing them directly.
ContentHandle(
id: str,
kind: ContentKind,
*,
mime_type: str | None = None,
byte_size: int | None = None,
display_name: str | None = None,
)
Properties (all read-only)
| Property | Type | Description |
|---|---|---|
.id | str | Opaque, store-defined identifier. Treat as a black box. |
.kind | ContentKind | What kind of content this handle refers to. |
.mime_type | str | None | MIME type if known. |
.byte_size | int | None | Byte size if known. |
.display_name | str | None | Human-readable display name (e.g. original filename) if known. |
ContentStore
A pluggable content registry that backs handle resolution. Construct via one of the static factories. All instance methods return awaitables.
from blazen import ContentStore, ContentKind
store = ContentStore.in_memory()
handle = await store.put(b"...", kind=ContentKind.Image, mime_type="image/png")
source = await store.resolve(handle) # {"type": "base64", "data": "..."}
Static factories
| Factory | Returns |
|---|---|
ContentStore.in_memory() | ContentStore |
ContentStore.local_file(path: str | os.PathLike | pathlib.Path) | ContentStore |
ContentStore.openai_files(api_key: str, *, base_url: str | None = None) | ContentStore |
ContentStore.anthropic_files(api_key: str, *, base_url: str | None = None) | ContentStore |
ContentStore.gemini_files(api_key: str, *, base_url: str | None = None) | ContentStore |
ContentStore.fal_storage(api_key: str, *, base_url: str | None = None) | ContentStore |
ContentStore.custom(*, put, resolve, fetch_bytes, fetch_stream=None, delete=None, name="custom") | ContentStore |
Instance methods (all awaitable)
| Method | Description |
|---|---|
await store.put(body, *, kind=None, mime_type=None, display_name=None, byte_size=None) | Persist content and return a freshly-issued ContentHandle. body may be bytes, a URL str, or a pathlib.Path. Keyword arguments are optional hints; the store may auto-detect kind / MIME from the bytes. |
await store.resolve(handle) | Resolve a handle to a wire-renderable media source. Returns a serialized MediaSource dict, e.g. {"type": "url", "url": "..."} or {"type": "base64", "data": "..."}. |
await store.fetch_bytes(handle) | Fetch raw bytes for a handle. Stores that hold only references (URL / provider-file / local-path) may raise UnsupportedError. |
await store.metadata(handle) | Cheap metadata lookup with no byte materialization. Returns {"kind": ..., "mime_type": ..., "byte_size": ..., "display_name": ...}. |
await store.delete(handle) | Optional cleanup hook. Default backends drop the entry; provider backends issue a delete call to the upstream API. |
Subclassing ContentStore
ContentStore is subclassable from Python. Override the methods your backend needs; the framework wraps your subclass in a Rust adapter (PyHostContentStore) that dispatches into your Python coroutines.
from blazen import ContentStore, ContentHandle, ContentKind
class S3ContentStore(ContentStore):
def __init__(self, bucket: str):
super().__init__()
self.bucket = bucket
async def put(self, body, hint) -> ContentHandle:
...
async def resolve(self, handle) -> dict:
...
async def fetch_bytes(self, handle) -> bytes:
...
# Optional overrides:
async def fetch_stream(self, handle): ...
async def delete(self, handle) -> None: ...
Subclasses MUST override put, resolve, fetch_bytes. The base-class default impls raise NotImplementedError so any missing override is an immediate clear error rather than silent infinite recursion via super().
ContentStore.custom(...)
Callback-based factory. Direct Python mirror of Rust CustomContentStore::builder.
ContentStore.custom(
*,
put: Callable[..., Awaitable[ContentHandle]],
resolve: Callable[[ContentHandle], Awaitable[dict]],
fetch_bytes: Callable[[ContentHandle], Awaitable[bytes]],
fetch_stream: Callable[[ContentHandle], Awaitable[bytes]] | None = None,
delete: Callable[[ContentHandle], Awaitable[None]] | None = None,
name: str = "custom",
) -> ContentStore
put, resolve, fetch_bytes are required. fetch_stream and delete are optional. The body argument to put arrives as a dict shaped like {"type": "bytes", "data": [...]} / {"type": "url", "url": "..."} / {"type": "local_path", "path": "..."} / {"type": "provider_file", "provider": "openai", "id": "..."} / {"type": "stream", "stream": <AsyncByteIter>, "size_hint": int | None}. The hint is a dict with optional mime_type / kind_hint / display_name / byte_size.
put must return a ContentHandle. resolve returns a serialized MediaSource dict (e.g. {"type": "url", "url": "..."}). fetch_bytes returns raw bytes. fetch_stream may return either bytes (legacy, single-chunk) or an AsyncIterator[bytes] for true chunk-by-chunk streaming.
Built-in stores
| Factory | Purpose | Notes |
|---|---|---|
ContentStore.in_memory() | Ephemeral / test workloads. | Holds bytes in process memory; resolves to base64. |
ContentStore.local_file(path) | Filesystem-backed persistence rooted at path. | Directory is created recursively if missing. Resolves to a local path / URL. |
ContentStore.openai_files(api_key, *, base_url=None) | OpenAI Files API. | Resolves to a provider file reference; fetch_bytes may not be supported. |
ContentStore.anthropic_files(api_key, *, base_url=None) | Anthropic Files API (beta). | Provider-file backed. |
ContentStore.gemini_files(api_key, *, base_url=None) | Google Gemini Files API. | Provider-file backed. |
ContentStore.fal_storage(api_key, *, base_url=None) | fal.ai object storage. | Resolves to URL references hosted on fal. |
ContentStore.custom(...) | User-defined backend via async callables (see above). | Required: put, resolve, fetch_bytes. Optional: fetch_stream, delete, name. |
Tool-input schema helpers
Top-level functions that return JSON-Schema-shaped dict fragments declaring a single required content-reference input. Each fragment carries an x-blazen-content-ref extension keyed to the appropriate ContentKind.
| Helper | Description |
|---|---|
image_input(name, description) | Schema declaring a single required image input. |
audio_input(name, description) | Schema declaring a single required audio input. |
video_input(name, description) | Schema declaring a single required video input. |
file_input(name, description) | Schema declaring a single required document / file input. |
three_d_input(name, description) | Schema declaring a single required 3D-model input. |
cad_input(name, description) | Schema declaring a single required CAD-file input. |
from blazen import image_input
schema = image_input("photo", "The photo to analyze")
# {
# "type": "object",
# "properties": {
# "photo": {
# "type": "string",
# "description": "The photo to analyze",
# "x-blazen-content-ref": {"kind": "image"}
# }
# },
# "required": ["photo"]
# }
Generic schema builders
For schemas that need extra companion fields alongside the content reference, or for embedding a content-ref property inside a larger object schema you assemble yourself.
| Helper | Description |
|---|---|
content_ref_property(kind: ContentKind, description: str) | Build a single-property fragment of the form {"type": "string", "description": ..., "x-blazen-content-ref": {"kind": ...}}, ready to embed inside an object schema’s properties map. |
content_ref_required_object(name: str, kind: ContentKind, description: str, *, extra_properties: dict | None = None) | Build a complete object-typed schema declaring a single required content-reference input plus optional companion fields merged from extra_properties. |
from blazen import ContentKind, content_ref_property, content_ref_required_object
# Single property fragment for manual assembly.
prop = content_ref_property(ContentKind.Image, "Source frame")
schema = {
"type": "object",
"properties": {"frame": prop, "threshold": {"type": "number"}},
"required": ["frame"],
}
# Or the full-object form with companion fields merged in.
schema = content_ref_required_object(
"frame",
ContentKind.Image,
"Source frame",
extra_properties={"threshold": {"type": "number"}},
)
How resolution works
The x-blazen-content-ref JSON Schema extension is invisible to the model and to the upstream provider — they see a plain string property. When the model emits {"photo": "blazen_a1b2c3..."}, Blazen’s resolver intercepts the tool-call arguments before invoking the handler, looks up each content-ref string in the active ContentStore, and substitutes the bare id with a typed content dict of the form:
{
"kind": "image",
"handle_id": "blazen_a1b2c3...",
"mime_type": "image/png",
"byte_size": 48213,
"display_name": "frame.png",
"source": {"type": "url", "url": "..."}, # serialized MediaSource
}
The tool handler receives the substituted shape and never has to call store.resolve itself. Handles produced by a tool’s return value flow back through the same store, so subsequent turns can reference them by id without re-uploading bytes.
Compute Request Types
Compute requests define jobs for media generation and processing. All requests are plain dict objects.
ImageRequest
Generate images from a text prompt. Passed as a plain dict.
{
"prompt": str, # required
"negative_prompt": str, # optional
"width": int, # optional
"height": int, # optional
"num_images": int, # optional
"model": str, # optional
}
| Key | Type | Required | Description |
|---|---|---|---|
"prompt" | str | yes | Text description of the image to generate. |
"negative_prompt" | str | no | What to avoid in the generated image. |
"width" | int | no | Image width in pixels. |
"height" | int | no | Image height in pixels. |
"num_images" | int | no | Number of images to generate. |
"model" | str | no | Specific model to use (provider-dependent). |
req = {"prompt": "a cat in space", "width": 1024, "height": 1024, "num_images": 2}
UpscaleRequest
Upscale an existing image to a higher resolution. Passed as a plain dict.
{
"image_url": str, # required
"scale": float, # required
"model": str, # optional
}
| Key | Type | Required | Description |
|---|---|---|---|
"image_url" | str | yes | URL of the image to upscale. |
"scale" | float | yes | Upscale factor (e.g. 2.0, 4.0). |
"model" | str | no | Specific model to use. |
req = {"image_url": "https://example.com/photo.jpg", "scale": 4.0}
VideoRequest
Generate a video from a text prompt, optionally with an input image. Passed as a plain dict.
{
"prompt": str, # required
"image_url": str, # optional
"duration_seconds": float, # optional
"negative_prompt": str, # optional
"width": int, # optional
"height": int, # optional
"model": str, # optional
}
| Key | Type | Required | Description |
|---|---|---|---|
"prompt" | str | yes | Text description of the video to generate. |
"image_url" | str | no | Optional starting image to animate. |
"duration_seconds" | float | no | Desired video length in seconds. |
"negative_prompt" | str | no | What to avoid in the generated video. |
"width" | int | no | Video width in pixels. |
"height" | int | no | Video height in pixels. |
"model" | str | no | Specific model to use. |
req = {"prompt": "a sunset timelapse", "duration_seconds": 5.0}
req = {"prompt": "animate this scene", "image_url": "https://example.com/frame.jpg"}
SpeechRequest
Generate speech audio from text. Passed as a plain dict.
{
"text": str, # required
"voice": str, # optional
"voice_url": str, # optional
"language": str, # optional
"speed": float, # optional
"model": str, # optional
}
| Key | Type | Required | Description |
|---|---|---|---|
"text" | str | yes | The text to convert to speech. |
"voice" | str | no | Voice preset name (e.g. "alloy", "nova"). |
"voice_url" | str | no | URL to a custom voice sample for cloning. |
"language" | str | no | Language code (e.g. "en", "fr"). |
"speed" | float | no | Playback speed multiplier (e.g. 1.2 for 20% faster). |
"model" | str | no | Specific model to use. |
req = {"text": "Hello world", "voice": "alloy", "speed": 1.2}
MusicRequest
Generate music or sound effects from a text prompt. Passed as a plain dict.
{
"prompt": str, # required
"duration_seconds": float, # optional
"model": str, # optional
}
| Key | Type | Required | Description |
|---|---|---|---|
"prompt" | str | yes | Description of the music to generate. |
"duration_seconds" | float | no | Desired duration in seconds. |
"model" | str | no | Specific model to use. |
req = {"prompt": "upbeat jazz", "duration_seconds": 30.0}
TranscriptionRequest
Transcribe audio to text. Passed as a plain dict.
{
"audio_url": str, # required
"language": str, # optional
"diarize": bool, # optional
"model": str, # optional
}
| Key | Type | Required | Description |
|---|---|---|---|
"audio_url" | str | yes | URL of the audio file to transcribe. |
"language" | str | no | Language hint (e.g. "en"). |
"diarize" | bool | no | If True, identify and label different speakers. |
"model" | str | no | Specific model to use. |
req = {"audio_url": "https://example.com/audio.mp3", "language": "en", "diarize": True}
ThreeDRequest
Generate a 3D model from a text prompt or image. Passed as a plain dict.
{
"prompt": str, # optional (provide at least one of prompt or image_url)
"image_url": str, # optional
"format": str, # optional
"model": str, # optional
}
| Key | Type | Required | Description |
|---|---|---|---|
"prompt" | str | no | Text description of the 3D object to generate. |
"image_url" | str | no | Image to use as reference for 3D generation. |
"format" | str | no | Output format (e.g. "glb", "obj", "usdz"). |
"model" | str | no | Specific model to use. |
Provide at least one of "prompt" or "image_url".
req = {"prompt": "a 3D cat", "format": "glb"}
req = {"image_url": "https://example.com/photo.jpg", "format": "obj"}
StreamChunk
A typed object received by the on_chunk callback during streaming. Replaces the raw dict interface while remaining backwards-compatible via chunk["key"] access.
| Property | Type | Description |
|---|---|---|
.delta | str | None | Incremental text content. |
.finish_reason | str | None | Present only on the final chunk ("stop", "tool_calls", etc.). |
.tool_calls | list[ToolCall] | Tool invocations completed in this chunk. |
async def on_chunk(chunk):
# Attribute access (preferred)
if chunk.delta:
print(chunk.delta, end="")
# Dict-style access (backwards compatible)
if chunk["finish_reason"]:
print(f"\n[done: {chunk['finish_reason']}]")
EmbeddingModel
Generate vector embeddings from text. Created via static constructor methods, similar to CompletionModel. Keys are read from environment variables (OPENAI_API_KEY, etc.) when options is omitted, or can be passed explicitly via ProviderOptions(api_key=...).
from blazen import EmbeddingModel, ProviderOptions
model = EmbeddingModel.openai()
model = EmbeddingModel.openai(model="text-embedding-3-large", dimensions=3072)
model = EmbeddingModel.openai(options=ProviderOptions(api_key="sk-..."))
model = EmbeddingModel.together()
model = EmbeddingModel.cohere()
model = EmbeddingModel.fireworks()
Provider constructors
| Constructor | Signature |
|---|---|
openai | EmbeddingModel.openai(*, options: ProviderOptions = None, model: str = None, dimensions: int = None) |
together | EmbeddingModel.together(*, options: ProviderOptions = None) |
cohere | EmbeddingModel.cohere(*, options: ProviderOptions = None) |
fireworks | EmbeddingModel.fireworks(*, options: ProviderOptions = None) |
Properties
| Property | Type | Description |
|---|---|---|
.model_id | str | The model identifier. |
.dimensions | int | Output vector dimensionality. |
embed()
response: EmbeddingResponse = await model.embed(texts: list[str])
Returns an EmbeddingResponse with one vector per input text.
EmbeddingResponse
Returned by EmbeddingModel.embed().
| Property | Type | Description |
|---|---|---|
.embeddings | list[list[float]] | One vector per input text. |
.model | str | Model that produced the embeddings. |
.usage | TokenUsage | None | Token usage statistics. |
.cost | float | None | Estimated cost in USD. |
.timing | RequestTiming | None | Request timing breakdown. |
response = await model.embed(["Hello", "World"])
print(len(response.embeddings)) # 2
print(len(response.embeddings[0])) # 1536
print(response.model) # "text-embedding-3-small"
print(response.cost) # e.g. 0.0001
Token Estimation
Lightweight token counting functions that work without external data files. Uses a heuristic (~3.5 characters per token) suitable for budget checks.
estimate_tokens()
from blazen import estimate_tokens
count = estimate_tokens("Hello, world!") # 4
count = estimate_tokens("Hello, world!", 32000) # same, with custom context size
| Parameter | Type | Default | Description |
|---|---|---|---|
text | str | required | The text to estimate. |
context_size | int | 128000 | Context window size hint. |
count_message_tokens()
from blazen import count_message_tokens, ChatMessage
count = count_message_tokens([
ChatMessage.system("You are helpful."),
ChatMessage.user("Hello!"),
])
Includes per-message overhead (role markers, separators) in addition to content tokens.
| Parameter | Type | Default | Description |
|---|---|---|---|
messages | list[ChatMessage] | required | Messages to count. |
context_size | int | 128000 | Context window size hint. |
Subclassable Providers
CompletionModel, EmbeddingModel, and Transcription can be subclassed to implement custom providers. Override the relevant methods and the framework will dispatch to your implementation.
CompletionModel
from blazen import CompletionModel, ChatMessage
class MyLLM(CompletionModel):
def __init__(self):
super().__init__(model_id="my-llm")
async def complete(self, messages, options=None):
# Your inference logic here
return {"content": "Hello from my custom model"}
async def stream(self, messages, on_chunk, options=None):
on_chunk({"delta": "Hello", "finish_reason": None, "tool_calls": []})
on_chunk({"delta": None, "finish_reason": "stop", "tool_calls": []})
model = MyLLM()
response = await model.complete([ChatMessage.user("Hi")])
EmbeddingModel
from blazen import EmbeddingModel
class MyEmbedder(EmbeddingModel):
def __init__(self):
super().__init__(model_id="my-embedder", dimensions=128)
async def embed(self, texts):
return {"embeddings": [[0.1] * 128 for _ in texts], "model": "my-embedder"}
Transcription
from blazen import Transcription
class MyTranscriber(Transcription):
def __init__(self):
super().__init__(provider_id="my-stt")
async def transcribe(self, request):
return {"text": "transcribed text", "segments": []}
Per-Capability Provider Classes
Seven provider base classes let you implement a single compute capability without dealing with the full ComputeProvider interface. Subclass and override the relevant methods.
| Class | Methods to Override | Rust Trait |
|---|---|---|
TTSProvider | text_to_speech(request) | AudioGeneration |
MusicProvider | generate_music(request), generate_sfx(request) | AudioGeneration |
ImageProvider | generate_image(request), upscale_image(request) | ImageGeneration |
VideoProvider | text_to_video(request), image_to_video(request) | VideoGeneration |
ThreeDProvider | generate_3d(request) | ThreeDGeneration |
BackgroundRemovalProvider | remove_background(request) | BackgroundRemoval |
VoiceProvider | clone_voice(request), list_voices(), delete_voice(voice) | VoiceCloning |
Constructor
All provider classes share the same constructor signature:
TTSProvider(
*,
provider_id: str,
base_url: str | None = None,
pricing: ModelPricing | None = None,
memory_estimate_bytes: int | None = None,
)
| Parameter | Type | Description |
|---|---|---|
provider_id | str | Identifier for the provider instance. |
base_url | str | None | Optional base URL for the provider API. |
pricing | ModelPricing | None | Optional pricing info for cost tracking. |
memory_estimate_bytes | int | None | Estimated memory footprint (host RAM if on CPU, GPU VRAM otherwise) for ModelManager integration. |
Example
from blazen import TTSProvider
class ElevenLabsTTS(TTSProvider):
def __init__(self, api_key: str):
super().__init__(provider_id="elevenlabs")
self.api_key = api_key
async def text_to_speech(self, request):
# Call ElevenLabs API with self.api_key
return {"audio": audio_bytes, "format": "mp3"}
tts = ElevenLabsTTS(api_key="sk-...")
result = await tts.text_to_speech({"text": "Hello world", "voice": "alice"})
MemoryBackend
Base class for custom memory storage backends. Subclass to implement persistence backed by Postgres, SQLite, DynamoDB, or any other store.
from blazen import MemoryBackend
class PostgresBackend(MemoryBackend):
async def put(self, entry):
# Insert or update entry in Postgres
...
async def get(self, id):
# Retrieve entry by id
...
async def delete(self, id):
# Delete entry, return True if it existed
...
async def list(self):
# Return all entries
...
async def len(self):
# Return count of entries
...
async def search_by_bands(self, bands, limit):
# Return candidates sharing LSH bands with the query
...
Methods to Override
| Method | Signature | Description |
|---|---|---|
put | async def put(self, entry) -> None | Insert or update a stored entry. |
get | async def get(self, id: str) -> dict | None | Retrieve a stored entry by id. |
delete | async def delete(self, id: str) -> bool | Delete an entry by id. Returns True if it existed. |
list | async def list(self) -> list[dict] | Return all stored entries. |
len | async def len(self) -> int | Return the number of stored entries. |
search_by_bands | async def search_by_bands(self, bands, limit) -> list[dict] | Return candidate entries sharing at least one LSH band. |
ModelManager
Per-pool memory budget-aware model manager with LRU eviction. Tracks registered local models and their estimated memory footprint (host RAM if the model runs on CPU, GPU VRAM otherwise) in distinct pools — one for CPU RAM and one per GPU device. When loading a model that would exceed its pool’s budget, the least-recently-used loaded model in the same pool is unloaded first; models in different pools never evict each other.
ModelManager is a memory budget bookkeeper, not a performance scheduler. It answers “will this fit?” — not “will this run fast?”. Whether a 70B model loaded on CPU is useful at 1-3 tok/s is a workload-choice question the manager intentionally does not answer.
Constructor
from blazen import ModelManager
# Common case: separate CPU RAM and GPU VRAM budgets in GB.
manager = ModelManager(cpu_ram_gb=100, gpu_vram_gb=24)
# Explicit per-pool budgets (bytes). Pool labels: "cpu", "gpu", or "gpu:N".
manager = ModelManager(pool_budgets={
"cpu": 100 * 1024**3,
"gpu:0": 24 * 1024**3,
})
# Zero-arg: defaults both the CPU pool and GPU 0 pool to "unlimited"
# (u64::MAX) -- the no-budget-enforcement sentinel for tests.
manager = ModelManager()
| Parameter | Type | Description |
|---|---|---|
cpu_ram_gb | float | None | Budget for the "cpu" pool in gigabytes. |
gpu_vram_gb | float | None | Budget for the "gpu:0" pool in gigabytes. |
pool_budgets | dict[str, int] | None | Explicit per-pool budgets in bytes, keyed by pool label ("cpu", "gpu", or "gpu:N"). |
Methods
| Method | Signature | Description |
|---|---|---|
register | await manager.register(id, model, memory_estimate_bytes=...) | Register a model with its estimated memory footprint. Memory is charged to the pool returned by model.device(). Starts unloaded. |
load | await manager.load(id) | Load a model, evicting LRU same-pool models if needed. |
unload | await manager.unload(id) | Unload a model and free its memory. |
is_loaded | await manager.is_loaded(id) -> bool | Check if a model is currently loaded. |
ensure_loaded | await manager.ensure_loaded(id) | Alias for load(). |
used_bytes | await manager.used_bytes(pool="cpu") -> int | Total memory currently used by loaded models in the given pool. Default "cpu". |
available_bytes | await manager.available_bytes(pool="cpu") -> int | Available memory within the given pool’s budget. Default "cpu". |
pools | manager.pools() -> list[tuple[str, int]] | Sync. Return [(label, budget_bytes), ...] for all configured pools. |
status | await manager.status() -> list[ModelStatus] | Status of all registered models. |
Invalid pool labels passed to used_bytes / available_bytes raise ValueError("invalid pool label '<x>': expected 'cpu', 'gpu', or 'gpu:N' where N is a non-negative integer").
ModelStatus
| Property | Type | Description |
|---|---|---|
.id | str | Model identifier. |
.loaded | bool | Whether the model is currently loaded. |
.memory_estimate_bytes | int | Estimated memory footprint in bytes (host RAM if on CPU, GPU VRAM otherwise). |
.pool | str | Pool label the model is charged against (e.g. "cpu" or "gpu:0"). |
ModelRegistry
An ABC for advertising a model catalog. Subclass to plug a custom catalog (a static manifest, a remote control-plane lookup, a cache of /v1/models results) into Blazen’s model-info surface. Mirrors the Rust trait blazen_llm::traits::ModelRegistry and the equivalent ABCs in the Node and WASM SDKs.
Both methods are abstract — the default implementations raise NotImplementedError, so a subclass must override both.
class ModelRegistry:
async def list_models(self) -> list[ModelInfo]: ...
async def get_model(self, model_id: str) -> ModelInfo | None: ...
Subclass example
from blazen import ModelRegistry, ModelInfo
class StaticRegistry(ModelRegistry):
def __init__(self, models: list[ModelInfo]):
super().__init__()
self._models = {m.id: m for m in models}
async def list_models(self) -> list[ModelInfo]:
return list(self._models.values())
async def get_model(self, model_id: str) -> ModelInfo | None:
return self._models.get(model_id)
Methods
| Method | Signature | Description |
|---|---|---|
list_models | async def list_models(self) -> list[ModelInfo] | List every model the registry advertises. |
get_model | async def get_model(self, model_id: str) -> ModelInfo | None | Look up a single model. Return None if unknown. |
See ModelInfo for the dataclass shape returned by these methods.
ModelPricing and Pricing Functions
ModelPricing
Pricing metadata for cost tracking.
from blazen import ModelPricing
pricing = ModelPricing(
input_per_million=1.0,
output_per_million=2.0,
per_image=0.02,
per_second=0.001,
)
| Property | Type | Description |
|---|---|---|
.input_per_million | float | None | Cost per million input tokens (USD). |
.output_per_million | float | None | Cost per million output tokens (USD). |
.per_image | float | None | Cost per generated image (USD). |
.per_second | float | None | Cost per second of compute (USD). |
register_pricing()
Register custom pricing for a model. Overrides any existing pricing for the same model ID.
from blazen import register_pricing, ModelPricing
register_pricing("my-model", ModelPricing(input_per_million=1.0, output_per_million=2.0))
lookup_pricing()
Look up pricing for a model by ID. Returns None if the model is unknown.
from blazen import lookup_pricing
pricing = lookup_pricing("gpt-4o")
if pricing:
print(f"Input: ${pricing.input_per_million}/M tokens")
LocalModel Methods on CompletionModel
CompletionModel instances backed by local inference (not remote APIs) support explicit load/unload lifecycle management.
| Method | Signature | Description |
|---|---|---|
load | await model.load() | Load the model into memory/VRAM. Idempotent. |
unload | await model.unload() | Free the model’s memory/VRAM. Idempotent. |
is_loaded | await model.is_loaded() -> bool | Whether the model is currently loaded. |
memory_bytes | await model.memory_bytes() -> int | None | Approximate memory footprint in bytes (host RAM if on CPU, GPU VRAM otherwise), or None if unknown. Default returns None. |
device | model.device() -> str | Return device string ('cpu', 'cuda:0', 'metal', etc.). Determines which pool the manager charges. Default implementation raises NotImplementedError — subclasses should override. |
model = CompletionModel.openai() # Remote -- these methods are no-ops
# For a local model:
await model.load()
print(await model.is_loaded()) # True
print(await model.memory_bytes()) # e.g. 4_000_000_000
await model.unload()
ProgressCallback
ProgressCallback is a subclassable abstract base class for receiving model-download progress notifications from ModelCache.download(...). Subclass it and override on_progress; the default implementation is a no-op so subclasses can ignore total-byte updates they do not care about.
| Method | Signature | Description |
|---|---|---|
__new__ | ProgressCallback() | Construct an instance. Subclass to override on_progress. |
on_progress | on_progress(downloaded: int, total: int | None) -> None | Called repeatedly during a download with current byte counts. total is None if the size is unknown. No-op default. |
from blazen import ProgressCallback
class MyProgress(ProgressCallback):
def on_progress(self, downloaded: int, total: int | None) -> None:
pct = (downloaded / total * 100) if total else 0
print(f"{pct:.1f}%")
cb = MyProgress()
await cache.download("bert-base-uncased", "config.json", cb)
You may also pass a plain Callable[[int, int | None], None] to download(progress=...) for the same effect; this ABC simply gives type-checked callers a typed base to inherit from.
Local Inference Types
The local-inference backends expose typed message, chunk, image, and result classes for use with the inherent infer / infer_stream APIs on each *Provider. There are three parallel families: the canonical un-prefixed Inference* types for mistral.rs, the LlamaCpp*-prefixed family for llama.cpp, and a single CandleInferenceResult for Candle (single-shot only — no streaming).
Streaming on both mistral.rs and llama.cpp returns an async iterator (InferenceChunkStream and LlamaCppInferenceChunkStream respectively) that you consume with async for chunk in stream: .... Both implement the __aiter__ / __anext__ protocol and terminate with StopAsyncIteration once the underlying engine stream is exhausted.
mistral.rs inference types
| Class | Purpose |
|---|---|
ChatMessageInput | A chat message for mistral.rs inference, optionally carrying image attachments. |
ChatRole | Enum: System, User, Assistant, Tool. |
InferenceImage | Image payload attached to a ChatMessageInput. Build with from_bytes(...) or from_path(...). |
InferenceImageSource | Underlying image source. Variants: bytes and path. Inspect via kind plus the per-variant getters. |
InferenceResult | Result of a single non-streaming call: content, reasoning_content, tool_calls, finish_reason, model, usage. |
InferenceChunk | Streaming delta: delta, reasoning_delta, tool_calls, finish_reason. |
InferenceChunkStream | Async iterator over InferenceChunk items. Implements __aiter__ / __anext__. |
InferenceToolCall | A tool call returned by the engine: id, name, arguments (JSON string). |
InferenceUsage | Token usage stats: prompt_tokens, completion_tokens, total_tokens, total_time_sec. |
from blazen import ChatMessageInput, ChatRole, InferenceImage
msg = ChatMessageInput.with_images(
role=ChatRole.User,
text="Describe this image.",
images=[InferenceImage.from_path("./photo.png")],
)
stream = await provider.infer_stream([msg])
async for chunk in stream:
if chunk.delta:
print(chunk.delta, end="")
llama.cpp inference types
llama.cpp messages are text-only; multimodal inputs are not supported by this backend.
| Class | Purpose |
|---|---|
LlamaCppChatMessageInput | A text-only chat message for llama.cpp inference. |
LlamaCppChatRole | Enum: System, User, Assistant, Tool. |
LlamaCppInferenceResult | Result of a single non-streaming call: content, finish_reason, model, usage. |
LlamaCppInferenceChunk | Streaming delta: delta, finish_reason. |
LlamaCppInferenceChunkStream | Async iterator over LlamaCppInferenceChunk items. Implements __aiter__ / __anext__. |
LlamaCppInferenceUsage | Token usage stats: prompt_tokens, completion_tokens, total_tokens, total_time_sec. |
from blazen import LlamaCppChatMessageInput, LlamaCppChatRole
msg = LlamaCppChatMessageInput(role=LlamaCppChatRole.User, text="Hello")
stream = await llama_provider.infer_stream([msg])
async for chunk in stream:
if chunk.delta:
print(chunk.delta, end="")
Candle inference types
Candle exposes a single non-streaming result type. Streaming is not currently supported on this backend.
| Class | Purpose |
|---|---|
CandleInferenceResult | Result of a non-streaming candle call: content, prompt_tokens, completion_tokens, total_time_secs. |
from blazen import CandleInferenceResult
result: CandleInferenceResult = await candle_provider.infer(messages)
print(result.content, result.prompt_tokens, result.completion_tokens)
Telemetry
Blazen ships three optional exporters for tracing and metrics: init_langfuse (LLM observability), init_otlp (generic OpenTelemetry), and init_prometheus (HTTP-scraped metrics). Each is gated behind a Cargo feature and installs a global subscriber on first call — invoke once at process startup, before any traced work. Calling more than one is allowed only in the documented order; install Langfuse before any other exporter if you want both.
| Function | Config type | Cargo feature | Purpose |
|---|---|---|---|
init_langfuse(config) | LangfuseConfig | langfuse | LLM call traces, token usage, latency to Langfuse. |
init_otlp(config) | OtlpConfig | otlp | Generic OpenTelemetry OTLP gRPC span exporter. |
init_prometheus(port) | int (no config object) | prometheus | Prometheus metrics over HTTP /metrics. |
Initialization functions raise a BlazenError subclass on failure (e.g. an invalid endpoint or port already in use).
LangfuseConfig
LLM-observability exporter for Langfuse. Behind the langfuse Cargo feature.
| Field | Type | Default | Description |
|---|---|---|---|
public_key | str | required | Langfuse public API key (Basic-auth username). |
secret_key | str | required | Langfuse secret API key (Basic-auth password). |
host | str | None | None | Langfuse host URL. Defaults to https://cloud.langfuse.com. |
batch_size | int | 100 | Maximum events buffered before an automatic flush. |
flush_interval_ms | int | 5000 | Background flush interval in milliseconds. |
from blazen import LangfuseConfig, init_langfuse
init_langfuse(LangfuseConfig(
public_key="pk-lf-...",
secret_key="sk-lf-...",
host="https://cloud.langfuse.com",
))
init_langfuse(config) spawns a background tokio task that periodically flushes buffered LLM call traces, token usage, and latency data to the Langfuse ingestion API. If a global tracing subscriber is already installed, it is a soft failure: the underlying dispatcher is constructed (so background flushing still runs) and the function returns without overwriting the existing subscriber.
OtlpConfig
Generic OpenTelemetry OTLP exporter. Behind the otlp Cargo feature.
| Field | Type | Default | Description |
|---|---|---|---|
endpoint | str | required | OTLP gRPC endpoint URL (e.g. "http://localhost:4317"). |
service_name | str | required | Service name reported to the backend. |
from blazen import OtlpConfig, init_otlp
init_otlp(OtlpConfig(endpoint="http://localhost:4317", service_name="my-app"))
init_otlp(config) sets up an OTLP gRPC span exporter and installs a combined tracing subscriber (env-filter + OTel layer + fmt layer).
Prometheus metrics
Behind the prometheus Cargo feature.
from blazen import init_prometheus
init_prometheus(9090) # serves /metrics on 0.0.0.0:9090
init_prometheus(port) installs a global metrics recorder backed by Prometheus and starts an HTTP listener on 0.0.0.0:{port} serving the /metrics endpoint. After calling this, any code using the metrics macros (counter!, histogram!, gauge!) is exposed at the Prometheus endpoint.
Error Handling
All Blazen failures are raised as subclasses of BlazenError, a typed exception hierarchy rooted at builtins.Exception. Catching BlazenError will catch every error raised from any Blazen API; catch a specific subclass to react to one category.
from blazen import BlazenError, RateLimitError, ProviderError, AuthError
try:
response = await model.complete([ChatMessage.user("Hello")])
except RateLimitError as e:
# Provider rate-limited the request -- back off and retry.
print(f"slow down: {e}")
except AuthError as e:
print(f"bad credentials: {e}")
except BlazenError as e:
print(f"blazen failed: {e}")
Base hierarchy
Every class below derives directly from BlazenError, which itself derives from builtins.Exception.
| Class | Description |
|---|---|
BlazenError | Base class for all Blazen runtime errors. Catches everything. |
AuthError | Authentication failed (invalid or missing API key). |
RateLimitError | Provider rate-limited the request. |
TimeoutError | The operation exceeded its time limit. Distinct from the builtin TimeoutError — import from blazen. |
ValidationError | Invalid input rejected before the provider round-trip. |
ContentPolicyError | Provider rejected the request for policy reasons. |
ProviderError | Provider-side error. Carries structured HTTP attributes (see below). |
UnsupportedError | Requested capability is not supported by this provider or backend. |
ComputeError | Compute job error (cancelled, quota exceeded, etc). |
MediaError | Media handling error (invalid input, size exceeded, etc). |
ProviderError attributes
For HTTP-attributable failures, ProviderError instances are populated with the following typed attributes (set via setattr on the instance, mirrored in the stub for static type-checking):
| Attribute | Type | Description |
|---|---|---|
provider | str | Provider tag (e.g. "fal", "openrouter"). |
status | int | None | HTTP status code; None for non-HTTP provider errors. |
endpoint | str | None | Request URL. |
request_id | str | None | x-fal-request-id / x-request-id header if present. |
detail | str | None | Parsed error message extracted from the JSON body. |
raw_body | str | None | Response body, capped at 4 KiB. |
retry_after_ms | int | None | Parsed Retry-After header in milliseconds. |
from blazen import ProviderError
try:
await model.complete([ChatMessage.user("Hello")])
except ProviderError as e:
print(f"{e.provider} {e.status} on {e.endpoint}")
if e.retry_after_ms:
await asyncio.sleep(e.retry_after_ms / 1000)
Per-backend ProviderError subclasses
Each local-inference backend raises its own ProviderError subclass so callers can route errors per-backend. The classes are always declared in the type stub, but only registered at runtime when the corresponding Cargo feature is enabled in your wheel build.
| Class | Cargo feature | Backend |
|---|---|---|
LlamaCppError | llamacpp | llama.cpp local LLM inference. |
CandleLlmError | candle-llm | Candle local LLM inference. |
CandleEmbedError | candle-embed | Candle local embedding backend. |
MistralRsError | mistralrs | mistral.rs local LLM inference. |
WhisperError | whispercpp | whisper.cpp transcription. |
PiperError | piper | Piper text-to-speech. |
DiffusionError | diffusion | Diffusion image generation. |
FastEmbedError | embed | fastembed embedding (non-musl only). |
TractError | tract | Tract ONNX embedding. |
Because they all derive from ProviderError, except ProviderError catches every backend-attributable error, including these subclasses. Use except LlamaCppError (etc.) when you need to react to a single backend.
from blazen import LlamaCppError, ProviderError
try:
await llama_provider.infer(messages)
except LlamaCppError as e:
print(f"llama.cpp blew up: {e}")
except ProviderError as e:
# Any other backend
print(f"{e.provider}: {e}")