Stage¶
A stage is the unit of composition in fluxio. It is any function decorated with @stage that takes a Context and returns a Context (or Send, for routing).
Three flavours, one decorator¶
Stage type is auto-detected from the function signature:
SYNC stages run in a thread pool, so CPU-bound work doesn't block the event loop.
Declared reads and writes¶
Declaring reads / writes unlocks two features:
- Auto-parallelism. Consecutive stages with disjoint dependencies are wrapped in an implicit
Parallel([...])block at compile time. - Write-conflict detection. If two branches of a
Parallelwrite the same key, compilation fails withCompilationError.
Per-stage options¶
from pydantic import BaseModel
class UserInput(BaseModel):
user_id: int
@stage(
input_schema=UserInput, # validates before CALL
output_schema=UserOutput, # validates after CALL
timeout=5.0, # asyncio.timeout around the call
)
async def fetch_user(ctx): ...
Schemas are validated against ctx.snapshot(). Extra keys are ignored — schemas describe what a stage requires / produces, not the entire world.
Returning Send for routing¶
from fluxio import Send
@stage
async def route(ctx):
if ctx["tier"] == "premium":
return Send("premium", {"priority": "high"})
return Send("standard")
See Conditional routing.