redress usage patterns
Unified Policy model
Policy is the unified resilience container. Configure retries via Retry,
or use RetryPolicy as a convenient shortcut.
Practical choice guide:
| Use this | When you need |
|---|---|
Policy |
Combine retries with a circuit breaker (or other policy components). |
RetryPolicy |
Retry-only behavior without composing a full Policy. |
Retry (component) |
Embed retry settings inside Policy or custom containers. |
from redress import Policy, Retry, default_classifier
from redress.strategies import decorrelated_jitter
policy = Policy(
retry=Retry(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=5.0),
deadline_s=30.0,
max_attempts=5,
)
)
Circuit breakers
Use CircuitBreaker with the unified policy to fail fast when a downstream is unhealthy.
from redress import CircuitBreaker, ErrorClass, Policy, Retry, default_classifier
from redress.strategies import decorrelated_jitter
breaker = CircuitBreaker(
failure_threshold=5,
window_s=60.0,
recovery_timeout_s=30.0,
trip_on={ErrorClass.TRANSIENT, ErrorClass.SERVER_ERROR},
)
policy = Policy(
retry=Retry(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=5.0),
),
circuit_breaker=breaker,
)
Per-class strategies and limits
from redress import ErrorClass, RetryPolicy, default_classifier
from redress.strategies import decorrelated_jitter, equal_jitter
policy = RetryPolicy(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=10.0), # default for most classes
strategies={
ErrorClass.CONCURRENCY: decorrelated_jitter(max_s=1.0),
ErrorClass.RATE_LIMIT: decorrelated_jitter(max_s=60.0),
ErrorClass.SERVER_ERROR: equal_jitter(max_s=30.0),
},
per_class_max_attempts={
ErrorClass.RATE_LIMIT: 3,
ErrorClass.SERVER_ERROR: 5,
},
)
Per-class limit semantics:
per_class_max_attempts is a cap on total attempts for that class (including the initial attempt, not just retries). For example, limit=1 means only the first attempt is allowed for that class; retries will not occur.
0= no attempts for that class1= one total attempt for that class2= two total attempts for that class
Retry budgets
Use a shared budget to cap retry volume across operations or policies.
from redress import Budget, RetryPolicy, default_classifier
from redress.strategies import decorrelated_jitter
budget = Budget(max_retries=100, window_s=60.0)
policy = RetryPolicy(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=5.0),
budget=budget,
)
When the budget is exhausted, retries stop with StopReason.BUDGET_EXHAUSTED.
Backpressure with budgets
Budgets are a coarse backpressure mechanism that limit aggregate retry work.
- Start with
max_retriesnear your peak concurrency. - Set
window_sto match the upstream recovery window (e.g., 30–120s). - Alert on
BUDGET_EXHAUSTEDand consider dropping concurrency or widening the window.
Per-attempt timeouts
Set a per-attempt timeout to bound each call, independently of the overall deadline.
policy = RetryPolicy(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=5.0),
attempt_timeout_s=2.0,
deadline_s=30.0,
)
Pluggable sleep handler (defer instead of sleeping)
Use a sleep handler to persist retry timing and exit the loop without blocking.
Returning SleepDecision.ABORT immediately ends retry execution and yields an
ABORTED stop reason/event.
from redress import RetryPolicy, SleepDecision, StopReason, default_classifier
from redress.strategies import decorrelated_jitter
def schedule(ctx, sleep_s: float) -> SleepDecision:
save_next_attempt(ctx.attempt, sleep_s, ctx.classification.klass)
return SleepDecision.DEFER
policy = RetryPolicy(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=5.0),
)
outcome = policy.execute(do_work, sleep=schedule)
if outcome.stop_reason is StopReason.SCHEDULED:
...
If you only need a side-effect right before sleeping (metrics/logging), use
before_sleep instead of a custom sleep handler.
def before_sleep(ctx, sleep_s: float) -> None:
log_retry_delay(ctx.attempt, ctx.classification.klass, sleep_s)
policy.execute(do_work, before_sleep=before_sleep)
If you want to keep the built-in retry decisions but replace the actual sleep
(e.g., deterministic tests or a custom scheduler), pass a sleeper.
Testing utilities
For deterministic tests and lightweight policy stubs, see
docs/testing.md (covers DeterministicStrategy, instant_retries,
RecordingPolicy, and FakePolicy).
def fake_sleep(seconds: float) -> None:
record_sleep(seconds)
policy.execute(do_work, sleeper=fake_sleep)
Classification context & context-aware strategies
Classifiers may return Classification to pass hints like Retry-After. The retry
loop normalizes all classifier outputs to Classification, and context-aware
strategies receive a BackoffContext.
from redress import RetryPolicy
from redress.extras import http_retry_after_classifier
from redress.strategies import decorrelated_jitter, retry_after_or
policy = RetryPolicy(
classifier=http_retry_after_classifier,
strategy=retry_after_or(decorrelated_jitter(max_s=10.0)),
)
Legacy strategies with (attempt, klass, prev_sleep_s) are still supported.
Strategies must accept exactly one required positional argument (ctx) or three
required positional arguments (attempt, klass, prev_sleep_s).
Adaptive backoff
Use adaptive() to scale a fallback strategy based on recent success/failure rate.
It only increases backoff (never below the fallback), and returns to baseline as
successes dominate.
from redress.strategies import adaptive, decorrelated_jitter
strategy = adaptive(
decorrelated_jitter(max_s=10.0),
window_s=60.0,
target_success=0.9,
max_multiplier=5.0,
)
If you also use Retry-After, wrap adaptive() inside retry_after_or so
Retry-After values are left untouched:
from redress.strategies import adaptive, decorrelated_jitter, retry_after_or
strategy = retry_after_or(adaptive(decorrelated_jitter(max_s=10.0)))
adaptive() is thread-safe and safe to share across policies. For most services,
window_s=60 and target_success=0.9 are good starting points.
Result-based retries
Use result_classifier to retry on return values instead of exceptions.
from redress import RetryPolicy
from redress import Classification, ErrorClass, RetryExhaustedError, default_classifier
from redress.strategies import decorrelated_jitter, retry_after_or
def result_classifier(resp) -> ErrorClass | Classification | None:
status = getattr(resp, "status", None) or getattr(resp, "status_code", None)
if status == 429:
retry_after = None
header = getattr(resp, "headers", {}).get("Retry-After")
if isinstance(header, str) and header.isdigit():
retry_after = float(header)
return Classification(klass=ErrorClass.RATE_LIMIT, retry_after_s=retry_after)
if status is not None and status >= 500:
return ErrorClass.SERVER_ERROR
return None
policy = RetryPolicy(
classifier=default_classifier,
result_classifier=result_classifier,
strategy=retry_after_or(decorrelated_jitter(max_s=10.0)),
)
try:
policy.call(fetch_response)
except RetryExhaustedError as err:
...
Using operation to distinguish call sites
def fetch_profile():
...
policy.call(fetch_profile, operation="fetch_profile")
Metrics/logs include operation=fetch_profile, letting you split dashboards per call site.
RetryConfig for shared settings
from redress import ErrorClass, RetryConfig, RetryPolicy, default_classifier
cfg = RetryConfig(
deadline_s=45.0,
max_attempts=6,
per_class_max_attempts={
ErrorClass.RATE_LIMIT: 2,
ErrorClass.SERVER_ERROR: 4,
},
)
policy = RetryPolicy.from_config(cfg, classifier=default_classifier)
Async usage
AsyncRetryPolicy mirrors the sync API but awaits your callable and uses asyncio.sleep for backoff.
import asyncio
from redress import AsyncRetryPolicy, default_classifier
from redress import ErrorClass
from redress.strategies import decorrelated_jitter
async_policy = AsyncRetryPolicy(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=2.0),
strategies={ErrorClass.RATE_LIMIT: decorrelated_jitter(min_s=1.0, max_s=8.0)},
deadline_s=10.0,
max_attempts=5,
)
async def fetch_user() -> str:
...
asyncio.run(async_policy.call(fetch_user, operation="fetch_user"))
Observability hooks (on_metric, on_log), deadlines, and per-class limits behave the same as the sync policy.
Logging and metrics hooks together
from redress.metrics import prometheus_metric_hook
def log_hook(event: str, fields: dict) -> None:
logger.info("retry_event", extra={"event": event, **fields})
policy.call(
lambda: do_work(),
on_metric=prometheus_metric_hook(counter),
on_log=log_hook,
operation="sync_account",
)
Attempt lifecycle hooks
from redress import AttemptDecision
def on_attempt_end(ctx) -> None:
if ctx.decision is AttemptDecision.RETRY:
record_retry(ctx.attempt, ctx.classification.klass, ctx.sleep_s)
policy.call(
lambda: do_work(),
on_attempt_end=on_attempt_end,
operation="sync_account",
)
Decorator-based retries (sync + async)
The retry decorator wraps functions and chooses the right policy automatically based on whether the function is sync or async.
from redress import retry, default_classifier
from redress.strategies import decorrelated_jitter
@retry # defaults to default_classifier + decorrelated_jitter(max_s=5.0)
def fetch_user():
...
@retry
async def fetch_user_async():
...
If you omit both strategy and strategies, the decorator injects
decorrelated_jitter(max_s=5.0) as a default. If you provide a per-class
strategies mapping without a default, the decorator will not add one.
Hooks and operation can be set on the decorator. The operation defaults to the function name when omitted.
Context managers for repeated calls
You can bind hooks/operation once and reuse:
policy = RetryPolicy(..., classifier=default_classifier, strategy=decorrelated_jitter())
with policy.context(operation="batch") as retry:
retry(do_thing)
retry(do_other, arg1, arg2)
Async variant:
async with async_policy.context(operation="batch") as retry:
await retry(do_async_work)
Structured outcomes
Use execute() when you want metadata without parsing hooks:
outcome = policy.execute(do_work, operation="sync_task")
if outcome.ok:
print(outcome.value, outcome.attempts)
else:
print(outcome.stop_reason, outcome.last_class)
Capture an opt-in per-attempt timeline for debugging:
outcome = policy.execute(do_work, capture_timeline=True)
if outcome.timeline is not None:
for event in outcome.timeline.events:
print(event.attempt, event.event, event.stop_reason)
You can also pass an explicit collector:
from redress import RetryTimeline
timeline = RetryTimeline()
outcome = policy.execute(do_work, capture_timeline=timeline)
Cooperative abort (shutdown/drain)
Worker loops often need to stop retries when shutting down. Use abort_if or
raise AbortRetryError inside your callable:
import threading
from redress import AbortRetryError, RetryPolicy, default_classifier
from redress.strategies import decorrelated_jitter
shutdown = threading.Event()
policy = RetryPolicy(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=5.0),
)
def abort_if() -> bool:
return shutdown.is_set()
try:
policy.call(do_work, abort_if=abort_if)
except AbortRetryError:
pass
Helper classifiers
redress.extras provides domain-oriented classifiers:
http_classifier– maps HTTP status codes (e.g., 429→RATE_LIMIT, 500→SERVER_ERROR, 408→TRANSIENT).sqlstate_classifier– maps SQLSTATE codes (e.g., 40001/40P01→CONCURRENCY, HYT00/08xxx→TRANSIENT, 28xxx→AUTH).urllib3_classifier– maps urllib3 transport errors and HTTP status codes.redis_classifier– maps redis-py connection/auth errors.aiohttp_classifier– maps aiohttp client errors and HTTP status codes.grpc_classifier– maps gRPC status codes to ErrorClass.boto3_classifier– maps botocore ClientError codes and AWS transport errors.
Optional classifiers are shipped as extras:
redress[urllib3]redress[redis]redress[aiohttp]redress[grpc]redress[boto3]
Optional classifier example
Available classifiers (extras): urllib3_classifier, redis_classifier, aiohttp_classifier,
grpc_classifier, boto3_classifier, pyodbc_classifier. All are used the same way—pass them as
the policy classifier.
from redress import RetryPolicy
from redress.extras import urllib3_classifier
from redress.strategies import decorrelated_jitter
policy = RetryPolicy(
classifier=urllib3_classifier,
strategy=decorrelated_jitter(max_s=3.0),
)
Swap in any of the other optional classifiers above as needed.
default_classifier includes name-based heuristics for convenience. If you want more predictable
behavior, use strict_classifier (same logic without name heuristics) or supply your own
domain-specific classifier.
PyODBC classification example
redress stays dependency-free, so database-specific classifiers live in extras. Use pyodbc_classifier for a SQLSTATE-based mapper.
from redress import retry
from redress.strategies import decorrelated_jitter
from redress.extras import pyodbc_classifier
@retry(
classifier=pyodbc_classifier,
strategy=decorrelated_jitter(max_s=3.0),
strategies={}, # optional per-class overrides
)
def run_query():
...