Параллельное выполнение¶
В fluxio есть два способа запустить стейджи одновременно: явный Parallel([...]) и автоматический параллелизм на этапе компиляции.
Явный Parallel¶
Когда вы точно знаете, что несколько стейджей независимы, и хотите это явно отразить в коде:
from fluxio import Parallel
Pipeline([
fetch_user,
Parallel([enrich_profile, fetch_orders]),
format_response,
])
Внутри ветки запускаются через asyncio.gather. Каждая ветка стартует с форка контекста, а после завершения всех веток их записи мерджатся обратно в общий контекст.
Автоматический параллелизм¶
Если у стейджей объявлены reads и writes, fluxio сам видит, какие из них друг от друга не зависят, и оборачивает их в Parallel за вас:
@stage(reads=frozenset({"user_id"}), writes=frozenset({"user"}))
async def fetch_user(ctx): ...
@stage(reads=frozenset({"user_id"}), writes=frozenset({"orders"}))
async def fetch_orders(ctx): ...
# В коде вы пишете последовательно:
Pipeline([fetch_user, fetch_orders])
# А fluxio скомпилирует это как Parallel([fetch_user, fetch_orders]).
Чтобы вы видели, что произошло, в лог пишется warning при автоматическом сворачивании. Если поведение нежелательно, отключайте через auto_parallel=False.
Конфликты при мердже¶
Если две ветки записали значение в один и тот же ключ, мердж падает с MergeConflictError. Когда writes объявлены, компилятор поймает такой конфликт ещё на этапе компиляции — на проде он не выстрелит.
Fire-and-forget¶
Иногда параллельные ветки нужны не для получения результата, а просто чтобы что-то отправить в фон (метрики, аудит-лог) и идти дальше:
В этом режиме ветки запускаются параллельно, но их ошибки только логируются, а не пробрасываются. Мерджа результатов нет — выполнение продолжается с того контекста, который был до форка.
Полезные привычки¶
- Объявляйте
readsиwrites. Это включает автопараллелизм и заставляет компилятор ловить конфликты записей до запуска. Parallelможно класть внутрь dict-блока роутинга ({"premium": [Parallel([...])]}) — работает как ожидается.- Scheduler опирается на asyncio + thread pool, поэтому CPU-связанные
SYNC-стейджи внутриParallelдействительно выполняются на разных потоках.