Context

Share state between workflow steps in Python

What is Context?

A key-value store shared across all steps in a workflow run.

Blazen exposes two explicit namespaces alongside the smart-routing ctx.set / ctx.get shortcuts: ctx.state for persistable values (survives pause/resume and checkpoints) and ctx.session for live in-process references (identity-preserving within a run, excluded from snapshots). See State vs Session below.

Setting and Getting Values

ctx.set(key, value) stores any Python value using a 4-tier dispatch:

  1. bytes / bytearray → raw binary (survives snapshots)
  2. JSON-serializable (dict, list, str, int, float, bool, None) → JSON (survives snapshots)
  3. Picklable objects (Pydantic models, dataclasses, custom classes) → pickled automatically (survives snapshots)
  4. Unpicklable objects (DB connections, file handles, sockets, lambdas) → live in-process reference (same-process only, excluded from snapshots)

ctx.get returns the original Python type for all four tiers.

from pydantic import BaseModel

class UserProfile(BaseModel):
    name: str
    score: float

class NextEvent(Event):
    pass

@step
async def store_data(ctx: Context, ev: Event):
    # JSON-serializable values (stored as JSON)
    ctx.set("user_id", "user_123")
    ctx.set("doc_count", 5)
    ctx.set("tags", ["admin", "active"])

    # Raw bytes (stored as binary)
    ctx.set("thumbnail", b"\x89PNG\r\n...")

    # Pydantic model (pickled automatically)
    ctx.set("profile", UserProfile(name="Alice", score=0.95))
    return NextEvent()

@step
async def use_data(ctx: Context, ev: NextEvent):
    user_id = ctx.get("user_id")          # str
    doc_count = ctx.get("doc_count")      # int
    thumbnail = ctx.get("thumbnail")      # bytes
    profile = ctx.get("profile")          # UserProfile
    return StopEvent(result={"user": user_id, "name": profile.name})

Important: ctx.set() and ctx.get() are synchronous — no await.

Run ID

run_id = ctx.run_id()  # Synchronous, returns a UUID string

Binary Storage

Since ctx.set() now handles bytes and bytearray natively (stored as raw binary), you can pass binary data directly:

@step
async def store(ctx: Context, ev: Event):
    ctx.set("model", b"\x00\x01\x02...")  # stored as raw bytes
    return NextEvent()

@step
async def load(ctx: Context, ev: NextEvent):
    raw = ctx.get("model")  # bytes
    return StopEvent(result=raw)

ctx.set_bytes() and ctx.get_bytes() remain available as explicit convenience aliases for binary data. They behave identically to calling ctx.set() / ctx.get() with bytes values. Binary data persists through pause/resume/checkpoint.

Manual Event Routing

ctx.send_event(ContinueEvent())  # Synchronous, routes manually
return None  # Don't return an event when using send_event

Streaming Events Externally

ctx.write_event_to_stream(ProgressEvent(...))  # Synchronous, external broadcast

State vs Session

Context exposes two explicit namespaces that make your intent clear at the call site:

NamespaceSurvives pause/resumeUse for
ctx.stateyespersistable values (JSON, bytes, picklable objects)
ctx.sessionno (see pause policy)live in-process references — identity-preserving
import sqlite3
from blazen import step, Context, StartEvent, StopEvent

@step
async def setup(ctx: Context, ev: StartEvent) -> StopEvent:
    # Persistable state — survives pause/resume and checkpoints.
    ctx.state["input_path"] = "data.csv"
    ctx.state["row_count"] = 0

    # Live in-process references — identity is preserved.
    conn = sqlite3.connect(":memory:")
    ctx.session["db"] = conn
    assert ctx.session["db"] is conn  # same object, always

    return StopEvent(result={"ok": True})

Both namespaces support the Python dict protocol (ctx.state["k"] = v, "k" in ctx.session, etc.). ctx.state routes through the same 4-tier dispatch as ctx.set and exposes set / get / set_bytes / get_bytes plus the dict protocol. ctx.session exposes set / get / has / remove plus the dict protocol. The legacy ctx.set / ctx.get still work as smart-routing shortcuts.

result.result preserves is-identity for non-JSON values — you can pass class instances, Pydantic models, and live DB connections through StopEvent.result and get the same object back.

Pause policy for ctx.session

Because session entries are live references, they are deliberately excluded from snapshots. When you call handler.pause(), the workflow’s session_pause_policy governs what happens to them. The default (pickle_or_error) attempts to pickle each entry into the snapshot and raises a clear error if any entry can’t be serialised. Other policies (warn_drop, hard_error) let workflows opt into ephemeral or strict behaviour. The practical rule: put anything that must survive pause() / resume() in ctx.state, and everything else in ctx.session.

BlazenState

For most cases prefer ctx.state and ctx.session directly — they’re simpler and cover the common patterns. BlazenState is for codebases that want typed, structured state with per-field custom storage (e.g. mapping one field to a file on disk, another to a database row). If your state is just a bag of values, use the namespaces.

BlazenState is a base class for typed workflow state with per-field context storage. Instead of manually calling ctx.set() for each piece of data, you define a @dataclass subclass and let Blazen store each field individually using the optimal storage tier.

Basic Example

import sqlite3
from dataclasses import dataclass, field
from blazen import BlazenState, Context, Event, StartEvent, StopEvent, step, Workflow

@dataclass
class PipelineState(BlazenState):
    input_path: str = ""
    doc_count: int = 0
    conn: sqlite3.Connection | None = None

    class Meta:
        transient = {"conn"}
        store_by = {}

    def restore(self):
        if self.input_path:
            self.conn = sqlite3.connect(self.input_path)

class ProcessEvent(Event):
    pass

@step
async def setup(ctx: Context, ev: Event):
    state = PipelineState(input_path="/tmp/data.db", doc_count=0)
    state.restore()  # Opens the sqlite3 connection
    ctx.set("state", state)
    return ProcessEvent()

@step
async def process(ctx: Context, ev: ProcessEvent):
    state = ctx.get("state")          # PipelineState with all fields restored
    cursor = state.conn.cursor()      # Transient field recreated by restore()
    cursor.execute("SELECT count(*) FROM docs")
    state.doc_count = cursor.fetchone()[0]
    ctx.set("state", state)           # Persist updated state
    return StopEvent(result={"docs": state.doc_count})

When you call ctx.set("state", my_state) with a BlazenState subclass, Blazen stores each field individually under the hood. When you call ctx.get("state"), it reconstructs the object field-by-field and then calls restore() to recreate any transient fields.

Storage Tiers

Each field is automatically stored using the best tier based on its type:

TierWhen UsedSurvives Snapshots
JSONstr, int, float, bool, None, dict, listYes
Bytesbytes, bytearrayYes
PicklePydantic models, dataclasses, other serializable objectsYes
Live referenceObjects listed in Meta.transientNo

Transient fields (like database connections, file handles, sockets) are excluded from serialization entirely. They are set to None in the snapshot and recreated by your restore() method when the state is loaded back.

Custom Persistence with FieldStore

For fields that need custom storage logic (e.g., writing large blobs to S3 instead of the context), implement the FieldStore protocol or use CallbackFieldStore:

from blazen import BlazenState, CallbackFieldStore

def save_to_s3(key: str, value: bytes) -> None:
    s3.put_object(Bucket="my-bucket", Key=key, Body=value)

def load_from_s3(key: str) -> bytes:
    return s3.get_object(Bucket="my-bucket", Key=key)["Body"].read()

@dataclass
class ModelState(BlazenState):
    name: str = ""
    weights: bytes = b""

    class Meta:
        transient = set()
        store_by = {
            "weights": CallbackFieldStore(
                save_fn=save_to_s3,
                load_fn=load_from_s3,
            ),
        }

When Blazen stores ModelState, the weights field is routed through your CallbackFieldStore instead of the default context storage. All other fields use their automatic tier.

Key Points

  • Transient fields are excluded from serialization. They are None after a snapshot restore until restore() recreates them.
  • restore() is called automatically by ctx.get() after all serializable fields are populated. Override it to reconnect databases, reopen files, or rebuild caches.
  • Per-field storage means each field is an independent context entry. Updating one field and calling ctx.set() again only overwrites the changed fields, not the entire object.