Стейдж¶
Стейдж — это базовый кирпичик, из которого собираются пайплайны. По сути это обычная Python-функция, помеченная @stage: она принимает Context, что-то с ним делает и возвращает новый Context (или Send, если хочет переключить ветку — об этом ниже).
Три типа стейджей, один декоратор¶
@stage сам определяет тип по сигнатуре функции — отдельных декораторов под каждый тип нет.
Обычный async-стейдж. Подходит для всего, что делает await: сетевые запросы, БД, обращения к LLM.
Обычная синхронная функция. fluxio запустит её в пуле потоков, чтобы CPU-нагрузка не блокировала event loop. Удобно для хешей, парсинга, сериализации.
Async-генератор. Любой async def с yield становится стримом — чанки доставляются по мере появления через pipe.stream() или callback. Дополнительно весь собранный список чанков пишется в ctx[f"{node_id}_stream_result"], чтобы следующие стейджи могли его прочитать.
Объявление reads и writes¶
Если указать, какие ключи стейдж читает и какие пишет, fluxio получает две вещи бесплатно:
- Автоматический параллелизм. Если два соседних стейджа не зависят друг от друга по данным, компилятор сам обернёт их в
Parallel([...]). Никаких изменений в коде не нужно. - Раннее обнаружение конфликтов. Если две ветки
Parallelпишут в один и тот же ключ, компиляция упадёт сCompilationError— вы узнаете об этом до первого запуска, а не на проде.
Параметры стейджа¶
from pydantic import BaseModel
class UserInput(BaseModel):
user_id: int
@stage(
input_schema=UserInput, # валидация входа перед вызовом
output_schema=UserOutput, # валидация выхода после вызова
timeout=5.0, # asyncio.timeout вокруг вызова
)
async def fetch_user(ctx): ...
Схемы проверяются против ctx.snapshot(). Лишние ключи в контексте схему не ломают — она описывает только то, что нужно конкретному стейджу, а не весь набор данных.
Возврат Send для роутинга¶
Если стейдж — это «роутер», который выбирает дальнейшую ветку, он возвращает Send:
from fluxio import Send
@stage
async def route(ctx):
if ctx["tier"] == "premium":
return Send("premium", {"priority": "high"})
return Send("standard")
Подробнее — в разделе Условный роутинг.