Context

Share state between workflow steps in Rust

What is Context?

Context is a key-value store shared across all steps in a workflow run. Use it to pass data between steps that doesn’t fit neatly into events — configuration, intermediate results, or anything you want accessible from any step without threading it through event payloads.

Setting and Getting Values

Store values with ctx.set() and retrieve them with ctx.get(). Values are stored as serde_json::Value, so any serializable type works:

use blazen::prelude::*;

#[step]
async fn store_data(event: StartEvent, ctx: Context) -> Result<NextEvent, WorkflowError> {
    ctx.set("user_id", serde_json::json!("user_123"));
    ctx.set("doc_count", serde_json::json!(5));
    Ok(NextEvent { /* ... */ })
}

#[step]
async fn use_data(event: NextEvent, ctx: Context) -> Result<StopEvent, WorkflowError> {
    let user_id: String = serde_json::from_value(ctx.get("user_id").unwrap()).unwrap();
    let doc_count: i64 = serde_json::from_value(ctx.get("doc_count").unwrap()).unwrap();
    Ok(StopEvent { result: serde_json::json!({"user": user_id, "docs": doc_count}) })
}

Run ID

Each workflow run is assigned a unique identifier. Access it from any step via ctx.run_id():

#[step]
async fn log_run(event: StartEvent, ctx: Context) -> Result<StopEvent, WorkflowError> {
    println!("Executing run: {}", ctx.run_id());
    Ok(StopEvent { result: serde_json::json!({"run": ctx.run_id()}) })
}

Binary Storage

Use ctx.set_bytes() and ctx.get_bytes() to store raw binary data. No serialization requirement — store any type by converting to bytes yourself (e.g., MessagePack, protobuf, bincode). Binary data persists through pause/resume/checkpoint via efficient serde_bytes serialization.

#[step]
async fn store_model(event: StartEvent, ctx: Context) -> Result<NextEvent, WorkflowError> {
    let weights: Vec<u8> = vec![0x01, 0x02, 0x03, 0x04];
    ctx.set_bytes("model-weights", weights);
    Ok(NextEvent { /* ... */ })
}

#[step]
async fn use_model(event: NextEvent, ctx: Context) -> Result<StopEvent, WorkflowError> {
    let weights = ctx.get_bytes("model-weights").expect("weights should exist");
    Ok(StopEvent { result: serde_json::json!({"weight_count": weights.len()}) })
}

StateValue and Direct Access

Under the hood, every context value is stored as a StateValue enum:

pub enum StateValue {
    Json(serde_json::Value),   // structured, serializable data
    Bytes(BytesWrapper),       // raw binary data
    Native(BytesWrapper),      // platform-serialized opaque objects
}

The Json and Bytes variants correspond to the typed and binary APIs shown above. The Native variant holds opaque bytes produced by a platform serializer (e.g., Python pickle). It exists so that a value set in a Python or Node.js binding step can round-trip through Rust steps without losing fidelity — Rust code can inspect or forward the raw bytes, but shouldn’t attempt to deserialize them as JSON.

Use ctx.set_value() and ctx.get_value() to work with StateValue directly:

use blazen::prelude::*;
use blazen::context::StateValue;

#[step]
async fn store_raw(event: StartEvent, ctx: Context) -> Result<NextEvent, WorkflowError> {
    // Store a JSON StateValue directly
    ctx.set_value("config", StateValue::Json(serde_json::json!({"retries": 3})));

    // Store raw bytes as a Native value (e.g., forwarded from a Python step)
    let opaque: Vec<u8> = vec![0x80, 0x04, 0x95]; // pickle header bytes
    ctx.set_value("py_obj", StateValue::Native(opaque.into()));

    Ok(NextEvent { /* ... */ })
}

#[step]
async fn read_raw(event: NextEvent, ctx: Context) -> Result<StopEvent, WorkflowError> {
    match ctx.get_value("config") {
        Some(StateValue::Json(v)) => println!("retries = {}", v["retries"]),
        Some(StateValue::Bytes(b)) => println!("got {} raw bytes", b.len()),
        Some(StateValue::Native(b)) => println!("got {} native bytes", b.len()),
        None => println!("key not found"),
    }
    Ok(StopEvent { result: serde_json::json!("done") })
}

When to use each API:

MethodStored asBest for
ctx.set(key, value) / ctx.get(key)StateValue::JsonTyped Rust data (String, structs, numbers)
ctx.set_bytes(key, data) / ctx.get_bytes(key)StateValue::BytesRaw binary blobs (model weights, protobuf)
ctx.set_value(key, sv) / ctx.get_value(key)Any StateValueDirect control, cross-language interop, forwarding Native values

Manual Event Routing

Instead of returning an event from a step, you can use ctx.send_event() to route events programmatically. This is useful when a step needs to emit multiple events or decide at runtime which path the workflow takes:

#[step]
async fn branch(event: StartEvent, ctx: Context) -> Result<(), WorkflowError> {
    let score: f64 = serde_json::from_value(ctx.get("score").unwrap()).unwrap();

    if score > 0.8 {
        ctx.send_event(HighScoreEvent { score });
    } else {
        ctx.send_event(LowScoreEvent { score });
    }
    Ok(())
}

When using send_event, the step returns () since routing is handled explicitly rather than through the return type.