Python Examples
Complete runnable Python examples for Blazen
Python Examples
Six complete, runnable examples that demonstrate core Blazen workflow patterns in Python using the subclass event model.
Basic Workflow
A 3-step sequential pipeline: StartEvent → GreetEvent → FormattedEvent → StopEvent.
class GreetEvent(Event):
name: str
@step
async def parse_input(ctx: Context, ev: Event):
return GreetEvent(name=ev.name)
@step
async def greet(ctx: Context, ev: GreetEvent):
return StopEvent(result={"greeting": f"Hello, {ev.name}!"})
python examples/basic_workflow.py
Streaming Workflow
Publishes typed progress events while processing via ctx.write_event_to_stream().
class ProgressEvent(Event):
step: int
ctx.write_event_to_stream(ProgressEvent(step=i))
async for event in handler.stream_events():
print(event.event_type)
python examples/streaming_workflow.py
Branching Workflow
Conditional fan-out by returning a list of typed events.
class PositiveEvent(Event):
text: str
class NegativeEvent(Event):
text: str
return [PositiveEvent(text=text), NegativeEvent(text=text)]
python examples/branching_workflow.py
LLM RAG Workflow
Multi-step RAG pipeline with context sharing between steps. Uses typed ChatMessage and CompletionResponse:
from blazen import CompletionModel, ChatMessage, CompletionResponse
# Reads OPENAI_API_KEY from the environment by default.
model = CompletionModel.openai()
response: CompletionResponse = await model.complete([
ChatMessage.system("Answer based on the provided documents."),
ChatMessage.user(query),
])
print(response.content) # typed attribute access
print(response.usage) # TokenUsage with .prompt_tokens, .completion_tokens, .total_tokens
ctx.set("documents", docs)
python examples/llm_rag_workflow.py
Human-in-the-Loop
Side-effect steps that pause for external input with typed review events.
class ReviewComplete(Event):
pass
ctx.send_event(ReviewComplete())
return None
python examples/human_in_the_loop.py
Stateful Workflow
Showcases the two explicit context namespaces and identity-preserving event payloads:
ctx.statefor persistable values (counters, paths) — survivespause()/resume().ctx.sessionfor live in-process references (DB connections, handles) — excluded from snapshots.StopEvent(result=conn)preservesis-identity — the caller gets back the same Python object.
import sqlite3
from blazen import Workflow, step, Context, StartEvent, StopEvent
@step
async def setup(ctx: Context, ev: StartEvent):
conn = sqlite3.connect(":memory:")
ctx.state["row_count"] = 0
ctx.session["db"] = conn # identity preserved
return StopEvent(result=conn)
# result = await handler.result()
# assert result.result is conn # same Python object
python examples/stateful_workflow.py
Example 7: Subclassing CompletionModel (Custom Provider)
Build a custom provider by subclassing CompletionModel. Override complete() and/or stream() to plug in any backend (local inference, a proxy, a mock, etc.). Once subclassed the model is a first-class citizen — it works with run_agent, with_retry(), with_cache(), and every other helper.
import asyncio
from blazen import ChatMessage, CompletionModel, run_agent
class EchoLLM(CompletionModel):
"""A toy provider that echoes the last user message back."""
def __init__(self):
super().__init__(model_id="echo-llm", context_length=4096)
async def complete(self, messages, options=None):
last = next(
(m.content for m in reversed(messages) if m.role == "user"), ""
)
# Return a dict -- the dispatcher depythonizes into CompletionResponse.
# Only `content` and `model` are required; every other field defaults.
return {"content": f"echo: {last}", "model": self.model_id}
async def main():
model = EchoLLM()
# Works with run_agent just like any built-in provider.
result = await run_agent(
model,
[ChatMessage.user("hello world")],
tools=[],
)
print(result.response.content) # -> "echo: hello world"
asyncio.run(main())
python examples/subclass_completion_model.py
Example 8: Custom MemoryBackend (DictBackend)
Subclass MemoryBackend to plug in any storage layer — Postgres, DynamoDB, SQLite, a plain dict. Every async method is dispatched from Rust back into Python, so you get full control while reusing Blazen’s embedding and SimHash search pipeline.
import asyncio
from blazen import EmbeddingModel, Memory, MemoryBackend
class DictBackend(MemoryBackend):
"""In-process dict backed memory backend."""
def __init__(self):
super().__init__()
self._store: dict[str, dict] = {}
async def put(self, entry):
self._store[entry["id"]] = entry
async def get(self, entry_id):
return self._store.get(entry_id)
async def delete(self, entry_id):
return self._store.pop(entry_id, None) is not None
async def list(self):
return list(self._store.values())
async def len(self):
return len(self._store)
async def search_by_bands(self, bands, limit):
# Return any entry that shares at least one LSH band with the query.
band_set = set(bands)
hits = [
e for e in self._store.values()
if band_set.intersection(e.get("bands", []))
]
return hits[:limit]
async def main():
embedder = EmbeddingModel.local()
memory = Memory(embedder, DictBackend())
await memory.add("fact-1", "Rust has ownership and borrowing.")
await memory.add("fact-2", "Python uses reference counting.")
results = await memory.search("memory management", limit=2)
for r in results:
print(r.score, r.text)
asyncio.run(main())
python examples/custom_memory_backend.py
Example 9: ModelManager with Memory Budgets
Track memory budgets across multiple local models, with separate pools for CPU RAM and GPU VRAM. Register each model with an estimated footprint; when loading a new one would exceed its pool’s budget, the least-recently-used model in the same pool is automatically unloaded.
import asyncio
from blazen import CompletionModel, MistralRsOptions, ModelManager
async def main():
# 64 GB of CPU RAM, 24 GB of GPU VRAM (GPU-typical for a consumer card).
manager = ModelManager(cpu_ram_gb=64, gpu_vram_gb=24)
llama_8b = CompletionModel.mistralrs(
options=MistralRsOptions("meta-llama/Llama-3.1-8B-Instruct"),
)
qwen_14b = CompletionModel.mistralrs(
options=MistralRsOptions("Qwen/Qwen2.5-14B-Instruct"),
)
mistral_24b = CompletionModel.mistralrs(
options=MistralRsOptions("mistralai/Mistral-Small-24B"),
)
await manager.register("llama-8b", llama_8b, memory_estimate_bytes=8 * 1024**3)
await manager.register("qwen-14b", qwen_14b, memory_estimate_bytes=14 * 1024**3)
await manager.register("mistral-24b", mistral_24b, memory_estimate_bytes=20 * 1024**3)
# Fits alongside qwen-14b in the GPU pool (8 + 14 = 22 GB).
await manager.load("llama-8b")
await manager.load("qwen-14b")
# 20 GB does not fit next to 8 + 14 = 22 GB in the GPU pool -- LRU
# same-pool model (llama-8b) is evicted.
await manager.load("mistral-24b")
for s in await manager.status():
print(
f"{s.id}: loaded={s.loaded}, pool={s.pool}, "
f"memory={s.memory_estimate_bytes:,} bytes"
)
asyncio.run(main())
python examples/model_manager_budget.py
Example 10: Pricing Registration and Cost Tracking
Register pricing for any model ID (your own model, a local finetune, a custom deployment). Every CompletionResponse then carries a .cost field computed from the registered rate.
import asyncio
from blazen import (
ChatMessage,
CompletionModel,
ModelPricing,
lookup_pricing,
register_pricing,
)
class MyFinetune(CompletionModel):
"""A stand-in for a custom deployment that reports its usage."""
def __init__(self):
super().__init__(model_id="my-finetuned-model")
async def complete(self, messages, options=None):
return {
"content": "Rust is a systems language with memory safety without GC.",
"model": self.model_id,
"tool_calls": [],
"citations": [],
"artifacts": [],
"images": [],
"audio": [],
"videos": [],
"usage": {
"prompt_tokens": 150,
"completion_tokens": 80,
"total_tokens": 230,
},
"metadata": {},
}
async def main():
# Register pricing once, globally, for any model ID.
register_pricing(
"my-finetuned-model",
ModelPricing(input_per_million=1.0, output_per_million=2.0),
)
# Readback -- pricing is centrally stored.
pricing = lookup_pricing("my-finetuned-model")
assert pricing is not None
print(f"input: ${pricing.input_per_million}/M, output: ${pricing.output_per_million}/M")
model = MyFinetune()
response = await model.complete([ChatMessage.user("Summarize Rust in one line.")])
print(response.content)
print(f"usage: {response.usage}")
# cost is computed from the registered pricing + usage.
print(f"cost: ${response.cost:.6f}")
asyncio.run(main())
python examples/pricing_and_cost.py
Example 11: Per-Capability Provider (Custom TTS)
Subclass TTSProvider to plug in any TTS backend (ElevenLabs, Coqui, a local model). The per-capability base classes (TTSProvider, ImageProvider, VideoProvider, MusicProvider, ThreeDProvider, BackgroundRemovalProvider, VoiceProvider) exist for users who only need to implement one capability.
import asyncio
from blazen import TTSProvider, SpeechRequest
class MyElevenLabs(TTSProvider):
"""A minimal custom TTS provider."""
def __init__(self, api_key: str):
super().__init__(
provider_id="elevenlabs",
base_url="https://api.elevenlabs.io/v1",
)
self._api_key = api_key
async def text_to_speech(self, request):
# In a real implementation, make an HTTP call with self._api_key
# and return audio bytes. Here we just echo the request.
return {
"audio_data": b"<wav-bytes>",
"format": "wav",
"voice": request.voice,
"text": request.text,
}
async def main():
tts = MyElevenLabs(api_key="sk-...")
result = await tts.text_to_speech(
SpeechRequest(text="Hello from Blazen!", voice="alice")
)
print(result["format"], len(result["audio_data"]), "bytes")
asyncio.run(main())
python examples/custom_tts_provider.py
Example 12: Typed Error Handling
Blazen exposes a typed exception hierarchy rooted at BlazenError. ProviderError carries structured HTTP context (provider, status, endpoint, request_id, detail, raw_body, retry_after_ms) so callers can branch on the failure mode instead of regex-matching error strings.
import asyncio
from blazen import (
BlazenError,
ChatMessage,
CompletionModel,
ProviderError,
RateLimitError,
)
async def main():
model = CompletionModel.openai()
try:
response = await model.complete([ChatMessage.user("Hello")])
print(response.content)
except RateLimitError:
print("rate limited; backing off")
except ProviderError as e:
print(f"provider {e.provider} returned {e.status}: {e.detail}")
except BlazenError as e:
print(f"blazen error: {e}")
asyncio.run(main())
python examples/typed_error_handling.py
Example 13: Custom Progress Reporting via ProgressCallback
Subclass ProgressCallback to receive download progress notifications from ModelCache.download(). The same instance can be reused across multiple downloads — the cache calls on_progress(downloaded, total) repeatedly as bytes arrive.
import asyncio
from blazen import ModelCache, ProgressCallback
class TerminalProgress(ProgressCallback):
def on_progress(self, downloaded: int, total: int | None) -> None:
if total:
pct = downloaded / total * 100
print(f"\r{pct:.1f}% ({downloaded}/{total})", end="", flush=True)
else:
print(f"\r{downloaded} bytes", end="", flush=True)
async def main():
cache = ModelCache()
path = await cache.download(
"mistralai/Mistral-7B-Instruct-v0.3",
"config.json",
TerminalProgress(),
)
print(f"\ndownloaded to {path}")
asyncio.run(main())
python examples/progress_callback.py
Example 14: Telemetry — Langfuse Trace Export
Wire Blazen into Langfuse with a single init_langfuse() call. Once configured, every workflow run, agent step, and LLM call ships traces to Langfuse in the background. Tune batch_size and flush_interval_ms to balance latency against API request volume.
import os
from blazen import LangfuseConfig, init_langfuse
init_langfuse(
LangfuseConfig(
public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
secret_key=os.environ["LANGFUSE_SECRET_KEY"],
host="https://cloud.langfuse.com",
batch_size=100,
flush_interval_ms=5000,
)
)
# All subsequent workflow runs ship traces to Langfuse.
python examples/langfuse_telemetry.py