Reference flows
This page collects runnable end-to-end patterns for common service integrations.
Policy reuse and per-operation contexts
Create a policy once and reuse it across calls. Use context() to bind hooks
and an operation name for a batch of calls.
from redress import Policy, Retry, default_classifier
from redress.strategies import decorrelated_jitter
policy = Policy(
retry=Retry(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=5.0),
deadline_s=30.0,
max_attempts=5,
)
)
with policy.context(operation="sync_user") as call:
call(lambda: do_work())
HTTP downstream call (Retry-After + result-based retries)
This example shows:
- result-based retries (429/5xx) without raising exceptions
- Retry-After honored via Classification.retry_after_s
- per-call hooks via policy.context
Run:
uv pip install httpx
uv run python docs/snippets/httpx_retry_after.py
"""
HTTP retry example that honors Retry-After and uses result-based retries.
Run with:
uv pip install httpx
uv run python docs/snippets/httpx_retry_after.py
"""
import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any
import httpx
from redress import Classification, ErrorClass, Policy, Retry
from redress.extras import http_retry_after_classifier
from redress.strategies import decorrelated_jitter, retry_after_or
def log_event(event: str, fields: dict[str, Any]) -> None:
print(f"[log] event={event} fields={fields}")
class DemoHandler(BaseHTTPRequestHandler):
rate_count = 0
flaky_count = 0
def do_GET(self) -> None: # noqa: N802 - BaseHTTPRequestHandler hook name
path = self.path.split("?", 1)[0]
if path == "/rate":
DemoHandler.rate_count += 1
if DemoHandler.rate_count < 3:
self.send_response(429)
self.send_header("Retry-After", "1")
self.end_headers()
return
if path == "/flaky":
DemoHandler.flaky_count += 1
if DemoHandler.flaky_count < 3:
self.send_response(503)
self.end_headers()
return
self.send_response(200)
self.end_headers()
self.wfile.write(b"ok")
def log_message(self, format: str, *args: object) -> None:
return
def start_server() -> ThreadingHTTPServer:
server = ThreadingHTTPServer(("127.0.0.1", 0), DemoHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
return server
def parse_retry_after(value: str | None) -> float | None:
if value is None:
return None
raw = value.strip()
if not raw:
return None
try:
return max(0.0, float(raw))
except ValueError:
return None
def classify_result(resp: httpx.Response) -> ErrorClass | Classification | None:
status = resp.status_code
if status == 429:
retry_after_s = parse_retry_after(resp.headers.get("Retry-After"))
return Classification(klass=ErrorClass.RATE_LIMIT, retry_after_s=retry_after_s)
if status >= 500:
return ErrorClass.SERVER_ERROR
return None
def build_policy() -> Policy:
return Policy(
retry=Retry(
classifier=http_retry_after_classifier,
result_classifier=classify_result,
strategy=retry_after_or(decorrelated_jitter(max_s=3.0)),
deadline_s=8.0,
max_attempts=5,
)
)
def demo(base_url: str) -> None:
policy = build_policy()
with httpx.Client(base_url=base_url) as client:
with policy.context(on_log=log_event, operation="http_demo") as call:
for path in ("/rate", "/flaky", "/ok"):
response = call(client.get, path, timeout=2.0)
print(f"{path} -> {response.status_code} {response.text!r}")
if __name__ == "__main__":
server = start_server()
try:
demo(f"http://127.0.0.1:{server.server_port}")
finally:
server.shutdown()
server.server_close()
Worker loop with cooperative abort
This example shows:
- abort_if for graceful shutdown
- durable attempt logging via on_log
- async policy context reuse
Run:
uv run python docs/snippets/async_worker_abort.py
"""
Async worker with cooperative abort and durable attempt logging.
Run with:
uv run python docs/snippets/async_worker_abort.py
"""
import asyncio
from typing import Any
from redress import AbortRetryError, AsyncPolicy, AsyncRetry, default_classifier
from redress.strategies import decorrelated_jitter
def record_attempt(event: str, fields: dict[str, Any]) -> None:
# Replace this with a durable store (DB, queue, log sink) in production.
print(f"[attempt] event={event} fields={fields}")
def log_event(event: str, fields: dict[str, Any]) -> None:
record_attempt(event, fields)
def build_policy() -> AsyncPolicy:
return AsyncPolicy(
retry=AsyncRetry(
classifier=default_classifier,
strategy=decorrelated_jitter(max_s=1.0),
deadline_s=6.0,
max_attempts=4,
)
)
async def process_message(message: str, seen: dict[str, int]) -> str:
seen[message] = seen.get(message, 0) + 1
if message == "flaky" and seen[message] < 3:
raise ConnectionError("transient network hiccup")
await asyncio.sleep(0.05)
return f"processed {message}"
async def worker_loop(messages: list[str], shutdown: asyncio.Event) -> None:
policy = build_policy()
seen: dict[str, int] = {}
async with policy.context(
on_log=log_event,
operation="worker.process",
abort_if=shutdown.is_set,
) as call:
for msg in messages:
try:
result = await call(process_message, msg, seen)
print(result)
except AbortRetryError:
print("shutdown requested, stopping worker")
return
except Exception as exc: # noqa: BLE001 - demo code
print(f"message {msg!r} failed permanently: {exc}")
async def main() -> None:
shutdown = asyncio.Event()
async def trigger_shutdown() -> None:
await asyncio.sleep(0.6)
shutdown.set()
asyncio.create_task(trigger_shutdown())
await worker_loop(["ok", "flaky", "ok", "flaky"], shutdown)
if __name__ == "__main__":
asyncio.run(main())
Observability wiring
Event names and stop reasons are exported as redress.events.EventName and
redress.StopReason. See Observability for the full contract.
Recommended low-cardinality tags:
- class
- operation
- err
- stop_reason (terminal events only)
Prometheus metric hook
from prometheus_client import Counter
from redress.metrics import prometheus_metric_hook
counter = Counter("redress_events", "Retry events", ["event", "class", "operation", "err"])
hook = prometheus_metric_hook(counter)
policy.call(work, on_metric=hook, operation="sync_user")
OpenTelemetry hooks
from opentelemetry import metrics, trace
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.trace import TracerProvider
from redress.contrib.otel import otel_hooks
trace.set_tracer_provider(TracerProvider())
metrics.set_meter_provider(MeterProvider())
hooks = otel_hooks(
tracer=trace.get_tracer("redress"),
meter=metrics.get_meter("redress"),
)
policy.call(work, **hooks, operation="sync_user")
Migration notes
Policy/Retryprovide a unified container;RetryPolicyremains a compatible shortcut.- Classifiers may return
Classificationfor hints likeretry_after_s;ErrorClassreturn is still supported. - Strategies can be context-aware:
(ctx: BackoffContext) -> float; legacy(attempt, klass, prev_sleep_s)is still accepted. - The
@retrydecorator only injects a default strategy when bothstrategyandstrategiesare omitted. execute()returns aRetryOutcomewithStopReasonand attempt metadata (call still returns/raises).EventNameandStopReasonare exported for stable observability contracts.