Node.js Examples
Complete runnable Node.js examples for Blazen
Node.js Examples
Six complete, runnable examples that demonstrate core Blazen workflow patterns.
Basic Workflow
A 3-step sequential pipeline: StartEvent → GreetEvent → FormattedEvent → StopEvent.
wf.addStep("greet", ["GreetEvent"], async (event, ctx) => {
return { type: "blazen::StopEvent", result: { greeting: `Hello, ${event.name}!` } };
});
npx tsx examples/basic_workflow.ts
Streaming Workflow
Publishes progress events while processing via ctx.writeEventToStream().
await ctx.writeEventToStream({ type: "Progress", step: i });
const result = await wf.runStreaming({}, (event) => console.log(event));
npx tsx examples/streaming_workflow.ts
Branching Workflow
Conditional fan-out by returning an array of events.
return [
{ type: "PositiveEvent", text },
{ type: "NegativeEvent", text },
];
npx tsx examples/branching_workflow.ts
LLM RAG Workflow
Multi-step RAG pipeline using context for shared state between steps. Uses typed ChatMessage and CompletionResponse:
import { CompletionModel, ChatMessage } from "blazen";
// Reads OPENAI_API_KEY from the environment by default.
const model = CompletionModel.openai();
const response = await model.complete([
ChatMessage.system("Answer based on the provided documents."),
ChatMessage.user(query),
]);
console.log(response.content); // typed string
console.log(response.usage); // { promptTokens, completionTokens, totalTokens }
await ctx.set("documents", docs);
npx tsx examples/llm_rag_workflow.ts
Human-in-the-Loop
Side-effect steps that pause for external input via ctx.sendEvent().
await ctx.sendEvent({ type: "ReviewComplete" });
return null;
npx tsx examples/human_in_the_loop.ts
Stateful Workflow
Demonstrates the two explicit context namespaces on the Node bindings:
ctx.state— persistable values (survivespause()/resume()).ctx.session— in-process-only values (excluded from snapshots).
Important: On the Node binding, ctx.session values are routed through serde_json::Value — JS object identity is NOT preserved across session.get(key). This is a napi-rs threading limitation (Reference<T> is !Send because its Drop must run on the v8 main thread). For true identity preservation of live JS objects, use the Python or WASM bindings.
const wf = new Workflow("stateful-example");
wf.addStep("setup", ["blazen::StartEvent"], async (event, ctx) => {
await ctx.state.set("counter", 5); // persistable
await ctx.session.set("reqId", "abc123"); // in-process only
return { type: "blazen::StopEvent", result: {} };
});
npx tsx examples/stateful_workflow.ts
Example 7: Subclassing CompletionModel (Custom Provider)
Extend CompletionModel in TypeScript/JavaScript to build a custom provider. Override complete() and/or stream() with any backend — local inference, a proxy, a mock, etc. Subclass instances work with runAgent, withRetry(), withCache(), and every other helper.
import {
ChatMessage,
CompletionModel,
runAgent,
type CompletionResponse,
} from "blazen";
class EchoLLM extends CompletionModel {
constructor() {
super({ modelId: "echo-llm", contextLength: 4096 });
}
async complete(messages: ChatMessage[]): Promise<CompletionResponse> {
const last = [...messages].reverse().find((m) => m.role === "user");
return {
content: `echo: ${last?.content ?? ""}`,
model: this.modelId,
toolCalls: [],
} as CompletionResponse;
}
// Override stream() when you want incremental output.
async *stream(messages: ChatMessage[]) {
const last = [...messages].reverse().find((m) => m.role === "user");
for (const word of `echo: ${last?.content ?? ""}`.split(" ")) {
yield { delta: word + " " };
}
}
}
const model = new EchoLLM();
const result = await runAgent(
model,
[ChatMessage.user("hello world")],
[], // no tools
async () => { throw new Error("no tools"); }, // toolHandler -- not called without tools
);
console.log(result.response.content); // -> "echo: hello world"
npx tsx examples/subclass_completion_model.ts
Example 8: Custom MemoryBackend (DictBackend)
Extend MemoryBackend to plug in any storage layer (Postgres, DynamoDB, SQLite, a plain Map). Each async method is dispatched from Rust back into JS, so you get full control while reusing Blazen’s embedding and SimHash search pipeline.
import { EmbeddingModel, Memory, MemoryBackend } from "blazen";
class DictBackend extends MemoryBackend {
private store = new Map<string, any>();
async put(entry: any): Promise<void> {
this.store.set(entry.id, entry);
}
async get(id: string): Promise<any | null> {
return this.store.get(id) ?? null;
}
async delete(id: string): Promise<boolean> {
return this.store.delete(id);
}
async list(): Promise<any[]> {
return Array.from(this.store.values());
}
async len(): Promise<number> {
return this.store.size;
}
async searchByBands(bands: string[], limit: number): Promise<any[]> {
const set = new Set(bands);
const hits: any[] = [];
for (const entry of this.store.values()) {
const entryBands: string[] = entry.bands ?? [];
if (entryBands.some((b) => set.has(b))) {
hits.push(entry);
if (hits.length >= limit) break;
}
}
return hits;
}
}
const embedder = EmbeddingModel.embed();
const memory = new Memory(embedder, new DictBackend());
await memory.add("fact-1", "Rust has ownership and borrowing.");
await memory.add("fact-2", "Python uses reference counting.");
const results = await memory.search("memory management", 2);
for (const r of results) {
console.log(r.score, r.text);
}
npx tsx examples/custom_memory_backend.ts
Example 9: ModelManager with Memory Budgets
Track memory across multiple local models. Each model is charged to a pool (CPU RAM vs GPU VRAM) based on its device; LRU eviction is per-pool, so a GPU LLM never evicts a CPU embedder. Register each model with an estimated footprint; when loading a new one would exceed its pool’s budget, the least-recently-used model within the same pool is automatically unloaded.
Note:
poolBudgetsvalues,memoryEstimateBytes,usedBytes,availableBytes, and thememoryEstimateBytesfield onModelStatusare JSbigints (since the Rust side usesu64and>4 GiBbudgets used to silently truncate).cpuRamGb/gpuVramGbare still regularnumbers.poolis a plain string ("cpu","gpu:0", …).
import { CompletionModel, ModelManager } from "blazen";
// 64 GB host RAM pool + 24 GB GPU pool (GPU-typical for a consumer card).
const manager = new ModelManager({ cpuRamGb: 64, gpuVramGb: 24 });
const llama8b = CompletionModel.mistralrs({
modelId: "meta-llama/Llama-3.1-8B-Instruct",
});
const qwen14b = CompletionModel.mistralrs({
modelId: "Qwen/Qwen2.5-14B-Instruct",
});
const mistral24b = CompletionModel.mistralrs({
modelId: "mistralai/Mistral-Small-24B",
});
// memoryEstimateBytes is a bigint -- use bigint literal arithmetic.
await manager.register("llama-8b", llama8b, 8n * 1024n ** 3n);
await manager.register("qwen-14b", qwen14b, 14n * 1024n ** 3n);
await manager.register("mistral-24b", mistral24b, 20n * 1024n ** 3n);
// Fits alongside qwen-14b (8 + 14 = 22 GB).
await manager.load("llama-8b");
await manager.load("qwen-14b");
// 20 GB does not fit next to 8 + 14 = 22 GB in the same pool -- LRU (llama-8b) is evicted.
await manager.load("mistral-24b");
for (const s of await manager.status()) {
// s.memoryEstimateBytes is a bigint; toLocaleString() works directly on bigints.
console.log(`${s.id}: loaded=${s.loaded}, pool=${s.pool}, memory=${s.memoryEstimateBytes.toLocaleString()} bytes`);
}
// usedBytes() / availableBytes() take an optional pool string (default "cpu"); divide by a bigint to get GB.
const usedGpu = await manager.usedBytes("gpu:0");
const availableGpu = await manager.availableBytes("gpu:0");
console.log(`gpu:0 used=${usedGpu / (1024n ** 3n)} GB, available=${availableGpu / (1024n ** 3n)} GB`);
npx tsx examples/model_manager_budget.ts
Example 10: Pricing Registration and Cost Tracking
Register pricing for any model ID (your own model, a local finetune, a custom deployment). Every CompletionResponse then carries a cost field computed from the registered rate.
import {
ChatMessage,
CompletionModel,
lookupPricing,
registerPricing,
type CompletionResponse,
} from "blazen";
class MyFinetune extends CompletionModel {
constructor() {
super({ modelId: "my-finetuned-model" });
}
async complete(messages: ChatMessage[]): Promise<CompletionResponse> {
return {
content: "Rust is a systems language with memory safety without GC.",
model: this.modelId,
toolCalls: [],
usage: { promptTokens: 150, completionTokens: 80, totalTokens: 230 },
} as CompletionResponse;
}
}
// Register pricing once, globally, for any model ID.
registerPricing("my-finetuned-model", {
inputPerMillion: 1.0,
outputPerMillion: 2.0,
});
// Readback -- pricing is centrally stored.
const pricing = lookupPricing("my-finetuned-model");
if (pricing) {
console.log(
`input: $${pricing.inputPerMillion}/M, output: $${pricing.outputPerMillion}/M`,
);
}
const model = new MyFinetune();
const response = await model.complete([
ChatMessage.user("Summarize Rust in one line."),
]);
console.log(response.content);
console.log("usage:", response.usage);
// cost is computed from the registered pricing + usage.
console.log(`cost: $${response.cost?.toFixed(6)}`);
npx tsx examples/pricing_and_cost.ts
Example 11: Per-Capability Provider (Custom TTS)
Extend TTSProvider to plug in any TTS backend (ElevenLabs, Coqui, a local model). The per-capability base classes (TTSProvider, ImageProvider, VideoProvider, MusicProvider, ThreeDProvider, BackgroundRemovalProvider, VoiceProvider) exist for users who only need to implement one capability.
import { TTSProvider } from "blazen";
class MyElevenLabs extends TTSProvider {
private apiKey: string;
constructor(apiKey: string) {
super({
providerId: "elevenlabs",
baseUrl: "https://api.elevenlabs.io/v1",
});
this.apiKey = apiKey;
}
async textToSpeech(request: any): Promise<any> {
// In a real implementation, make an HTTP call with this.apiKey
// and return audio bytes. Here we just echo the request.
return {
audioData: new Uint8Array([0, 1, 2]),
format: "wav",
voice: request.voice,
text: request.text,
};
}
}
const tts = new MyElevenLabs("sk-...");
const result = await tts.textToSpeech({
text: "Hello from Blazen!",
voice: "alice",
});
console.log(result.format, result.audioData.length, "bytes");
npx tsx examples/custom_tts_provider.ts
Example 12: Typed Error Handling
Blazen exports a typed error hierarchy so you can branch on failure modes with instanceof instead of string-matching messages. ProviderError carries structured fields (provider, status, endpoint, requestId, detail, retryAfterMs) populated from the underlying HTTP response. RateLimitError, AuthError, TimeoutError, ContentPolicyError, and the per-provider classes (MistralRsError, WhisperError, PiperError, …) all extend BlazenError, so a single instanceof BlazenError check catches everything Blazen raises while letting unrelated runtime errors keep propagating.
import {
CompletionModel,
ChatMessage,
RateLimitError,
ProviderError,
BlazenError,
} from "blazen";
const model = CompletionModel.openai();
try {
const response = await model.complete([ChatMessage.user("Hello")]);
console.log(response.content);
} catch (e) {
if (e instanceof RateLimitError) {
// Backoff and retry -- retryAfterMs is populated from the Retry-After header
// when the upstream provider supplies one.
console.warn("Rate limited; retry later");
} else if (e instanceof ProviderError) {
console.error(
`Provider ${e.provider} returned ${e.status}: ${e.detail}`,
);
} else if (e instanceof BlazenError) {
console.error("Blazen error:", e.message);
} else {
throw e;
}
}
npx tsx examples/typed_error_handling.ts
Example 13: Custom Progress Reporting via ProgressCallback
Subclass ProgressCallback to plug structured download progress into any UI. The base class exists as a real Rust-backed type so subclass instances can be passed straight into ModelCache.download (and any other Blazen API that accepts a progress hook). onProgress receives byte counts as bigint to safely represent multi-gigabyte downloads; total is null when the server does not advertise Content-Length.
import { ModelCache, ProgressCallback } from "blazen";
class TerminalProgress extends ProgressCallback {
override onProgress(downloaded: bigint, total?: bigint | null): void {
if (total) {
const pct = ((Number(downloaded) / Number(total)) * 100).toFixed(1);
process.stderr.write(`\r${pct}% (${downloaded}/${total})`);
} else {
process.stderr.write(`\r${downloaded} bytes`);
}
}
}
const cache = ModelCache.create();
const path = await cache.download(
"mistralai/Mistral-7B-Instruct-v0.3",
"config.json",
new TerminalProgress(),
);
console.log(`\ncached at ${path}`);
npx tsx examples/progress_callback.ts
Example 14: Pipeline State Persistence
Pipelines are multi-stage workflows with built-in checkpoint support. Register a JSON persistence callback with onPersistJson and Blazen will hand you a snapshot string after each stage completes — ideal for shipping checkpoints to durable storage (S3, Postgres, an HTTP service) so a crashed run can be resumed later via Pipeline.resume(snapshot). Use onPersist instead if you’d rather receive the typed PipelineSnapshot object directly.
import { PipelineBuilder, Stage, Workflow } from "blazen";
const workflowA = new Workflow("step-1");
// ... addStep(...) calls populating workflowA ...
const workflowB = new Workflow("step-2");
// ... addStep(...) calls populating workflowB ...
const pipeline = new PipelineBuilder("ingest")
.stage(new Stage("step-1", workflowA))
.stage(new Stage("step-2", workflowB))
.onPersistJson(async (snapshot: string) => {
await fetch("/api/checkpoint", {
method: "POST",
body: snapshot,
headers: { "Content-Type": "application/json" },
});
})
.build();
const handler = await pipeline.start({ input: "..." });
const result = await handler.result();
console.log(result.finalOutput);
npx tsx examples/pipeline_persistence.ts