Справочник API¶
Эта страница автоматически генерируется из docstring'ов в исходниках — она показывает актуальные сигнатуры и описания всех публичных классов и функций.
Pipeline¶
fluxio.Pipeline ¶
Pipeline(
nodes: list[StageFunc | Parallel | dict[str, Any]],
*,
middleware: list[Middleware] | None = None,
callbacks: list[BaseCallback] | None = None,
checkpoint_store: CheckpointStore | None = None,
durable: bool = False,
max_workers: int | None = None,
auto_parallel: bool = True,
)
Compiled, executable sequence of stages.
Nodes are stages decorated with @stage, Parallel(...) blocks, or
dict[str, ...] routing blocks (following a stage that returns Send).
The pipeline is compiled once on construction. Use async with Pipeline(...)
to ensure the thread pool shuts down cleanly.
Example::
async with Pipeline([fetch_user, send_email]) as pipe:
result = await pipe.invoke({"user_id": 42})
invoke
async
¶
invoke(
initial_ctx: dict[str, Any] | Context,
*,
run_id: str | None = None,
resume: bool = False,
) -> Context
Run the pipeline to completion and return the final context.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
initial_ctx
|
dict[str, Any] | Context
|
Initial values as a dict or a |
required |
run_id
|
str | None
|
Identifier for this run. Auto-generated if not provided.
With |
None
|
resume
|
bool
|
If True, load checkpoint for |
False
|
stream
async
¶
stream(
initial_ctx: dict[str, Any] | Context,
*,
run_id: str | None = None,
) -> AsyncGenerator[Any, None]
Run the pipeline and yield chunks from STREAM stages as they arrive.
Chunks from multiple STREAM stages are yielded in the order produced.
Use an on_step_stream callback if you need to tag chunks by stage.
Parallel¶
fluxio.Parallel
dataclass
¶
stage¶
fluxio.stage ¶
stage(
fn: Callable[..., Any] | None = None,
*,
node_type: NodeType = NodeType.ASYNC,
reads: frozenset[str] | None = None,
writes: frozenset[str] | None = None,
input_schema: type[BaseModel] | None = None,
output_schema: type[BaseModel] | None = None,
timeout: float | None = None,
) -> Any
Mark a function as a pipeline stage.
The stage type is auto-detected from the signature:
async def→ASYNCasync defgenerator (yieldinside) →STREAM- plain
def→SYNC(runs in the pipeline thread pool)
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_type
|
NodeType
|
Override the auto-detected node type. |
ASYNC
|
reads
|
frozenset[str] | None
|
Declared context keys the stage reads. |
None
|
writes
|
frozenset[str] | None
|
Declared context keys the stage writes. Combined with |
None
|
input_schema
|
type[BaseModel] | None
|
Pydantic model validated against the context before the stage runs. Extra fields are ignored. |
None
|
output_schema
|
type[BaseModel] | None
|
Pydantic model validated against the context after the stage runs. Extra fields are ignored. |
None
|
timeout
|
float | None
|
Per-stage timeout in seconds, enforced via |
None
|
Usage::
@stage
async def fetch_user(ctx):
return ctx.set("user", await db.get(ctx["user_id"]))
@stage(reads=frozenset({"user"}), writes=frozenset({"profile"}))
async def enrich(ctx): ...
Context¶
fluxio.Context
dataclass
¶
Immutable, copy-on-write state passed between stages.
Backed by a HAMT (pyrsistent.PMap) — set() returns a new Context
with structural sharing, so forks are O(1). Writes are tracked per fork
and reconciled at merge time; two branches writing the same key raise
MergeConflictError.
Access:
ctx.get(key)— permissive (returnsNoneif missing).ctx[key]— strict (raisesKeyError).ctx.set(k, v)— returns a new Context.
Send¶
Middleware¶
fluxio.Middleware ¶
Bases: ABC
fluxio.RetryMiddleware ¶
RetryMiddleware(
max_attempts: int = 3,
backoff: Literal[
"fixed", "exponential", "jitter"
] = "exponential",
base_delay: float = 0.5,
exceptions: tuple[type[Exception], ...] = (Exception,),
)
Bases: Middleware
Retries the stage up to max_attempts times on matching exceptions.
Bypassed for STREAM stages — partial chunks are already delivered on the first attempt, so retrying would produce duplicate output.
fluxio.CacheMiddleware ¶
CacheMiddleware(
store: CacheStore | None = None,
ttl: int = 300,
key_fn: Callable[[StageFunc, Context], str]
| None = None,
)
Bases: Middleware
Memoizes stage output in a CacheStore keyed by stage name + ctx hash.
Defaults to an in-process InMemoryCache. Bypassed for STREAM stages.
fluxio.CircuitBreakerMiddleware ¶
CircuitBreakerMiddleware(
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
exceptions: tuple[type[Exception], ...] = (Exception,),
)
Bases: Middleware
Opens the circuit after failure_threshold consecutive failures.
In the OPEN state CircuitOpenError is raised immediately without
calling the stage, until recovery_timeout elapses.
fluxio.RateLimitMiddleware ¶
Observability¶
fluxio.BaseCallback ¶
Hook interface for pipeline observability.
Subclass and override the events you care about; all hooks default to no-op.
Attach via Pipeline(callbacks=[MyCallback()]).
Хранилища¶
fluxio.InMemoryCache ¶
fluxio.CacheStore ¶
Bases: ABC
Storage backend for CacheMiddleware. Separate from CheckpointStore.
Ошибки¶
fluxio.CompilationError ¶
Bases: Exception
fluxio.MergeConflictError ¶
Bases: Exception
fluxio.CircuitOpenError ¶
Bases: Exception
fluxio.RunIDInUseError ¶
Bases: RuntimeError
Raised when a durable invoke is attempted with a run_id already in progress.
fluxio.CheckpointVersionError ¶
Bases: Exception