Быстрый старт¶
Ниже — полный рабочий пример со всеми ключевыми частями fluxio. Дальше по документации каждый кусочек разобран подробно.
import asyncio
from fluxio import Pipeline, Parallel, stage
# 1. Стейдж — обычная функция, помеченная @stage.
# Принимает контекст, возвращает контекст.
@stage
async def fetch_user(ctx):
return ctx.set("user", {"id": ctx["user_id"], "name": "Alice"})
# 2. Если объявить, что стейдж читает и что пишет,
# fluxio сам найдёт независимые стейджи и распараллелит их.
@stage(reads=frozenset({"user"}), writes=frozenset({"profile"}))
async def enrich(ctx):
return ctx.set("profile", {"bio": "..."})
@stage(reads=frozenset({"user"}), writes=frozenset({"orders"}))
async def fetch_orders(ctx):
return ctx.set("orders", [1, 2, 3])
@stage
async def finalize(ctx):
return ctx.set(
"summary",
f"У {ctx['user']['name']} {len(ctx['orders'])} заказов",
)
# 3. Pipeline компилируется один раз; invoke можно вызывать
# сколько угодно. async with закрывает внутренний пул потоков.
async def main() -> None:
async with Pipeline(
[
fetch_user,
Parallel([enrich, fetch_orders]),
finalize,
],
) as pipe:
result = await pipe.invoke({"user_id": 42})
print(result["summary"]) # У Alice 3 заказов
asyncio.run(main())
Что здесь произошло¶
@stageпревратил обычную функцию в стейдж. Тип (ASYNC/SYNC/STREAM) определился автоматически по сигнатуре.Pipeline(...)прошёлся по списку стейджей и скомпилировал из них линейную программу — это происходит один раз, на старте.pipe.invoke({...})запустил эту программу: интерпретатор по очереди вызывает стейджи, передавая между ними неизменяемыйContext.Parallel([...])запустилenrichиfetch_ordersодновременно, а после завершения слил их записи обратно в общий контекст.async withв конце корректно закрыл пул потоков, в котором выполняются синхронные стейджи.