Перейти к содержанию

Параллельное выполнение

В fluxio есть два способа запустить стейджи одновременно: явный Parallel([...]) и автоматический параллелизм на этапе компиляции.

Явный Parallel

Когда вы точно знаете, что несколько стейджей независимы, и хотите это явно отразить в коде:

from fluxio import Parallel

Pipeline([
    fetch_user,
    Parallel([enrich_profile, fetch_orders]),
    format_response,
])

Внутри ветки запускаются через asyncio.gather. Каждая ветка стартует с форка контекста, а после завершения всех веток их записи мерджатся обратно в общий контекст.

Автоматический параллелизм

Если у стейджей объявлены reads и writes, fluxio сам видит, какие из них друг от друга не зависят, и оборачивает их в Parallel за вас:

@stage(reads=frozenset({"user_id"}), writes=frozenset({"user"}))
async def fetch_user(ctx): ...

@stage(reads=frozenset({"user_id"}), writes=frozenset({"orders"}))
async def fetch_orders(ctx): ...

# В коде вы пишете последовательно:
Pipeline([fetch_user, fetch_orders])
# А fluxio скомпилирует это как Parallel([fetch_user, fetch_orders]).

Чтобы вы видели, что произошло, в лог пишется warning при автоматическом сворачивании. Если поведение нежелательно, отключайте через auto_parallel=False.

Конфликты при мердже

Если две ветки записали значение в один и тот же ключ, мердж падает с MergeConflictError. Когда writes объявлены, компилятор поймает такой конфликт ещё на этапе компиляции — на проде он не выстрелит.

Fire-and-forget

Иногда параллельные ветки нужны не для получения результата, а просто чтобы что-то отправить в фон (метрики, аудит-лог) и идти дальше:

Parallel([audit_log, metrics_push], mode=ForkMode.FIRE_FORGET)

В этом режиме ветки запускаются параллельно, но их ошибки только логируются, а не пробрасываются. Мерджа результатов нет — выполнение продолжается с того контекста, который был до форка.

Полезные привычки

  • Объявляйте reads и writes. Это включает автопараллелизм и заставляет компилятор ловить конфликты записей до запуска.
  • Parallel можно класть внутрь dict-блока роутинга ({"premium": [Parallel([...])]}) — работает как ожидается.
  • Scheduler опирается на asyncio + thread pool, поэтому CPU-связанные SYNC-стейджи внутри Parallel действительно выполняются на разных потоках.