Context
Share state between workflow steps in Rust
What is Context?
Context is a key-value store shared across all steps in a workflow run. Use it to pass data between steps that doesn’t fit neatly into events — configuration, intermediate results, or anything you want accessible from any step without threading it through event payloads.
Setting and Getting Values
Store values with ctx.set() and retrieve them with ctx.get(). Values are stored as serde_json::Value, so any serializable type works:
use blazen::prelude::*;
#[step]
async fn store_data(event: StartEvent, ctx: Context) -> Result<NextEvent, WorkflowError> {
ctx.set("user_id", serde_json::json!("user_123"));
ctx.set("doc_count", serde_json::json!(5));
Ok(NextEvent { /* ... */ })
}
#[step]
async fn use_data(event: NextEvent, ctx: Context) -> Result<StopEvent, WorkflowError> {
let user_id: String = serde_json::from_value(ctx.get("user_id").unwrap()).unwrap();
let doc_count: i64 = serde_json::from_value(ctx.get("doc_count").unwrap()).unwrap();
Ok(StopEvent { result: serde_json::json!({"user": user_id, "docs": doc_count}) })
}
Run ID
Each workflow run is assigned a unique identifier. Access it from any step via ctx.run_id():
#[step]
async fn log_run(event: StartEvent, ctx: Context) -> Result<StopEvent, WorkflowError> {
println!("Executing run: {}", ctx.run_id());
Ok(StopEvent { result: serde_json::json!({"run": ctx.run_id()}) })
}
Binary Storage
Use ctx.set_bytes() and ctx.get_bytes() to store raw binary data. No serialization requirement — store any type by converting to bytes yourself (e.g., MessagePack, protobuf, bincode). Binary data persists through pause/resume/checkpoint via efficient serde_bytes serialization.
#[step]
async fn store_model(event: StartEvent, ctx: Context) -> Result<NextEvent, WorkflowError> {
let weights: Vec<u8> = vec![0x01, 0x02, 0x03, 0x04];
ctx.set_bytes("model-weights", weights);
Ok(NextEvent { /* ... */ })
}
#[step]
async fn use_model(event: NextEvent, ctx: Context) -> Result<StopEvent, WorkflowError> {
let weights = ctx.get_bytes("model-weights").expect("weights should exist");
Ok(StopEvent { result: serde_json::json!({"weight_count": weights.len()}) })
}
StateValue and Direct Access
Under the hood, every context value is stored as a StateValue enum:
pub enum StateValue {
Json(serde_json::Value), // structured, serializable data
Bytes(BytesWrapper), // raw binary data
Native(BytesWrapper), // platform-serialized opaque objects
}
The Json and Bytes variants correspond to the typed and binary APIs shown above. The Native variant holds opaque bytes produced by a platform serializer (e.g., Python pickle). It exists so that a value set in a Python or Node.js binding step can round-trip through Rust steps without losing fidelity — Rust code can inspect or forward the raw bytes, but shouldn’t attempt to deserialize them as JSON.
Use ctx.set_value() and ctx.get_value() to work with StateValue directly:
use blazen::prelude::*;
use blazen::context::StateValue;
#[step]
async fn store_raw(event: StartEvent, ctx: Context) -> Result<NextEvent, WorkflowError> {
// Store a JSON StateValue directly
ctx.set_value("config", StateValue::Json(serde_json::json!({"retries": 3})));
// Store raw bytes as a Native value (e.g., forwarded from a Python step)
let opaque: Vec<u8> = vec![0x80, 0x04, 0x95]; // pickle header bytes
ctx.set_value("py_obj", StateValue::Native(opaque.into()));
Ok(NextEvent { /* ... */ })
}
#[step]
async fn read_raw(event: NextEvent, ctx: Context) -> Result<StopEvent, WorkflowError> {
match ctx.get_value("config") {
Some(StateValue::Json(v)) => println!("retries = {}", v["retries"]),
Some(StateValue::Bytes(b)) => println!("got {} raw bytes", b.len()),
Some(StateValue::Native(b)) => println!("got {} native bytes", b.len()),
None => println!("key not found"),
}
Ok(StopEvent { result: serde_json::json!("done") })
}
When to use each API:
| Method | Stored as | Best for |
|---|---|---|
ctx.set(key, value) / ctx.get(key) | StateValue::Json | Typed Rust data (String, structs, numbers) |
ctx.set_bytes(key, data) / ctx.get_bytes(key) | StateValue::Bytes | Raw binary blobs (model weights, protobuf) |
ctx.set_value(key, sv) / ctx.get_value(key) | Any StateValue | Direct control, cross-language interop, forwarding Native values |
Manual Event Routing
Instead of returning an event from a step, you can use ctx.send_event() to route events programmatically. This is useful when a step needs to emit multiple events or decide at runtime which path the workflow takes:
#[step]
async fn branch(event: StartEvent, ctx: Context) -> Result<(), WorkflowError> {
let score: f64 = serde_json::from_value(ctx.get("score").unwrap()).unwrap();
if score > 0.8 {
ctx.send_event(HighScoreEvent { score });
} else {
ctx.send_event(LowScoreEvent { score });
}
Ok(())
}
When using send_event, the step returns () since routing is handled explicitly rather than through the return type.
BlazenState
BlazenState is primarily a Python, Node.js, and WASM concept. In those languages, you define a class that extends BlazenState to get automatic per-field persistence — each field is individually stored and retrieved from the context without manual key management.
In Rust, there is no BlazenState base class. Instead, you achieve the same per-field storage by calling set_value() and get_value() with explicit keys, using the StateValue variants described in the StateValue and Direct Access section above. This approach is idiomatic Rust: explicit, zero-cost, and fully type-safe.
Per-Field Storage in Rust
A common pattern is to store some fields as JSON (for structured, inspectable data) and others as raw bytes (for binary or pre-serialized data):
use blazen::prelude::*;
use blazen::context::StateValue;
/// A struct whose fields are stored individually in the context.
struct PipelineState {
config: serde_json::Value,
embeddings: Vec<u8>,
}
impl PipelineState {
fn save(&self, ctx: &Context) {
ctx.set_value("pipeline:config", StateValue::Json(self.config.clone()));
ctx.set_value("pipeline:embeddings", StateValue::Bytes(self.embeddings.clone().into()));
}
fn load(ctx: &Context) -> Option<Self> {
let config = match ctx.get_value("pipeline:config")? {
StateValue::Json(v) => v,
_ => return None,
};
let embeddings = match ctx.get_value("pipeline:embeddings")? {
StateValue::Bytes(b) => b.to_vec(),
_ => return None,
};
Some(Self { config, embeddings })
}
}
Binding Authors: Using StateValue::Native
If you are building a language binding (e.g., via PyO3 or napi-rs), the Native(BytesWrapper) variant lets you store platform-serialized objects that Rust steps can forward without deserializing:
use blazen::context::StateValue;
// In a binding layer: serialize a Python/Node object to bytes and store it
fn store_platform_object(ctx: &Context, key: &str, serialized: Vec<u8>) {
ctx.set_value(key, StateValue::Native(serialized.into()));
}
// A Rust step can forward the value without interpreting its contents
fn forward_native(ctx: &Context, src_key: &str, dst_key: &str) {
if let Some(val @ StateValue::Native(_)) = ctx.get_value(src_key) {
ctx.set_value(dst_key, val);
}
}
This is how the Python and Node.js BlazenState implementations persist fields that have no natural JSON representation — they serialize to platform-native bytes and store them as Native values that survive round-trips through Rust workflow steps.