fluxio¶
Библиотека для построения пайплайнов на Python: вы пишете обычные функции, а fluxio собирает из них программу, которую можно параллелить, стримить и возобновлять после падения.
Что это даёт¶
- Пайплайн собирается из стейджей. Стейдж — это любая функция с декоратором
@stage. Синхронная, асинхронная или генератор — fluxio сам разберётся. - Контекст неизменяемый.
ctx.set("ключ", значение)возвращает новый контекст, старый остаётся как есть. Внутри — HAMT с разделяемой структурой, поэтому копии дешёвые. - Параллелизм из коробки. Если у стейджа объявлены
reads/writes, компилятор сам найдёт независимые стейджи и запустит их параллельно. Конфликты записей ловятся при компиляции, а не в продакшне. - Возобновление после падения. Между стейджами пишутся чекпоинты. Упало — перезапустили с
resume=Trueи пайплайн продолжит с того места, где остановился. - Условные ветки. Стейдж возвращает
Send("имя_ветки"), и следующий dict-блок выбирает, какую под-пайплайн запустить. - Стриминг. Если стейдж — async-генератор, его чанки можно отдавать клиенту по мере появления через
pipe.stream()или callback. - Стандартные обёртки. Retry, кеш, circuit breaker, rate limit, логирование, интеграция с Langfuse, in-memory и Redis-стор — всё уже есть.
Минимальный пример¶
from fluxio import Pipeline, stage
@stage
async def fetch_user(ctx):
return ctx.set("user", {"id": ctx["user_id"], "name": "Alice"})
@stage
async def greet(ctx):
return ctx.set("greeting", f"Привет, {ctx['user']['name']}")
async with Pipeline([fetch_user, greet]) as pipe:
result = await pipe.invoke({"user_id": 1})
print(result["greeting"])
Что читать дальше¶
- Установка — как поставить пакет и какие есть extras.
- Быстрый старт — все основные элементы в одном файле.
- Стейдж — единица композиции, из которой собирается всё остальное.
- Durable execution — как писать пайплайны, которые переживают падения.
- Справочник API — полный список классов и функций.