Node.js Examples
Complete runnable Node.js examples for Blazen
Node.js Examples
Six complete, runnable examples that demonstrate core Blazen workflow patterns.
Basic Workflow
A 3-step sequential pipeline: StartEvent → GreetEvent → FormattedEvent → StopEvent.
wf.addStep("greet", ["GreetEvent"], async (event, ctx) => {
return { type: "blazen::StopEvent", result: { greeting: `Hello, ${event.name}!` } };
});
npx tsx examples/basic_workflow.ts
Streaming Workflow
Publishes progress events while processing via ctx.writeEventToStream().
await ctx.writeEventToStream({ type: "Progress", step: i });
const result = await wf.runStreaming({}, (event) => console.log(event));
npx tsx examples/streaming_workflow.ts
Branching Workflow
Conditional fan-out by returning an array of events.
return [
{ type: "PositiveEvent", text },
{ type: "NegativeEvent", text },
];
npx tsx examples/branching_workflow.ts
LLM RAG Workflow
Multi-step RAG pipeline using context for shared state between steps. Uses typed ChatMessage and ModelResponse:
import { OpenAiProvider, ChatMessage } from "blazen";
// Construct the provider directly. With no apiKey, it reads OPENAI_API_KEY.
const model = OpenAiProvider.create({ apiKey: "sk-..." });
const response = await model.complete([
ChatMessage.system("Answer based on the provided documents."),
ChatMessage.user(query),
]);
console.log(response.content); // typed string
console.log(response.usage); // { promptTokens, completionTokens, totalTokens }
await ctx.set("documents", docs);
npx tsx examples/llm_rag_workflow.ts
Human-in-the-Loop
Side-effect steps that pause for external input via ctx.sendEvent().
await ctx.sendEvent({ type: "ReviewComplete" });
return null;
npx tsx examples/human_in_the_loop.ts
Stateful Workflow
Demonstrates the two explicit context namespaces on the Node bindings:
ctx.state— persistable values (survivespause()/resume()).ctx.session— in-process-only values (excluded from snapshots).
Important: On the Node binding, ctx.session values are routed through serde_json::Value — JS object identity is NOT preserved across session.get(key). This is a napi-rs threading limitation (Reference<T> is !Send because its Drop must run on the v8 main thread). For true identity preservation of live JS objects, use the Python or WASM bindings.
const wf = new Workflow("stateful-example");
wf.addStep("setup", ["blazen::StartEvent"], async (event, ctx) => {
await ctx.state.set("counter", 5); // persistable
await ctx.session.set("reqId", "abc123"); // in-process only
return { type: "blazen::StopEvent", result: {} };
});
npx tsx examples/stateful_workflow.ts
Example 7: Subclassing Model (Custom Provider)
Extend Model in TypeScript/JavaScript to build a custom provider. Override complete() and/or stream() with any backend — local inference, a proxy, a mock, etc. Subclass instances work with runAgent, withRetry(), withCache(), and every other helper.
import {
ChatMessage,
Model,
runAgent,
type ModelResponse,
} from "blazen";
class EchoLLM extends Model {
constructor() {
super({ modelId: "echo-llm", contextLength: 4096 });
}
async complete(messages: ChatMessage[]): Promise<ModelResponse> {
const last = [...messages].reverse().find((m) => m.role === "user");
return {
content: `echo: ${last?.content ?? ""}`,
model: this.modelId,
toolCalls: [],
} as ModelResponse;
}
// Override stream() when you want incremental output.
async *stream(messages: ChatMessage[]) {
const last = [...messages].reverse().find((m) => m.role === "user");
for (const word of `echo: ${last?.content ?? ""}`.split(" ")) {
yield { delta: word + " " };
}
}
}
const model = new EchoLLM();
const result = await runAgent(
model,
[ChatMessage.user("hello world")],
[], // no tools
async () => { throw new Error("no tools"); }, // toolHandler -- not called without tools
);
console.log(result.response.content); // -> "echo: hello world"
npx tsx examples/subclass_model.ts
Example 8: Custom MemoryBackend (DictBackend)
Extend MemoryBackend to plug in any storage layer (Postgres, DynamoDB, SQLite, a plain Map). Each async method is dispatched from Rust back into JS, so you get full control while reusing Blazen’s embedding and SimHash search pipeline.
import { EmbeddingModel, Memory, MemoryBackend } from "blazen";
class DictBackend extends MemoryBackend {
private store = new Map<string, any>();
async put(entry: any): Promise<void> {
this.store.set(entry.id, entry);
}
async get(id: string): Promise<any | null> {
return this.store.get(id) ?? null;
}
async delete(id: string): Promise<boolean> {
return this.store.delete(id);
}
async list(): Promise<any[]> {
return Array.from(this.store.values());
}
async len(): Promise<number> {
return this.store.size;
}
async searchByBands(bands: string[], limit: number): Promise<any[]> {
const set = new Set(bands);
const hits: any[] = [];
for (const entry of this.store.values()) {
const entryBands: string[] = entry.bands ?? [];
if (entryBands.some((b) => set.has(b))) {
hits.push(entry);
if (hits.length >= limit) break;
}
}
return hits;
}
}
const embedder = EmbeddingModel.embed();
const memory = new Memory(embedder, new DictBackend());
await memory.add("fact-1", "Rust has ownership and borrowing.");
await memory.add("fact-2", "Python uses reference counting.");
const results = await memory.search("memory management", 2);
for (const r of results) {
console.log(r.score, r.text);
}
npx tsx examples/custom_memory_backend.ts
Example 9: ModelManager with Memory Budgets
ModelManager is the unified registry — register local models and remote
providers by name, then dispatch with complete(id, messages) (also
stream(id, messages, onChunk) and get(id)). Each local model is charged to a
pool (CPU RAM vs GPU VRAM) based on its device; LRU eviction is per-pool, so a
GPU LLM never evicts a CPU embedder. Register each local model with an estimated
footprint; when loading a new one would exceed its pool’s budget, the
least-recently-used model within the same pool is automatically unloaded.
Remote providers own no local weights, so they dispatch straight through and
never count against a budget.
Note:
poolBudgetsvalues,memoryEstimateBytes,usedBytes,availableBytes, and thememoryEstimateBytesfield onModelStatusare JSbigints (since the Rust side usesu64and>4 GiBbudgets used to silently truncate).cpuRamGb/gpuVramGbare still regularnumbers.poolis a plain string ("cpu","gpu:0", …).
import { Model, ModelManager, OpenAiProvider, ChatMessage } from "blazen";
// 64 GB host RAM pool + 24 GB GPU pool (GPU-typical for a consumer card).
const manager = new ModelManager({ cpuRamGb: 64, gpuVramGb: 24 });
// Remote provider: dispatch-only, no footprint (pass 0n).
await manager.register("gpt", OpenAiProvider.create({ apiKey: "sk-..." }), 0n);
const llama8b = Model.mistralrs({
modelId: "meta-llama/Llama-3.1-8B-Instruct",
});
const qwen14b = Model.mistralrs({
modelId: "Qwen/Qwen2.5-14B-Instruct",
});
const mistral24b = Model.mistralrs({
modelId: "mistralai/Mistral-Small-24B",
});
// memoryEstimateBytes is a bigint -- use bigint literal arithmetic.
await manager.register("llama-8b", llama8b, 8n * 1024n ** 3n);
await manager.register("qwen-14b", qwen14b, 14n * 1024n ** 3n);
await manager.register("mistral-24b", mistral24b, 20n * 1024n ** 3n);
// Fits alongside qwen-14b (8 + 14 = 22 GB).
await manager.load("llama-8b");
await manager.load("qwen-14b");
// 20 GB does not fit next to 8 + 14 = 22 GB in the same pool -- LRU (llama-8b) is evicted.
await manager.load("mistral-24b");
for (const s of await manager.status()) {
// s.memoryEstimateBytes is a bigint; toLocaleString() works directly on bigints.
console.log(`${s.id}: loaded=${s.loaded}, pool=${s.pool}, memory=${s.memoryEstimateBytes.toLocaleString()} bytes`);
}
// usedBytes() / availableBytes() take an optional pool string (default "cpu"); divide by a bigint to get GB.
const usedGpu = await manager.usedBytes("gpu:0");
const availableGpu = await manager.availableBytes("gpu:0");
console.log(`gpu:0 used=${usedGpu / (1024n ** 3n)} GB, available=${availableGpu / (1024n ** 3n)} GB`);
npx tsx examples/model_manager_budget.ts
Example 10: Pricing Registration and Cost Tracking
Register pricing for any model ID (your own model, a local finetune, a custom deployment). Every ModelResponse then carries a cost field computed from the registered rate.
import {
ChatMessage,
Model,
lookupPricing,
registerPricing,
type ModelResponse,
} from "blazen";
class MyFinetune extends Model {
constructor() {
super({ modelId: "my-finetuned-model" });
}
async complete(messages: ChatMessage[]): Promise<ModelResponse> {
return {
content: "Rust is a systems language with memory safety without GC.",
model: this.modelId,
toolCalls: [],
usage: { promptTokens: 150, completionTokens: 80, totalTokens: 230 },
} as ModelResponse;
}
}
// Register pricing once, globally, for any model ID.
registerPricing("my-finetuned-model", {
inputPerMillion: 1.0,
outputPerMillion: 2.0,
});
// Readback -- pricing is centrally stored.
const pricing = lookupPricing("my-finetuned-model");
if (pricing) {
console.log(
`input: $${pricing.inputPerMillion}/M, output: $${pricing.outputPerMillion}/M`,
);
}
const model = new MyFinetune();
const response = await model.complete([
ChatMessage.user("Summarize Rust in one line."),
]);
console.log(response.content);
console.log("usage:", response.usage);
// cost is computed from the registered pricing + usage.
console.log(`cost: $${response.cost?.toFixed(6)}`);
npx tsx examples/pricing_and_cost.ts
Example 11: Per-Capability Provider (Custom TTS)
Extend TTSProvider to plug in any TTS backend (ElevenLabs, Coqui, a local model). The per-capability base classes (TTSProvider, ImageProvider, VideoProvider, MusicProvider, ThreeDProvider, BackgroundRemovalProvider, VoiceProvider) exist for users who only need to implement one capability.
import { TTSProvider } from "blazen";
class MyElevenLabs extends TTSProvider {
private apiKey: string;
constructor(apiKey: string) {
super({
providerId: "elevenlabs",
baseUrl: "https://api.elevenlabs.io/v1",
});
this.apiKey = apiKey;
}
async textToSpeech(request: any): Promise<any> {
// In a real implementation, make an HTTP call with this.apiKey
// and return audio bytes. Here we just echo the request.
return {
audioData: new Uint8Array([0, 1, 2]),
format: "wav",
voice: request.voice,
text: request.text,
};
}
}
const tts = new MyElevenLabs("sk-...");
const result = await tts.textToSpeech({
text: "Hello from Blazen!",
voice: "alice",
});
console.log(result.format, result.audioData.length, "bytes");
npx tsx examples/custom_tts_provider.ts
Example 12: Typed Error Handling
Blazen exports a typed error hierarchy so you can branch on failure modes with instanceof instead of string-matching messages. ProviderError carries structured fields (provider, status, endpoint, requestId, detail, retryAfterMs) populated from the underlying HTTP response. RateLimitError, AuthError, TimeoutError, ContentPolicyError, and the per-provider classes (MistralRsError, WhisperError, PiperError, …) all extend BlazenError, so a single instanceof BlazenError check catches everything Blazen raises while letting unrelated runtime errors keep propagating.
import {
OpenAiProvider,
ChatMessage,
RateLimitError,
ProviderError,
BlazenError,
} from "blazen";
const model = OpenAiProvider.create({ apiKey: "sk-..." });
try {
const response = await model.complete([ChatMessage.user("Hello")]);
console.log(response.content);
} catch (e) {
if (e instanceof RateLimitError) {
// Backoff and retry -- retryAfterMs is populated from the Retry-After header
// when the upstream provider supplies one.
console.warn("Rate limited; retry later");
} else if (e instanceof ProviderError) {
console.error(
`Provider ${e.provider} returned ${e.status}: ${e.detail}`,
);
} else if (e instanceof BlazenError) {
console.error("Blazen error:", e.message);
} else {
throw e;
}
}
npx tsx examples/typed_error_handling.ts
Example 13: Custom Progress Reporting via ProgressCallback
Subclass ProgressCallback to plug structured download progress into any UI. The base class exists as a real Rust-backed type so subclass instances can be passed straight into ModelCache.download (and any other Blazen API that accepts a progress hook). onProgress receives byte counts as bigint to safely represent multi-gigabyte downloads; total is null when the server does not advertise Content-Length.
import { ModelCache, ProgressCallback } from "blazen";
class TerminalProgress extends ProgressCallback {
override onProgress(downloaded: bigint, total?: bigint | null): void {
if (total) {
const pct = ((Number(downloaded) / Number(total)) * 100).toFixed(1);
process.stderr.write(`\r${pct}% (${downloaded}/${total})`);
} else {
process.stderr.write(`\r${downloaded} bytes`);
}
}
}
const cache = ModelCache.create();
const path = await cache.download(
"mistralai/Mistral-7B-Instruct-v0.3",
"config.json",
new TerminalProgress(),
);
console.log(`\ncached at ${path}`);
npx tsx examples/progress_callback.ts
Example 14: Pipeline State Persistence
Pipelines are multi-stage workflows with built-in checkpoint support. Register a JSON persistence callback with onPersistJson and Blazen will hand you a snapshot string after each stage completes — ideal for shipping checkpoints to durable storage (S3, Postgres, an HTTP service) so a crashed run can be resumed later via Pipeline.resume(snapshot). Use onPersist instead if you’d rather receive the typed PipelineSnapshot object directly.
import { PipelineBuilder, Stage, Workflow } from "blazen";
const workflowA = new Workflow("step-1");
// ... addStep(...) calls populating workflowA ...
const workflowB = new Workflow("step-2");
// ... addStep(...) calls populating workflowB ...
const pipeline = new PipelineBuilder("ingest")
.stage(new Stage("step-1", workflowA))
.stage(new Stage("step-2", workflowB))
.onPersistJson(async (snapshot: string) => {
await fetch("/api/checkpoint", {
method: "POST",
body: snapshot,
headers: { "Content-Type": "application/json" },
});
})
.build();
const handler = await pipeline.start({ input: "..." });
const result = await handler.result();
console.log(result.finalOutput);
npx tsx examples/pipeline_persistence.ts
Example 15: Always-on chat bot
A Bot is a persistent, event-driven agent. Build it once with Bot.builder(model), subscribe to replies with bot.responses(...) before the first message (replies emitted before you subscribe are not replayed), drive it with bot.send(...), and tear it down with bot.shutdown(). The conversation-memory window persists across turns, so the bot remembers earlier messages.
import { Model, Bot } from "blazen";
// Build a running bot. With no apiKey, Model.openai() reads OPENAI_API_KEY.
const bot = await Bot.builder(Model.openai())
.systemPrompt("You are a concise assistant.")
.idleTimeout(300) // shut down after 5 min idle
.costBudgetUsd(1.0) // abort once accumulated cost exceeds $1
.build();
// Subscribe BEFORE sending so we don't miss the first reply.
bot.responses((response) => {
console.log("bot:", response.text);
});
// Drive a couple of turns. send() is non-blocking; replies arrive on the
// responses() callback as each agentic turn completes.
bot.send("Hi! My name is Ada.");
bot.send("What's my name?"); // the bot remembers across turns
// Give the turns time to complete, then tear the bot down.
await new Promise((resolve) => setTimeout(resolve, 5000));
bot.shutdown();
npx tsx examples/chat_bot.ts