Skip to content

redress usage patterns

Per-class strategies and limits

from redress.policy import RetryPolicy
from redress.errors import ErrorClass
from redress.classify import default_classifier
from redress.strategies import decorrelated_jitter, equal_jitter

policy = RetryPolicy(
    classifier=default_classifier,
    strategy=decorrelated_jitter(max_s=10.0),  # default for most classes
    strategies={
        ErrorClass.CONCURRENCY: decorrelated_jitter(max_s=1.0),
        ErrorClass.RATE_LIMIT: decorrelated_jitter(max_s=60.0),
        ErrorClass.SERVER_ERROR: equal_jitter(max_s=30.0),
    },
    per_class_max_attempts={
        ErrorClass.RATE_LIMIT: 3,
        ErrorClass.SERVER_ERROR: 5,
    },
)

Using operation to distinguish call sites

def fetch_profile():
    ...

policy.call(fetch_profile, operation="fetch_profile")

Metrics/logs include operation=fetch_profile, letting you split dashboards per call site.

RetryConfig for shared settings

from redress.config import RetryConfig
from redress.policy import RetryPolicy
from redress.classify import default_classifier

cfg = RetryConfig(
    deadline_s=45.0,
    max_attempts=6,
    per_class_max_attempts={
        ErrorClass.RATE_LIMIT: 2,
        ErrorClass.SERVER_ERROR: 4,
    },
)

policy = RetryPolicy.from_config(cfg, classifier=default_classifier)

Async usage

AsyncRetryPolicy mirrors the sync API but awaits your callable and uses asyncio.sleep for backoff.

import asyncio
from redress import AsyncRetryPolicy, default_classifier
from redress.errors import ErrorClass
from redress.strategies import decorrelated_jitter

async_policy = AsyncRetryPolicy(
    classifier=default_classifier,
    strategy=decorrelated_jitter(max_s=2.0),
    strategies={ErrorClass.RATE_LIMIT: decorrelated_jitter(min_s=1.0, max_s=8.0)},
    deadline_s=10.0,
    max_attempts=5,
)

async def fetch_user() -> str:
    ...

asyncio.run(async_policy.call(fetch_user, operation="fetch_user"))

Observability hooks (on_metric, on_log), deadlines, and per-class limits behave the same as the sync policy.

Logging and metrics hooks together

from redress.metrics import prometheus_metric_hook

def log_hook(event: str, fields: dict) -> None:
    logger.info("retry_event", extra={"event": event, **fields})

policy.call(
    lambda: do_work(),
    on_metric=prometheus_metric_hook(counter),
    on_log=log_hook,
    operation="sync_account",
)

Decorator-based retries (sync + async)

The retry decorator wraps functions and chooses the right policy automatically based on whether the function is sync or async.

from redress import retry, default_classifier
from redress.strategies import decorrelated_jitter

@retry  # defaults to default_classifier + decorrelated_jitter(max_s=5.0)
def fetch_user():
    ...

@retry
async def fetch_user_async():
    ...

Hooks and operation can be set on the decorator. The operation defaults to the function name when omitted.

Context managers for repeated calls

You can bind hooks/operation once and reuse:

policy = RetryPolicy(..., classifier=default_classifier, strategy=decorrelated_jitter())

with policy.context(operation="batch") as retry:
    retry(do_thing)
    retry(do_other, arg1, arg2)

Async variant:

async with async_policy.context(operation="batch") as retry:
    await retry(do_async_work)

Helper classifiers

redress.extras provides domain-oriented classifiers:

  • http_classifier – maps HTTP status codes (e.g., 429→RATE_LIMIT, 500→SERVER_ERROR, 408→TRANSIENT).
  • sqlstate_classifier – maps SQLSTATE codes (e.g., 40001/40P01→CONCURRENCY, HYT00/08xxx→TRANSIENT, 28xxx→AUTH).

PyODBC classification example

redress stays dependency-free, so database-specific classifiers live in docs. See docs/snippets/pyodbc_classifier.py for a SQLSTATE-based mapper.

from redress import retry
from redress.strategies import decorrelated_jitter
from docs.snippets.pyodbc_classifier import pyodbc_classifier  # adjust import path as needed

@retry(
    classifier=pyodbc_classifier,
    strategy=decorrelated_jitter(max_s=3.0),
    strategies={},  # optional per-class overrides
)
def run_query():
    ...