Пайплайн¶
Pipeline — это то, что собирает список стейджей, Parallel-блоков и dict-роутингов в готовую к запуску программу. На входе — декларативное описание, на выходе — объект, который умеет запускаться, стримить и возобновляться после падения.
Создание¶
async with Pipeline(
nodes=[stage_a, stage_b, Parallel([c, d]), router, {"x": [e], "y": [f]}],
middleware=[RetryMiddleware(), CacheMiddleware()],
callbacks=[LoggingCallback()],
checkpoint_store=InMemoryStore(),
durable=True,
max_workers=8,
auto_parallel=True,
) as pipe:
...
Несколько важных деталей:
- Компиляция происходит один раз, в
__init__. Готовый байткод доступен поpipe.version(короткий sha256-хеш — пригодится для durable execution). async with— рекомендуемая форма создания. На выходе она корректно закроет внутренний пул потоков, в котором выполняютсяSYNC-стейджи.
Способы запуска¶
invoke — обычный запуск¶
Если включён durable=True, к нему добавляется run_id для идентификации конкретного прогона:
# первый раз — упало посреди пайплайна
await pipe.invoke({"user_id": 42}, run_id="req-1")
# продолжаем с последнего чекпоинта
await pipe.invoke({}, run_id="req-1", resume=True)
stream — стриминг чанков¶
Отдаёт чанки из STREAM-стейджей по мере их появления — пригодится для SSE, WebSocket'ов и подобного:
async with Pipeline([ingest, llm_stream]) as pipe:
async for chunk in pipe.stream({"prompt": "привет"}):
await websocket.send(chunk)
Не-STREAM стейджи всё равно выполняются в обычном порядке — стриминг не отменяет остальную работу.
run_step — один стейдж изолированно¶
Удобно в тестах, когда нужно проверить ровно одну функцию без сборки всего пайплайна:
replay — повтор по чекпоинту¶
Перезапуск пайплайна с сохранённого состояния. Требует durable=True:
explain — что получилось после компиляции¶
Печатает читаемую структуру скомпилированного пайплайна — со всеми маршрутами и параллельными блоками. Полезно, когда вы используете автоматический параллелизм и хотите посмотреть, что компилятор сделал из вашего списка:
Что важно знать¶
- Если запустить
invoke(run_id="x")дважды параллельно сdurable=True, второй вызов получитRunIDInUseError— для одногоrun_idза раз может выполняться только один прогон. RetryMiddlewareиCacheMiddlewareсами пропускают STREAM-стейджи: повтор дублировал бы уже отданные чанки, а кеширование живого генератора бессмысленно.async withможно не использовать, если у вас только async-стейджи и пул потоков не нужен. В этом случае вызывайтеpipe.shutdown()вручную, когда пайплайн больше не нужен.