Skip to content

Worker / queue retries

This recipe shows a simple worker loop with retries, backpressure via budgets, and cooperative shutdown.

import signal
import time

from redress import Budget, RetryPolicy, default_classifier
from redress import StopReason
from redress.strategies import decorrelated_jitter

shutdown = {"now": False}

def handle_signal(*_args):
    shutdown["now"] = True

signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)

budget = Budget(max_retries=200, window_s=60.0)

policy = RetryPolicy(
    classifier=default_classifier,
    strategy=decorrelated_jitter(max_s=2.0),
    max_attempts=5,
    deadline_s=10.0,
    budget=budget,
)


def fetch_job():
    # Replace with your queue client
    raise RuntimeError("transient")


def process_job(job):
    # Your business logic
    return None


while not shutdown["now"]:
    try:
        job = policy.call(fetch_job, operation="queue_fetch", abort_if=lambda: shutdown["now"])
    except Exception:
        # Backoff outside of redress to avoid tight loops when no work is available.
        time.sleep(0.1)
        continue

    outcome = policy.execute(lambda: process_job(job), operation="job_process")
    if outcome.stop_reason is StopReason.BUDGET_EXHAUSTED:
        # Optionally stop consuming or shed load until budget recovers.
        time.sleep(1.0)