Pipeline¶
Pipeline compiles a list of stages, Parallel blocks, and routing dicts into executable bytecode and orchestrates runs.
Construction¶
async with Pipeline(
nodes=[stage_a, stage_b, Parallel([c, d]), router, {"x": [e], "y": [f]}],
middleware=[RetryMiddleware(), CacheMiddleware()],
callbacks=[LoggingCallback()],
checkpoint_store=InMemoryStore(),
durable=True,
max_workers=8,
auto_parallel=True,
) as pipe:
...
- Compilation happens once, in
__init__. The result is cached inpipe.version(a short sha256). async withis the recommended form — it closes the internal thread pool on exit.
Running¶
invoke¶
With durable execution:
# first attempt crashes somewhere mid-pipeline
await pipe.invoke({"user_id": 42}, run_id="req-1")
# resume from last checkpoint
await pipe.invoke({}, run_id="req-1", resume=True)
stream¶
Yields chunks from STREAM stages as they arrive:
async with Pipeline([ingest, llm_stream]) as pipe:
async for chunk in pipe.stream({"prompt": "hi"}):
await websocket.send(chunk)
run_step¶
Runs a single stage in isolation — useful for tests:
replay¶
Re-runs a pipeline from a stored checkpoint. Requires durable=True:
explain¶
Human-readable structure of the compiled pipeline, including routes and parallel blocks:
Key guarantees¶
- Concurrent
invoke(run_id="x")withdurable=Trueon the same pipeline raisesRunIDInUseError. STREAMstages bypassRetryMiddlewareandCacheMiddlewareautomatically — partial chunks have already been delivered.async withis safe to omit for purely-async pipelines, but you must then callpipe.shutdown()manually.