Python API Reference

Complete API reference for blazen in Python

Event

The preferred way to define events is by subclassing Event. The event_type is automatically set to the class name.

class AnalyzeEvent(Event):
    text: str
    score: float

ev = AnalyzeEvent(text="hello", score=0.9)
ev.event_type   # "AnalyzeEvent"
ev.text          # "hello"

You can also construct events inline without a subclass:

Event(event_type: str, **kwargs)
ev = Event("AnalyzeEvent", text="hello", score=0.9)
MemberTypeDescription
.event_typestrThe event type string. Auto-set to the class name for subclasses.
.to_dict()-> dictSerialize the event data to a plain dictionary.
.field_nameAnyAttribute access for any keyword argument supplied at construction.

StartEvent

StartEvent(**kwargs)

Built-in event whose event_type is "blazen::StartEvent". All keyword arguments are available as attributes.


StopEvent

StopEvent(result=dict)

Built-in event whose event_type is "blazen::StopEvent".

MemberTypeDescription
.resultAnyThe value passed via the result keyword argument.

StopEvent(result=x) preserves is-identity for non-JSON values. Pass class instances, Pydantic models, DB connections, or lambdas as the result and await handler.result() returns an event whose .result attribute is the same Python object — non-JSON values are routed through a per-Context session-ref registry and __getattr__ on the returned event resolves the marker transparently.


step decorator

The @step decorator reads the type hint of the ev parameter to automatically determine which events the step accepts.

class AnalyzeEvent(Event):
    text: str

@step
async def analyze(ctx: Context, ev: AnalyzeEvent) -> Event | None:
    ...
# Equivalent to @step(accepts=["AnalyzeEvent"])

When the annotation is the base Event class or absent, the step defaults to accepting StartEvent:

@step
async def start(ctx: Context, ev: Event) -> Event | None:
    ...
# Equivalent to @step(accepts=["blazen::StartEvent"])

Explicit overrides still work:

VariantDescription
@stepInfers accepts from the ev type hint. Defaults to StartEvent when the hint is Event or missing.
@step(accepts=["EventType"])Explicitly sets accepted event types, overriding type-hint inference.
@step(emits=["EventType"])Declares the event types this step may produce.
@step(max_concurrency=N)Limits how many instances of this step may run concurrently. 0 means unlimited.

Step signature

async def name(ctx: Context, ev: MyEvent) -> Event | list[Event] | None

Return an Event to emit it, a list[Event] to emit several, or None to emit nothing. Steps can be sync or async.


Workflow

Workflow(name: str, steps: list, timeout: float = None)

Create a workflow from a name and an ordered list of steps. The optional timeout is in seconds.

MethodSignatureDescription
runawait wf.run(**kwargs) -> WorkflowHandlerStart the workflow. Keyword arguments become fields on the initial StartEvent.

WorkflowHandler

Returned by Workflow.run(). Provides control over a running workflow instance.

MethodSignatureDescription
resultawait handler.result() -> EventBlock until the workflow emits a StopEvent and return it.
stream_eventshandler.stream_events() -> AsyncIterator[Event]Async iterator yielding events written to the stream.
handler = await wf.run(prompt="Hello")

# Stream intermediate events while waiting for the result
async for event in handler.stream_events():
    print(event.event_type, event.to_dict())

result = await handler.result()

Context

Available as the first parameter of every step function. All methods are synchronous.

Two explicit namespaces are exposed alongside the smart-routing shortcuts on ctx itself:

FieldTypeDescription
stateStateNamespacePersistable workflow state. Routes through the same 4-tier dispatch as ctx.set (bytes / JSON / pickle / live-ref). Survives pause() / resume() and checkpoint stores.
sessionSessionNamespaceLive in-process references. Identity is preserved within a single workflow run. Values are deliberately excluded from snapshots.
MethodSignatureDescription
setctx.set(key: str, value: StateValue) -> NoneStore any Python value via 4-tier dispatch: bytes/bytearray are stored as raw binary; JSON-serializable types (dict, list, str, int, float, bool, None) are stored as JSON; picklable objects (Pydantic models, dataclasses, custom classes) are pickled automatically; unpicklable objects (DB connections, file handles, lambdas) are stored as a live in-process reference (same-process only, excluded from snapshots).
getctx.get(key: str) -> StateValue | NoneRetrieve a value by key, or None if absent. Returns the original type transparently: JSON values come back as their Python type, bytes come back as bytes, and both pickled and live-reference values round-trip to the original Python object.
set_bytesctx.set_bytes(key: str, data: bytes) -> NoneConvenience alias for storing raw binary data. Equivalent to ctx.set(key, data) when data is bytes.
get_bytesctx.get_bytes(key: str) -> bytes | NoneConvenience alias for retrieving raw binary data, or None if absent.
run_idctx.run_id() -> strReturn the UUID of the current workflow run.
send_eventctx.send_event(event: Event) -> NoneRoute an event to matching steps manually.
write_event_to_streamctx.write_event_to_stream(event: Event) -> NonePublish an event to the stream visible via WorkflowHandler.stream_events().

StateValue = Any — a type alias defined in the .pyi stubs indicating that any Python value is accepted. The first three storage tiers (bytes / JSON / pickle) persist through pause/resume/checkpoint; the fourth tier (live in-process reference) is same-process only.


StateNamespace

Namespace for persistable workflow state, accessed via ctx.state. Routes values through the same 4-tier dispatch as Context.set: bytes → JSON → pickle → live-object. The first three tiers survive pause() / resume() and checkpoint stores; the fourth is in-process only.

MethodSignatureDescription
setctx.state.set(key: str, value: StateValue) -> NoneStore a value under key using 4-tier dispatch.
getctx.state.get(key: str) -> StateValue | NoneRetrieve the value under key, deserialized to its original Python type, or None if absent.
set_bytesctx.state.set_bytes(key: str, data: bytes) -> NoneStore raw binary data under key.
get_bytesctx.state.get_bytes(key: str) -> bytes | NoneRetrieve raw binary data under key, or None if absent.

Dict protocol

Also supports __setitem__, __getitem__, and __contains__ so it can be used like a dict:

ctx.state.set("counter", 5)
ctx.state["counter"] = 5         # equivalent
count = ctx.state.get("counter")
print("counter" in ctx.state)    # True

SessionNamespace

Namespace for live in-process references, accessed via ctx.session. Identity is preserved within a single workflow run; values are deliberately excluded from snapshots, so they are the right place to stash database connections, open file handles, ML model objects, or any other resource that cannot (or should not) be serialized.

MethodSignatureDescription
setctx.session.set(key: str, value: Any) -> NoneStore a live reference to value under key.
getctx.session.get(key: str) -> Any | NoneRetrieve the live reference under key, or None if absent. The returned object is the same Python object that was stored.
hasctx.session.has(key: str) -> boolReturn whether key is currently set.
removectx.session.remove(key: str) -> NoneDrop the entry under key, if any.

Dict protocol

Also supports __setitem__, __getitem__, and __contains__:

import sqlite3
conn = sqlite3.connect(":memory:")
ctx.session.set("db", conn)
assert ctx.session.get("db") is conn   # same object, always

Pause/resume behavior

Session values are not serialized into workflow snapshots. What happens to them at pause time is governed by the workflow’s session_pause_policy (default "pickle_or_error"). The policy exists at the workflow level — see the workflow configuration for details.


BlazenState

Base class for typed workflow state with per-field context storage. Subclass with @dataclass to get automatic per-field serialization where each field is stored individually using its optimal tier.

from dataclasses import dataclass
from blazen import BlazenState

@dataclass
class MyState(BlazenState):
    input_path: str = ""
    conn: sqlite3.Connection | None = None

    class Meta:
        transient = {"conn"}
        store_by = {}

    def restore(self):
        if self.input_path:
            self.conn = sqlite3.connect(self.input_path)

Store and retrieve via context:

ctx.set("state", my_state)   # Stores each field individually
state = ctx.get("state")     # Reconstructs object, then calls restore()

Meta inner class

AttributeTypeDescription
transientClassVar[set[str]]Field names excluded from serialization. These fields are set to None in snapshots and recreated by restore().
store_byClassVar[dict[str, FieldStore]]Custom persistence strategy per field. Fields not listed use the default automatic tier (JSON / bytes / pickle).

Methods

MethodSignatureDescription
restoredef restore(self) -> NoneOverride to recreate transient fields after deserialization. Called automatically by ctx.get() once all serializable fields are populated. Transient fields are None at call time.

FieldStore

Structural protocol for custom per-field persistence. There is no FieldStore class to import — any object whose shape matches the two methods below can be used as a value in BlazenState.Meta.store_by to route specific fields through custom storage (e.g., S3, a database, Redis).

Implement the shape directly with your own class, or use CallbackFieldStore below for the common case.

MethodSignatureDescription
savedef save(self, key: str, value: Any, ctx: Context) -> NonePersist the field value under the given key.
loaddef load(self, key: str, ctx: Context) -> AnyLoad and return the field value for the given key.

CallbackFieldStore

Convenience implementation of the FieldStore structural protocol that delegates to plain callables. Importable directly from blazen.

from blazen import CallbackFieldStore

CallbackFieldStore(
    save_fn: Callable[[str, Any], None],
    load_fn: Callable[[str], Any],
)
ParameterTypeDescription
save_fnCallable[[str, Any], None]Called with (key, value) to persist a field.
load_fnCallable[[str], Any]Called with (key) to load a field value.

Also exposes save(key, value, ctx) and load(key, ctx) methods matching the FieldStore protocol — the ctx argument is accepted but not forwarded to your callbacks.

from blazen import CallbackFieldStore

store = CallbackFieldStore(
    save_fn=lambda k, v: s3.put_object(Bucket="b", Key=k, Body=v),
    load_fn=lambda k: s3.get_object(Bucket="b", Key=k)["Body"].read(),
)

This is a tiny ~15-line wrapper. If your store needs richer behavior (the ctx argument, async I/O, batching), implement the structural protocol directly with your own class.


CompletionModel

Use static constructor methods to create a model for a specific provider, then call complete() or stream() to generate responses.

API keys are resolved from ProviderOptions(api_key=...) when passed explicitly, or from the provider’s standard environment variable (e.g. OPENAI_API_KEY, ANTHROPIC_API_KEY) when options is omitted.

from blazen import CompletionModel, ProviderOptions

# Read key from OPENAI_API_KEY env var
model = CompletionModel.openai()

# Pass an explicit key
model = CompletionModel.anthropic(options=ProviderOptions(api_key="sk-ant-..."))

# Override the default model
model = CompletionModel.openrouter(
    options=ProviderOptions(api_key="sk-or-...", model="meta-llama/llama-3-70b")
)

Provider constructors

ConstructorSignature
openaiCompletionModel.openai(*, options: ProviderOptions = None)
anthropicCompletionModel.anthropic(*, options: ProviderOptions = None)
geminiCompletionModel.gemini(*, options: ProviderOptions = None)
azureCompletionModel.azure(*, options: AzureOptions)
openrouterCompletionModel.openrouter(*, options: ProviderOptions = None)
groqCompletionModel.groq(*, options: ProviderOptions = None)
togetherCompletionModel.together(*, options: ProviderOptions = None)
mistralCompletionModel.mistral(*, options: ProviderOptions = None)
deepseekCompletionModel.deepseek(*, options: ProviderOptions = None)
fireworksCompletionModel.fireworks(*, options: ProviderOptions = None)
perplexityCompletionModel.perplexity(*, options: ProviderOptions = None)
xaiCompletionModel.xai(*, options: ProviderOptions = None)
cohereCompletionModel.cohere(*, options: ProviderOptions = None)
bedrockCompletionModel.bedrock(*, options: BedrockOptions)
falCompletionModel.fal(*, options: FalOptions = None)

Properties

PropertyTypeDescription
.model_idstrThe string identifier of the active model.

complete()

response: CompletionResponse = await model.complete(
    messages: list[ChatMessage],
    options: CompletionOptions = None,
)

Returns a typed CompletionResponse (see below). Also supports dict-style access for backwards compatibility: response["content"].

opts = CompletionOptions(temperature=0.7, max_tokens=1024)
response = await model.complete(messages, opts)

stream()

await model.stream(
    messages: list[ChatMessage],
    on_chunk: Callable[[dict], Any],
    options: CompletionOptions = None,
)

Streams a chat completion, calling on_chunk for each chunk received. Each chunk is a dict with the following keys:

KeyTypeDescription
deltastr | NoneThe incremental text content for this chunk.
finish_reasonstr | NoneSet on the final chunk (e.g. "stop", "tool_calls").
tool_callslist[dict]Tool call fragments, if any.
def handle(chunk):
    if chunk["delta"]:
        print(chunk["delta"], end="")

await model.stream([ChatMessage.user("Tell me a story")], handle)

CompletionOptions

Options object for complete() and stream(). All fields are optional keyword arguments.

opts = CompletionOptions(
    temperature=0.7,
    max_tokens=1024,
    top_p=0.9,
    model="gpt-4o",
    tools=[{"name": "search", "description": "...", "parameters": {...}}],
    response_format={"type": "json_schema", "json_schema": {...}},
)
FieldTypeDescription
temperaturefloat | NoneSampling temperature (0.0-2.0).
max_tokensint | NoneMaximum tokens to generate.
top_pfloat | NoneNucleus sampling parameter (0.0-1.0).
modelstr | NoneModel override for this request.
toolsAny | NoneTool definitions for function calling.
response_formatdict | NoneJSON schema dict for structured output.

Middleware decorators

Each decorator returns a new CompletionModel wrapping the original with additional behaviour.

MethodSignatureDescription
with_retry.with_retry(config: RetryConfig | None = None)Automatic retry with exponential backoff on transient failures.
with_cache.with_cache(config: CacheConfig | None = None)In-memory response cache for identical non-streaming requests.
with_fallbackCompletionModel.with_fallback(models: list[CompletionModel])Static method. Tries providers in order; falls back on transient errors.

RetryConfig and CacheConfig are typed configuration objects:

RetryConfig(*, max_retries=3, initial_delay_ms=1000, max_delay_ms=30000, honor_retry_after=True, jitter=True)
CacheConfig(*, strategy: CacheStrategy | None = None, ttl_seconds=300, max_entries=1000)
# Chain decorators with typed config objects
model = (
    CompletionModel.openai()
    .with_cache(CacheConfig(ttl_seconds=600))
    .with_retry(RetryConfig(max_retries=5))
)

# Or use defaults (both config args are optional)
model = CompletionModel.openai().with_cache().with_retry()

# Fallback across providers
primary = CompletionModel.openai()
backup = CompletionModel.anthropic()
model = CompletionModel.with_fallback([primary, backup])

CompletionResponse

Returned by model.complete(). Supports both attribute access and dict-style access.

PropertyTypeDescription
.contentstr | NoneThe generated text.
.modelstrModel name used for the completion.
.finish_reasonstr | NoneWhy generation stopped ("stop", "tool_calls", etc.).
.tool_callslist[ToolCall]Tool calls requested by the model.
.usageTokenUsage | NoneToken usage statistics.
.costfloat | NoneEstimated cost in USD for this request.
.timingRequestTiming | NoneTiming metadata for the request.
.imageslist[dict]Image outputs (provider-dependent).
.audiolist[dict]Audio outputs (provider-dependent).
.videoslist[dict]Video outputs (provider-dependent).
response = await model.complete([ChatMessage.user("Hello")])
print(response.content)        # attribute access
print(response["content"])     # dict-style access (backwards compatible)
print(response.cost)           # e.g. 0.0023
print(response.timing)         # RequestTiming or None
print(response.keys())         # list of available keys

RequestTiming

Timing metadata attached to a CompletionResponse. All fields are optional since not every provider reports timing data.

PropertyTypeDescription
.queue_msint | NoneTime spent waiting in the provider’s queue.
.execution_msint | NoneTime spent executing the request.
.total_msint | NoneTotal round-trip time.
response = await model.complete([ChatMessage.user("Hello")])
if response.timing:
    print(f"Total: {response.timing.total_ms}ms")
    print(f"Queue: {response.timing.queue_ms}ms")
    print(f"Execution: {response.timing.execution_ms}ms")

ChatMessage

A single message in a chat conversation.

msg = ChatMessage(role="user", content="Hello, world!")
# role is optional, defaults to "user"
msg = ChatMessage(content="Hello!")

Static constructors

MethodDescription
ChatMessage.system(content: str)Create a system message.
ChatMessage.user(content: str)Create a user message.
ChatMessage.assistant(content: str)Create an assistant message.
ChatMessage.tool(content: str)Create a tool result message.
ChatMessage.user_image_url(*, text, url, media_type=None)Create a user message with text and an image URL.
ChatMessage.user_image_base64(*, text, data, media_type)Create a user message with text and a base64 image.
ChatMessage.user_parts(*, parts: list[ContentPart])Create a user message with multiple content parts.

Properties

PropertyTypeDescription
.rolestrOne of "system", "user", "assistant", "tool".
.contentstr | NoneThe message text.
.tool_call_idstr | NoneFor role="tool" messages, the call ID this message responds to. None otherwise.
.namestr | NoneFor role="tool" messages, the tool/function name (set by some providers). None otherwise.
.tool_resultToolOutput | NoneThe structured tool-result payload. None for non-tool messages or when the tool returned a plain string (in which case the string lives in .content instead). See ToolOutput.

Role

Constants for message roles.

from blazen import Role

Role.SYSTEM     # "system"
Role.USER       # "user"
Role.ASSISTANT  # "assistant"
Role.TOOL       # "tool"

ContentPart

Build multimodal content parts for use with ChatMessage.user_parts().

Factory MethodDescription
ContentPart.text(*, text=...)Create a text content part.
ContentPart.image_url(*, url=..., media_type=...)Create an image URL content part.
ContentPart.image_base64(*, data=..., media_type=...)Create a base64 image content part.
msg = ChatMessage.user_parts(parts=[
    ContentPart.text(text="What's in this image?"),
    ContentPart.image_url(url="https://example.com/photo.jpg", media_type=MediaType.JPEG),
])

ToolCall

A tool invocation requested by the model.

PropertyTypeDescription
.idstrUnique identifier for the tool call.
.namestrName of the tool to invoke.
.argumentsdict[str, Any]Parsed arguments for the tool call.

Supports dict-style access: tool_call["name"].


ToolOutput

Two-channel return value from a tool handler. Tool results have two distinct audiences. The caller (your Python code) wants the full structured data; the LLM, on the next turn, may need a different shape — sometimes shorter, sometimes provider-specific. ToolOutput carries both channels: data is what the caller sees, llm_override is what the LLM sees.

import blazen

out = blazen.ToolOutput(data={"items": [1, 2, 3]})
out.data            # {"items": [1, 2, 3]}
out.llm_override    # None

# Explicit override: the caller still gets the full dict, but the LLM
# sees a short summary on its next turn.
out2 = blazen.ToolOutput(
    data={"items": [1, 2, 3], "_debug": "..."},
    llm_override=blazen.LlmPayload.text("Found 3 items."),
)

Constructor

ArgumentTypeDescription
dataAnyThe structured value the caller sees programmatically. Dict, list, scalar, or string — anything JSON-serializable.
llm_overrideLlmPayload | NoneOptional override for what the LLM sees on the next turn. None means each provider applies its default conversion from data (see Per-provider behavior).

Properties

PropertyTypeDescription
.dataAnyThe user-visible structured payload. Re-materialized as a Python value on each access.
.llm_overrideLlmPayload | NoneThe LLM-side override, if set.

A tool handler can return a bare dict/list/str/scalar, in which case Blazen auto-wraps it as ToolOutput(data=value, llm_override=None). Returning a ToolOutput instance directly is only required when you want to set llm_override.


LlmPayload

Provider-aware override for what the LLM sees as a tool result. Constructed only via the classmethod factories — there is no public __init__.

import blazen

blazen.LlmPayload.text("Found 3 results.")
blazen.LlmPayload.json({"items": [1, 2, 3]})
blazen.LlmPayload.provider_raw(
    provider="anthropic",
    value=[{"type": "text", "text": "..."}],
)

Variants

Variant kindConstructorBehavior
"text"LlmPayload.text(text: str)Plain text. Works on every provider.
"json"LlmPayload.json(value: Any)Structured JSON. Anthropic and Gemini natively consume the structure; OpenAI-family stringifies once at the wire boundary.
"provider_raw"LlmPayload.provider_raw(*, provider: str, value: Any)Provider-specific escape hatch. Only the named provider sees value; every other provider falls back to converting ToolOutput.data with its default. provider is one of "openai", "openai_compat", "azure", "anthropic", "gemini", "responses", "fal".

Properties

PropertyTypeDescription
.kindstrThe variant tag: "text", "json", "parts", or "provider_raw".
.text_valuestr | NoneThe text body for Text payloads. None for other variants.
.valueAny | NoneThe structured value for Json and ProviderRaw payloads. None otherwise.
.providerstr | NoneThe provider name for ProviderRaw payloads. None otherwise.

Per-provider behavior

When a tool returns structured data and no llm_override, each provider sends a sensible default to the LLM:

  • OpenAI / OpenAI-compat / Azure / Responses / Fal: data is JSON-stringified into the content field of the tool message.
  • Anthropic: structured data becomes [{"type": "text", "text": <stringified-json>}] inside tool_result.content.
  • Gemini: structured object data is passed natively as functionResponse.response. Scalar values (numbers, booleans, strings) are wrapped as {"result": <scalar>}.

Plain strings always pass through unchanged on every provider — a tool returning "hello" results in the LLM seeing hello, not "hello".

When an llm_override is set:

  • LlmPayload.text(...) is universally accepted.
  • LlmPayload.json(...) is consumed natively by Anthropic and Gemini and stringified by the OpenAI family.
  • LlmPayload.provider_raw(provider="X", value=V) only takes effect when the active provider matches X. For every other provider the dispatcher falls back to converting ToolOutput.data with the provider’s default rule above.

TokenUsage

Token usage statistics for a completion.

PropertyTypeDescription
.prompt_tokensintTokens in the prompt.
.completion_tokensintTokens in the completion.
.total_tokensintTotal tokens used.

Supports dict-style access: usage["total_tokens"].


Agent System

The agent system provides an agentic tool-execution loop on top of CompletionModel. Define tools with ToolDef, then call run_agent to let the model iteratively call tools until it produces a final answer.

ToolDef

Define a tool that the model can invoke during an agent run.

ToolDef(
    *,
    name: str,
    description: str,
    parameters: dict[str, Any],
    handler: Callable | AsyncCallable,
)
ParameterTypeDescription
namestrUnique tool name exposed to the model.
descriptionstrDescription the model uses to decide when to call this tool.
parametersdictJSON Schema describing the tool’s input parameters.
handlerCallableFunction called when the model invokes the tool. Can be sync or async. Receives a dict[str, Any] of arguments and returns either a JSON-serializable value (auto-wrapped into ToolOutput(data=value)) or an explicit ToolOutput when you want to override what the LLM sees.

A handler may return:

  1. A bare dict, list, scalar, or str — Blazen wraps it as ToolOutput(data=value, llm_override=None) and each provider applies its default conversion.
  2. A ToolOutput — full control. Set llm_override to send the LLM a different shape than the structured data your code reads back from messages[-1].tool_result.
import blazen
from blazen import ToolDef

# 1. Sync handler — return a bare dict, auto-wrapped.
tool = ToolDef(
    name="search",
    description="Search the web for a query",
    parameters={
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "Search query"}
        },
        "required": ["query"],
    },
    handler=lambda args: {"results": ["result1", "result2"]},
)

# 2. Async handler — return a bare dict, auto-wrapped.
async def fetch_weather(args):
    data = await weather_api(args["city"])
    return {"temperature": data.temp, "conditions": data.conditions}

weather_tool = ToolDef(
    name="weather",
    description="Get current weather for a city",
    parameters={
        "type": "object",
        "properties": {
            "city": {"type": "string"}
        },
        "required": ["city"],
    },
    handler=fetch_weather,
)

# 3. Explicit ToolOutput — LLM sees a short summary, but the caller's
#    `messages[-1].tool_result.data` still has the full structured payload.
def search_with_summary(args):
    return blazen.ToolOutput(
        data={"items": [1, 2, 3], "raw_response": "..."},
        llm_override=blazen.LlmPayload.text("Found 3 items."),
    )

search_tool = ToolDef(
    name="search",
    description="Search for items.",
    parameters={
        "type": "object",
        "properties": {"q": {"type": "string"}},
        "required": ["q"],
    },
    handler=search_with_summary,
)

After a tool runs, the result message in the conversation history exposes both channels:

result = await run_agent(model, messages, tools=[search_tool])
last = result.messages[-1]
last.role          # "tool"
last.tool_call_id  # the call ID this responded to
last.tool_result   # ToolOutput | None — None if the handler returned a plain string
last.tool_result.data          # full caller-visible payload
last.tool_result.llm_override  # the LlmPayload sent to the LLM, if any

run_agent

Run an agentic tool-execution loop. The model is called repeatedly, executing any requested tool calls and feeding results back, until the model stops calling tools or max_iterations is reached.

result: AgentResult = await run_agent(
    model: CompletionModel,
    messages: list[ChatMessage],
    *,
    tools: list[ToolDef],
    max_iterations: int = 10,
    system_prompt: str = None,
    temperature: float = None,
    max_tokens: int = None,
    add_finish_tool: bool = False,
)
ParameterTypeDefaultDescription
modelCompletionModelrequiredThe model to use for completions.
messageslist[ChatMessage]requiredInitial conversation messages.
toolslist[ToolDef]requiredTools available to the model.
max_iterationsint10Maximum number of tool-call rounds before stopping.
system_promptstr | NoneNoneOptional system prompt prepended to messages.
temperaturefloat | NoneNoneSampling temperature override.
max_tokensint | NoneNoneMax tokens per completion call.
add_finish_toolboolFalseIf True, adds a built-in “finish” tool the model can call to explicitly end the loop.
model = CompletionModel.openai()
messages = [ChatMessage.user("What's the weather in Paris and London?")]

result = await run_agent(model, messages, tools=[weather_tool])
print(result.response.content)  # Final answer
print(result.iterations)        # Number of tool-call rounds
print(result.total_cost)        # Accumulated cost across all iterations

AgentResult

Returned by run_agent.

PropertyTypeDescription
.responseCompletionResponseThe final completion response from the model.
.messageslist[ChatMessage]The full conversation history including all tool calls and results.
.iterationsintNumber of tool-call iterations executed.
.total_costfloat | NoneTotal cost in USD accumulated across all iterations.

MediaType

Constants for common MIME types. Useful when constructing ContentPart or compute requests.

from blazen import MediaType

MediaType.PNG   # "image/png"
MediaType.MP4   # "video/mp4"
MediaType.MP3   # "audio/mpeg"
MediaType.GLB   # "model/gltf-binary"

Image types

ConstantMIME Type
MediaType.PNGimage/png
MediaType.JPEGimage/jpeg
MediaType.WEBPimage/webp
MediaType.GIFimage/gif
MediaType.SVGimage/svg+xml
MediaType.BMPimage/bmp
MediaType.TIFFimage/tiff
MediaType.AVIFimage/avif

Video types

ConstantMIME Type
MediaType.MP4video/mp4
MediaType.WEBMvideo/webm
MediaType.MOVvideo/quicktime

Audio types

ConstantMIME Type
MediaType.MP3audio/mpeg
MediaType.WAVaudio/wav
MediaType.OGGaudio/ogg
MediaType.FLACaudio/flac
MediaType.AACaudio/aac
MediaType.M4Aaudio/mp4

3D model types

ConstantMIME Type
MediaType.GLBmodel/gltf-binary
MediaType.GLTFmodel/gltf+json
MediaType.OBJmodel/obj
MediaType.USDZmodel/vnd.usdz+zip
MediaType.FBXmodel/fbx
MediaType.STLmodel/stl

Document types

ConstantMIME Type
MediaType.PDFapplication/pdf

MediaSource alias

MediaSource is a module-level alias for ImageSource, exported from blazen so callers writing media-source-typed code can use the more descriptive name:

from blazen import MediaSource  # alias of ImageSource

src = MediaSource.from_path("./photo.png")

The two names refer to the same class — isinstance(src, ImageSource) is True.


Content Subsystem

A pluggable registry for multimodal blobs (images, audio, video, documents, 3D, CAD) that lets tool handlers emit and accept opaque handles instead of inline bytes. Handles are resolved to a wire-renderable MediaSource only when the model actually needs the content.

ContentKind

Taxonomy enum of multimodal content kinds. Used by tool-input declarations and ContentStore routing.

from blazen import ContentKind

ContentKind.Image
ContentKind.from_str("three_d_model")
ContentKind.from_mime("image/png")
ContentKind.from_extension("glb")

Members

MemberWire name (name_str)
ContentKind.Image"image"
ContentKind.Audio"audio"
ContentKind.Video"video"
ContentKind.Document"document"
ContentKind.ThreeDModel"three_d_model"
ContentKind.Cad"cad"
ContentKind.Archive"archive"
ContentKind.Font"font"
ContentKind.Code"code"
ContentKind.Data"data"
ContentKind.Other"other"

Methods

MethodDescription
kind.name_strProperty returning the canonical wire name (matches the JSON / serde tag).
ContentKind.from_str(value: str)Parse a kind from its canonical wire name. Unknown names raise ValueError.
ContentKind.from_mime(mime: str)Map a MIME type to a kind. Unknown MIME types resolve to ContentKind.Other.
ContentKind.from_extension(ext: str)Map a filename extension (no leading dot, case-insensitive) to a kind. Unknown extensions resolve to ContentKind.Other.

ContentHandle

Stable, opaque reference to content registered with a ContentStore. Most callers obtain handles from ContentStore.put rather than constructing them directly.

ContentHandle(
    id: str,
    kind: ContentKind,
    *,
    mime_type: str | None = None,
    byte_size: int | None = None,
    display_name: str | None = None,
)

Properties (all read-only)

PropertyTypeDescription
.idstrOpaque, store-defined identifier. Treat as a black box.
.kindContentKindWhat kind of content this handle refers to.
.mime_typestr | NoneMIME type if known.
.byte_sizeint | NoneByte size if known.
.display_namestr | NoneHuman-readable display name (e.g. original filename) if known.

ContentStore

A pluggable content registry that backs handle resolution. Construct via one of the static factories. All instance methods return awaitables.

from blazen import ContentStore, ContentKind

store = ContentStore.in_memory()
handle = await store.put(b"...", kind=ContentKind.Image, mime_type="image/png")
source = await store.resolve(handle)   # {"type": "base64", "data": "..."}

Static factories

FactoryReturns
ContentStore.in_memory()ContentStore
ContentStore.local_file(path: str | os.PathLike | pathlib.Path)ContentStore
ContentStore.openai_files(api_key: str, *, base_url: str | None = None)ContentStore
ContentStore.anthropic_files(api_key: str, *, base_url: str | None = None)ContentStore
ContentStore.gemini_files(api_key: str, *, base_url: str | None = None)ContentStore
ContentStore.fal_storage(api_key: str, *, base_url: str | None = None)ContentStore
ContentStore.custom(*, put, resolve, fetch_bytes, fetch_stream=None, delete=None, name="custom")ContentStore

Instance methods (all awaitable)

MethodDescription
await store.put(body, *, kind=None, mime_type=None, display_name=None, byte_size=None)Persist content and return a freshly-issued ContentHandle. body may be bytes, a URL str, or a pathlib.Path. Keyword arguments are optional hints; the store may auto-detect kind / MIME from the bytes.
await store.resolve(handle)Resolve a handle to a wire-renderable media source. Returns a serialized MediaSource dict, e.g. {"type": "url", "url": "..."} or {"type": "base64", "data": "..."}.
await store.fetch_bytes(handle)Fetch raw bytes for a handle. Stores that hold only references (URL / provider-file / local-path) may raise UnsupportedError.
await store.metadata(handle)Cheap metadata lookup with no byte materialization. Returns {"kind": ..., "mime_type": ..., "byte_size": ..., "display_name": ...}.
await store.delete(handle)Optional cleanup hook. Default backends drop the entry; provider backends issue a delete call to the upstream API.

Subclassing ContentStore

ContentStore is subclassable from Python. Override the methods your backend needs; the framework wraps your subclass in a Rust adapter (PyHostContentStore) that dispatches into your Python coroutines.

from blazen import ContentStore, ContentHandle, ContentKind

class S3ContentStore(ContentStore):
    def __init__(self, bucket: str):
        super().__init__()
        self.bucket = bucket

    async def put(self, body, hint) -> ContentHandle:
        ...

    async def resolve(self, handle) -> dict:
        ...

    async def fetch_bytes(self, handle) -> bytes:
        ...

    # Optional overrides:
    async def fetch_stream(self, handle): ...
    async def delete(self, handle) -> None: ...

Subclasses MUST override put, resolve, fetch_bytes. The base-class default impls raise NotImplementedError so any missing override is an immediate clear error rather than silent infinite recursion via super().

ContentStore.custom(...)

Callback-based factory. Direct Python mirror of Rust CustomContentStore::builder.

ContentStore.custom(
    *,
    put: Callable[..., Awaitable[ContentHandle]],
    resolve: Callable[[ContentHandle], Awaitable[dict]],
    fetch_bytes: Callable[[ContentHandle], Awaitable[bytes]],
    fetch_stream: Callable[[ContentHandle], Awaitable[bytes]] | None = None,
    delete: Callable[[ContentHandle], Awaitable[None]] | None = None,
    name: str = "custom",
) -> ContentStore

put, resolve, fetch_bytes are required. fetch_stream and delete are optional. The body argument to put arrives as a dict shaped like {"type": "bytes", "data": [...]} / {"type": "url", "url": "..."} / {"type": "local_path", "path": "..."} / {"type": "provider_file", "provider": "openai", "id": "..."} / {"type": "stream", "stream": <AsyncByteIter>, "size_hint": int | None}. The hint is a dict with optional mime_type / kind_hint / display_name / byte_size.

put must return a ContentHandle. resolve returns a serialized MediaSource dict (e.g. {"type": "url", "url": "..."}). fetch_bytes returns raw bytes. fetch_stream may return either bytes (legacy, single-chunk) or an AsyncIterator[bytes] for true chunk-by-chunk streaming.


Built-in stores

FactoryPurposeNotes
ContentStore.in_memory()Ephemeral / test workloads.Holds bytes in process memory; resolves to base64.
ContentStore.local_file(path)Filesystem-backed persistence rooted at path.Directory is created recursively if missing. Resolves to a local path / URL.
ContentStore.openai_files(api_key, *, base_url=None)OpenAI Files API.Resolves to a provider file reference; fetch_bytes may not be supported.
ContentStore.anthropic_files(api_key, *, base_url=None)Anthropic Files API (beta).Provider-file backed.
ContentStore.gemini_files(api_key, *, base_url=None)Google Gemini Files API.Provider-file backed.
ContentStore.fal_storage(api_key, *, base_url=None)fal.ai object storage.Resolves to URL references hosted on fal.
ContentStore.custom(...)User-defined backend via async callables (see above).Required: put, resolve, fetch_bytes. Optional: fetch_stream, delete, name.

Tool-input schema helpers

Top-level functions that return JSON-Schema-shaped dict fragments declaring a single required content-reference input. Each fragment carries an x-blazen-content-ref extension keyed to the appropriate ContentKind.

HelperDescription
image_input(name, description)Schema declaring a single required image input.
audio_input(name, description)Schema declaring a single required audio input.
video_input(name, description)Schema declaring a single required video input.
file_input(name, description)Schema declaring a single required document / file input.
three_d_input(name, description)Schema declaring a single required 3D-model input.
cad_input(name, description)Schema declaring a single required CAD-file input.
from blazen import image_input

schema = image_input("photo", "The photo to analyze")
# {
#   "type": "object",
#   "properties": {
#     "photo": {
#       "type": "string",
#       "description": "The photo to analyze",
#       "x-blazen-content-ref": {"kind": "image"}
#     }
#   },
#   "required": ["photo"]
# }

Generic schema builders

For schemas that need extra companion fields alongside the content reference, or for embedding a content-ref property inside a larger object schema you assemble yourself.

HelperDescription
content_ref_property(kind: ContentKind, description: str)Build a single-property fragment of the form {"type": "string", "description": ..., "x-blazen-content-ref": {"kind": ...}}, ready to embed inside an object schema’s properties map.
content_ref_required_object(name: str, kind: ContentKind, description: str, *, extra_properties: dict | None = None)Build a complete object-typed schema declaring a single required content-reference input plus optional companion fields merged from extra_properties.
from blazen import ContentKind, content_ref_property, content_ref_required_object

# Single property fragment for manual assembly.
prop = content_ref_property(ContentKind.Image, "Source frame")
schema = {
    "type": "object",
    "properties": {"frame": prop, "threshold": {"type": "number"}},
    "required": ["frame"],
}

# Or the full-object form with companion fields merged in.
schema = content_ref_required_object(
    "frame",
    ContentKind.Image,
    "Source frame",
    extra_properties={"threshold": {"type": "number"}},
)

How resolution works

The x-blazen-content-ref JSON Schema extension is invisible to the model and to the upstream provider — they see a plain string property. When the model emits {"photo": "blazen_a1b2c3..."}, Blazen’s resolver intercepts the tool-call arguments before invoking the handler, looks up each content-ref string in the active ContentStore, and substitutes the bare id with a typed content dict of the form:

{
    "kind": "image",
    "handle_id": "blazen_a1b2c3...",
    "mime_type": "image/png",
    "byte_size": 48213,
    "display_name": "frame.png",
    "source": {"type": "url", "url": "..."},   # serialized MediaSource
}

The tool handler receives the substituted shape and never has to call store.resolve itself. Handles produced by a tool’s return value flow back through the same store, so subsequent turns can reference them by id without re-uploading bytes.


Compute Request Types

Compute requests define jobs for media generation and processing. All requests are plain dict objects.

ImageRequest

Generate images from a text prompt. Passed as a plain dict.

{
    "prompt": str,                # required
    "negative_prompt": str,       # optional
    "width": int,                 # optional
    "height": int,                # optional
    "num_images": int,            # optional
    "model": str,                 # optional
}
KeyTypeRequiredDescription
"prompt"stryesText description of the image to generate.
"negative_prompt"strnoWhat to avoid in the generated image.
"width"intnoImage width in pixels.
"height"intnoImage height in pixels.
"num_images"intnoNumber of images to generate.
"model"strnoSpecific model to use (provider-dependent).
req = {"prompt": "a cat in space", "width": 1024, "height": 1024, "num_images": 2}

UpscaleRequest

Upscale an existing image to a higher resolution. Passed as a plain dict.

{
    "image_url": str,   # required
    "scale": float,     # required
    "model": str,       # optional
}
KeyTypeRequiredDescription
"image_url"stryesURL of the image to upscale.
"scale"floatyesUpscale factor (e.g. 2.0, 4.0).
"model"strnoSpecific model to use.
req = {"image_url": "https://example.com/photo.jpg", "scale": 4.0}

VideoRequest

Generate a video from a text prompt, optionally with an input image. Passed as a plain dict.

{
    "prompt": str,                # required
    "image_url": str,             # optional
    "duration_seconds": float,    # optional
    "negative_prompt": str,       # optional
    "width": int,                 # optional
    "height": int,                # optional
    "model": str,                 # optional
}
KeyTypeRequiredDescription
"prompt"stryesText description of the video to generate.
"image_url"strnoOptional starting image to animate.
"duration_seconds"floatnoDesired video length in seconds.
"negative_prompt"strnoWhat to avoid in the generated video.
"width"intnoVideo width in pixels.
"height"intnoVideo height in pixels.
"model"strnoSpecific model to use.
req = {"prompt": "a sunset timelapse", "duration_seconds": 5.0}
req = {"prompt": "animate this scene", "image_url": "https://example.com/frame.jpg"}

SpeechRequest

Generate speech audio from text. Passed as a plain dict.

{
    "text": str,          # required
    "voice": str,         # optional
    "voice_url": str,     # optional
    "language": str,      # optional
    "speed": float,       # optional
    "model": str,         # optional
}
KeyTypeRequiredDescription
"text"stryesThe text to convert to speech.
"voice"strnoVoice preset name (e.g. "alloy", "nova").
"voice_url"strnoURL to a custom voice sample for cloning.
"language"strnoLanguage code (e.g. "en", "fr").
"speed"floatnoPlayback speed multiplier (e.g. 1.2 for 20% faster).
"model"strnoSpecific model to use.
req = {"text": "Hello world", "voice": "alloy", "speed": 1.2}

MusicRequest

Generate music or sound effects from a text prompt. Passed as a plain dict.

{
    "prompt": str,                # required
    "duration_seconds": float,    # optional
    "model": str,                 # optional
}
KeyTypeRequiredDescription
"prompt"stryesDescription of the music to generate.
"duration_seconds"floatnoDesired duration in seconds.
"model"strnoSpecific model to use.
req = {"prompt": "upbeat jazz", "duration_seconds": 30.0}

TranscriptionRequest

Transcribe audio to text. Passed as a plain dict.

{
    "audio_url": str,    # required
    "language": str,     # optional
    "diarize": bool,     # optional
    "model": str,        # optional
}
KeyTypeRequiredDescription
"audio_url"stryesURL of the audio file to transcribe.
"language"strnoLanguage hint (e.g. "en").
"diarize"boolnoIf True, identify and label different speakers.
"model"strnoSpecific model to use.
req = {"audio_url": "https://example.com/audio.mp3", "language": "en", "diarize": True}

ThreeDRequest

Generate a 3D model from a text prompt or image. Passed as a plain dict.

{
    "prompt": str,       # optional (provide at least one of prompt or image_url)
    "image_url": str,    # optional
    "format": str,       # optional
    "model": str,        # optional
}
KeyTypeRequiredDescription
"prompt"strnoText description of the 3D object to generate.
"image_url"strnoImage to use as reference for 3D generation.
"format"strnoOutput format (e.g. "glb", "obj", "usdz").
"model"strnoSpecific model to use.

Provide at least one of "prompt" or "image_url".

req = {"prompt": "a 3D cat", "format": "glb"}
req = {"image_url": "https://example.com/photo.jpg", "format": "obj"}

StreamChunk

A typed object received by the on_chunk callback during streaming. Replaces the raw dict interface while remaining backwards-compatible via chunk["key"] access.

PropertyTypeDescription
.deltastr | NoneIncremental text content.
.finish_reasonstr | NonePresent only on the final chunk ("stop", "tool_calls", etc.).
.tool_callslist[ToolCall]Tool invocations completed in this chunk.
async def on_chunk(chunk):
    # Attribute access (preferred)
    if chunk.delta:
        print(chunk.delta, end="")

    # Dict-style access (backwards compatible)
    if chunk["finish_reason"]:
        print(f"\n[done: {chunk['finish_reason']}]")

EmbeddingModel

Generate vector embeddings from text. Created via static constructor methods, similar to CompletionModel. Keys are read from environment variables (OPENAI_API_KEY, etc.) when options is omitted, or can be passed explicitly via ProviderOptions(api_key=...).

from blazen import EmbeddingModel, ProviderOptions

model = EmbeddingModel.openai()
model = EmbeddingModel.openai(model="text-embedding-3-large", dimensions=3072)
model = EmbeddingModel.openai(options=ProviderOptions(api_key="sk-..."))
model = EmbeddingModel.together()
model = EmbeddingModel.cohere()
model = EmbeddingModel.fireworks()

Provider constructors

ConstructorSignature
openaiEmbeddingModel.openai(*, options: ProviderOptions = None, model: str = None, dimensions: int = None)
togetherEmbeddingModel.together(*, options: ProviderOptions = None)
cohereEmbeddingModel.cohere(*, options: ProviderOptions = None)
fireworksEmbeddingModel.fireworks(*, options: ProviderOptions = None)

Properties

PropertyTypeDescription
.model_idstrThe model identifier.
.dimensionsintOutput vector dimensionality.

embed()

response: EmbeddingResponse = await model.embed(texts: list[str])

Returns an EmbeddingResponse with one vector per input text.


EmbeddingResponse

Returned by EmbeddingModel.embed().

PropertyTypeDescription
.embeddingslist[list[float]]One vector per input text.
.modelstrModel that produced the embeddings.
.usageTokenUsage | NoneToken usage statistics.
.costfloat | NoneEstimated cost in USD.
.timingRequestTiming | NoneRequest timing breakdown.
response = await model.embed(["Hello", "World"])
print(len(response.embeddings))       # 2
print(len(response.embeddings[0]))    # 1536
print(response.model)                 # "text-embedding-3-small"
print(response.cost)                  # e.g. 0.0001

Token Estimation

Lightweight token counting functions that work without external data files. Uses a heuristic (~3.5 characters per token) suitable for budget checks.

estimate_tokens()

from blazen import estimate_tokens

count = estimate_tokens("Hello, world!")          # 4
count = estimate_tokens("Hello, world!", 32000)   # same, with custom context size
ParameterTypeDefaultDescription
textstrrequiredThe text to estimate.
context_sizeint128000Context window size hint.

count_message_tokens()

from blazen import count_message_tokens, ChatMessage

count = count_message_tokens([
    ChatMessage.system("You are helpful."),
    ChatMessage.user("Hello!"),
])

Includes per-message overhead (role markers, separators) in addition to content tokens.

ParameterTypeDefaultDescription
messageslist[ChatMessage]requiredMessages to count.
context_sizeint128000Context window size hint.

Subclassable Providers

CompletionModel, EmbeddingModel, and Transcription can be subclassed to implement custom providers. Override the relevant methods and the framework will dispatch to your implementation.

CompletionModel

from blazen import CompletionModel, ChatMessage

class MyLLM(CompletionModel):
    def __init__(self):
        super().__init__(model_id="my-llm")

    async def complete(self, messages, options=None):
        # Your inference logic here
        return {"content": "Hello from my custom model"}

    async def stream(self, messages, on_chunk, options=None):
        on_chunk({"delta": "Hello", "finish_reason": None, "tool_calls": []})
        on_chunk({"delta": None, "finish_reason": "stop", "tool_calls": []})

model = MyLLM()
response = await model.complete([ChatMessage.user("Hi")])

EmbeddingModel

from blazen import EmbeddingModel

class MyEmbedder(EmbeddingModel):
    def __init__(self):
        super().__init__(model_id="my-embedder", dimensions=128)

    async def embed(self, texts):
        return {"embeddings": [[0.1] * 128 for _ in texts], "model": "my-embedder"}

Transcription

from blazen import Transcription

class MyTranscriber(Transcription):
    def __init__(self):
        super().__init__(provider_id="my-stt")

    async def transcribe(self, request):
        return {"text": "transcribed text", "segments": []}

Per-Capability Provider Classes

Seven provider base classes let you implement a single compute capability without dealing with the full ComputeProvider interface. Subclass and override the relevant methods.

ClassMethods to OverrideRust Trait
TTSProvidertext_to_speech(request)AudioGeneration
MusicProvidergenerate_music(request), generate_sfx(request)AudioGeneration
ImageProvidergenerate_image(request), upscale_image(request)ImageGeneration
VideoProvidertext_to_video(request), image_to_video(request)VideoGeneration
ThreeDProvidergenerate_3d(request)ThreeDGeneration
BackgroundRemovalProviderremove_background(request)BackgroundRemoval
VoiceProviderclone_voice(request), list_voices(), delete_voice(voice)VoiceCloning

Constructor

All provider classes share the same constructor signature:

TTSProvider(
    *,
    provider_id: str,
    base_url: str | None = None,
    pricing: ModelPricing | None = None,
    memory_estimate_bytes: int | None = None,
)
ParameterTypeDescription
provider_idstrIdentifier for the provider instance.
base_urlstr | NoneOptional base URL for the provider API.
pricingModelPricing | NoneOptional pricing info for cost tracking.
memory_estimate_bytesint | NoneEstimated memory footprint (host RAM if on CPU, GPU VRAM otherwise) for ModelManager integration.

Example

from blazen import TTSProvider

class ElevenLabsTTS(TTSProvider):
    def __init__(self, api_key: str):
        super().__init__(provider_id="elevenlabs")
        self.api_key = api_key

    async def text_to_speech(self, request):
        # Call ElevenLabs API with self.api_key
        return {"audio": audio_bytes, "format": "mp3"}

tts = ElevenLabsTTS(api_key="sk-...")
result = await tts.text_to_speech({"text": "Hello world", "voice": "alice"})

MemoryBackend

Base class for custom memory storage backends. Subclass to implement persistence backed by Postgres, SQLite, DynamoDB, or any other store.

from blazen import MemoryBackend

class PostgresBackend(MemoryBackend):
    async def put(self, entry):
        # Insert or update entry in Postgres
        ...

    async def get(self, id):
        # Retrieve entry by id
        ...

    async def delete(self, id):
        # Delete entry, return True if it existed
        ...

    async def list(self):
        # Return all entries
        ...

    async def len(self):
        # Return count of entries
        ...

    async def search_by_bands(self, bands, limit):
        # Return candidates sharing LSH bands with the query
        ...

Methods to Override

MethodSignatureDescription
putasync def put(self, entry) -> NoneInsert or update a stored entry.
getasync def get(self, id: str) -> dict | NoneRetrieve a stored entry by id.
deleteasync def delete(self, id: str) -> boolDelete an entry by id. Returns True if it existed.
listasync def list(self) -> list[dict]Return all stored entries.
lenasync def len(self) -> intReturn the number of stored entries.
search_by_bandsasync def search_by_bands(self, bands, limit) -> list[dict]Return candidate entries sharing at least one LSH band.

ModelManager

Per-pool memory budget-aware model manager with LRU eviction. Tracks registered local models and their estimated memory footprint (host RAM if the model runs on CPU, GPU VRAM otherwise) in distinct pools — one for CPU RAM and one per GPU device. When loading a model that would exceed its pool’s budget, the least-recently-used loaded model in the same pool is unloaded first; models in different pools never evict each other.

ModelManager is a memory budget bookkeeper, not a performance scheduler. It answers “will this fit?” — not “will this run fast?”. Whether a 70B model loaded on CPU is useful at 1-3 tok/s is a workload-choice question the manager intentionally does not answer.

Constructor

from blazen import ModelManager

# Common case: separate CPU RAM and GPU VRAM budgets in GB.
manager = ModelManager(cpu_ram_gb=100, gpu_vram_gb=24)

# Explicit per-pool budgets (bytes). Pool labels: "cpu", "gpu", or "gpu:N".
manager = ModelManager(pool_budgets={
    "cpu": 100 * 1024**3,
    "gpu:0": 24 * 1024**3,
})

# Zero-arg: defaults both the CPU pool and GPU 0 pool to "unlimited"
# (u64::MAX) -- the no-budget-enforcement sentinel for tests.
manager = ModelManager()
ParameterTypeDescription
cpu_ram_gbfloat | NoneBudget for the "cpu" pool in gigabytes.
gpu_vram_gbfloat | NoneBudget for the "gpu:0" pool in gigabytes.
pool_budgetsdict[str, int] | NoneExplicit per-pool budgets in bytes, keyed by pool label ("cpu", "gpu", or "gpu:N").

Methods

MethodSignatureDescription
registerawait manager.register(id, model, memory_estimate_bytes=...)Register a model with its estimated memory footprint. Memory is charged to the pool returned by model.device(). Starts unloaded.
loadawait manager.load(id)Load a model, evicting LRU same-pool models if needed.
unloadawait manager.unload(id)Unload a model and free its memory.
is_loadedawait manager.is_loaded(id) -> boolCheck if a model is currently loaded.
ensure_loadedawait manager.ensure_loaded(id)Alias for load().
used_bytesawait manager.used_bytes(pool="cpu") -> intTotal memory currently used by loaded models in the given pool. Default "cpu".
available_bytesawait manager.available_bytes(pool="cpu") -> intAvailable memory within the given pool’s budget. Default "cpu".
poolsmanager.pools() -> list[tuple[str, int]]Sync. Return [(label, budget_bytes), ...] for all configured pools.
statusawait manager.status() -> list[ModelStatus]Status of all registered models.

Invalid pool labels passed to used_bytes / available_bytes raise ValueError("invalid pool label '<x>': expected 'cpu', 'gpu', or 'gpu:N' where N is a non-negative integer").

ModelStatus

PropertyTypeDescription
.idstrModel identifier.
.loadedboolWhether the model is currently loaded.
.memory_estimate_bytesintEstimated memory footprint in bytes (host RAM if on CPU, GPU VRAM otherwise).
.poolstrPool label the model is charged against (e.g. "cpu" or "gpu:0").

ModelRegistry

An ABC for advertising a model catalog. Subclass to plug a custom catalog (a static manifest, a remote control-plane lookup, a cache of /v1/models results) into Blazen’s model-info surface. Mirrors the Rust trait blazen_llm::traits::ModelRegistry and the equivalent ABCs in the Node and WASM SDKs.

Both methods are abstract — the default implementations raise NotImplementedError, so a subclass must override both.

class ModelRegistry:
    async def list_models(self) -> list[ModelInfo]: ...
    async def get_model(self, model_id: str) -> ModelInfo | None: ...

Subclass example

from blazen import ModelRegistry, ModelInfo

class StaticRegistry(ModelRegistry):
    def __init__(self, models: list[ModelInfo]):
        super().__init__()
        self._models = {m.id: m for m in models}

    async def list_models(self) -> list[ModelInfo]:
        return list(self._models.values())

    async def get_model(self, model_id: str) -> ModelInfo | None:
        return self._models.get(model_id)

Methods

MethodSignatureDescription
list_modelsasync def list_models(self) -> list[ModelInfo]List every model the registry advertises.
get_modelasync def get_model(self, model_id: str) -> ModelInfo | NoneLook up a single model. Return None if unknown.

See ModelInfo for the dataclass shape returned by these methods.


ModelPricing and Pricing Functions

ModelPricing

Pricing metadata for cost tracking.

from blazen import ModelPricing

pricing = ModelPricing(
    input_per_million=1.0,
    output_per_million=2.0,
    per_image=0.02,
    per_second=0.001,
)
PropertyTypeDescription
.input_per_millionfloat | NoneCost per million input tokens (USD).
.output_per_millionfloat | NoneCost per million output tokens (USD).
.per_imagefloat | NoneCost per generated image (USD).
.per_secondfloat | NoneCost per second of compute (USD).

register_pricing()

Register custom pricing for a model. Overrides any existing pricing for the same model ID.

from blazen import register_pricing, ModelPricing

register_pricing("my-model", ModelPricing(input_per_million=1.0, output_per_million=2.0))

lookup_pricing()

Look up pricing for a model by ID. Returns None if the model is unknown.

from blazen import lookup_pricing

pricing = lookup_pricing("gpt-4o")
if pricing:
    print(f"Input: ${pricing.input_per_million}/M tokens")

LocalModel Methods on CompletionModel

CompletionModel instances backed by local inference (not remote APIs) support explicit load/unload lifecycle management.

MethodSignatureDescription
loadawait model.load()Load the model into memory/VRAM. Idempotent.
unloadawait model.unload()Free the model’s memory/VRAM. Idempotent.
is_loadedawait model.is_loaded() -> boolWhether the model is currently loaded.
memory_bytesawait model.memory_bytes() -> int | NoneApproximate memory footprint in bytes (host RAM if on CPU, GPU VRAM otherwise), or None if unknown. Default returns None.
devicemodel.device() -> strReturn device string ('cpu', 'cuda:0', 'metal', etc.). Determines which pool the manager charges. Default implementation raises NotImplementedError — subclasses should override.
model = CompletionModel.openai()  # Remote -- these methods are no-ops
# For a local model:
await model.load()
print(await model.is_loaded())    # True
print(await model.memory_bytes()) # e.g. 4_000_000_000
await model.unload()

ProgressCallback

ProgressCallback is a subclassable abstract base class for receiving model-download progress notifications from ModelCache.download(...). Subclass it and override on_progress; the default implementation is a no-op so subclasses can ignore total-byte updates they do not care about.

MethodSignatureDescription
__new__ProgressCallback()Construct an instance. Subclass to override on_progress.
on_progresson_progress(downloaded: int, total: int | None) -> NoneCalled repeatedly during a download with current byte counts. total is None if the size is unknown. No-op default.
from blazen import ProgressCallback

class MyProgress(ProgressCallback):
    def on_progress(self, downloaded: int, total: int | None) -> None:
        pct = (downloaded / total * 100) if total else 0
        print(f"{pct:.1f}%")

cb = MyProgress()
await cache.download("bert-base-uncased", "config.json", cb)

You may also pass a plain Callable[[int, int | None], None] to download(progress=...) for the same effect; this ABC simply gives type-checked callers a typed base to inherit from.


Local Inference Types

The local-inference backends expose typed message, chunk, image, and result classes for use with the inherent infer / infer_stream APIs on each *Provider. There are three parallel families: the canonical un-prefixed Inference* types for mistral.rs, the LlamaCpp*-prefixed family for llama.cpp, and a single CandleInferenceResult for Candle (single-shot only — no streaming).

Streaming on both mistral.rs and llama.cpp returns an async iterator (InferenceChunkStream and LlamaCppInferenceChunkStream respectively) that you consume with async for chunk in stream: .... Both implement the __aiter__ / __anext__ protocol and terminate with StopAsyncIteration once the underlying engine stream is exhausted.

mistral.rs inference types

ClassPurpose
ChatMessageInputA chat message for mistral.rs inference, optionally carrying image attachments.
ChatRoleEnum: System, User, Assistant, Tool.
InferenceImageImage payload attached to a ChatMessageInput. Build with from_bytes(...) or from_path(...).
InferenceImageSourceUnderlying image source. Variants: bytes and path. Inspect via kind plus the per-variant getters.
InferenceResultResult of a single non-streaming call: content, reasoning_content, tool_calls, finish_reason, model, usage.
InferenceChunkStreaming delta: delta, reasoning_delta, tool_calls, finish_reason.
InferenceChunkStreamAsync iterator over InferenceChunk items. Implements __aiter__ / __anext__.
InferenceToolCallA tool call returned by the engine: id, name, arguments (JSON string).
InferenceUsageToken usage stats: prompt_tokens, completion_tokens, total_tokens, total_time_sec.
from blazen import ChatMessageInput, ChatRole, InferenceImage

msg = ChatMessageInput.with_images(
    role=ChatRole.User,
    text="Describe this image.",
    images=[InferenceImage.from_path("./photo.png")],
)
stream = await provider.infer_stream([msg])
async for chunk in stream:
    if chunk.delta:
        print(chunk.delta, end="")

llama.cpp inference types

llama.cpp messages are text-only; multimodal inputs are not supported by this backend.

ClassPurpose
LlamaCppChatMessageInputA text-only chat message for llama.cpp inference.
LlamaCppChatRoleEnum: System, User, Assistant, Tool.
LlamaCppInferenceResultResult of a single non-streaming call: content, finish_reason, model, usage.
LlamaCppInferenceChunkStreaming delta: delta, finish_reason.
LlamaCppInferenceChunkStreamAsync iterator over LlamaCppInferenceChunk items. Implements __aiter__ / __anext__.
LlamaCppInferenceUsageToken usage stats: prompt_tokens, completion_tokens, total_tokens, total_time_sec.
from blazen import LlamaCppChatMessageInput, LlamaCppChatRole

msg = LlamaCppChatMessageInput(role=LlamaCppChatRole.User, text="Hello")
stream = await llama_provider.infer_stream([msg])
async for chunk in stream:
    if chunk.delta:
        print(chunk.delta, end="")

Candle inference types

Candle exposes a single non-streaming result type. Streaming is not currently supported on this backend.

ClassPurpose
CandleInferenceResultResult of a non-streaming candle call: content, prompt_tokens, completion_tokens, total_time_secs.
from blazen import CandleInferenceResult

result: CandleInferenceResult = await candle_provider.infer(messages)
print(result.content, result.prompt_tokens, result.completion_tokens)

Telemetry

Blazen ships three optional exporters for tracing and metrics: init_langfuse (LLM observability), init_otlp (generic OpenTelemetry), and init_prometheus (HTTP-scraped metrics). Each is gated behind a Cargo feature and installs a global subscriber on first call — invoke once at process startup, before any traced work. Calling more than one is allowed only in the documented order; install Langfuse before any other exporter if you want both.

FunctionConfig typeCargo featurePurpose
init_langfuse(config)LangfuseConfiglangfuseLLM call traces, token usage, latency to Langfuse.
init_otlp(config)OtlpConfigotlpGeneric OpenTelemetry OTLP gRPC span exporter.
init_prometheus(port)int (no config object)prometheusPrometheus metrics over HTTP /metrics.

Initialization functions raise a BlazenError subclass on failure (e.g. an invalid endpoint or port already in use).

LangfuseConfig

LLM-observability exporter for Langfuse. Behind the langfuse Cargo feature.

FieldTypeDefaultDescription
public_keystrrequiredLangfuse public API key (Basic-auth username).
secret_keystrrequiredLangfuse secret API key (Basic-auth password).
hoststr | NoneNoneLangfuse host URL. Defaults to https://cloud.langfuse.com.
batch_sizeint100Maximum events buffered before an automatic flush.
flush_interval_msint5000Background flush interval in milliseconds.
from blazen import LangfuseConfig, init_langfuse

init_langfuse(LangfuseConfig(
    public_key="pk-lf-...",
    secret_key="sk-lf-...",
    host="https://cloud.langfuse.com",
))

init_langfuse(config) spawns a background tokio task that periodically flushes buffered LLM call traces, token usage, and latency data to the Langfuse ingestion API. If a global tracing subscriber is already installed, it is a soft failure: the underlying dispatcher is constructed (so background flushing still runs) and the function returns without overwriting the existing subscriber.

OtlpConfig

Generic OpenTelemetry OTLP exporter. Behind the otlp Cargo feature.

FieldTypeDefaultDescription
endpointstrrequiredOTLP gRPC endpoint URL (e.g. "http://localhost:4317").
service_namestrrequiredService name reported to the backend.
from blazen import OtlpConfig, init_otlp

init_otlp(OtlpConfig(endpoint="http://localhost:4317", service_name="my-app"))

init_otlp(config) sets up an OTLP gRPC span exporter and installs a combined tracing subscriber (env-filter + OTel layer + fmt layer).

Prometheus metrics

Behind the prometheus Cargo feature.

from blazen import init_prometheus

init_prometheus(9090)  # serves /metrics on 0.0.0.0:9090

init_prometheus(port) installs a global metrics recorder backed by Prometheus and starts an HTTP listener on 0.0.0.0:{port} serving the /metrics endpoint. After calling this, any code using the metrics macros (counter!, histogram!, gauge!) is exposed at the Prometheus endpoint.


Error Handling

All Blazen failures are raised as subclasses of BlazenError, a typed exception hierarchy rooted at builtins.Exception. Catching BlazenError will catch every error raised from any Blazen API; catch a specific subclass to react to one category.

from blazen import BlazenError, RateLimitError, ProviderError, AuthError

try:
    response = await model.complete([ChatMessage.user("Hello")])
except RateLimitError as e:
    # Provider rate-limited the request -- back off and retry.
    print(f"slow down: {e}")
except AuthError as e:
    print(f"bad credentials: {e}")
except BlazenError as e:
    print(f"blazen failed: {e}")

Base hierarchy

Every class below derives directly from BlazenError, which itself derives from builtins.Exception.

ClassDescription
BlazenErrorBase class for all Blazen runtime errors. Catches everything.
AuthErrorAuthentication failed (invalid or missing API key).
RateLimitErrorProvider rate-limited the request.
TimeoutErrorThe operation exceeded its time limit. Distinct from the builtin TimeoutError — import from blazen.
ValidationErrorInvalid input rejected before the provider round-trip.
ContentPolicyErrorProvider rejected the request for policy reasons.
ProviderErrorProvider-side error. Carries structured HTTP attributes (see below).
UnsupportedErrorRequested capability is not supported by this provider or backend.
ComputeErrorCompute job error (cancelled, quota exceeded, etc).
MediaErrorMedia handling error (invalid input, size exceeded, etc).

ProviderError attributes

For HTTP-attributable failures, ProviderError instances are populated with the following typed attributes (set via setattr on the instance, mirrored in the stub for static type-checking):

AttributeTypeDescription
providerstrProvider tag (e.g. "fal", "openrouter").
statusint | NoneHTTP status code; None for non-HTTP provider errors.
endpointstr | NoneRequest URL.
request_idstr | Nonex-fal-request-id / x-request-id header if present.
detailstr | NoneParsed error message extracted from the JSON body.
raw_bodystr | NoneResponse body, capped at 4 KiB.
retry_after_msint | NoneParsed Retry-After header in milliseconds.
from blazen import ProviderError

try:
    await model.complete([ChatMessage.user("Hello")])
except ProviderError as e:
    print(f"{e.provider} {e.status} on {e.endpoint}")
    if e.retry_after_ms:
        await asyncio.sleep(e.retry_after_ms / 1000)

Per-backend ProviderError subclasses

Each local-inference backend raises its own ProviderError subclass so callers can route errors per-backend. The classes are always declared in the type stub, but only registered at runtime when the corresponding Cargo feature is enabled in your wheel build.

ClassCargo featureBackend
LlamaCppErrorllamacppllama.cpp local LLM inference.
CandleLlmErrorcandle-llmCandle local LLM inference.
CandleEmbedErrorcandle-embedCandle local embedding backend.
MistralRsErrormistralrsmistral.rs local LLM inference.
WhisperErrorwhispercppwhisper.cpp transcription.
PiperErrorpiperPiper text-to-speech.
DiffusionErrordiffusionDiffusion image generation.
FastEmbedErrorembedfastembed embedding (non-musl only).
TractErrortractTract ONNX embedding.

Because they all derive from ProviderError, except ProviderError catches every backend-attributable error, including these subclasses. Use except LlamaCppError (etc.) when you need to react to a single backend.

from blazen import LlamaCppError, ProviderError

try:
    await llama_provider.infer(messages)
except LlamaCppError as e:
    print(f"llama.cpp blew up: {e}")
except ProviderError as e:
    # Any other backend
    print(f"{e.provider}: {e}")