Rust Examples

Complete runnable Rust examples for Blazen

Rust Examples

Four complete, runnable examples that demonstrate core Blazen workflow patterns.


Basic Workflow

A 3-step sequential pipeline: StartEventGreetEventStopEvent.

#[derive(Debug, Clone, Serialize, Deserialize, Event)]
struct GreetEvent { name: String }

#[step]
async fn parse_input(event: StartEvent, _ctx: Context) -> Result<GreetEvent, WorkflowError> {
    Ok(GreetEvent { name: event.data["name"].as_str().unwrap_or("World").to_string() })
}
cargo run -p blazen --example basic_workflow

Streaming Workflow

Publishes progress events while processing, observable via stream_events().

ctx.write_event_to_stream(ProgressEvent { step: i, message: format!("Step {}", i) });
cargo run -p blazen --example streaming_workflow

Branching Workflow

Conditional routing based on sentiment analysis using #[step(emits = [...])].

#[step(emits = [PositiveEvent, NegativeEvent])]
async fn classify(event: AnalyzeEvent, _ctx: Context) -> Result<StepOutput, WorkflowError> {
    // route to PositiveEvent or NegativeEvent based on sentiment
}
cargo run -p blazen --example branching_workflow

LLM RAG Workflow

Multi-step RAG pipeline using context for shared state between steps.

// Typed JSON via set/get
ctx.set("documents", serde_json::json!(docs));
let docs = ctx.get("documents").unwrap();

// Direct StateValue access for cross-language or binary data
ctx.set_value("embeddings", StateValue::Bytes(embedding_bytes.into()));
cargo run -p blazen --example llm_rag_workflow

Custom CompletionModel (trait impl)

In Rust, custom providers are built by implementing the CompletionModel trait. The trait-impl is a first-class citizen — it works with run_agent, with_retry, with_cache, and every other helper.

use async_trait::async_trait;
use futures::stream::{self, Stream};
use std::pin::Pin;

use blazen_llm::{
    BlazenError, CompletionRequest, CompletionResponse, StreamChunk,
    traits::CompletionModel,
    types::Role,
};

struct EchoLLM;

#[async_trait]
impl CompletionModel for EchoLLM {
    fn model_id(&self) -> &str {
        "echo-llm"
    }

    async fn complete(
        &self,
        request: CompletionRequest,
    ) -> Result<CompletionResponse, BlazenError> {
        let last = request
            .messages
            .iter()
            .rev()
            .find(|m| m.role == Role::User)
            .and_then(|m| m.content.text_content())
            .unwrap_or_default();

        Ok(CompletionResponse {
            content: Some(format!("echo: {last}")),
            tool_calls: Vec::new(),
            reasoning: None,
            citations: Vec::new(),
            artifacts: Vec::new(),
            usage: None,
            model: self.model_id().to_string(),
            finish_reason: Some("stop".to_string()),
            cost: None,
            timing: None,
            images: Vec::new(),
            audio: Vec::new(),
            videos: Vec::new(),
            metadata: serde_json::Value::Null,
        })
    }

    async fn stream(
        &self,
        request: CompletionRequest,
    ) -> Result<
        Pin<Box<dyn Stream<Item = Result<StreamChunk, BlazenError>> + Send>>,
        BlazenError,
    > {
        let response = self.complete(request).await?;
        let content = response.content.unwrap_or_default();
        let chunks: Vec<Result<StreamChunk, BlazenError>> = content
            .split(' ')
            .map(|word| {
                Ok(StreamChunk {
                    delta: Some(format!("{word} ")),
                    ..Default::default()
                })
            })
            .collect();
        Ok(Box::pin(stream::iter(chunks)))
    }
}
cargo run -p blazen --example custom_completion_model

Custom MemoryBackend (trait impl)

Implement the MemoryBackend trait from blazen-memory to plug in any storage layer (Postgres, SQLite, DynamoDB, a DashMap). The reference InMemoryBackend is already provided; this example shows the pattern for a custom one.

use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use tokio::sync::RwLock;

use blazen_memory::{Memory, MemoryBackend, MemoryError, StoredEntry};

struct DictBackend {
    store: RwLock<HashMap<String, StoredEntry>>,
}

impl DictBackend {
    fn new() -> Self {
        Self {
            store: RwLock::new(HashMap::new()),
        }
    }
}

#[async_trait]
impl MemoryBackend for DictBackend {
    async fn put(&self, entry: StoredEntry) -> Result<(), MemoryError> {
        self.store.write().await.insert(entry.id.clone(), entry);
        Ok(())
    }

    async fn get(&self, id: &str) -> Result<Option<StoredEntry>, MemoryError> {
        Ok(self.store.read().await.get(id).cloned())
    }

    async fn delete(&self, id: &str) -> Result<bool, MemoryError> {
        Ok(self.store.write().await.remove(id).is_some())
    }

    async fn list(&self) -> Result<Vec<StoredEntry>, MemoryError> {
        Ok(self.store.read().await.values().cloned().collect())
    }

    async fn len(&self) -> Result<usize, MemoryError> {
        Ok(self.store.read().await.len())
    }

    async fn search_by_bands(
        &self,
        bands: &[String],
        limit: usize,
    ) -> Result<Vec<StoredEntry>, MemoryError> {
        let set: std::collections::HashSet<_> = bands.iter().cloned().collect();
        Ok(self
            .store
            .read()
            .await
            .values()
            .filter(|e| e.bands.iter().any(|b| set.contains(b)))
            .take(limit)
            .cloned()
            .collect())
    }
}

// Usage -- the custom backend is a drop-in for the built-in ones:
// let embedder = Arc::new(
//     blazen_embed::EmbedModel::from_options(blazen_embed::EmbedOptions::default()).await?,
// ) as Arc<dyn blazen_llm::EmbeddingModel>;
// let memory = Memory::new(embedder, DictBackend::new());
cargo run -p blazen --example custom_memory_backend

ModelManager with Memory Budgets

The blazen-manager crate tracks per-pool memory budgets across multiple local models and runs LRU eviction within each pool when loading would exceed that pool’s budget.

use std::sync::Arc;

use blazen_manager::ModelManager;
use blazen_llm::{LocalModel, Pool};

// Replace with your local model constructors (mistral.rs, llama.cpp, candle).
async fn load_local_model(_id: &str) -> Arc<dyn LocalModel> {
    unimplemented!("construct your local model here")
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // CPU RAM + GPU VRAM (GPU-typical for a consumer card with a roomy host).
    let manager = ModelManager::with_budgets_gb(64.0, 24.0);

    let llama_8b = load_local_model("llama-8b").await;
    let qwen_14b = load_local_model("qwen-14b").await;
    let mistral_24b = load_local_model("mistral-24b").await;

    manager.register("llama-8b", llama_8b, 8 * 1024 * 1024 * 1024).await;
    manager.register("qwen-14b", qwen_14b, 14 * 1024 * 1024 * 1024).await;
    manager.register("mistral-24b", mistral_24b, 20 * 1024 * 1024 * 1024).await;

    // Fits alongside qwen-14b (8 + 14 = 22 GB).
    manager.load("llama-8b").await?;
    manager.load("qwen-14b").await?;

    // 20 GB does not fit next to 8 + 14 = 22 GB -- LRU (llama-8b) is evicted from its pool.
    manager.load("mistral-24b").await?;

    for s in manager.status().await {
        println!(
            "{}: loaded={}, pool={}, memory={} bytes",
            s.id, s.loaded, s.pool, s.memory_estimate_bytes
        );
    }
    Ok(())
}
cargo run -p blazen --example model_manager_budget

Pricing Registration and Cost Tracking

Register pricing for any model ID (your own model, a local finetune, a custom deployment). Every CompletionResponse carries a .cost field computed from the registered rate.

use blazen_llm::{
    ChatMessage, PricingEntry,
    providers::openai_compat::{AuthMethod, OpenAiCompatConfig, OpenAiCompatProvider},
    register_pricing, traits::CompletionModel, types::CompletionRequest,
};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Register pricing once, globally, for any model ID.
    register_pricing(
        "my-finetuned-model",
        PricingEntry {
            input_per_million: 1.0,
            output_per_million: 2.0,
        },
    );

    // Point a provider at your deployment using the registered ID.
    let model = OpenAiCompatProvider::new(OpenAiCompatConfig {
        provider_name: "local".to_string(),
        base_url: "http://localhost:8080/v1".to_string(),
        api_key: "local".to_string(),
        default_model: "my-finetuned-model".to_string(),
        auth_method: AuthMethod::Bearer,
        extra_headers: Vec::new(),
        query_params: Vec::new(),
        supports_model_listing: true,
    });

    let request = CompletionRequest::new(vec![
        ChatMessage::user("Summarize Rust in one line."),
    ]);

    let response = model.complete(request).await?;
    println!("{}", response.content.unwrap_or_default());
    println!("usage: {:?}", response.usage);
    if let Some(cost) = response.cost {
        println!("cost: ${cost:.6}"); // computed from registered pricing
    }
    Ok(())
}
cargo run -p blazen --example pricing_and_cost

Custom TTS Provider (AudioGeneration trait)

For Rust, per-capability custom providers are built by implementing the capability trait from blazen-llm::compute (e.g. AudioGeneration, ImageGeneration, VideoGeneration). Every capability trait extends ComputeProvider, so you implement both.

use async_trait::async_trait;

use blazen_llm::{
    BlazenError, GeneratedAudio, MediaOutput, MediaType, RequestTiming,
    compute::{
        AudioGeneration, AudioResult, ComputeProvider, ComputeRequest, ComputeResult,
        JobHandle, JobStatus, SpeechRequest,
    },
};

struct MyElevenLabs {
    api_key: String,
}

#[async_trait]
impl ComputeProvider for MyElevenLabs {
    fn provider_id(&self) -> &str {
        "elevenlabs"
    }

    // TTS is synchronous -- we don't use the submit/status/result flow.
    // Mark those endpoints as unsupported so callers can't accidentally
    // queue jobs.
    async fn submit(&self, _r: ComputeRequest) -> Result<JobHandle, BlazenError> {
        Err(BlazenError::unsupported("use text_to_speech() directly"))
    }
    async fn status(&self, _j: &JobHandle) -> Result<JobStatus, BlazenError> {
        Err(BlazenError::unsupported("no job queue"))
    }
    async fn result(&self, _j: JobHandle) -> Result<ComputeResult, BlazenError> {
        Err(BlazenError::unsupported("no job queue"))
    }
    async fn cancel(&self, _j: &JobHandle) -> Result<(), BlazenError> {
        Err(BlazenError::unsupported("no job queue"))
    }
}

#[async_trait]
impl AudioGeneration for MyElevenLabs {
    async fn text_to_speech(
        &self,
        request: SpeechRequest,
    ) -> Result<AudioResult, BlazenError> {
        // In a real implementation, make an HTTP call with self.api_key.
        let _api_key = &self.api_key;
        Ok(AudioResult {
            audio: vec![GeneratedAudio {
                media: MediaOutput::from_base64("AAEC", MediaType::Wav),
                duration_seconds: None,
                sample_rate: Some(44_100),
                channels: Some(1),
            }],
            timing: RequestTiming::default(),
            cost: None,
            metadata: serde_json::json!({
                "voice": request.voice,
                "text": request.text,
            }),
        })
    }

    // generate_music / generate_sfx default to BlazenError::Unsupported.
}
cargo run -p blazen --example custom_tts_provider

Langfuse Exporter

The blazen-telemetry crate ships a tracing_subscriber::Layer that maps Blazen’s workflow.run, workflow.step, and llm.complete spans to Langfuse traces, spans, and generations. init_langfuse returns the LangfuseLayer — you compose it into the registry yourself, which lets you stack it alongside fmt, EnvFilter, or other exporters.

LangfuseConfig::new(public_key, secret_key) defaults to the cloud.langfuse.com host, batch size 100, and a 5 s flush interval. Override any of those with the chained builders.

use blazen_telemetry::{LangfuseConfig, init_langfuse};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let cfg = LangfuseConfig::new(
        std::env::var("LANGFUSE_PUBLIC_KEY")?,
        std::env::var("LANGFUSE_SECRET_KEY")?,
    )
    .with_host("https://cloud.langfuse.com")
    .with_batch_size(100)
    .with_flush_interval_ms(5000);

    let layer = init_langfuse(cfg)?;
    tracing_subscriber::registry().with(layer).init();

    // ... your workflow code ...
    Ok(())
}
cargo run -p blazen --example langfuse_exporter

OTLP HTTP Exporter

init_otlp_http is the wasm-compatible variant of init_otlp (which uses gRPC + tonic). It is gated behind the otlp-http Cargo feature and builds the OpenTelemetry HTTP/protobuf span exporter with a target-appropriate HttpClient (reqwest on native, web_sys::fetch on wasm32). Unlike init_langfuse, this function installs the global tracing subscriber internally — you do not need to call tracing_subscriber::registry().init() yourself.

OtlpConfig is a plain struct — there is no ::new() constructor; populate endpoint and service_name directly. For HTTP, point endpoint at the /v1/traces path on your collector.

use blazen_telemetry::{OtlpConfig, init_otlp_http};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let cfg = OtlpConfig {
        endpoint: "https://otel-collector.example.com:4318/v1/traces".to_string(),
        service_name: "my-service".to_string(),
    };
    init_otlp_http(cfg)?;

    // ... workflow code ...
    Ok(())
}
cargo run -p blazen --example otlp_http_exporter