Контекст¶
Context — это неизменяемое состояние, которое проходит через все стейджи. Каждый стейдж получает контекст на вход, что-то добавляет и возвращает новый. Старый при этом не меняется — это ключевая идея, на которой держатся форки, мерджи и чекпоинты.
Чтение¶
ctx.get("user_id") # вернёт None, если ключа нет
ctx.get("tier", "standard") # с дефолтом
ctx["user_id"] # KeyError, если ключа нет
"user_id" in ctx # проверка наличия
Простое правило: ctx[key] — когда ключ обязан быть и его отсутствие — это баг. ctx.get(key) — когда «нет ключа» вполне допустимая ситуация.
Запись¶
Запись возвращает новый контекст, оригинал остаётся как был:
new_ctx = ctx.set("user", {"id": 1})
ctx.get("user") # None — оригинал не изменён
new_ctx.get("user") # {"id": 1}
Если нужно записать сразу несколько ключей, используйте update:
Почему копии дешёвые¶
Под капотом Context — это обёртка над pyrsistent.PMap (HAMT). При set() создаётся новый контекст, но неизменённые ветки дерева переиспользуются — копия по времени и памяти стоит O(log₃₂ N), а не O(N).
fork() ещё дешевле — это просто новая обёртка над тем же деревом, отличающаяся только именем ветки. Поэтому параллельные ветки можно создавать сколько угодно.
Форк и мердж¶
Форк и мердж — это то, как fluxio внутри себя работает с Parallel и роутингом. Напрямую вы их обычно не вызываете, но видеть полезно:
base = Context.create({"user_id": 1})
branch_a = base.fork("a").set("profile", {...})
branch_b = base.fork("b").set("orders", [...])
merged = Context.merge(base, [branch_a, branch_b])
# В merged есть и "profile", и "orders"
Если две ветки случайно пишут в один и тот же ключ, мердж падает с MergeConflictError, в которой указаны конфликтующие ключи и имена веток — найти причину легко.
Снепшоты¶
Контекст можно превратить в обычный dict и обратно — это нужно для чекпоинтов и удобно для отладки:
Именно такие снепшоты CheckpointStore сохраняет на диск или в Redis между стейджами.