Python API Reference

Complete API reference for blazen in Python

Event

The preferred way to define events is by subclassing Event. The event_type is automatically set to the class name.

class AnalyzeEvent(Event):
    text: str
    score: float

ev = AnalyzeEvent(text="hello", score=0.9)
ev.event_type   # "AnalyzeEvent"
ev.text          # "hello"

You can also construct events inline without a subclass:

Event(event_type: str, **kwargs)
ev = Event("AnalyzeEvent", text="hello", score=0.9)
MemberTypeDescription
.event_typestrThe event type string. Auto-set to the class name for subclasses.
.to_dict()-> dictSerialize the event data to a plain dictionary.
.field_nameAnyAttribute access for any keyword argument supplied at construction.

StartEvent

StartEvent(**kwargs)

Built-in event whose event_type is "blazen::StartEvent". All keyword arguments are available as attributes.


StopEvent

StopEvent(result=dict)

Built-in event whose event_type is "blazen::StopEvent".

MemberTypeDescription
.resultdictThe value passed via the result keyword argument.

step decorator

The @step decorator reads the type hint of the ev parameter to automatically determine which events the step accepts.

class AnalyzeEvent(Event):
    text: str

@step
async def analyze(ctx: Context, ev: AnalyzeEvent) -> Event | None:
    ...
# Equivalent to @step(accepts=["AnalyzeEvent"])

When the annotation is the base Event class or absent, the step defaults to accepting StartEvent:

@step
async def start(ctx: Context, ev: Event) -> Event | None:
    ...
# Equivalent to @step(accepts=["blazen::StartEvent"])

Explicit overrides still work:

VariantDescription
@stepInfers accepts from the ev type hint. Defaults to StartEvent when the hint is Event or missing.
@step(accepts=["EventType"])Explicitly sets accepted event types, overriding type-hint inference.
@step(emits=["EventType"])Declares the event types this step may produce.
@step(max_concurrency=N)Limits how many instances of this step may run concurrently. 0 means unlimited.

Step signature

async def name(ctx: Context, ev: MyEvent) -> Event | list[Event] | None

Return an Event to emit it, a list[Event] to emit several, or None to emit nothing. Steps can be sync or async.


Workflow

Workflow(name: str, steps: list, timeout: float = None)

Create a workflow from a name and an ordered list of steps. The optional timeout is in seconds.

MethodSignatureDescription
runawait wf.run(**kwargs) -> WorkflowHandlerStart the workflow. Keyword arguments become fields on the initial StartEvent.

WorkflowHandler

Returned by Workflow.run(). Provides control over a running workflow instance.

MethodSignatureDescription
resultawait handler.result() -> EventBlock until the workflow emits a StopEvent and return it.
stream_eventshandler.stream_events() -> AsyncIterator[Event]Async iterator yielding events written to the stream.
handler = await wf.run(prompt="Hello")

# Stream intermediate events while waiting for the result
async for event in handler.stream_events():
    print(event.event_type, event.to_dict())

result = await handler.result()

Context

Available as the first parameter of every step function. All methods are synchronous.

MethodSignatureDescription
setctx.set(key: str, value: StateValue) -> NoneStore any Python value. bytes/bytearray are stored as raw binary; JSON-serializable types (dict, list, str, int, float, bool, None) are stored as JSON; all other objects (Pydantic models, dataclasses, custom classes) are pickled automatically.
getctx.get(key: str) -> StateValue | NoneRetrieve a value by key, or None if absent. Returns the original type transparently: JSON values come back as their Python type, bytes come back as bytes, and pickled objects are unpickled to their original class.
set_bytesctx.set_bytes(key: str, data: bytes) -> NoneConvenience alias for storing raw binary data. Equivalent to ctx.set(key, data) when data is bytes.
get_bytesctx.get_bytes(key: str) -> bytes | NoneConvenience alias for retrieving raw binary data, or None if absent.
run_idctx.run_id() -> strReturn the UUID of the current workflow run.
send_eventctx.send_event(event: Event) -> NoneRoute an event to matching steps manually.
write_event_to_streamctx.write_event_to_stream(event: Event) -> NonePublish an event to the stream visible via WorkflowHandler.stream_events().

StateValue = Any — a type alias defined in the .pyi stubs indicating that any Python value is accepted. All stored values persist through pause/resume/checkpoint.


CompletionModel

Use static constructor methods to create a model for a specific provider, then call complete() or stream() to generate responses.

model = CompletionModel.openai("sk-...")
model = CompletionModel.anthropic("sk-ant-...")
model = CompletionModel.openrouter("sk-or-...", model="meta-llama/llama-3-70b")

Provider constructors

ConstructorSignature
openaiCompletionModel.openai(api_key: str, model: str = None)
anthropicCompletionModel.anthropic(api_key: str, model: str = None)
geminiCompletionModel.gemini(api_key: str, model: str = None)
azureCompletionModel.azure(api_key: str, resource_name: str, deployment_name: str)
openrouterCompletionModel.openrouter(api_key: str, model: str = None)
groqCompletionModel.groq(api_key: str, model: str = None)
togetherCompletionModel.together(api_key: str, model: str = None)
mistralCompletionModel.mistral(api_key: str, model: str = None)
deepseekCompletionModel.deepseek(api_key: str, model: str = None)
fireworksCompletionModel.fireworks(api_key: str, model: str = None)
perplexityCompletionModel.perplexity(api_key: str, model: str = None)
xaiCompletionModel.xai(api_key: str, model: str = None)
cohereCompletionModel.cohere(api_key: str, model: str = None)
bedrockCompletionModel.bedrock(api_key: str, region: str, model: str = None)
falCompletionModel.fal(api_key: str, model: str = None)

Properties

PropertyTypeDescription
.model_idstrThe string identifier of the active model.

complete()

response: CompletionResponse = await model.complete(
    messages: list[ChatMessage],
    temperature: float = None,
    max_tokens: int = None,
    model: str = None,
)

Returns a typed CompletionResponse (see below). Also supports dict-style access for backwards compatibility: response["content"].

stream()

await model.stream(
    messages: list[ChatMessage],
    on_chunk: Callable[[dict], Any],
    *,
    temperature: float = None,
    max_tokens: int = None,
    model: str = None,
)

Streams a chat completion, calling on_chunk for each chunk received. Each chunk is a dict with the following keys:

KeyTypeDescription
deltastr | NoneThe incremental text content for this chunk.
finish_reasonstr | NoneSet on the final chunk (e.g. "stop", "tool_calls").
tool_callslist[dict]Tool call fragments, if any.
def handle(chunk):
    if chunk["delta"]:
        print(chunk["delta"], end="")

await model.stream([ChatMessage.user("Tell me a story")], handle)

Middleware decorators

Each decorator returns a new CompletionModel wrapping the original with additional behaviour.

MethodSignatureDescription
with_retry.with_retry(*, max_retries=3, initial_delay_ms=1000, max_delay_ms=30000)Automatic retry with exponential backoff on transient failures.
with_cache.with_cache(*, ttl_seconds=300, max_entries=1000)In-memory response cache for identical non-streaming requests.
with_fallbackCompletionModel.with_fallback(models: list[CompletionModel])Static method. Tries providers in order; falls back on transient errors.
# Chain decorators
model = CompletionModel.openai("sk-...").with_cache().with_retry(max_retries=5)

# Fallback across providers
primary = CompletionModel.openai("sk-...")
backup = CompletionModel.anthropic("sk-ant-...")
model = CompletionModel.with_fallback([primary, backup])

CompletionResponse

Returned by model.complete(). Supports both attribute access and dict-style access.

PropertyTypeDescription
.contentstr | NoneThe generated text.
.modelstrModel name used for the completion.
.finish_reasonstr | NoneWhy generation stopped ("stop", "tool_calls", etc.).
.tool_callslist[ToolCall]Tool calls requested by the model.
.usageTokenUsage | NoneToken usage statistics.
.costfloat | NoneEstimated cost in USD for this request.
.timingRequestTiming | NoneTiming metadata for the request.
.imageslist[dict]Image outputs (provider-dependent).
.audiolist[dict]Audio outputs (provider-dependent).
.videoslist[dict]Video outputs (provider-dependent).
response = await model.complete([ChatMessage.user("Hello")])
print(response.content)        # attribute access
print(response["content"])     # dict-style access (backwards compatible)
print(response.cost)           # e.g. 0.0023
print(response.timing)         # RequestTiming or None
print(response.keys())         # list of available keys

RequestTiming

Timing metadata attached to a CompletionResponse. All fields are optional since not every provider reports timing data.

PropertyTypeDescription
.queue_msint | NoneTime spent waiting in the provider’s queue.
.execution_msint | NoneTime spent executing the request.
.total_msint | NoneTotal round-trip time.
response = await model.complete([ChatMessage.user("Hello")])
if response.timing:
    print(f"Total: {response.timing.total_ms}ms")
    print(f"Queue: {response.timing.queue_ms}ms")
    print(f"Execution: {response.timing.execution_ms}ms")

ChatMessage

A single message in a chat conversation.

msg = ChatMessage(role="user", content="Hello, world!")
# role is optional, defaults to "user"
msg = ChatMessage(content="Hello!")

Static constructors

MethodDescription
ChatMessage.system(content: str)Create a system message.
ChatMessage.user(content: str)Create a user message.
ChatMessage.assistant(content: str)Create an assistant message.
ChatMessage.tool(content: str)Create a tool result message.
ChatMessage.user_image_url(*, text, url, media_type=None)Create a user message with text and an image URL.
ChatMessage.user_image_base64(*, text, data, media_type)Create a user message with text and a base64 image.
ChatMessage.user_parts(*, parts: list[ContentPart])Create a user message with multiple content parts.

Properties

PropertyTypeDescription
.rolestrOne of "system", "user", "assistant", "tool".
.contentstr | NoneThe message text.

Role

Constants for message roles.

from blazen import Role

Role.SYSTEM     # "system"
Role.USER       # "user"
Role.ASSISTANT  # "assistant"
Role.TOOL       # "tool"

ContentPart

Build multimodal content parts for use with ChatMessage.user_parts().

Factory MethodDescription
ContentPart.text(*, text=...)Create a text content part.
ContentPart.image_url(*, url=..., media_type=...)Create an image URL content part.
ContentPart.image_base64(*, data=..., media_type=...)Create a base64 image content part.
msg = ChatMessage.user_parts(parts=[
    ContentPart.text(text="What's in this image?"),
    ContentPart.image_url(url="https://example.com/photo.jpg", media_type=MediaType.JPEG),
])

ToolCall

A tool invocation requested by the model.

PropertyTypeDescription
.idstrUnique identifier for the tool call.
.namestrName of the tool to invoke.
.argumentsdict[str, Any]Parsed arguments for the tool call.

Supports dict-style access: tool_call["name"].


TokenUsage

Token usage statistics for a completion.

PropertyTypeDescription
.prompt_tokensintTokens in the prompt.
.completion_tokensintTokens in the completion.
.total_tokensintTotal tokens used.

Supports dict-style access: usage["total_tokens"].


Agent System

The agent system provides an agentic tool-execution loop on top of CompletionModel. Define tools with ToolDef, then call run_agent to let the model iteratively call tools until it produces a final answer.

ToolDef

Define a tool that the model can invoke during an agent run.

ToolDef(
    *,
    name: str,
    description: str,
    parameters: dict[str, Any],
    handler: Callable | AsyncCallable,
)
ParameterTypeDescription
namestrUnique tool name exposed to the model.
descriptionstrDescription the model uses to decide when to call this tool.
parametersdictJSON Schema describing the tool’s input parameters.
handlerCallableFunction called when the model invokes the tool. Can be sync or async. Receives a dict[str, Any] of arguments and should return a JSON-serializable value.
# Sync handler
tool = ToolDef(
    name="search",
    description="Search the web for a query",
    parameters={
        "type": "object",
        "properties": {
            "query": {"type": "string", "description": "Search query"}
        },
        "required": ["query"],
    },
    handler=lambda args: {"results": ["result1", "result2"]},
)

# Async handler
async def fetch_weather(args):
    data = await weather_api(args["city"])
    return {"temperature": data.temp, "conditions": data.conditions}

weather_tool = ToolDef(
    name="weather",
    description="Get current weather for a city",
    parameters={
        "type": "object",
        "properties": {
            "city": {"type": "string"}
        },
        "required": ["city"],
    },
    handler=fetch_weather,
)

run_agent

Run an agentic tool-execution loop. The model is called repeatedly, executing any requested tool calls and feeding results back, until the model stops calling tools or max_iterations is reached.

result: AgentResult = await run_agent(
    model: CompletionModel,
    messages: list[ChatMessage],
    *,
    tools: list[ToolDef],
    max_iterations: int = 10,
    system_prompt: str = None,
    temperature: float = None,
    max_tokens: int = None,
    add_finish_tool: bool = False,
)
ParameterTypeDefaultDescription
modelCompletionModelrequiredThe model to use for completions.
messageslist[ChatMessage]requiredInitial conversation messages.
toolslist[ToolDef]requiredTools available to the model.
max_iterationsint10Maximum number of tool-call rounds before stopping.
system_promptstr | NoneNoneOptional system prompt prepended to messages.
temperaturefloat | NoneNoneSampling temperature override.
max_tokensint | NoneNoneMax tokens per completion call.
add_finish_toolboolFalseIf True, adds a built-in “finish” tool the model can call to explicitly end the loop.
model = CompletionModel.openai("sk-...")
messages = [ChatMessage.user("What's the weather in Paris and London?")]

result = await run_agent(model, messages, tools=[weather_tool])
print(result.response.content)  # Final answer
print(result.iterations)        # Number of tool-call rounds
print(result.total_cost)        # Accumulated cost across all iterations

AgentResult

Returned by run_agent.

PropertyTypeDescription
.responseCompletionResponseThe final completion response from the model.
.messageslist[ChatMessage]The full conversation history including all tool calls and results.
.iterationsintNumber of tool-call iterations executed.
.total_costfloat | NoneTotal cost in USD accumulated across all iterations.

MediaType

Constants for common MIME types. Useful when constructing ContentPart or compute requests.

from blazen import MediaType

MediaType.PNG   # "image/png"
MediaType.MP4   # "video/mp4"
MediaType.MP3   # "audio/mpeg"
MediaType.GLB   # "model/gltf-binary"

Image types

ConstantMIME Type
MediaType.PNGimage/png
MediaType.JPEGimage/jpeg
MediaType.WEBPimage/webp
MediaType.GIFimage/gif
MediaType.SVGimage/svg+xml
MediaType.BMPimage/bmp
MediaType.TIFFimage/tiff
MediaType.AVIFimage/avif

Video types

ConstantMIME Type
MediaType.MP4video/mp4
MediaType.WEBMvideo/webm
MediaType.MOVvideo/quicktime

Audio types

ConstantMIME Type
MediaType.MP3audio/mpeg
MediaType.WAVaudio/wav
MediaType.OGGaudio/ogg
MediaType.FLACaudio/flac
MediaType.AACaudio/aac
MediaType.M4Aaudio/m4a

3D model types

ConstantMIME Type
MediaType.GLBmodel/gltf-binary
MediaType.GLTFmodel/gltf+json
MediaType.OBJmodel/obj
MediaType.USDZmodel/vnd.usdz+zip
MediaType.FBXmodel/fbx
MediaType.STLmodel/stl

Document types

ConstantMIME Type
MediaType.PDFapplication/pdf

Compute Request Types

Compute requests define jobs for media generation and processing. All constructors use keyword-only arguments.

ImageRequest

Generate images from a text prompt.

ImageRequest(
    *,
    prompt: str,
    negative_prompt: str = None,
    width: int = None,
    height: int = None,
    num_images: int = None,
    model: str = None,
)
ParameterTypeDefaultDescription
promptstrrequiredText description of the image to generate.
negative_promptstr | NoneNoneWhat to avoid in the generated image.
widthint | NoneNoneImage width in pixels.
heightint | NoneNoneImage height in pixels.
num_imagesint | NoneNoneNumber of images to generate.
modelstr | NoneNoneSpecific model to use (provider-dependent).
req = ImageRequest(prompt="a cat in space", width=1024, height=1024, num_images=2)

UpscaleRequest

Upscale an existing image to a higher resolution.

UpscaleRequest(
    *,
    image_url: str,
    scale: float,
    model: str = None,
)
ParameterTypeDefaultDescription
image_urlstrrequiredURL of the image to upscale.
scalefloatrequiredUpscale factor (e.g. 2.0, 4.0).
modelstr | NoneNoneSpecific model to use.
req = UpscaleRequest(image_url="https://example.com/photo.jpg", scale=4.0)

VideoRequest

Generate a video from a text prompt, optionally with an input image.

VideoRequest(
    *,
    prompt: str,
    image_url: str = None,
    duration_seconds: float = None,
    negative_prompt: str = None,
    width: int = None,
    height: int = None,
    model: str = None,
)
ParameterTypeDefaultDescription
promptstrrequiredText description of the video to generate.
image_urlstr | NoneNoneOptional starting image to animate.
duration_secondsfloat | NoneNoneDesired video length in seconds.
negative_promptstr | NoneNoneWhat to avoid in the generated video.
widthint | NoneNoneVideo width in pixels.
heightint | NoneNoneVideo height in pixels.
modelstr | NoneNoneSpecific model to use.
req = VideoRequest(prompt="a sunset timelapse", duration_seconds=5.0)
req = VideoRequest(prompt="animate this scene", image_url="https://example.com/frame.jpg")

SpeechRequest

Generate speech audio from text.

SpeechRequest(
    *,
    text: str,
    voice: str = None,
    voice_url: str = None,
    language: str = None,
    speed: float = None,
    model: str = None,
)
ParameterTypeDefaultDescription
textstrrequiredThe text to convert to speech.
voicestr | NoneNoneVoice preset name (e.g. "alloy", "nova").
voice_urlstr | NoneNoneURL to a custom voice sample for cloning.
languagestr | NoneNoneLanguage code (e.g. "en", "fr").
speedfloat | NoneNonePlayback speed multiplier (e.g. 1.2 for 20% faster).
modelstr | NoneNoneSpecific model to use.
req = SpeechRequest(text="Hello world", voice="alloy", speed=1.2)

MusicRequest

Generate music or sound effects from a text prompt.

MusicRequest(
    *,
    prompt: str,
    duration_seconds: float = None,
    model: str = None,
)
ParameterTypeDefaultDescription
promptstrrequiredDescription of the music to generate.
duration_secondsfloat | NoneNoneDesired duration in seconds.
modelstr | NoneNoneSpecific model to use.
req = MusicRequest(prompt="upbeat jazz", duration_seconds=30.0)

TranscriptionRequest

Transcribe audio to text.

TranscriptionRequest(
    *,
    audio_url: str,
    language: str = None,
    diarize: bool = None,
    model: str = None,
)
ParameterTypeDefaultDescription
audio_urlstrrequiredURL of the audio file to transcribe.
languagestr | NoneNoneLanguage hint (e.g. "en").
diarizebool | NoneNoneIf True, identify and label different speakers.
modelstr | NoneNoneSpecific model to use.
req = TranscriptionRequest(audio_url="https://example.com/audio.mp3", language="en", diarize=True)

ThreeDRequest

Generate a 3D model from a text prompt or image.

ThreeDRequest(
    *,
    prompt: str = None,
    image_url: str = None,
    format: str = None,
    model: str = None,
)
ParameterTypeDefaultDescription
promptstr | NoneNoneText description of the 3D object to generate.
image_urlstr | NoneNoneImage to use as reference for 3D generation.
formatstr | NoneNoneOutput format (e.g. "glb", "obj", "usdz").
modelstr | NoneNoneSpecific model to use.

Provide at least one of prompt or image_url.

req = ThreeDRequest(prompt="a 3D cat", format="glb")
req = ThreeDRequest(image_url="https://example.com/photo.jpg", format="obj")

StreamChunk

A typed object received by the on_chunk callback during streaming. Replaces the raw dict interface while remaining backwards-compatible via chunk["key"] access.

PropertyTypeDescription
.deltastr | NoneIncremental text content.
.finish_reasonstr | NonePresent only on the final chunk ("stop", "tool_calls", etc.).
.tool_callslist[ToolCall]Tool invocations completed in this chunk.
async def on_chunk(chunk):
    # Attribute access (preferred)
    if chunk.delta:
        print(chunk.delta, end="")

    # Dict-style access (backwards compatible)
    if chunk["finish_reason"]:
        print(f"\n[done: {chunk['finish_reason']}]")

EmbeddingModel

Generate vector embeddings from text. Created via static constructor methods, similar to CompletionModel.

model = EmbeddingModel.openai("sk-...")
model = EmbeddingModel.openai("sk-...", model="text-embedding-3-large", dimensions=3072)
model = EmbeddingModel.together("tok-...")
model = EmbeddingModel.cohere("co-...")
model = EmbeddingModel.fireworks("fw-...")

Provider constructors

ConstructorSignature
openaiEmbeddingModel.openai(api_key: str, model: str = None, dimensions: int = None)
togetherEmbeddingModel.together(api_key: str)
cohereEmbeddingModel.cohere(api_key: str)
fireworksEmbeddingModel.fireworks(api_key: str)

Properties

PropertyTypeDescription
.model_idstrThe model identifier.
.dimensionsintOutput vector dimensionality.

embed()

response: EmbeddingResponse = await model.embed(texts: list[str])

Returns an EmbeddingResponse with one vector per input text.


EmbeddingResponse

Returned by EmbeddingModel.embed().

PropertyTypeDescription
.embeddingslist[list[float]]One vector per input text.
.modelstrModel that produced the embeddings.
.usageTokenUsage | NoneToken usage statistics.
.costfloat | NoneEstimated cost in USD.
.timingRequestTiming | NoneRequest timing breakdown.
response = await model.embed(["Hello", "World"])
print(len(response.embeddings))       # 2
print(len(response.embeddings[0]))    # 1536
print(response.model)                 # "text-embedding-3-small"
print(response.cost)                  # e.g. 0.0001

Token Estimation

Lightweight token counting functions that work without external data files. Uses a heuristic (~3.5 characters per token) suitable for budget checks.

estimate_tokens()

from blazen import estimate_tokens

count = estimate_tokens("Hello, world!")          # 4
count = estimate_tokens("Hello, world!", 32000)   # same, with custom context size
ParameterTypeDefaultDescription
textstrrequiredThe text to estimate.
context_sizeint128000Context window size hint.

count_message_tokens()

from blazen import count_message_tokens, ChatMessage

count = count_message_tokens([
    ChatMessage.system("You are helpful."),
    ChatMessage.user("Hello!"),
])

Includes per-message overhead (role markers, separators) in addition to content tokens.

ParameterTypeDefaultDescription
messageslist[ChatMessage]requiredMessages to count.
context_sizeint128000Context window size hint.

Error Handling

All errors from blazen are raised as standard Python exceptions. LLM and compute errors originate from a unified BlazenError hierarchy internally, which is mapped to Python exceptions as follows:

Error ConditionPython ExceptionDescription
Invalid arguments, auth failure, validationValueErrorBad input, missing/invalid API keys, schema violations.
TimeoutTimeoutErrorThe operation exceeded its time limit.
All other errorsRuntimeErrorProvider errors, rate limits, content policy, model not found, job failures, etc.

The error message includes a descriptive prefix indicating the category:

try:
    response = await model.complete([ChatMessage.user("Hello")])
except ValueError as e:
    # Authentication, validation errors
    print(f"Invalid input: {e}")
except TimeoutError as e:
    # Request timed out
    print(f"Timed out: {e}")
except RuntimeError as e:
    # Provider errors, rate limits, content policy, etc.
    print(f"Error: {e}")

Underlying error categories (reflected in the error message string):

CategoryScopeExample Message
AuthShared"authentication failed: invalid API key"
RateLimitShared"rate limited: retry after 1000ms"
TimeoutShared"timed out after 30000ms"
ProviderShared"openai error: server overloaded"
ValidationShared"invalid input: temperature must be >= 0"
ContentPolicyShared"content policy violation: ..."
UnsupportedShared"unsupported: model does not support streaming"
NoContentCompletion"model returned no content"
ModelNotFoundCompletion"model not found: gpt-5-turbo"
InvalidResponseCompletion"invalid response: malformed JSON"
StreamCompletion"stream error: connection reset"
JobFailedCompute"job failed: out of memory"
CancelledCompute"job cancelled"
QuotaExceededCompute"quota exceeded: monthly limit reached"
InvalidMedia"invalid media: unsupported format"
TooLargeMedia"media too large: 52428800 bytes (max 10485760)"