Перейти к содержанию

Справочник API

Эта страница автоматически генерируется из docstring'ов в исходниках — она показывает актуальные сигнатуры и описания всех публичных классов и функций.

Pipeline

fluxio.Pipeline

Pipeline(
    nodes: list[StageFunc | Parallel | dict[str, Any]],
    *,
    middleware: list[Middleware] | None = None,
    callbacks: list[BaseCallback] | None = None,
    checkpoint_store: CheckpointStore | None = None,
    durable: bool = False,
    max_workers: int | None = None,
    auto_parallel: bool = True,
)

Compiled, executable sequence of stages.

Nodes are stages decorated with @stage, Parallel(...) blocks, or dict[str, ...] routing blocks (following a stage that returns Send).

The pipeline is compiled once on construction. Use async with Pipeline(...) to ensure the thread pool shuts down cleanly.

Example::

async with Pipeline([fetch_user, send_email]) as pipe:
    result = await pipe.invoke({"user_id": 42})

invoke async

invoke(
    initial_ctx: dict[str, Any] | Context,
    *,
    run_id: str | None = None,
    resume: bool = False,
) -> Context

Run the pipeline to completion and return the final context.

Parameters:

Name Type Description Default
initial_ctx dict[str, Any] | Context

Initial values as a dict or a Context instance.

required
run_id str | None

Identifier for this run. Auto-generated if not provided. With durable=True + resume=True, must match an existing checkpoint.

None
resume bool

If True, load checkpoint for run_id and continue. Raises KeyError if no checkpoint exists. Requires durable=True and a checkpoint_store.

False

stream async

stream(
    initial_ctx: dict[str, Any] | Context,
    *,
    run_id: str | None = None,
) -> AsyncGenerator[Any, None]

Run the pipeline and yield chunks from STREAM stages as they arrive.

Chunks from multiple STREAM stages are yielded in the order produced. Use an on_step_stream callback if you need to tag chunks by stage.

Parallel

fluxio.Parallel dataclass

Parallel(
    branches: list[StageFunc],
    mode: ForkMode = ForkMode.PARALLEL,
    name: str | None = None,
)

stage

fluxio.stage

stage(fn: Callable[..., Any]) -> StageFunc
stage(
    *,
    node_type: NodeType = NodeType.ASYNC,
    reads: frozenset[str] | None = None,
    writes: frozenset[str] | None = None,
    input_schema: type[BaseModel] | None = None,
    output_schema: type[BaseModel] | None = None,
    timeout: float | None = None,
) -> Callable[[Callable[..., Any]], StageFunc]
stage(
    fn: Callable[..., Any] | None = None,
    *,
    node_type: NodeType = NodeType.ASYNC,
    reads: frozenset[str] | None = None,
    writes: frozenset[str] | None = None,
    input_schema: type[BaseModel] | None = None,
    output_schema: type[BaseModel] | None = None,
    timeout: float | None = None,
) -> Any

Mark a function as a pipeline stage.

The stage type is auto-detected from the signature:

  • async defASYNC
  • async def generator (yield inside) → STREAM
  • plain defSYNC (runs in the pipeline thread pool)

Parameters:

Name Type Description Default
node_type NodeType

Override the auto-detected node type.

ASYNC
reads frozenset[str] | None

Declared context keys the stage reads.

None
writes frozenset[str] | None

Declared context keys the stage writes. Combined with reads this enables auto-parallelism and write-conflict detection at compile time.

None
input_schema type[BaseModel] | None

Pydantic model validated against the context before the stage runs. Extra fields are ignored.

None
output_schema type[BaseModel] | None

Pydantic model validated against the context after the stage runs. Extra fields are ignored.

None
timeout float | None

Per-stage timeout in seconds, enforced via asyncio.timeout.

None

Usage::

@stage
async def fetch_user(ctx):
    return ctx.set("user", await db.get(ctx["user_id"]))


@stage(reads=frozenset({"user"}), writes=frozenset({"profile"}))
async def enrich(ctx): ...

Context

fluxio.Context dataclass

Context(
    _data: PMap = pmap(),
    _written: frozenset[str] = frozenset(),
    name: str = "root",
)

Immutable, copy-on-write state passed between stages.

Backed by a HAMT (pyrsistent.PMap) — set() returns a new Context with structural sharing, so forks are O(1). Writes are tracked per fork and reconciled at merge time; two branches writing the same key raise MergeConflictError.

Access:

  • ctx.get(key) — permissive (returns None if missing).
  • ctx[key] — strict (raises KeyError).
  • ctx.set(k, v) — returns a new Context.

Send

fluxio.Send dataclass

Send(route: str, patch: dict[str, Any] = dict())

Middleware

fluxio.Middleware

Bases: ABC

fluxio.RetryMiddleware

RetryMiddleware(
    max_attempts: int = 3,
    backoff: Literal[
        "fixed", "exponential", "jitter"
    ] = "exponential",
    base_delay: float = 0.5,
    exceptions: tuple[type[Exception], ...] = (Exception,),
)

Bases: Middleware

Retries the stage up to max_attempts times on matching exceptions.

Bypassed for STREAM stages — partial chunks are already delivered on the first attempt, so retrying would produce duplicate output.

fluxio.CacheMiddleware

CacheMiddleware(
    store: CacheStore | None = None,
    ttl: int = 300,
    key_fn: Callable[[StageFunc, Context], str]
    | None = None,
)

Bases: Middleware

Memoizes stage output in a CacheStore keyed by stage name + ctx hash.

Defaults to an in-process InMemoryCache. Bypassed for STREAM stages.

fluxio.CircuitBreakerMiddleware

CircuitBreakerMiddleware(
    failure_threshold: int = 5,
    recovery_timeout: float = 60.0,
    exceptions: tuple[type[Exception], ...] = (Exception,),
)

Bases: Middleware

Opens the circuit after failure_threshold consecutive failures.

In the OPEN state CircuitOpenError is raised immediately without calling the stage, until recovery_timeout elapses.

fluxio.RateLimitMiddleware

RateLimitMiddleware(rps: float)

Bases: Middleware

Token-bucket style sliding-window rate limit. Waits, never raises.

Observability

fluxio.BaseCallback

Hook interface for pipeline observability.

Subclass and override the events you care about; all hooks default to no-op. Attach via Pipeline(callbacks=[MyCallback()]).

fluxio.LoggingCallback

LoggingCallback(chunk_logging: bool = False)

Bases: BaseCallback

Хранилища

fluxio.InMemoryStore

InMemoryStore()

Bases: CheckpointStore

fluxio.InMemoryCache

InMemoryCache()

Bases: CacheStore

Process-local cache with TTL-aware retrieval handled by CacheMiddleware.

fluxio.CacheStore

Bases: ABC

Storage backend for CacheMiddleware. Separate from CheckpointStore.

Ошибки

fluxio.CompilationError

CompilationError(
    message: str, node_ids: list[str] | None = None
)

Bases: Exception

fluxio.MergeConflictError

MergeConflictError(
    conflicting_keys: list[str], branch_names: list[str]
)

Bases: Exception

fluxio.CircuitOpenError

Bases: Exception

fluxio.RunIDInUseError

Bases: RuntimeError

Raised when a durable invoke is attempted with a run_id already in progress.

fluxio.CheckpointVersionError

Bases: Exception