Python Examples

Complete runnable Python examples for Blazen

Python Examples

Six complete, runnable examples that demonstrate core Blazen workflow patterns in Python using the subclass event model.


Basic Workflow

A 3-step sequential pipeline: StartEventGreetEventFormattedEventStopEvent.

class GreetEvent(Event):
    name: str

@step
async def parse_input(ctx: Context, ev: Event):
    return GreetEvent(name=ev.name)

@step
async def greet(ctx: Context, ev: GreetEvent):
    return StopEvent(result={"greeting": f"Hello, {ev.name}!"})
python examples/basic_workflow.py

Streaming Workflow

Publishes typed progress events while processing via ctx.write_event_to_stream().

class ProgressEvent(Event):
    step: int

ctx.write_event_to_stream(ProgressEvent(step=i))
async for event in handler.stream_events():
    print(event.event_type)
python examples/streaming_workflow.py

Branching Workflow

Conditional fan-out by returning a list of typed events.

class PositiveEvent(Event):
    text: str
class NegativeEvent(Event):
    text: str

return [PositiveEvent(text=text), NegativeEvent(text=text)]
python examples/branching_workflow.py

LLM RAG Workflow

Multi-step RAG pipeline with context sharing between steps. Uses typed ChatMessage and ModelResponse:

from blazen import OpenAiProvider, ProviderOptions, ChatMessage, ModelResponse

# Construct the provider directly. With no api_key, it reads OPENAI_API_KEY.
model = OpenAiProvider(options=ProviderOptions(api_key="sk-..."))
response: ModelResponse = await model.complete([
    ChatMessage.system("Answer based on the provided documents."),
    ChatMessage.user(query),
])
print(response.content)       # typed attribute access
print(response.usage)         # TokenUsage with .prompt_tokens, .completion_tokens, .total_tokens
ctx.set("documents", docs)
python examples/llm_rag_workflow.py

Human-in-the-Loop

Side-effect steps that pause for external input with typed review events.

class ReviewComplete(Event):
    pass

ctx.send_event(ReviewComplete())
return None
python examples/human_in_the_loop.py

Stateful Workflow

Showcases the two explicit context namespaces and identity-preserving event payloads:

  • ctx.state for persistable values (counters, paths) — survives pause()/resume().
  • ctx.session for live in-process references (DB connections, handles) — excluded from snapshots.
  • StopEvent(result=conn) preserves is-identity — the caller gets back the same Python object.
import sqlite3
from blazen import Workflow, step, Context, StartEvent, StopEvent

@step
async def setup(ctx: Context, ev: StartEvent):
    conn = sqlite3.connect(":memory:")
    ctx.state["row_count"] = 0
    ctx.session["db"] = conn  # identity preserved
    return StopEvent(result=conn)

# result = await handler.result()
# assert result.result is conn  # same Python object
python examples/stateful_workflow.py

Example 7: Subclassing Model (Custom Provider)

Build a custom provider by subclassing Model. Override complete() and/or stream() to plug in any backend (local inference, a proxy, a mock, etc.). Once subclassed the model is a first-class citizen — it works with run_agent, with_retry(), with_cache(), and every other helper.

import asyncio

from blazen import ChatMessage, Model, run_agent


class EchoLLM(Model):
    """A toy provider that echoes the last user message back."""

    def __init__(self):
        super().__init__(model_id="echo-llm", context_length=4096)

    async def complete(self, messages, options=None):
        last = next(
            (m.content for m in reversed(messages) if m.role == "user"), ""
        )
        # Return a dict -- the dispatcher depythonizes into ModelResponse.
        # Only `content` and `model` are required; every other field defaults.
        return {"content": f"echo: {last}", "model": self.model_id}


async def main():
    model = EchoLLM()

    # Works with run_agent just like any built-in provider.
    result = await run_agent(
        model,
        [ChatMessage.user("hello world")],
        tools=[],
    )
    print(result.response.content)  # -> "echo: hello world"


asyncio.run(main())
python examples/subclass_model.py

Example 8: Custom MemoryBackend (DictBackend)

Subclass MemoryBackend to plug in any storage layer — Postgres, DynamoDB, SQLite, a plain dict. Every async method is dispatched from Rust back into Python, so you get full control while reusing Blazen’s embedding and SimHash search pipeline.

import asyncio

from blazen import EmbeddingModel, Memory, MemoryBackend


class DictBackend(MemoryBackend):
    """In-process dict backed memory backend."""

    def __init__(self):
        super().__init__()
        self._store: dict[str, dict] = {}

    async def put(self, entry):
        self._store[entry["id"]] = entry

    async def get(self, entry_id):
        return self._store.get(entry_id)

    async def delete(self, entry_id):
        return self._store.pop(entry_id, None) is not None

    async def list(self):
        return list(self._store.values())

    async def len(self):
        return len(self._store)

    async def search_by_bands(self, bands, limit):
        # Return any entry that shares at least one LSH band with the query.
        band_set = set(bands)
        hits = [
            e for e in self._store.values()
            if band_set.intersection(e.get("bands", []))
        ]
        return hits[:limit]


async def main():
    embedder = EmbeddingModel.local()
    memory = Memory(embedder, DictBackend())

    await memory.add("fact-1", "Rust has ownership and borrowing.")
    await memory.add("fact-2", "Python uses reference counting.")

    results = await memory.search("memory management", limit=2)
    for r in results:
        print(r.score, r.text)


asyncio.run(main())
python examples/custom_memory_backend.py

Example 9: ModelManager with Memory Budgets

ModelManager is the unified registry — register local models and remote providers by name, then dispatch with complete(id, messages) (also stream(id, messages, on_chunk) and get(id)). Local models additionally participate in memory budgeting across separate pools for CPU RAM and GPU VRAM: register each with an estimated footprint, and when loading a new one would exceed its pool’s budget, the least-recently-used model in the same pool is automatically unloaded. Remote providers own no local weights, so they dispatch straight through and never count against a budget.

import asyncio

from blazen import (
    ChatMessage,
    Model,
    MistralRsOptions,
    ModelManager,
    OpenAiProvider,
    ProviderOptions,
)


async def main():
    # 64 GB of CPU RAM, 24 GB of GPU VRAM (GPU-typical for a consumer card).
    manager = ModelManager(cpu_ram_gb=64, gpu_vram_gb=24)

    # Remote provider: dispatch-only, no footprint (pass 0 / omit estimate).
    await manager.register(
        "gpt", OpenAiProvider(options=ProviderOptions(api_key="sk-...")),
    )

    llama_8b = Model.mistralrs(
        options=MistralRsOptions("meta-llama/Llama-3.1-8B-Instruct"),
    )
    qwen_14b = Model.mistralrs(
        options=MistralRsOptions("Qwen/Qwen2.5-14B-Instruct"),
    )
    mistral_24b = Model.mistralrs(
        options=MistralRsOptions("mistralai/Mistral-Small-24B"),
    )

    await manager.register("llama-8b", llama_8b, memory_estimate_bytes=8 * 1024**3)
    await manager.register("qwen-14b", qwen_14b, memory_estimate_bytes=14 * 1024**3)
    await manager.register("mistral-24b", mistral_24b, memory_estimate_bytes=20 * 1024**3)

    # Dispatch by name — same call shape for the remote and local entries.
    remote = await manager.complete("gpt", [ChatMessage.user("Hello from the cloud!")])
    print(remote.content)

    # Fits alongside qwen-14b in the GPU pool (8 + 14 = 22 GB).
    await manager.load("llama-8b")
    await manager.load("qwen-14b")

    # 20 GB does not fit next to 8 + 14 = 22 GB in the GPU pool -- LRU
    # same-pool model (llama-8b) is evicted.
    await manager.load("mistral-24b")

    for s in await manager.status():
        print(
            f"{s.id}: loaded={s.loaded}, pool={s.pool}, "
            f"memory={s.memory_estimate_bytes:,} bytes"
        )


asyncio.run(main())
python examples/model_manager_budget.py

Example 10: Pricing Registration and Cost Tracking

Register pricing for any model ID (your own model, a local finetune, a custom deployment). Every ModelResponse then carries a .cost field computed from the registered rate.

import asyncio

from blazen import (
    ChatMessage,
    Model,
    ModelPricing,
    lookup_pricing,
    register_pricing,
)


class MyFinetune(Model):
    """A stand-in for a custom deployment that reports its usage."""

    def __init__(self):
        super().__init__(model_id="my-finetuned-model")

    async def complete(self, messages, options=None):
        return {
            "content": "Rust is a systems language with memory safety without GC.",
            "model": self.model_id,
            "tool_calls": [],
            "citations": [],
            "artifacts": [],
            "images": [],
            "audio": [],
            "videos": [],
            "usage": {
                "prompt_tokens": 150,
                "completion_tokens": 80,
                "total_tokens": 230,
            },
            "metadata": {},
        }


async def main():
    # Register pricing once, globally, for any model ID.
    register_pricing(
        "my-finetuned-model",
        ModelPricing(input_per_million=1.0, output_per_million=2.0),
    )

    # Readback -- pricing is centrally stored.
    pricing = lookup_pricing("my-finetuned-model")
    assert pricing is not None
    print(f"input: ${pricing.input_per_million}/M, output: ${pricing.output_per_million}/M")

    model = MyFinetune()
    response = await model.complete([ChatMessage.user("Summarize Rust in one line.")])
    print(response.content)
    print(f"usage: {response.usage}")
    # cost is computed from the registered pricing + usage.
    print(f"cost: ${response.cost:.6f}")


asyncio.run(main())
python examples/pricing_and_cost.py

Example 11: Per-Capability Provider (Custom TTS)

Subclass TTSProvider to plug in any TTS backend (ElevenLabs, Coqui, a local model). The per-capability base classes (TTSProvider, ImageProvider, VideoProvider, MusicProvider, ThreeDProvider, BackgroundRemovalProvider, VoiceProvider) exist for users who only need to implement one capability.

import asyncio

from blazen import TTSProvider, SpeechRequest


class MyElevenLabs(TTSProvider):
    """A minimal custom TTS provider."""

    def __init__(self, api_key: str):
        super().__init__(
            provider_id="elevenlabs",
            base_url="https://api.elevenlabs.io/v1",
        )
        self._api_key = api_key

    async def text_to_speech(self, request):
        # In a real implementation, make an HTTP call with self._api_key
        # and return audio bytes. Here we just echo the request.
        return {
            "audio_data": b"<wav-bytes>",
            "format": "wav",
            "voice": request.voice,
            "text": request.text,
        }


async def main():
    tts = MyElevenLabs(api_key="sk-...")
    result = await tts.text_to_speech(
        SpeechRequest(text="Hello from Blazen!", voice="alice")
    )
    print(result["format"], len(result["audio_data"]), "bytes")


asyncio.run(main())
python examples/custom_tts_provider.py

Example 12: Typed Error Handling

Blazen exposes a typed exception hierarchy rooted at BlazenError. ProviderError carries structured HTTP context (provider, status, endpoint, request_id, detail, raw_body, retry_after_ms) so callers can branch on the failure mode instead of regex-matching error strings.

import asyncio

from blazen import (
    BlazenError,
    ChatMessage,
    OpenAiProvider,
    ProviderError,
    ProviderOptions,
    RateLimitError,
)


async def main():
    model = OpenAiProvider(options=ProviderOptions(api_key="sk-..."))
    try:
        response = await model.complete([ChatMessage.user("Hello")])
        print(response.content)
    except RateLimitError:
        print("rate limited; backing off")
    except ProviderError as e:
        print(f"provider {e.provider} returned {e.status}: {e.detail}")
    except BlazenError as e:
        print(f"blazen error: {e}")


asyncio.run(main())
python examples/typed_error_handling.py

Example 13: Custom Progress Reporting via ProgressCallback

Subclass ProgressCallback to receive download progress notifications from ModelCache.download(). The same instance can be reused across multiple downloads — the cache calls on_progress(downloaded, total) repeatedly as bytes arrive.

import asyncio

from blazen import ModelCache, ProgressCallback


class TerminalProgress(ProgressCallback):
    def on_progress(self, downloaded: int, total: int | None) -> None:
        if total:
            pct = downloaded / total * 100
            print(f"\r{pct:.1f}% ({downloaded}/{total})", end="", flush=True)
        else:
            print(f"\r{downloaded} bytes", end="", flush=True)


async def main():
    cache = ModelCache()
    path = await cache.download(
        "mistralai/Mistral-7B-Instruct-v0.3",
        "config.json",
        TerminalProgress(),
    )
    print(f"\ndownloaded to {path}")


asyncio.run(main())
python examples/progress_callback.py

Example 14: Telemetry — Langfuse Trace Export

Wire Blazen into Langfuse with a single init_langfuse() call. Once configured, every workflow run, agent step, and LLM call ships traces to Langfuse in the background. Tune batch_size and flush_interval_ms to balance latency against API request volume.

import os

from blazen import LangfuseConfig, init_langfuse


init_langfuse(
    LangfuseConfig(
        public_key=os.environ["LANGFUSE_PUBLIC_KEY"],
        secret_key=os.environ["LANGFUSE_SECRET_KEY"],
        host="https://cloud.langfuse.com",
        batch_size=100,
        flush_interval_ms=5000,
    )
)

# All subsequent workflow runs ship traces to Langfuse.
python examples/langfuse_telemetry.py

Example 15: Always-on chat bot

Bot is a persistent, event-driven agent. Build it once, subscribe to responses(), then send() messages over time — each reply arrives on the async stream. Build with await Bot.builder(model, ...).build(), drive it with the synchronous send() / shutdown(), and consume replies with async for.

import asyncio

from blazen import Bot, Model


async def main():
    bot = await Bot.builder(
        Model.openai(),
        system_prompt="You are a terse assistant. Answer in one sentence.",
    ).build()

    # Subscribe *before* sending so the first reply isn't missed.
    stream = bot.responses()

    async def consume():
        # Print each reply as it arrives on the bot's event loop.
        async for reply in stream:
            print("bot:", reply)

    reader = asyncio.create_task(consume())

    # send() is non-blocking: the turn runs on the bot's loop and the
    # reply shows up on the responses() stream.
    bot.send("What is the capital of France?")
    await asyncio.sleep(2)
    bot.send("And its population, roughly?")
    await asyncio.sleep(2)

    # Tear down: the responses() stream closes and the reader task ends.
    bot.shutdown()
    await reader


asyncio.run(main())
python examples/always_on_bot.py