Context
Share state between workflow steps in Python
What is Context?
A key-value store shared across all steps in a workflow run.
Blazen exposes two explicit namespaces alongside the smart-routing ctx.set / ctx.get shortcuts: ctx.state for persistable values (survives pause/resume and checkpoints) and ctx.session for live in-process references (identity-preserving within a run, excluded from snapshots). See State vs Session below.
Setting and Getting Values
ctx.set(key, value) stores any Python value using a 4-tier dispatch:
bytes/bytearray→ raw binary (survives snapshots)- JSON-serializable (
dict,list,str,int,float,bool,None) → JSON (survives snapshots) - Picklable objects (Pydantic models, dataclasses, custom classes) → pickled automatically (survives snapshots)
- Unpicklable objects (DB connections, file handles, sockets, lambdas) → live in-process reference (same-process only, excluded from snapshots)
ctx.get returns the original Python type for all four tiers.
from pydantic import BaseModel
class UserProfile(BaseModel):
name: str
score: float
class NextEvent(Event):
pass
@step
async def store_data(ctx: Context, ev: Event):
# JSON-serializable values (stored as JSON)
ctx.set("user_id", "user_123")
ctx.set("doc_count", 5)
ctx.set("tags", ["admin", "active"])
# Raw bytes (stored as binary)
ctx.set("thumbnail", b"\x89PNG\r\n...")
# Pydantic model (pickled automatically)
ctx.set("profile", UserProfile(name="Alice", score=0.95))
return NextEvent()
@step
async def use_data(ctx: Context, ev: NextEvent):
user_id = ctx.get("user_id") # str
doc_count = ctx.get("doc_count") # int
thumbnail = ctx.get("thumbnail") # bytes
profile = ctx.get("profile") # UserProfile
return StopEvent(result={"user": user_id, "name": profile.name})
Important: ctx.set() and ctx.get() are synchronous — no await.
Run ID
run_id = ctx.run_id() # Synchronous, returns a UUID string
Binary Storage
Since ctx.set() now handles bytes and bytearray natively (stored as raw binary), you can pass binary data directly:
@step
async def store(ctx: Context, ev: Event):
ctx.set("model", b"\x00\x01\x02...") # stored as raw bytes
return NextEvent()
@step
async def load(ctx: Context, ev: NextEvent):
raw = ctx.get("model") # bytes
return StopEvent(result=raw)
ctx.set_bytes() and ctx.get_bytes() remain available as explicit convenience aliases for binary data. They behave identically to calling ctx.set() / ctx.get() with bytes values. Binary data persists through pause/resume/checkpoint.
Manual Event Routing
ctx.send_event(ContinueEvent()) # Synchronous, routes manually
return None # Don't return an event when using send_event
Streaming Events Externally
ctx.write_event_to_stream(ProgressEvent(...)) # Synchronous, external broadcast
State vs Session
Context exposes two explicit namespaces that make your intent clear at the call site:
| Namespace | Survives pause/resume | Use for |
|---|---|---|
ctx.state | yes | persistable values (JSON, bytes, picklable objects) |
ctx.session | no (see pause policy) | live in-process references — identity-preserving |
import sqlite3
from blazen import step, Context, StartEvent, StopEvent
@step
async def setup(ctx: Context, ev: StartEvent) -> StopEvent:
# Persistable state — survives pause/resume and checkpoints.
ctx.state["input_path"] = "data.csv"
ctx.state["row_count"] = 0
# Live in-process references — identity is preserved.
conn = sqlite3.connect(":memory:")
ctx.session["db"] = conn
assert ctx.session["db"] is conn # same object, always
return StopEvent(result={"ok": True})
Both namespaces support the Python dict protocol (ctx.state["k"] = v, "k" in ctx.session, etc.). ctx.state routes through the same 4-tier dispatch as ctx.set and exposes set / get / set_bytes / get_bytes plus the dict protocol. ctx.session exposes set / get / has / remove plus the dict protocol. The legacy ctx.set / ctx.get still work as smart-routing shortcuts.
result.resultpreservesis-identity for non-JSON values — you can pass class instances, Pydantic models, and live DB connections throughStopEvent.resultand get the same object back.
Pause policy for ctx.session
Because session entries are live references, they are deliberately excluded from snapshots. When you call handler.pause(), the workflow’s session_pause_policy governs what happens to them. The default (pickle_or_error) attempts to pickle each entry into the snapshot and raises a clear error if any entry can’t be serialised. Other policies (warn_drop, hard_error) let workflows opt into ephemeral or strict behaviour. The practical rule: put anything that must survive pause() / resume() in ctx.state, and everything else in ctx.session.
BlazenState
For most cases prefer ctx.state and ctx.session directly — they’re simpler and cover the common patterns. BlazenState is for codebases that want typed, structured state with per-field custom storage (e.g. mapping one field to a file on disk, another to a database row). If your state is just a bag of values, use the namespaces.
BlazenState is a base class for typed workflow state with per-field context storage. Instead of manually calling ctx.set() for each piece of data, you define a @dataclass subclass and let Blazen store each field individually using the optimal storage tier.
Basic Example
import sqlite3
from dataclasses import dataclass, field
from blazen import BlazenState, Context, Event, StartEvent, StopEvent, step, Workflow
@dataclass
class PipelineState(BlazenState):
input_path: str = ""
doc_count: int = 0
conn: sqlite3.Connection | None = None
class Meta:
transient = {"conn"}
store_by = {}
def restore(self):
if self.input_path:
self.conn = sqlite3.connect(self.input_path)
class ProcessEvent(Event):
pass
@step
async def setup(ctx: Context, ev: Event):
state = PipelineState(input_path="/tmp/data.db", doc_count=0)
state.restore() # Opens the sqlite3 connection
ctx.set("state", state)
return ProcessEvent()
@step
async def process(ctx: Context, ev: ProcessEvent):
state = ctx.get("state") # PipelineState with all fields restored
cursor = state.conn.cursor() # Transient field recreated by restore()
cursor.execute("SELECT count(*) FROM docs")
state.doc_count = cursor.fetchone()[0]
ctx.set("state", state) # Persist updated state
return StopEvent(result={"docs": state.doc_count})
When you call ctx.set("state", my_state) with a BlazenState subclass, Blazen stores each field individually under the hood. When you call ctx.get("state"), it reconstructs the object field-by-field and then calls restore() to recreate any transient fields.
Storage Tiers
Each field is automatically stored using the best tier based on its type:
| Tier | When Used | Survives Snapshots |
|---|---|---|
| JSON | str, int, float, bool, None, dict, list | Yes |
| Bytes | bytes, bytearray | Yes |
| Pickle | Pydantic models, dataclasses, other serializable objects | Yes |
| Live reference | Objects listed in Meta.transient | No |
Transient fields (like database connections, file handles, sockets) are excluded from serialization entirely. They are set to None in the snapshot and recreated by your restore() method when the state is loaded back.
Custom Persistence with FieldStore
For fields that need custom storage logic (e.g., writing large blobs to S3 instead of the context), implement the FieldStore protocol or use CallbackFieldStore:
from blazen import BlazenState, CallbackFieldStore
def save_to_s3(key: str, value: bytes) -> None:
s3.put_object(Bucket="my-bucket", Key=key, Body=value)
def load_from_s3(key: str) -> bytes:
return s3.get_object(Bucket="my-bucket", Key=key)["Body"].read()
@dataclass
class ModelState(BlazenState):
name: str = ""
weights: bytes = b""
class Meta:
transient = set()
store_by = {
"weights": CallbackFieldStore(
save_fn=save_to_s3,
load_fn=load_from_s3,
),
}
When Blazen stores ModelState, the weights field is routed through your CallbackFieldStore instead of the default context storage. All other fields use their automatic tier.
Key Points
- Transient fields are excluded from serialization. They are
Noneafter a snapshot restore untilrestore()recreates them. restore()is called automatically byctx.get()after all serializable fields are populated. Override it to reconnect databases, reopen files, or rebuild caches.- Per-field storage means each field is an independent context entry. Updating one field and calling
ctx.set()again only overwrites the changed fields, not the entire object.