Rust Examples
Complete runnable Rust examples for Blazen
Rust Examples
Four complete, runnable examples that demonstrate core Blazen workflow patterns.
Basic Workflow
A 3-step sequential pipeline: StartEvent → GreetEvent → StopEvent.
#[derive(Debug, Clone, Serialize, Deserialize, Event)]
struct GreetEvent { name: String }
#[step]
async fn parse_input(event: StartEvent, _ctx: Context) -> Result<GreetEvent, WorkflowError> {
Ok(GreetEvent { name: event.data["name"].as_str().unwrap_or("World").to_string() })
}
cargo run -p blazen --example basic_workflow
Streaming Workflow
Publishes progress events while processing, observable via stream_events().
ctx.write_event_to_stream(ProgressEvent { step: i, message: format!("Step {}", i) });
cargo run -p blazen --example streaming_workflow
Branching Workflow
Conditional routing based on sentiment analysis using #[step(emits = [...])].
#[step(emits = [PositiveEvent, NegativeEvent])]
async fn classify(event: AnalyzeEvent, _ctx: Context) -> Result<StepOutput, WorkflowError> {
// route to PositiveEvent or NegativeEvent based on sentiment
}
cargo run -p blazen --example branching_workflow
LLM RAG Workflow
Multi-step RAG pipeline using context for shared state between steps.
// Typed JSON via set/get
ctx.set("documents", serde_json::json!(docs));
let docs = ctx.get("documents").unwrap();
// Direct StateValue access for cross-language or binary data
ctx.set_value("embeddings", StateValue::Bytes(embedding_bytes.into()));
cargo run -p blazen --example llm_rag_workflow
Custom CompletionModel (trait impl)
In Rust, custom providers are built by implementing the CompletionModel trait. The trait-impl is a first-class citizen — it works with run_agent, with_retry, with_cache, and every other helper.
use async_trait::async_trait;
use futures::stream::{self, Stream};
use std::pin::Pin;
use blazen_llm::{
BlazenError, CompletionRequest, CompletionResponse, StreamChunk,
traits::CompletionModel,
types::Role,
};
struct EchoLLM;
#[async_trait]
impl CompletionModel for EchoLLM {
fn model_id(&self) -> &str {
"echo-llm"
}
async fn complete(
&self,
request: CompletionRequest,
) -> Result<CompletionResponse, BlazenError> {
let last = request
.messages
.iter()
.rev()
.find(|m| m.role == Role::User)
.and_then(|m| m.content.text_content())
.unwrap_or_default();
Ok(CompletionResponse {
content: Some(format!("echo: {last}")),
tool_calls: Vec::new(),
reasoning: None,
citations: Vec::new(),
artifacts: Vec::new(),
usage: None,
model: self.model_id().to_string(),
finish_reason: Some("stop".to_string()),
cost: None,
timing: None,
images: Vec::new(),
audio: Vec::new(),
videos: Vec::new(),
metadata: serde_json::Value::Null,
})
}
async fn stream(
&self,
request: CompletionRequest,
) -> Result<
Pin<Box<dyn Stream<Item = Result<StreamChunk, BlazenError>> + Send>>,
BlazenError,
> {
let response = self.complete(request).await?;
let content = response.content.unwrap_or_default();
let chunks: Vec<Result<StreamChunk, BlazenError>> = content
.split(' ')
.map(|word| {
Ok(StreamChunk {
delta: Some(format!("{word} ")),
..Default::default()
})
})
.collect();
Ok(Box::pin(stream::iter(chunks)))
}
}
cargo run -p blazen --example custom_completion_model
Custom MemoryBackend (trait impl)
Implement the MemoryBackend trait from blazen-memory to plug in any storage layer (Postgres, SQLite, DynamoDB, a DashMap). The reference InMemoryBackend is already provided; this example shows the pattern for a custom one.
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::RwLock;
use blazen_memory::{Memory, MemoryBackend, MemoryError, StoredEntry};
struct DictBackend {
store: RwLock<HashMap<String, StoredEntry>>,
}
impl DictBackend {
fn new() -> Self {
Self {
store: RwLock::new(HashMap::new()),
}
}
}
#[async_trait]
impl MemoryBackend for DictBackend {
async fn put(&self, entry: StoredEntry) -> Result<(), MemoryError> {
self.store.write().await.insert(entry.id.clone(), entry);
Ok(())
}
async fn get(&self, id: &str) -> Result<Option<StoredEntry>, MemoryError> {
Ok(self.store.read().await.get(id).cloned())
}
async fn delete(&self, id: &str) -> Result<bool, MemoryError> {
Ok(self.store.write().await.remove(id).is_some())
}
async fn list(&self) -> Result<Vec<StoredEntry>, MemoryError> {
Ok(self.store.read().await.values().cloned().collect())
}
async fn len(&self) -> Result<usize, MemoryError> {
Ok(self.store.read().await.len())
}
async fn search_by_bands(
&self,
bands: &[String],
limit: usize,
) -> Result<Vec<StoredEntry>, MemoryError> {
let set: std::collections::HashSet<_> = bands.iter().cloned().collect();
Ok(self
.store
.read()
.await
.values()
.filter(|e| e.bands.iter().any(|b| set.contains(b)))
.take(limit)
.cloned()
.collect())
}
}
// Usage -- the custom backend is a drop-in for the built-in ones:
// let embedder = Arc::new(
// blazen_embed::EmbedModel::from_options(blazen_embed::EmbedOptions::default()).await?,
// ) as Arc<dyn blazen_llm::EmbeddingModel>;
// let memory = Memory::new(embedder, DictBackend::new());
cargo run -p blazen --example custom_memory_backend
ModelManager with Memory Budgets
The blazen-manager crate tracks per-pool memory budgets across multiple local models and runs LRU eviction within each pool when loading would exceed that pool’s budget.
use std::sync::Arc;
use blazen_manager::ModelManager;
use blazen_llm::{LocalModel, Pool};
// Replace with your local model constructors (mistral.rs, llama.cpp, candle).
async fn load_local_model(_id: &str) -> Arc<dyn LocalModel> {
unimplemented!("construct your local model here")
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// CPU RAM + GPU VRAM (GPU-typical for a consumer card with a roomy host).
let manager = ModelManager::with_budgets_gb(64.0, 24.0);
let llama_8b = load_local_model("llama-8b").await;
let qwen_14b = load_local_model("qwen-14b").await;
let mistral_24b = load_local_model("mistral-24b").await;
manager.register("llama-8b", llama_8b, 8 * 1024 * 1024 * 1024).await;
manager.register("qwen-14b", qwen_14b, 14 * 1024 * 1024 * 1024).await;
manager.register("mistral-24b", mistral_24b, 20 * 1024 * 1024 * 1024).await;
// Fits alongside qwen-14b (8 + 14 = 22 GB).
manager.load("llama-8b").await?;
manager.load("qwen-14b").await?;
// 20 GB does not fit next to 8 + 14 = 22 GB -- LRU (llama-8b) is evicted from its pool.
manager.load("mistral-24b").await?;
for s in manager.status().await {
println!(
"{}: loaded={}, pool={}, memory={} bytes",
s.id, s.loaded, s.pool, s.memory_estimate_bytes
);
}
Ok(())
}
cargo run -p blazen --example model_manager_budget
Pricing Registration and Cost Tracking
Register pricing for any model ID (your own model, a local finetune, a custom deployment). Every CompletionResponse carries a .cost field computed from the registered rate.
use blazen_llm::{
ChatMessage, PricingEntry,
providers::openai_compat::{AuthMethod, OpenAiCompatConfig, OpenAiCompatProvider},
register_pricing, traits::CompletionModel, types::CompletionRequest,
};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Register pricing once, globally, for any model ID.
register_pricing(
"my-finetuned-model",
PricingEntry {
input_per_million: 1.0,
output_per_million: 2.0,
},
);
// Point a provider at your deployment using the registered ID.
let model = OpenAiCompatProvider::new(OpenAiCompatConfig {
provider_name: "local".to_string(),
base_url: "http://localhost:8080/v1".to_string(),
api_key: "local".to_string(),
default_model: "my-finetuned-model".to_string(),
auth_method: AuthMethod::Bearer,
extra_headers: Vec::new(),
query_params: Vec::new(),
supports_model_listing: true,
});
let request = CompletionRequest::new(vec![
ChatMessage::user("Summarize Rust in one line."),
]);
let response = model.complete(request).await?;
println!("{}", response.content.unwrap_or_default());
println!("usage: {:?}", response.usage);
if let Some(cost) = response.cost {
println!("cost: ${cost:.6}"); // computed from registered pricing
}
Ok(())
}
cargo run -p blazen --example pricing_and_cost
Custom TTS Provider (AudioGeneration trait)
For Rust, per-capability custom providers are built by implementing the capability trait from blazen-llm::compute (e.g. AudioGeneration, ImageGeneration, VideoGeneration). Every capability trait extends ComputeProvider, so you implement both.
use async_trait::async_trait;
use blazen_llm::{
BlazenError, GeneratedAudio, MediaOutput, MediaType, RequestTiming,
compute::{
AudioGeneration, AudioResult, ComputeProvider, ComputeRequest, ComputeResult,
JobHandle, JobStatus, SpeechRequest,
},
};
struct MyElevenLabs {
api_key: String,
}
#[async_trait]
impl ComputeProvider for MyElevenLabs {
fn provider_id(&self) -> &str {
"elevenlabs"
}
// TTS is synchronous -- we don't use the submit/status/result flow.
// Mark those endpoints as unsupported so callers can't accidentally
// queue jobs.
async fn submit(&self, _r: ComputeRequest) -> Result<JobHandle, BlazenError> {
Err(BlazenError::unsupported("use text_to_speech() directly"))
}
async fn status(&self, _j: &JobHandle) -> Result<JobStatus, BlazenError> {
Err(BlazenError::unsupported("no job queue"))
}
async fn result(&self, _j: JobHandle) -> Result<ComputeResult, BlazenError> {
Err(BlazenError::unsupported("no job queue"))
}
async fn cancel(&self, _j: &JobHandle) -> Result<(), BlazenError> {
Err(BlazenError::unsupported("no job queue"))
}
}
#[async_trait]
impl AudioGeneration for MyElevenLabs {
async fn text_to_speech(
&self,
request: SpeechRequest,
) -> Result<AudioResult, BlazenError> {
// In a real implementation, make an HTTP call with self.api_key.
let _api_key = &self.api_key;
Ok(AudioResult {
audio: vec![GeneratedAudio {
media: MediaOutput::from_base64("AAEC", MediaType::Wav),
duration_seconds: None,
sample_rate: Some(44_100),
channels: Some(1),
}],
timing: RequestTiming::default(),
cost: None,
metadata: serde_json::json!({
"voice": request.voice,
"text": request.text,
}),
})
}
// generate_music / generate_sfx default to BlazenError::Unsupported.
}
cargo run -p blazen --example custom_tts_provider
Langfuse Exporter
The blazen-telemetry crate ships a tracing_subscriber::Layer that maps Blazen’s workflow.run, workflow.step, and llm.complete spans to Langfuse traces, spans, and generations. init_langfuse returns the LangfuseLayer — you compose it into the registry yourself, which lets you stack it alongside fmt, EnvFilter, or other exporters.
LangfuseConfig::new(public_key, secret_key) defaults to the cloud.langfuse.com host, batch size 100, and a 5 s flush interval. Override any of those with the chained builders.
use blazen_telemetry::{LangfuseConfig, init_langfuse};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cfg = LangfuseConfig::new(
std::env::var("LANGFUSE_PUBLIC_KEY")?,
std::env::var("LANGFUSE_SECRET_KEY")?,
)
.with_host("https://cloud.langfuse.com")
.with_batch_size(100)
.with_flush_interval_ms(5000);
let layer = init_langfuse(cfg)?;
tracing_subscriber::registry().with(layer).init();
// ... your workflow code ...
Ok(())
}
cargo run -p blazen --example langfuse_exporter
OTLP HTTP Exporter
init_otlp_http is the wasm-compatible variant of init_otlp (which uses gRPC + tonic). It is gated behind the otlp-http Cargo feature and builds the OpenTelemetry HTTP/protobuf span exporter with a target-appropriate HttpClient (reqwest on native, web_sys::fetch on wasm32). Unlike init_langfuse, this function installs the global tracing subscriber internally — you do not need to call tracing_subscriber::registry().init() yourself.
OtlpConfig is a plain struct — there is no ::new() constructor; populate endpoint and service_name directly. For HTTP, point endpoint at the /v1/traces path on your collector.
use blazen_telemetry::{OtlpConfig, init_otlp_http};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cfg = OtlpConfig {
endpoint: "https://otel-collector.example.com:4318/v1/traces".to_string(),
service_name: "my-service".to_string(),
};
init_otlp_http(cfg)?;
// ... workflow code ...
Ok(())
}
cargo run -p blazen --example otlp_http_exporter