diff --git a/README.md b/README.md index 2889d98..ff11391 100644 --- a/README.md +++ b/README.md @@ -5,15 +5,16 @@ Small sync helpers shared across projects. Base is stdlib only — **no dependen - `timing` — unix-timestamp deltas + timezone-aware datetime conversions - `paths` — nested dict/list access by dotted path - `masking` — display masking for cards / cvv / tokens +- `retry` — exponential-backoff retry, sync (`retry`) and async (`aretry`) - `addr` — ip/address tooling: pure stdlib ip utils in base, async geo lookups behind the `commons[addr]` extra ## Install ``` -commons @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.1.0 +commons @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.2.0 # async address/geo lookups (fetch_ip / ip_location / fetch_location) need the extra: -commons[addr] @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.1.0 +commons[addr] @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.2.0 ``` The base install pulls **nothing** (stdlib). Only `commons[addr]` adds `aiohttp`, and @@ -112,6 +113,39 @@ phantom("abcdef1234567890") # "abcdef...7890" provider("4111111111111111") # "VISA" (VISA/MC/AMEX/UPAY/DISC/JCB/DNRS/UNKW) ``` +## retry + +Exponential-backoff retry, sync (`retry`) and async (`aretry`). Each works as a **call +form** or a **decorator**, with the same kwargs. After the attempts are exhausted the +**last exception is re-raised** — it never swallows or returns a default. + +```python +from commons import retry, aretry + +# call form +rows = retry(lambda: read_db(), attempts=5, on=(IOError,)) +data = await aretry(lambda: fetch(url), attempts=3, backoff=0.5, on=(TimeoutError,)) + +# decorator form (same kwargs) +@aretry(attempts=4, backoff=0.5, factor=2.0, on=(ConnectionError,)) +async def pull(): + ... +``` + +Knobs: `attempts` (total tries), `backoff` / `factor` / `max_backoff` (delay is +`min(backoff * factor**n, max_backoff)`), `jitter` (full jitter, on by default), +`on=` (tuple of retryable exception types), and `give_up=lambda exc: ...` to stop early +on a non-retryable error (e.g. a 400 vs a 429): + +```python +# retry 429/5xx but give up immediately on a 4xx +await aretry(send, attempts=4, on=(HTTPError,), + give_up=lambda e: 400 <= e.status < 500 and e.status != 429) +``` + +Each retry is logged (emit-only). `sleep=` and `rand=` are injectable for deterministic +tests (no real waits). + ## addr IP/address tooling, exposed as a submodule. The pure `ip` utilities ship in the base diff --git a/pyproject.toml b/pyproject.toml index 6008471..a847907 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,8 +4,8 @@ build-backend = "hatchling.build" [project] name = "commons" -version = "0.1.0" -description = "small stdlib-only sync helpers: time/timezone deltas, dotted-path dict access, display masking, and ip/address tooling" +version = "0.2.0" +description = "small stdlib-only sync helpers: time/timezone deltas, dotted-path dict access, display masking, ip/address tooling, and retry/backoff" requires-python = ">=3.10" dependencies = [] diff --git a/src/commons/__init__.py b/src/commons/__init__.py index a95cd90..2358c94 100644 --- a/src/commons/__init__.py +++ b/src/commons/__init__.py @@ -6,6 +6,8 @@ paths: nested dict/list access by dotted path (deep_get / deep_set). masking: display masking for cards / cvv / tokens (cosmetic, not a security control). addr: ip/address tooling — pure stdlib ip utils (`commons.addr.ip`) in base, async geo lookups (`commons.addr.geo`) behind the `commons[addr]` extra. +retry: exponential-backoff retry, sync (`retry`) and async (`aretry`), call or + decorator form; re-raises the last exception (fail loud), never swallows. base is stdlib only, no dependencies (the addr geo lookups add aiohttp via the extra). to toggle the timing test mode for bare calls, set it on the module — @@ -16,6 +18,7 @@ addr`); its ip helpers live under `commons.addr.ip` to keep top-level uncluttere from . import addr, masking, paths, timing from .masking import credit, cvv, phantom, provider from .paths import deep_get, deep_set +from .retry import aretry, retry from .timing import ( UTC, Clock, @@ -56,6 +59,8 @@ __all__ = [ "cvv", "phantom", "provider", + "retry", + "aretry", ] -__version__ = "0.1.0" +__version__ = "0.2.0" diff --git a/src/commons/retry.py b/src/commons/retry.py new file mode 100644 index 0000000..e5d4e92 --- /dev/null +++ b/src/commons/retry.py @@ -0,0 +1,161 @@ +"""retry with exponential backoff — sync and async, one backoff engine. + +de-duplicates retry logic that was written divergently across several libs (HTTP +429/5xx caps, proxy burn/rotate caps, IMAP reconnects). both a call form and a +decorator form share one implementation per flavor; the backoff schedule is a pure +generator so it tests without real sleeps. + + from commons import retry, aretry + + # call form + rows = retry(lambda: db_read(), attempts=5, on=(IOError,)) + data = await aretry(lambda: fetch(url), attempts=3, on=(TimeoutError,)) + + # decorator form (same kwargs) + @aretry(attempts=4, backoff=0.5, on=(ConnectionError,)) + async def pull(): + ... + +after `attempts` are exhausted the LAST exception is re-raised (fail loud, never +swallowed). `on` narrows which exceptions retry; `give_up(exc) -> bool` stops early +on a non-retryable error (e.g. a 400 vs a 429). each retry is logged (emit-only), +never printed. +""" + +import asyncio +import functools +import logging +import random +import time +from typing import Callable, Iterable, Optional, Tuple, Type + +log = logging.getLogger(__name__) + +ExcTypes = Tuple[Type[BaseException], ...] + + +def _delays(attempts: int, backoff: float, factor: float, max_backoff: float): + """yield the wait before each retry: min(backoff * factor**n, max_backoff) + + yields `attempts - 1` delays (one before each retry after the first try). the + raw, un-jittered schedule — jitter is applied at call time so the schedule stays + pure and testable. + """ + for n in range(max(0, attempts - 1)): + yield min(backoff * (factor ** n), max_backoff) + + +def _jittered(delay: float, jitter: bool, rand: Callable[[], float]) -> float: + """apply full jitter to a delay when enabled: uniform(0, delay)""" + if not jitter or delay <= 0: + return delay + return rand() * delay + + +def _as_types(on: Iterable[Type[BaseException]]) -> ExcTypes: + """coerce the `on` argument into a tuple of exception types""" + if isinstance(on, type): + return (on,) + return tuple(on) + + +def retry( + fn: Optional[Callable] = None, + *, + attempts: int = 3, + backoff: float = 0.5, + factor: float = 2.0, + max_backoff: float = 30.0, + jitter: bool = True, + on: Iterable[Type[BaseException]] = (Exception,), + give_up: Optional[Callable[[BaseException], bool]] = None, + sleep: Callable[[float], None] = time.sleep, + rand: Callable[[], float] = random.random, +): + """retry a sync callable with exponential backoff; call form or decorator + + `retry(fn, ...)` runs immediately; `@retry(...)` wraps a function. retries on the + `on` exceptions, stops early if `give_up(exc)` is true, re-raises the last + exception once `attempts` are exhausted. `sleep`/`rand` are injectable for tests. + """ + types = _as_types(on) + + def run(target: Callable, args, kwargs): + delays = list(_delays(attempts, backoff, factor, max_backoff)) + last_index = attempts - 1 + for index in range(attempts): + try: + return target(*args, **kwargs) + except types as exc: + if give_up is not None and give_up(exc): + raise + if index == last_index: + raise + wait = _jittered(delays[index], jitter, rand) + log.warning( + "retry %d/%d after %s: %s", + index + 1, last_index, type(exc).__name__, exc, + ) + if wait > 0: + sleep(wait) + + def decorator(target: Callable) -> Callable: + @functools.wraps(target) + def wrapper(*args, **kwargs): + return run(target, args, kwargs) + return wrapper + + if fn is not None: + return run(fn, (), {}) + return decorator + + +def aretry( + fn: Optional[Callable] = None, + *, + attempts: int = 3, + backoff: float = 0.5, + factor: float = 2.0, + max_backoff: float = 30.0, + jitter: bool = True, + on: Iterable[Type[BaseException]] = (Exception,), + give_up: Optional[Callable[[BaseException], bool]] = None, + sleep: Callable[[float], "asyncio.Future"] = asyncio.sleep, + rand: Callable[[], float] = random.random, +): + """retry an async callable with exponential backoff; call form or decorator + + async twin of `retry`. `await aretry(coro_fn, ...)` runs immediately; + `@aretry(...)` wraps a coroutine function. same semantics: retry on `on`, stop on + `give_up`, re-raise the last exception after `attempts`. `sleep`/`rand` injectable. + """ + types = _as_types(on) + + async def run(target: Callable, args, kwargs): + delays = list(_delays(attempts, backoff, factor, max_backoff)) + last_index = attempts - 1 + for index in range(attempts): + try: + return await target(*args, **kwargs) + except types as exc: + if give_up is not None and give_up(exc): + raise + if index == last_index: + raise + wait = _jittered(delays[index], jitter, rand) + log.warning( + "retry %d/%d after %s: %s", + index + 1, last_index, type(exc).__name__, exc, + ) + if wait > 0: + await sleep(wait) + + def decorator(target: Callable) -> Callable: + @functools.wraps(target) + async def wrapper(*args, **kwargs): + return await run(target, args, kwargs) + return wrapper + + if fn is not None: + return run(fn, (), {}) + return decorator