Quickstart¶
A walk-through of the core moving parts in one file.
import asyncio
from fluxio import Pipeline, Parallel, stage
# 1. Stages — any function decorated with @stage.
@stage
async def fetch_user(ctx):
return ctx.set("user", {"id": ctx["user_id"], "name": "Alice"})
# 2. Declared reads/writes enable auto-parallelism and conflict detection.
@stage(reads=frozenset({"user"}), writes=frozenset({"profile"}))
async def enrich(ctx):
return ctx.set("profile", {"bio": "..."})
@stage(reads=frozenset({"user"}), writes=frozenset({"orders"}))
async def fetch_orders(ctx):
return ctx.set("orders", [1, 2, 3])
@stage
async def finalize(ctx):
return ctx.set(
"summary",
f"{ctx['user']['name']} has {len(ctx['orders'])} orders",
)
# 3. Compile once, invoke many. async with ensures clean shutdown.
async def main() -> None:
async with Pipeline(
[
fetch_user,
Parallel([enrich, fetch_orders]),
finalize,
],
) as pipe:
result = await pipe.invoke({"user_id": 42})
print(result["summary"]) # Alice has 3 orders
asyncio.run(main())
What just happened¶
@stageattached metadata to each function. Stage type (ASYNC/SYNC/STREAM) was auto-detected from the signature.Pipeline(...)compiled the nodes into a linear bytecode once.pipe.invoke({...})ran the program, passing an immutableContextfrom stage to stage.Parallel([...])ranenrichandfetch_ordersconcurrently and merged their writes.async withclosed the thread pool used forSYNCstages.
Next: learn the Stage, Context, and Pipeline primitives in detail.