Middleware¶
Middleware — это обёртки вокруг каждого вызова стейджа. Через них удобно добавлять ретраи, кеширование, метрики и любую сквозную логику, которую неприятно дублировать в каждом стейдже.
Middleware применяются в порядке объявления: первый в списке — самый внешний, последний — ближе всего к стейджу.
Pipeline(
[...],
middleware=[
CircuitBreakerMiddleware(), # самый внешний
RetryMiddleware(), # внутри breaker
CacheMiddleware(), # ближе всех к стейджу
],
)
# Порядок вызова: CircuitBreaker → Retry → Cache → стейдж
Порядок имеет значение: например, retry обычно ставят снаружи кеша (чтобы повторять только реальные обращения, а не попадания в кеш) и внутри circuit breaker'а (чтобы breaker считал каждую попытку отдельно).
Встроенные middleware¶
RetryMiddleware¶
Повторяет вызов при исключении с экспоненциальной задержкой:
RetryMiddleware(
max_attempts=3,
backoff="exponential", # "fixed" | "exponential" | "jitter"
base_delay=0.5,
exceptions=(Exception,),
)
STREAM-стейджи retry автоматически пропускает — повтор продублировал бы уже отданные клиенту чанки.
CacheMiddleware¶
Кеширует результаты стейджей по ключу из node_id и снепшота контекста:
CacheMiddleware(
store=InMemoryCache(), # или ваш CacheStore
ttl=300,
key_fn=None, # по умолчанию sha256(node_id + ctx_snapshot)
)
Кеш и хранилище чекпоинтов — это разные интерфейсы (CacheStore и CheckpointStore). Сделано намеренно: цели и время жизни данных у них разные. STREAM-стейджи кеш тоже пропускает — кеширование живого генератора бессмысленно.
CircuitBreakerMiddleware¶
Стандартный circuit breaker. После N подряд идущих ошибок переходит в open и какое-то время сразу бросает CircuitOpenError, не доходя до стейджа:
Состояния: closed → open → half_open → closed.
RateLimitMiddleware¶
Sliding-window лимитер запросов в секунду:
Когда бюджет исчерпан — middleware ждёт, а не бросает исключение. То есть он сглаживает поток, а не отбрасывает запросы.
Свой middleware¶
Достаточно реализовать __call__. Вызов next(fn, ctx) запускает остаток цепочки и возвращает итоговый Context:
from fluxio.runtime.middleware import Middleware, Next
class MetricsMiddleware(Middleware):
async def __call__(self, fn, ctx, next: Next):
start = time.monotonic()
try:
return await next(fn, ctx)
finally:
metrics.histogram(
"stage.duration",
time.monotonic() - start,
tags={"stage": fn.__name__},
)
Внутри своего middleware можно: модифицировать ctx до вызова next, проверить или подменить результат после, или вообще не звать next и вернуть свой контекст (short-circuit) — всё это допустимо.