feat: retry/backoff module (commons v0.2.0)
add commons.retry: exponential-backoff retry as sync `retry` and async `aretry`, each usable as a call form or a decorator. backoff is min(backoff*factor**n, max_backoff) with optional full jitter; the schedule is a pure generator so it tests without real sleeps (sleep + rand injectable). `on=` narrows retryable exception types, `give_up(exc)` stops early on a non-retryable error, and after attempts are exhausted the LAST exception is re-raised (fail loud, never swallowed). de-dups retry logic written 3x divergently (aiowebhooks 429/5xx, aioproxies burn/rotate, aiomail reconnect). Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
parent
7437e1feb0
commit
0939917172
38
README.md
38
README.md
@ -5,15 +5,16 @@ Small sync helpers shared across projects. Base is stdlib only — **no dependen
|
||||
- `timing` — unix-timestamp deltas + timezone-aware datetime conversions
|
||||
- `paths` — nested dict/list access by dotted path
|
||||
- `masking` — display masking for cards / cvv / tokens
|
||||
- `retry` — exponential-backoff retry, sync (`retry`) and async (`aretry`)
|
||||
- `addr` — ip/address tooling: pure stdlib ip utils in base, async geo lookups
|
||||
behind the `commons[addr]` extra
|
||||
|
||||
## Install
|
||||
|
||||
```
|
||||
commons @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.1.0
|
||||
commons @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.2.0
|
||||
# async address/geo lookups (fetch_ip / ip_location / fetch_location) need the extra:
|
||||
commons[addr] @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.1.0
|
||||
commons[addr] @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.2.0
|
||||
```
|
||||
|
||||
The base install pulls **nothing** (stdlib). Only `commons[addr]` adds `aiohttp`, and
|
||||
@ -112,6 +113,39 @@ phantom("abcdef1234567890") # "abcdef...7890"
|
||||
provider("4111111111111111") # "VISA" (VISA/MC/AMEX/UPAY/DISC/JCB/DNRS/UNKW)
|
||||
```
|
||||
|
||||
## retry
|
||||
|
||||
Exponential-backoff retry, sync (`retry`) and async (`aretry`). Each works as a **call
|
||||
form** or a **decorator**, with the same kwargs. After the attempts are exhausted the
|
||||
**last exception is re-raised** — it never swallows or returns a default.
|
||||
|
||||
```python
|
||||
from commons import retry, aretry
|
||||
|
||||
# call form
|
||||
rows = retry(lambda: read_db(), attempts=5, on=(IOError,))
|
||||
data = await aretry(lambda: fetch(url), attempts=3, backoff=0.5, on=(TimeoutError,))
|
||||
|
||||
# decorator form (same kwargs)
|
||||
@aretry(attempts=4, backoff=0.5, factor=2.0, on=(ConnectionError,))
|
||||
async def pull():
|
||||
...
|
||||
```
|
||||
|
||||
Knobs: `attempts` (total tries), `backoff` / `factor` / `max_backoff` (delay is
|
||||
`min(backoff * factor**n, max_backoff)`), `jitter` (full jitter, on by default),
|
||||
`on=` (tuple of retryable exception types), and `give_up=lambda exc: ...` to stop early
|
||||
on a non-retryable error (e.g. a 400 vs a 429):
|
||||
|
||||
```python
|
||||
# retry 429/5xx but give up immediately on a 4xx
|
||||
await aretry(send, attempts=4, on=(HTTPError,),
|
||||
give_up=lambda e: 400 <= e.status < 500 and e.status != 429)
|
||||
```
|
||||
|
||||
Each retry is logged (emit-only). `sleep=` and `rand=` are injectable for deterministic
|
||||
tests (no real waits).
|
||||
|
||||
## addr
|
||||
|
||||
IP/address tooling, exposed as a submodule. The pure `ip` utilities ship in the base
|
||||
|
||||
@ -4,8 +4,8 @@ build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "commons"
|
||||
version = "0.1.0"
|
||||
description = "small stdlib-only sync helpers: time/timezone deltas, dotted-path dict access, display masking, and ip/address tooling"
|
||||
version = "0.2.0"
|
||||
description = "small stdlib-only sync helpers: time/timezone deltas, dotted-path dict access, display masking, ip/address tooling, and retry/backoff"
|
||||
requires-python = ">=3.10"
|
||||
dependencies = []
|
||||
|
||||
|
||||
@ -6,6 +6,8 @@ paths: nested dict/list access by dotted path (deep_get / deep_set).
|
||||
masking: display masking for cards / cvv / tokens (cosmetic, not a security control).
|
||||
addr: ip/address tooling — pure stdlib ip utils (`commons.addr.ip`) in base,
|
||||
async geo lookups (`commons.addr.geo`) behind the `commons[addr]` extra.
|
||||
retry: exponential-backoff retry, sync (`retry`) and async (`aretry`), call or
|
||||
decorator form; re-raises the last exception (fail loud), never swallows.
|
||||
|
||||
base is stdlib only, no dependencies (the addr geo lookups add aiohttp via the
|
||||
extra). to toggle the timing test mode for bare calls, set it on the module —
|
||||
@ -16,6 +18,7 @@ addr`); its ip helpers live under `commons.addr.ip` to keep top-level uncluttere
|
||||
from . import addr, masking, paths, timing
|
||||
from .masking import credit, cvv, phantom, provider
|
||||
from .paths import deep_get, deep_set
|
||||
from .retry import aretry, retry
|
||||
from .timing import (
|
||||
UTC,
|
||||
Clock,
|
||||
@ -56,6 +59,8 @@ __all__ = [
|
||||
"cvv",
|
||||
"phantom",
|
||||
"provider",
|
||||
"retry",
|
||||
"aretry",
|
||||
]
|
||||
|
||||
__version__ = "0.1.0"
|
||||
__version__ = "0.2.0"
|
||||
|
||||
161
src/commons/retry.py
Normal file
161
src/commons/retry.py
Normal file
@ -0,0 +1,161 @@
|
||||
"""retry with exponential backoff — sync and async, one backoff engine.
|
||||
|
||||
de-duplicates retry logic that was written divergently across several libs (HTTP
|
||||
429/5xx caps, proxy burn/rotate caps, IMAP reconnects). both a call form and a
|
||||
decorator form share one implementation per flavor; the backoff schedule is a pure
|
||||
generator so it tests without real sleeps.
|
||||
|
||||
from commons import retry, aretry
|
||||
|
||||
# call form
|
||||
rows = retry(lambda: db_read(), attempts=5, on=(IOError,))
|
||||
data = await aretry(lambda: fetch(url), attempts=3, on=(TimeoutError,))
|
||||
|
||||
# decorator form (same kwargs)
|
||||
@aretry(attempts=4, backoff=0.5, on=(ConnectionError,))
|
||||
async def pull():
|
||||
...
|
||||
|
||||
after `attempts` are exhausted the LAST exception is re-raised (fail loud, never
|
||||
swallowed). `on` narrows which exceptions retry; `give_up(exc) -> bool` stops early
|
||||
on a non-retryable error (e.g. a 400 vs a 429). each retry is logged (emit-only),
|
||||
never printed.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import functools
|
||||
import logging
|
||||
import random
|
||||
import time
|
||||
from typing import Callable, Iterable, Optional, Tuple, Type
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
|
||||
ExcTypes = Tuple[Type[BaseException], ...]
|
||||
|
||||
|
||||
def _delays(attempts: int, backoff: float, factor: float, max_backoff: float):
|
||||
"""yield the wait before each retry: min(backoff * factor**n, max_backoff)
|
||||
|
||||
yields `attempts - 1` delays (one before each retry after the first try). the
|
||||
raw, un-jittered schedule — jitter is applied at call time so the schedule stays
|
||||
pure and testable.
|
||||
"""
|
||||
for n in range(max(0, attempts - 1)):
|
||||
yield min(backoff * (factor ** n), max_backoff)
|
||||
|
||||
|
||||
def _jittered(delay: float, jitter: bool, rand: Callable[[], float]) -> float:
|
||||
"""apply full jitter to a delay when enabled: uniform(0, delay)"""
|
||||
if not jitter or delay <= 0:
|
||||
return delay
|
||||
return rand() * delay
|
||||
|
||||
|
||||
def _as_types(on: Iterable[Type[BaseException]]) -> ExcTypes:
|
||||
"""coerce the `on` argument into a tuple of exception types"""
|
||||
if isinstance(on, type):
|
||||
return (on,)
|
||||
return tuple(on)
|
||||
|
||||
|
||||
def retry(
|
||||
fn: Optional[Callable] = None,
|
||||
*,
|
||||
attempts: int = 3,
|
||||
backoff: float = 0.5,
|
||||
factor: float = 2.0,
|
||||
max_backoff: float = 30.0,
|
||||
jitter: bool = True,
|
||||
on: Iterable[Type[BaseException]] = (Exception,),
|
||||
give_up: Optional[Callable[[BaseException], bool]] = None,
|
||||
sleep: Callable[[float], None] = time.sleep,
|
||||
rand: Callable[[], float] = random.random,
|
||||
):
|
||||
"""retry a sync callable with exponential backoff; call form or decorator
|
||||
|
||||
`retry(fn, ...)` runs immediately; `@retry(...)` wraps a function. retries on the
|
||||
`on` exceptions, stops early if `give_up(exc)` is true, re-raises the last
|
||||
exception once `attempts` are exhausted. `sleep`/`rand` are injectable for tests.
|
||||
"""
|
||||
types = _as_types(on)
|
||||
|
||||
def run(target: Callable, args, kwargs):
|
||||
delays = list(_delays(attempts, backoff, factor, max_backoff))
|
||||
last_index = attempts - 1
|
||||
for index in range(attempts):
|
||||
try:
|
||||
return target(*args, **kwargs)
|
||||
except types as exc:
|
||||
if give_up is not None and give_up(exc):
|
||||
raise
|
||||
if index == last_index:
|
||||
raise
|
||||
wait = _jittered(delays[index], jitter, rand)
|
||||
log.warning(
|
||||
"retry %d/%d after %s: %s",
|
||||
index + 1, last_index, type(exc).__name__, exc,
|
||||
)
|
||||
if wait > 0:
|
||||
sleep(wait)
|
||||
|
||||
def decorator(target: Callable) -> Callable:
|
||||
@functools.wraps(target)
|
||||
def wrapper(*args, **kwargs):
|
||||
return run(target, args, kwargs)
|
||||
return wrapper
|
||||
|
||||
if fn is not None:
|
||||
return run(fn, (), {})
|
||||
return decorator
|
||||
|
||||
|
||||
def aretry(
|
||||
fn: Optional[Callable] = None,
|
||||
*,
|
||||
attempts: int = 3,
|
||||
backoff: float = 0.5,
|
||||
factor: float = 2.0,
|
||||
max_backoff: float = 30.0,
|
||||
jitter: bool = True,
|
||||
on: Iterable[Type[BaseException]] = (Exception,),
|
||||
give_up: Optional[Callable[[BaseException], bool]] = None,
|
||||
sleep: Callable[[float], "asyncio.Future"] = asyncio.sleep,
|
||||
rand: Callable[[], float] = random.random,
|
||||
):
|
||||
"""retry an async callable with exponential backoff; call form or decorator
|
||||
|
||||
async twin of `retry`. `await aretry(coro_fn, ...)` runs immediately;
|
||||
`@aretry(...)` wraps a coroutine function. same semantics: retry on `on`, stop on
|
||||
`give_up`, re-raise the last exception after `attempts`. `sleep`/`rand` injectable.
|
||||
"""
|
||||
types = _as_types(on)
|
||||
|
||||
async def run(target: Callable, args, kwargs):
|
||||
delays = list(_delays(attempts, backoff, factor, max_backoff))
|
||||
last_index = attempts - 1
|
||||
for index in range(attempts):
|
||||
try:
|
||||
return await target(*args, **kwargs)
|
||||
except types as exc:
|
||||
if give_up is not None and give_up(exc):
|
||||
raise
|
||||
if index == last_index:
|
||||
raise
|
||||
wait = _jittered(delays[index], jitter, rand)
|
||||
log.warning(
|
||||
"retry %d/%d after %s: %s",
|
||||
index + 1, last_index, type(exc).__name__, exc,
|
||||
)
|
||||
if wait > 0:
|
||||
await sleep(wait)
|
||||
|
||||
def decorator(target: Callable) -> Callable:
|
||||
@functools.wraps(target)
|
||||
async def wrapper(*args, **kwargs):
|
||||
return await run(target, args, kwargs)
|
||||
return wrapper
|
||||
|
||||
if fn is not None:
|
||||
return run(fn, (), {})
|
||||
return decorator
|
||||
Loading…
Reference in New Issue
Block a user