Перейти к содержанию

Контекст

Context — это неизменяемое состояние, которое проходит через все стейджи. Каждый стейдж получает контекст на вход, что-то добавляет и возвращает новый. Старый при этом не меняется — это ключевая идея, на которой держатся форки, мерджи и чекпоинты.

Чтение

ctx.get("user_id")           # вернёт None, если ключа нет
ctx.get("tier", "standard")  # с дефолтом
ctx["user_id"]               # KeyError, если ключа нет
"user_id" in ctx             # проверка наличия

Простое правило: ctx[key] — когда ключ обязан быть и его отсутствие — это баг. ctx.get(key) — когда «нет ключа» вполне допустимая ситуация.

Запись

Запись возвращает новый контекст, оригинал остаётся как был:

new_ctx = ctx.set("user", {"id": 1})
ctx.get("user")       # None — оригинал не изменён
new_ctx.get("user")   # {"id": 1}

Если нужно записать сразу несколько ключей, используйте update:

ctx = ctx.update({"priority": "high", "retries": 0})

Почему копии дешёвые

Под капотом Context — это обёртка над pyrsistent.PMap (HAMT). При set() создаётся новый контекст, но неизменённые ветки дерева переиспользуются — копия по времени и памяти стоит O(log₃₂ N), а не O(N).

fork() ещё дешевле — это просто новая обёртка над тем же деревом, отличающаяся только именем ветки. Поэтому параллельные ветки можно создавать сколько угодно.

Форк и мердж

Форк и мердж — это то, как fluxio внутри себя работает с Parallel и роутингом. Напрямую вы их обычно не вызываете, но видеть полезно:

base = Context.create({"user_id": 1})
branch_a = base.fork("a").set("profile", {...})
branch_b = base.fork("b").set("orders", [...])

merged = Context.merge(base, [branch_a, branch_b])
# В merged есть и "profile", и "orders"

Если две ветки случайно пишут в один и тот же ключ, мердж падает с MergeConflictError, в которой указаны конфликтующие ключи и имена веток — найти причину легко.

Снепшоты

Контекст можно превратить в обычный dict и обратно — это нужно для чекпоинтов и удобно для отладки:

data = ctx.snapshot()                              # plain dict[str, Any]
ctx_ = Context.from_snapshot(data, name="restored")

Именно такие снепшоты CheckpointStore сохраняет на диск или в Redis между стейджами.