Перейти к содержанию

fluxio

Библиотека для построения пайплайнов на Python: вы пишете обычные функции, а fluxio собирает из них программу, которую можно параллелить, стримить и возобновлять после падения.

Что это даёт

  • Пайплайн собирается из стейджей. Стейдж — это любая функция с декоратором @stage. Синхронная, асинхронная или генератор — fluxio сам разберётся.
  • Контекст неизменяемый. ctx.set("ключ", значение) возвращает новый контекст, старый остаётся как есть. Внутри — HAMT с разделяемой структурой, поэтому копии дешёвые.
  • Параллелизм из коробки. Если у стейджа объявлены reads / writes, компилятор сам найдёт независимые стейджи и запустит их параллельно. Конфликты записей ловятся при компиляции, а не в продакшне.
  • Возобновление после падения. Между стейджами пишутся чекпоинты. Упало — перезапустили с resume=True и пайплайн продолжит с того места, где остановился.
  • Условные ветки. Стейдж возвращает Send("имя_ветки"), и следующий dict-блок выбирает, какую под-пайплайн запустить.
  • Стриминг. Если стейдж — async-генератор, его чанки можно отдавать клиенту по мере появления через pipe.stream() или callback.
  • Стандартные обёртки. Retry, кеш, circuit breaker, rate limit, логирование, интеграция с Langfuse, in-memory и Redis-стор — всё уже есть.

Минимальный пример

from fluxio import Pipeline, stage

@stage
async def fetch_user(ctx):
    return ctx.set("user", {"id": ctx["user_id"], "name": "Alice"})

@stage
async def greet(ctx):
    return ctx.set("greeting", f"Привет, {ctx['user']['name']}")

async with Pipeline([fetch_user, greet]) as pipe:
    result = await pipe.invoke({"user_id": 1})
    print(result["greeting"])

Что читать дальше

  • Установка — как поставить пакет и какие есть extras.
  • Быстрый старт — все основные элементы в одном файле.
  • Стейдж — единица композиции, из которой собирается всё остальное.
  • Durable execution — как писать пайплайны, которые переживают падения.
  • Справочник API — полный список классов и функций.