Node.js Examples

Complete runnable Node.js examples for Blazen

Node.js Examples

Six complete, runnable examples that demonstrate core Blazen workflow patterns.


Basic Workflow

A 3-step sequential pipeline: StartEventGreetEventFormattedEventStopEvent.

wf.addStep("greet", ["GreetEvent"], async (event, ctx) => {
  return { type: "blazen::StopEvent", result: { greeting: `Hello, ${event.name}!` } };
});
npx tsx examples/basic_workflow.ts

Streaming Workflow

Publishes progress events while processing via ctx.writeEventToStream().

await ctx.writeEventToStream({ type: "Progress", step: i });
const result = await wf.runStreaming({}, (event) => console.log(event));
npx tsx examples/streaming_workflow.ts

Branching Workflow

Conditional fan-out by returning an array of events.

return [
  { type: "PositiveEvent", text },
  { type: "NegativeEvent", text },
];
npx tsx examples/branching_workflow.ts

LLM RAG Workflow

Multi-step RAG pipeline using context for shared state between steps. Uses typed ChatMessage and CompletionResponse:

import { CompletionModel, ChatMessage } from "blazen";

// Reads OPENAI_API_KEY from the environment by default.
const model = CompletionModel.openai();
const response = await model.complete([
  ChatMessage.system("Answer based on the provided documents."),
  ChatMessage.user(query),
]);
console.log(response.content);     // typed string
console.log(response.usage);       // { promptTokens, completionTokens, totalTokens }
await ctx.set("documents", docs);
npx tsx examples/llm_rag_workflow.ts

Human-in-the-Loop

Side-effect steps that pause for external input via ctx.sendEvent().

await ctx.sendEvent({ type: "ReviewComplete" });
return null;
npx tsx examples/human_in_the_loop.ts

Stateful Workflow

Demonstrates the two explicit context namespaces on the Node bindings:

  • ctx.state — persistable values (survives pause()/resume()).
  • ctx.session — in-process-only values (excluded from snapshots).

Important: On the Node binding, ctx.session values are routed through serde_json::Value — JS object identity is NOT preserved across session.get(key). This is a napi-rs threading limitation (Reference<T> is !Send because its Drop must run on the v8 main thread). For true identity preservation of live JS objects, use the Python or WASM bindings.

const wf = new Workflow("stateful-example");
wf.addStep("setup", ["blazen::StartEvent"], async (event, ctx) => {
  await ctx.state.set("counter", 5);              // persistable
  await ctx.session.set("reqId", "abc123");       // in-process only
  return { type: "blazen::StopEvent", result: {} };
});
npx tsx examples/stateful_workflow.ts

Example 7: Subclassing CompletionModel (Custom Provider)

Extend CompletionModel in TypeScript/JavaScript to build a custom provider. Override complete() and/or stream() with any backend — local inference, a proxy, a mock, etc. Subclass instances work with runAgent, withRetry(), withCache(), and every other helper.

import {
  ChatMessage,
  CompletionModel,
  runAgent,
  type CompletionResponse,
} from "blazen";

class EchoLLM extends CompletionModel {
  constructor() {
    super({ modelId: "echo-llm", contextLength: 4096 });
  }

  async complete(messages: ChatMessage[]): Promise<CompletionResponse> {
    const last = [...messages].reverse().find((m) => m.role === "user");
    return {
      content: `echo: ${last?.content ?? ""}`,
      model: this.modelId,
      toolCalls: [],
    } as CompletionResponse;
  }

  // Override stream() when you want incremental output.
  async *stream(messages: ChatMessage[]) {
    const last = [...messages].reverse().find((m) => m.role === "user");
    for (const word of `echo: ${last?.content ?? ""}`.split(" ")) {
      yield { delta: word + " " };
    }
  }
}

const model = new EchoLLM();
const result = await runAgent(
  model,
  [ChatMessage.user("hello world")],
  [], // no tools
  async () => { throw new Error("no tools"); }, // toolHandler -- not called without tools
);
console.log(result.response.content); // -> "echo: hello world"
npx tsx examples/subclass_completion_model.ts

Example 8: Custom MemoryBackend (DictBackend)

Extend MemoryBackend to plug in any storage layer (Postgres, DynamoDB, SQLite, a plain Map). Each async method is dispatched from Rust back into JS, so you get full control while reusing Blazen’s embedding and SimHash search pipeline.

import { EmbeddingModel, Memory, MemoryBackend } from "blazen";

class DictBackend extends MemoryBackend {
  private store = new Map<string, any>();

  async put(entry: any): Promise<void> {
    this.store.set(entry.id, entry);
  }

  async get(id: string): Promise<any | null> {
    return this.store.get(id) ?? null;
  }

  async delete(id: string): Promise<boolean> {
    return this.store.delete(id);
  }

  async list(): Promise<any[]> {
    return Array.from(this.store.values());
  }

  async len(): Promise<number> {
    return this.store.size;
  }

  async searchByBands(bands: string[], limit: number): Promise<any[]> {
    const set = new Set(bands);
    const hits: any[] = [];
    for (const entry of this.store.values()) {
      const entryBands: string[] = entry.bands ?? [];
      if (entryBands.some((b) => set.has(b))) {
        hits.push(entry);
        if (hits.length >= limit) break;
      }
    }
    return hits;
  }
}

const embedder = EmbeddingModel.embed();
const memory = new Memory(embedder, new DictBackend());

await memory.add("fact-1", "Rust has ownership and borrowing.");
await memory.add("fact-2", "Python uses reference counting.");

const results = await memory.search("memory management", 2);
for (const r of results) {
  console.log(r.score, r.text);
}
npx tsx examples/custom_memory_backend.ts

Example 9: ModelManager with Memory Budgets

Track memory across multiple local models. Each model is charged to a pool (CPU RAM vs GPU VRAM) based on its device; LRU eviction is per-pool, so a GPU LLM never evicts a CPU embedder. Register each model with an estimated footprint; when loading a new one would exceed its pool’s budget, the least-recently-used model within the same pool is automatically unloaded.

Note: poolBudgets values, memoryEstimateBytes, usedBytes, availableBytes, and the memoryEstimateBytes field on ModelStatus are JS bigints (since the Rust side uses u64 and >4 GiB budgets used to silently truncate). cpuRamGb / gpuVramGb are still regular numbers. pool is a plain string ("cpu", "gpu:0", …).

import { CompletionModel, ModelManager } from "blazen";

// 64 GB host RAM pool + 24 GB GPU pool (GPU-typical for a consumer card).
const manager = new ModelManager({ cpuRamGb: 64, gpuVramGb: 24 });

const llama8b = CompletionModel.mistralrs({
  modelId: "meta-llama/Llama-3.1-8B-Instruct",
});
const qwen14b = CompletionModel.mistralrs({
  modelId: "Qwen/Qwen2.5-14B-Instruct",
});
const mistral24b = CompletionModel.mistralrs({
  modelId: "mistralai/Mistral-Small-24B",
});

// memoryEstimateBytes is a bigint -- use bigint literal arithmetic.
await manager.register("llama-8b", llama8b, 8n * 1024n ** 3n);
await manager.register("qwen-14b", qwen14b, 14n * 1024n ** 3n);
await manager.register("mistral-24b", mistral24b, 20n * 1024n ** 3n);

// Fits alongside qwen-14b (8 + 14 = 22 GB).
await manager.load("llama-8b");
await manager.load("qwen-14b");

// 20 GB does not fit next to 8 + 14 = 22 GB in the same pool -- LRU (llama-8b) is evicted.
await manager.load("mistral-24b");

for (const s of await manager.status()) {
  // s.memoryEstimateBytes is a bigint; toLocaleString() works directly on bigints.
  console.log(`${s.id}: loaded=${s.loaded}, pool=${s.pool}, memory=${s.memoryEstimateBytes.toLocaleString()} bytes`);
}

// usedBytes() / availableBytes() take an optional pool string (default "cpu"); divide by a bigint to get GB.
const usedGpu = await manager.usedBytes("gpu:0");
const availableGpu = await manager.availableBytes("gpu:0");
console.log(`gpu:0 used=${usedGpu / (1024n ** 3n)} GB, available=${availableGpu / (1024n ** 3n)} GB`);
npx tsx examples/model_manager_budget.ts

Example 10: Pricing Registration and Cost Tracking

Register pricing for any model ID (your own model, a local finetune, a custom deployment). Every CompletionResponse then carries a cost field computed from the registered rate.

import {
  ChatMessage,
  CompletionModel,
  lookupPricing,
  registerPricing,
  type CompletionResponse,
} from "blazen";

class MyFinetune extends CompletionModel {
  constructor() {
    super({ modelId: "my-finetuned-model" });
  }

  async complete(messages: ChatMessage[]): Promise<CompletionResponse> {
    return {
      content: "Rust is a systems language with memory safety without GC.",
      model: this.modelId,
      toolCalls: [],
      usage: { promptTokens: 150, completionTokens: 80, totalTokens: 230 },
    } as CompletionResponse;
  }
}

// Register pricing once, globally, for any model ID.
registerPricing("my-finetuned-model", {
  inputPerMillion: 1.0,
  outputPerMillion: 2.0,
});

// Readback -- pricing is centrally stored.
const pricing = lookupPricing("my-finetuned-model");
if (pricing) {
  console.log(
    `input: $${pricing.inputPerMillion}/M, output: $${pricing.outputPerMillion}/M`,
  );
}

const model = new MyFinetune();
const response = await model.complete([
  ChatMessage.user("Summarize Rust in one line."),
]);
console.log(response.content);
console.log("usage:", response.usage);
// cost is computed from the registered pricing + usage.
console.log(`cost: $${response.cost?.toFixed(6)}`);
npx tsx examples/pricing_and_cost.ts

Example 11: Per-Capability Provider (Custom TTS)

Extend TTSProvider to plug in any TTS backend (ElevenLabs, Coqui, a local model). The per-capability base classes (TTSProvider, ImageProvider, VideoProvider, MusicProvider, ThreeDProvider, BackgroundRemovalProvider, VoiceProvider) exist for users who only need to implement one capability.

import { TTSProvider } from "blazen";

class MyElevenLabs extends TTSProvider {
  private apiKey: string;

  constructor(apiKey: string) {
    super({
      providerId: "elevenlabs",
      baseUrl: "https://api.elevenlabs.io/v1",
    });
    this.apiKey = apiKey;
  }

  async textToSpeech(request: any): Promise<any> {
    // In a real implementation, make an HTTP call with this.apiKey
    // and return audio bytes. Here we just echo the request.
    return {
      audioData: new Uint8Array([0, 1, 2]),
      format: "wav",
      voice: request.voice,
      text: request.text,
    };
  }
}

const tts = new MyElevenLabs("sk-...");
const result = await tts.textToSpeech({
  text: "Hello from Blazen!",
  voice: "alice",
});
console.log(result.format, result.audioData.length, "bytes");
npx tsx examples/custom_tts_provider.ts

Example 12: Typed Error Handling

Blazen exports a typed error hierarchy so you can branch on failure modes with instanceof instead of string-matching messages. ProviderError carries structured fields (provider, status, endpoint, requestId, detail, retryAfterMs) populated from the underlying HTTP response. RateLimitError, AuthError, TimeoutError, ContentPolicyError, and the per-provider classes (MistralRsError, WhisperError, PiperError, …) all extend BlazenError, so a single instanceof BlazenError check catches everything Blazen raises while letting unrelated runtime errors keep propagating.

import {
  CompletionModel,
  ChatMessage,
  RateLimitError,
  ProviderError,
  BlazenError,
} from "blazen";

const model = CompletionModel.openai();
try {
  const response = await model.complete([ChatMessage.user("Hello")]);
  console.log(response.content);
} catch (e) {
  if (e instanceof RateLimitError) {
    // Backoff and retry -- retryAfterMs is populated from the Retry-After header
    // when the upstream provider supplies one.
    console.warn("Rate limited; retry later");
  } else if (e instanceof ProviderError) {
    console.error(
      `Provider ${e.provider} returned ${e.status}: ${e.detail}`,
    );
  } else if (e instanceof BlazenError) {
    console.error("Blazen error:", e.message);
  } else {
    throw e;
  }
}
npx tsx examples/typed_error_handling.ts

Example 13: Custom Progress Reporting via ProgressCallback

Subclass ProgressCallback to plug structured download progress into any UI. The base class exists as a real Rust-backed type so subclass instances can be passed straight into ModelCache.download (and any other Blazen API that accepts a progress hook). onProgress receives byte counts as bigint to safely represent multi-gigabyte downloads; total is null when the server does not advertise Content-Length.

import { ModelCache, ProgressCallback } from "blazen";

class TerminalProgress extends ProgressCallback {
  override onProgress(downloaded: bigint, total?: bigint | null): void {
    if (total) {
      const pct = ((Number(downloaded) / Number(total)) * 100).toFixed(1);
      process.stderr.write(`\r${pct}% (${downloaded}/${total})`);
    } else {
      process.stderr.write(`\r${downloaded} bytes`);
    }
  }
}

const cache = ModelCache.create();
const path = await cache.download(
  "mistralai/Mistral-7B-Instruct-v0.3",
  "config.json",
  new TerminalProgress(),
);
console.log(`\ncached at ${path}`);
npx tsx examples/progress_callback.ts

Example 14: Pipeline State Persistence

Pipelines are multi-stage workflows with built-in checkpoint support. Register a JSON persistence callback with onPersistJson and Blazen will hand you a snapshot string after each stage completes — ideal for shipping checkpoints to durable storage (S3, Postgres, an HTTP service) so a crashed run can be resumed later via Pipeline.resume(snapshot). Use onPersist instead if you’d rather receive the typed PipelineSnapshot object directly.

import { PipelineBuilder, Stage, Workflow } from "blazen";

const workflowA = new Workflow("step-1");
// ... addStep(...) calls populating workflowA ...
const workflowB = new Workflow("step-2");
// ... addStep(...) calls populating workflowB ...

const pipeline = new PipelineBuilder("ingest")
  .stage(new Stage("step-1", workflowA))
  .stage(new Stage("step-2", workflowB))
  .onPersistJson(async (snapshot: string) => {
    await fetch("/api/checkpoint", {
      method: "POST",
      body: snapshot,
      headers: { "Content-Type": "application/json" },
    });
  })
  .build();

const handler = await pipeline.start({ input: "..." });
const result = await handler.result();
console.log(result.finalOutput);
npx tsx examples/pipeline_persistence.ts