Skip to content

Reference flows

This page collects runnable end-to-end patterns for common service integrations.

Policy reuse and per-operation contexts

Create a policy once and reuse it across calls. Use context() to bind hooks and an operation name for a batch of calls.

from redress import Policy, Retry, default_classifier
from redress.strategies import decorrelated_jitter

policy = Policy(
    retry=Retry(
        classifier=default_classifier,
        strategy=decorrelated_jitter(max_s=5.0),
        deadline_s=30.0,
        max_attempts=5,
    )
)

with policy.context(operation="sync_user") as call:
    call(lambda: do_work())

HTTP downstream call (Retry-After + result-based retries)

This example shows: - result-based retries (429/5xx) without raising exceptions - Retry-After honored via Classification.retry_after_s - per-call hooks via policy.context

Run: uv pip install httpx uv run python docs/snippets/httpx_retry_after.py

"""
HTTP retry example that honors Retry-After and uses result-based retries.

Run with:
    uv pip install httpx
    uv run python docs/snippets/httpx_retry_after.py
"""

import threading
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from typing import Any

import httpx

from redress import Classification, ErrorClass, Policy, Retry
from redress.extras import http_retry_after_classifier
from redress.strategies import decorrelated_jitter, retry_after_or


def log_event(event: str, fields: dict[str, Any]) -> None:
    print(f"[log] event={event} fields={fields}")


class DemoHandler(BaseHTTPRequestHandler):
    rate_count = 0
    flaky_count = 0

    def do_GET(self) -> None:  # noqa: N802 - BaseHTTPRequestHandler hook name
        path = self.path.split("?", 1)[0]
        if path == "/rate":
            DemoHandler.rate_count += 1
            if DemoHandler.rate_count < 3:
                self.send_response(429)
                self.send_header("Retry-After", "1")
                self.end_headers()
                return
        if path == "/flaky":
            DemoHandler.flaky_count += 1
            if DemoHandler.flaky_count < 3:
                self.send_response(503)
                self.end_headers()
                return
        self.send_response(200)
        self.end_headers()
        self.wfile.write(b"ok")

    def log_message(self, format: str, *args: object) -> None:
        return


def start_server() -> ThreadingHTTPServer:
    server = ThreadingHTTPServer(("127.0.0.1", 0), DemoHandler)
    thread = threading.Thread(target=server.serve_forever, daemon=True)
    thread.start()
    return server


def parse_retry_after(value: str | None) -> float | None:
    if value is None:
        return None
    raw = value.strip()
    if not raw:
        return None
    try:
        return max(0.0, float(raw))
    except ValueError:
        return None


def classify_result(resp: httpx.Response) -> ErrorClass | Classification | None:
    status = resp.status_code
    if status == 429:
        retry_after_s = parse_retry_after(resp.headers.get("Retry-After"))
        return Classification(klass=ErrorClass.RATE_LIMIT, retry_after_s=retry_after_s)
    if status >= 500:
        return ErrorClass.SERVER_ERROR
    return None


def build_policy() -> Policy:
    return Policy(
        retry=Retry(
            classifier=http_retry_after_classifier,
            result_classifier=classify_result,
            strategy=retry_after_or(decorrelated_jitter(max_s=3.0)),
            deadline_s=8.0,
            max_attempts=5,
        )
    )


def demo(base_url: str) -> None:
    policy = build_policy()
    with httpx.Client(base_url=base_url) as client:
        with policy.context(on_log=log_event, operation="http_demo") as call:
            for path in ("/rate", "/flaky", "/ok"):
                response = call(client.get, path, timeout=2.0)
                print(f"{path} -> {response.status_code} {response.text!r}")


if __name__ == "__main__":
    server = start_server()
    try:
        demo(f"http://127.0.0.1:{server.server_port}")
    finally:
        server.shutdown()
        server.server_close()

Worker loop with cooperative abort

This example shows: - abort_if for graceful shutdown - durable attempt logging via on_log - async policy context reuse

Run: uv run python docs/snippets/async_worker_abort.py

"""
Async worker with cooperative abort and durable attempt logging.

Run with:
    uv run python docs/snippets/async_worker_abort.py
"""

import asyncio
from typing import Any

from redress import AbortRetryError, AsyncPolicy, AsyncRetry, default_classifier
from redress.strategies import decorrelated_jitter


def record_attempt(event: str, fields: dict[str, Any]) -> None:
    # Replace this with a durable store (DB, queue, log sink) in production.
    print(f"[attempt] event={event} fields={fields}")


def log_event(event: str, fields: dict[str, Any]) -> None:
    record_attempt(event, fields)


def build_policy() -> AsyncPolicy:
    return AsyncPolicy(
        retry=AsyncRetry(
            classifier=default_classifier,
            strategy=decorrelated_jitter(max_s=1.0),
            deadline_s=6.0,
            max_attempts=4,
        )
    )


async def process_message(message: str, seen: dict[str, int]) -> str:
    seen[message] = seen.get(message, 0) + 1
    if message == "flaky" and seen[message] < 3:
        raise ConnectionError("transient network hiccup")
    await asyncio.sleep(0.05)
    return f"processed {message}"


async def worker_loop(messages: list[str], shutdown: asyncio.Event) -> None:
    policy = build_policy()
    seen: dict[str, int] = {}
    async with policy.context(
        on_log=log_event,
        operation="worker.process",
        abort_if=shutdown.is_set,
    ) as call:
        for msg in messages:
            try:
                result = await call(process_message, msg, seen)
                print(result)
            except AbortRetryError:
                print("shutdown requested, stopping worker")
                return
            except Exception as exc:  # noqa: BLE001 - demo code
                print(f"message {msg!r} failed permanently: {exc}")


async def main() -> None:
    shutdown = asyncio.Event()

    async def trigger_shutdown() -> None:
        await asyncio.sleep(0.6)
        shutdown.set()

    asyncio.create_task(trigger_shutdown())
    await worker_loop(["ok", "flaky", "ok", "flaky"], shutdown)


if __name__ == "__main__":
    asyncio.run(main())

Observability wiring

Event names and stop reasons are exported as redress.events.EventName and redress.StopReason. See Observability for the full contract.

Recommended low-cardinality tags: - class - operation - err - stop_reason (terminal events only)

Prometheus metric hook

from prometheus_client import Counter
from redress.metrics import prometheus_metric_hook

counter = Counter("redress_events", "Retry events", ["event", "class", "operation", "err"])
hook = prometheus_metric_hook(counter)
policy.call(work, on_metric=hook, operation="sync_user")

OpenTelemetry hooks

from opentelemetry import metrics, trace
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.trace import TracerProvider
from redress.contrib.otel import otel_hooks

trace.set_tracer_provider(TracerProvider())
metrics.set_meter_provider(MeterProvider())

hooks = otel_hooks(
    tracer=trace.get_tracer("redress"),
    meter=metrics.get_meter("redress"),
)

policy.call(work, **hooks, operation="sync_user")

Migration notes

  • Policy/Retry provide a unified container; RetryPolicy remains a compatible shortcut.
  • Classifiers may return Classification for hints like retry_after_s; ErrorClass return is still supported.
  • Strategies can be context-aware: (ctx: BackoffContext) -> float; legacy (attempt, klass, prev_sleep_s) is still accepted.
  • The @retry decorator only injects a default strategy when both strategy and strategies are omitted.
  • execute() returns a RetryOutcome with StopReason and attempt metadata (call still returns/raises).
  • EventName and StopReason are exported for stable observability contracts.