Smarter retry policies for real systems
from reflexio.policy import RetryPolicy
from reflexio.errors import ErrorClass
from reflexio.classify import default_classifier
from reflexio.strategies import decorrelated_jitter, equal_jitter
policy = RetryPolicy(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=10.0), # default for most classes
strategies={
ErrorClass.CONCURRENCY: decorrelated_jitter(max_s=1.0),
ErrorClass.RATE_LIMIT: decorrelated_jitter(max_s=60.0),
ErrorClass.SERVER_ERROR: equal_jitter(max_s=30.0),
},
per_class_max_attempts={
ErrorClass.RATE_LIMIT: 3,
ErrorClass.SERVER_ERROR: 5,
},
)
operation to distinguish call sitesdef fetch_profile():
...
policy.call(fetch_profile, operation="fetch_profile")
Metrics/logs include operation=fetch_profile, letting you split dashboards per call site.
from reflexio.config import RetryConfig
from reflexio.policy import RetryPolicy
from reflexio.classify import default_classifier
cfg = RetryConfig(
deadline_s=45.0,
max_attempts=6,
per_class_max_attempts={
ErrorClass.RATE_LIMIT: 2,
ErrorClass.SERVER_ERROR: 4,
},
)
policy = RetryPolicy.from_config(cfg, classifier=default_classifier)
AsyncRetryPolicy mirrors the sync API but awaits your callable and uses asyncio.sleep for backoff.
import asyncio
from reflexio import AsyncRetryPolicy, default_classifier
from reflexio.errors import ErrorClass
from reflexio.strategies import decorrelated_jitter
async_policy = AsyncRetryPolicy(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=2.0),
strategies={ErrorClass.RATE_LIMIT: decorrelated_jitter(min_s=1.0, max_s=8.0)},
deadline_s=10.0,
max_attempts=5,
)
async def fetch_user() -> str:
...
asyncio.run(async_policy.call(fetch_user, operation="fetch_user"))
Observability hooks (on_metric, on_log), deadlines, and per-class limits behave the same as the sync policy.
from reflexio.metrics import prometheus_metric_hook
def log_hook(event: str, fields: dict) -> None:
logger.info("retry_event", extra={"event": event, **fields})
policy.call(
lambda: do_work(),
on_metric=prometheus_metric_hook(counter),
on_log=log_hook,
operation="sync_account",
)
The retry decorator wraps functions and chooses the right policy automatically based on whether the function is sync or async.
from reflexio import retry, default_classifier
from reflexio.strategies import decorrelated_jitter
@retry(classifier=default_classifier, strategy=decorrelated_jitter(max_s=5.0))
def fetch_user():
...
@retry(classifier=default_classifier, strategy=decorrelated_jitter(max_s=5.0))
async def fetch_user_async():
...
Hooks and operation can be set on the decorator. The operation defaults to the function name when omitted.
reflexio stays dependency-free, so database-specific classifiers live in docs. See docs/snippets/pyodbc_classifier.py for a SQLSTATE-based mapper.
from reflexio import retry
from reflexio.strategies import decorrelated_jitter
from docs.snippets.pyodbc_classifier import pyodbc_classifier # adjust import path as needed
@retry(
classifier=pyodbc_classifier,
strategy=decorrelated_jitter(max_s=3.0),
strategies={}, # optional per-class overrides
)
def run_query():
...