Перейти к содержанию

Пайплайн

Pipeline — это то, что собирает список стейджей, Parallel-блоков и dict-роутингов в готовую к запуску программу. На входе — декларативное описание, на выходе — объект, который умеет запускаться, стримить и возобновляться после падения.

Создание

async with Pipeline(
    nodes=[stage_a, stage_b, Parallel([c, d]), router, {"x": [e], "y": [f]}],
    middleware=[RetryMiddleware(), CacheMiddleware()],
    callbacks=[LoggingCallback()],
    checkpoint_store=InMemoryStore(),
    durable=True,
    max_workers=8,
    auto_parallel=True,
) as pipe:
    ...

Несколько важных деталей:

  • Компиляция происходит один раз, в __init__. Готовый байткод доступен по pipe.version (короткий sha256-хеш — пригодится для durable execution).
  • async with — рекомендуемая форма создания. На выходе она корректно закроет внутренний пул потоков, в котором выполняются SYNC-стейджи.

Способы запуска

invoke — обычный запуск

result: Context = await pipe.invoke({"user_id": 42})

Если включён durable=True, к нему добавляется run_id для идентификации конкретного прогона:

# первый раз — упало посреди пайплайна
await pipe.invoke({"user_id": 42}, run_id="req-1")

# продолжаем с последнего чекпоинта
await pipe.invoke({}, run_id="req-1", resume=True)

stream — стриминг чанков

Отдаёт чанки из STREAM-стейджей по мере их появления — пригодится для SSE, WebSocket'ов и подобного:

async with Pipeline([ingest, llm_stream]) as pipe:
    async for chunk in pipe.stream({"prompt": "привет"}):
        await websocket.send(chunk)

Не-STREAM стейджи всё равно выполняются в обычном порядке — стриминг не отменяет остальную работу.

run_step — один стейдж изолированно

Удобно в тестах, когда нужно проверить ровно одну функцию без сборки всего пайплайна:

out = await pipe.run_step("fetch_user", {"user_id": 1})

replay — повтор по чекпоинту

Перезапуск пайплайна с сохранённого состояния. Требует durable=True:

await pipe.replay("req-1")

explain — что получилось после компиляции

Печатает читаемую структуру скомпилированного пайплайна — со всеми маршрутами и параллельными блоками. Полезно, когда вы используете автоматический параллелизм и хотите посмотреть, что компилятор сделал из вашего списка:

print(pipe.explain())

Что важно знать

  • Если запустить invoke(run_id="x") дважды параллельно с durable=True, второй вызов получит RunIDInUseError — для одного run_id за раз может выполняться только один прогон.
  • RetryMiddleware и CacheMiddleware сами пропускают STREAM-стейджи: повтор дублировал бы уже отданные чанки, а кеширование живого генератора бессмысленно.
  • async with можно не использовать, если у вас только async-стейджи и пул потоков не нужен. В этом случае вызывайте pipe.shutdown() вручную, когда пайплайн больше не нужен.