Durable execution¶
fluxio can checkpoint state between stages and resume a crashed run from the last checkpoint.
Enabling durability¶
from fluxio import Pipeline, InMemoryStore
pipeline = Pipeline(
nodes=[...],
checkpoint_store=InMemoryStore(),
durable=True,
)
Before every CALL instruction the interpreter saves ctx.snapshot() together with the instruction pointer. If the stage raises, a final checkpoint is written with error=True and the exception propagates.
Fresh runs vs resume¶
The default behaviour is always fresh. If a checkpoint exists for the given run_id, it is deleted and the run starts from scratch.
To continue from the last checkpoint, pass resume=True:
# crash happened here
await pipeline.invoke({"user_id": 42}, run_id="req-1")
# later, or in a retry loop
await pipeline.invoke({}, run_id="req-1", resume=True)
If no checkpoint exists for that run_id, KeyError is raised.
Pipeline versioning¶
Each CompiledPipeline has a short sha256 version. A checkpoint stores the version it was taken against. Resuming on a pipeline with a different version raises CheckpointVersionError — code drift should not silently run against stale state.
Concurrency¶
invoke(run_id="x") with durable=True takes a lock for that run_id. A second concurrent call with the same id raises RunIDInUseError. Different run_ids are unconstrained.
Stores¶
InMemoryStore (default)¶
Process-local dict with an asyncio.Lock. Perfect for tests and single-process services.
RedisStore¶
from fluxio import RedisStore
pipeline = Pipeline(
...,
checkpoint_store=RedisStore(
url="redis://localhost:6379",
ttl=86400,
key_prefix="fluxio:checkpoint",
),
durable=True,
)
Install with pip install fluxio[redis].
Custom backends¶
Implement CheckpointStore:
from fluxio.store.base import CheckpointStore, Checkpoint
class PostgresStore(CheckpointStore):
async def save(self, checkpoint: Checkpoint) -> None: ...
async def load(self, run_id: str) -> Checkpoint | None: ...
async def delete(self, run_id: str) -> None: ...
async def exists(self, run_id: str) -> bool: ...