feat: retry/backoff module (commons v0.2.0)
add commons.retry: exponential-backoff retry as sync `retry` and async `aretry`, each usable as a call form or a decorator. backoff is min(backoff*factor**n, max_backoff) with optional full jitter; the schedule is a pure generator so it tests without real sleeps (sleep + rand injectable). `on=` narrows retryable exception types, `give_up(exc)` stops early on a non-retryable error, and after attempts are exhausted the LAST exception is re-raised (fail loud, never swallowed). de-dups retry logic written 3x divergently (aiowebhooks 429/5xx, aioproxies burn/rotate, aiomail reconnect). Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
parent
7437e1feb0
commit
b4a9e196cc
38
README.md
38
README.md
@ -5,15 +5,16 @@ Small sync helpers shared across projects. Base is stdlib only — **no dependen
|
|||||||
- `timing` — unix-timestamp deltas + timezone-aware datetime conversions
|
- `timing` — unix-timestamp deltas + timezone-aware datetime conversions
|
||||||
- `paths` — nested dict/list access by dotted path
|
- `paths` — nested dict/list access by dotted path
|
||||||
- `masking` — display masking for cards / cvv / tokens
|
- `masking` — display masking for cards / cvv / tokens
|
||||||
|
- `retry` — exponential-backoff retry, sync (`retry`) and async (`aretry`)
|
||||||
- `addr` — ip/address tooling: pure stdlib ip utils in base, async geo lookups
|
- `addr` — ip/address tooling: pure stdlib ip utils in base, async geo lookups
|
||||||
behind the `commons[addr]` extra
|
behind the `commons[addr]` extra
|
||||||
|
|
||||||
## Install
|
## Install
|
||||||
|
|
||||||
```
|
```
|
||||||
commons @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.1.0
|
commons @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.2.0
|
||||||
# async address/geo lookups (fetch_ip / ip_location / fetch_location) need the extra:
|
# async address/geo lookups (fetch_ip / ip_location / fetch_location) need the extra:
|
||||||
commons[addr] @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.1.0
|
commons[addr] @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.2.0
|
||||||
```
|
```
|
||||||
|
|
||||||
The base install pulls **nothing** (stdlib). Only `commons[addr]` adds `aiohttp`, and
|
The base install pulls **nothing** (stdlib). Only `commons[addr]` adds `aiohttp`, and
|
||||||
@ -112,6 +113,39 @@ phantom("abcdef1234567890") # "abcdef...7890"
|
|||||||
provider("4111111111111111") # "VISA" (VISA/MC/AMEX/UPAY/DISC/JCB/DNRS/UNKW)
|
provider("4111111111111111") # "VISA" (VISA/MC/AMEX/UPAY/DISC/JCB/DNRS/UNKW)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## retry
|
||||||
|
|
||||||
|
Exponential-backoff retry, sync (`retry`) and async (`aretry`). Each works as a **call
|
||||||
|
form** or a **decorator**, with the same kwargs. After the attempts are exhausted the
|
||||||
|
**last exception is re-raised** — it never swallows or returns a default.
|
||||||
|
|
||||||
|
```python
|
||||||
|
from commons import retry, aretry
|
||||||
|
|
||||||
|
# call form
|
||||||
|
rows = retry(lambda: read_db(), attempts=5, on=(IOError,))
|
||||||
|
data = await aretry(lambda: fetch(url), attempts=3, backoff=0.5, on=(TimeoutError,))
|
||||||
|
|
||||||
|
# decorator form (same kwargs)
|
||||||
|
@aretry(attempts=4, backoff=0.5, factor=2.0, on=(ConnectionError,))
|
||||||
|
async def pull():
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
Knobs: `attempts` (total tries), `backoff` / `factor` / `max_backoff` (delay is
|
||||||
|
`min(backoff * factor**n, max_backoff)`), `jitter` (full jitter, on by default),
|
||||||
|
`on=` (tuple of retryable exception types), and `give_up=lambda exc: ...` to stop early
|
||||||
|
on a non-retryable error (e.g. a 400 vs a 429):
|
||||||
|
|
||||||
|
```python
|
||||||
|
# retry 429/5xx but give up immediately on a 4xx
|
||||||
|
await aretry(send, attempts=4, on=(HTTPError,),
|
||||||
|
give_up=lambda e: 400 <= e.status < 500 and e.status != 429)
|
||||||
|
```
|
||||||
|
|
||||||
|
Each retry is logged (emit-only). `sleep=` and `rand=` are injectable for deterministic
|
||||||
|
tests (no real waits).
|
||||||
|
|
||||||
## addr
|
## addr
|
||||||
|
|
||||||
IP/address tooling, exposed as a submodule. The pure `ip` utilities ship in the base
|
IP/address tooling, exposed as a submodule. The pure `ip` utilities ship in the base
|
||||||
|
|||||||
@ -4,8 +4,8 @@ build-backend = "hatchling.build"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "commons"
|
name = "commons"
|
||||||
version = "0.1.0"
|
version = "0.2.0"
|
||||||
description = "small stdlib-only sync helpers: time/timezone deltas, dotted-path dict access, display masking, and ip/address tooling"
|
description = "small stdlib-only sync helpers: time/timezone deltas, dotted-path dict access, display masking, ip/address tooling, and retry/backoff"
|
||||||
requires-python = ">=3.10"
|
requires-python = ">=3.10"
|
||||||
dependencies = []
|
dependencies = []
|
||||||
|
|
||||||
|
|||||||
@ -6,6 +6,8 @@ paths: nested dict/list access by dotted path (deep_get / deep_set).
|
|||||||
masking: display masking for cards / cvv / tokens (cosmetic, not a security control).
|
masking: display masking for cards / cvv / tokens (cosmetic, not a security control).
|
||||||
addr: ip/address tooling — pure stdlib ip utils (`commons.addr.ip`) in base,
|
addr: ip/address tooling — pure stdlib ip utils (`commons.addr.ip`) in base,
|
||||||
async geo lookups (`commons.addr.geo`) behind the `commons[addr]` extra.
|
async geo lookups (`commons.addr.geo`) behind the `commons[addr]` extra.
|
||||||
|
retry: exponential-backoff retry, sync (`retry`) and async (`aretry`), call or
|
||||||
|
decorator form; re-raises the last exception (fail loud), never swallows.
|
||||||
|
|
||||||
base is stdlib only, no dependencies (the addr geo lookups add aiohttp via the
|
base is stdlib only, no dependencies (the addr geo lookups add aiohttp via the
|
||||||
extra). to toggle the timing test mode for bare calls, set it on the module —
|
extra). to toggle the timing test mode for bare calls, set it on the module —
|
||||||
@ -16,6 +18,7 @@ addr`); its ip helpers live under `commons.addr.ip` to keep top-level uncluttere
|
|||||||
from . import addr, masking, paths, timing
|
from . import addr, masking, paths, timing
|
||||||
from .masking import credit, cvv, phantom, provider
|
from .masking import credit, cvv, phantom, provider
|
||||||
from .paths import deep_get, deep_set
|
from .paths import deep_get, deep_set
|
||||||
|
from .retry import aretry, retry
|
||||||
from .timing import (
|
from .timing import (
|
||||||
UTC,
|
UTC,
|
||||||
Clock,
|
Clock,
|
||||||
@ -56,6 +59,8 @@ __all__ = [
|
|||||||
"cvv",
|
"cvv",
|
||||||
"phantom",
|
"phantom",
|
||||||
"provider",
|
"provider",
|
||||||
|
"retry",
|
||||||
|
"aretry",
|
||||||
]
|
]
|
||||||
|
|
||||||
__version__ = "0.1.0"
|
__version__ = "0.2.0"
|
||||||
|
|||||||
161
src/commons/retry.py
Normal file
161
src/commons/retry.py
Normal file
@ -0,0 +1,161 @@
|
|||||||
|
"""retry with exponential backoff — sync and async, one backoff engine.
|
||||||
|
|
||||||
|
de-duplicates retry logic that was written divergently across several libs (HTTP
|
||||||
|
429/5xx caps, proxy burn/rotate caps, IMAP reconnects). both a call form and a
|
||||||
|
decorator form share one implementation per flavor; the backoff schedule is a pure
|
||||||
|
generator so it tests without real sleeps.
|
||||||
|
|
||||||
|
from commons import retry, aretry
|
||||||
|
|
||||||
|
# call form
|
||||||
|
rows = retry(lambda: db_read(), attempts=5, on=(IOError,))
|
||||||
|
data = await aretry(lambda: fetch(url), attempts=3, on=(TimeoutError,))
|
||||||
|
|
||||||
|
# decorator form (same kwargs)
|
||||||
|
@aretry(attempts=4, backoff=0.5, on=(ConnectionError,))
|
||||||
|
async def pull():
|
||||||
|
...
|
||||||
|
|
||||||
|
after `attempts` are exhausted the LAST exception is re-raised (fail loud, never
|
||||||
|
swallowed). `on` narrows which exceptions retry; `give_up(exc) -> bool` stops early
|
||||||
|
on a non-retryable error (e.g. a 400 vs a 429). each retry is logged (emit-only),
|
||||||
|
never printed.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import functools
|
||||||
|
import logging
|
||||||
|
import random
|
||||||
|
import time
|
||||||
|
from typing import Callable, Iterable, Optional, Tuple, Type
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
ExcTypes = Tuple[Type[BaseException], ...]
|
||||||
|
|
||||||
|
|
||||||
|
def _delays(attempts: int, backoff: float, factor: float, max_backoff: float):
|
||||||
|
"""yield the wait before each retry: min(backoff * factor**n, max_backoff)
|
||||||
|
|
||||||
|
yields `attempts - 1` delays (one before each retry after the first try). the
|
||||||
|
raw, un-jittered schedule — jitter is applied at call time so the schedule stays
|
||||||
|
pure and testable.
|
||||||
|
"""
|
||||||
|
for n in range(max(0, attempts - 1)):
|
||||||
|
yield min(backoff * (factor ** n), max_backoff)
|
||||||
|
|
||||||
|
|
||||||
|
def _jittered(delay: float, jitter: bool, rand: Callable[[], float]) -> float:
|
||||||
|
"""apply full jitter to a delay when enabled: uniform(0, delay)"""
|
||||||
|
if not jitter or delay <= 0:
|
||||||
|
return delay
|
||||||
|
return rand() * delay
|
||||||
|
|
||||||
|
|
||||||
|
def _as_types(on: Iterable[Type[BaseException]]) -> ExcTypes:
|
||||||
|
"""coerce the `on` argument into a tuple of exception types"""
|
||||||
|
if isinstance(on, type):
|
||||||
|
return (on,)
|
||||||
|
return tuple(on)
|
||||||
|
|
||||||
|
|
||||||
|
def retry(
|
||||||
|
fn: Optional[Callable] = None,
|
||||||
|
*,
|
||||||
|
attempts: int = 3,
|
||||||
|
backoff: float = 0.5,
|
||||||
|
factor: float = 2.0,
|
||||||
|
max_backoff: float = 30.0,
|
||||||
|
jitter: bool = True,
|
||||||
|
on: Iterable[Type[BaseException]] = (Exception,),
|
||||||
|
give_up: Optional[Callable[[BaseException], bool]] = None,
|
||||||
|
sleep: Callable[[float], None] = time.sleep,
|
||||||
|
rand: Callable[[], float] = random.random,
|
||||||
|
):
|
||||||
|
"""retry a sync callable with exponential backoff; call form or decorator
|
||||||
|
|
||||||
|
`retry(fn, ...)` runs immediately; `@retry(...)` wraps a function. retries on the
|
||||||
|
`on` exceptions, stops early if `give_up(exc)` is true, re-raises the last
|
||||||
|
exception once `attempts` are exhausted. `sleep`/`rand` are injectable for tests.
|
||||||
|
"""
|
||||||
|
types = _as_types(on)
|
||||||
|
|
||||||
|
def run(target: Callable, args, kwargs):
|
||||||
|
delays = list(_delays(attempts, backoff, factor, max_backoff))
|
||||||
|
last_index = attempts - 1
|
||||||
|
for index in range(attempts):
|
||||||
|
try:
|
||||||
|
return target(*args, **kwargs)
|
||||||
|
except types as exc:
|
||||||
|
if give_up is not None and give_up(exc):
|
||||||
|
raise
|
||||||
|
if index == last_index:
|
||||||
|
raise
|
||||||
|
wait = _jittered(delays[index], jitter, rand)
|
||||||
|
log.warning(
|
||||||
|
"retry %d/%d after %s: %s",
|
||||||
|
index + 1, last_index, type(exc).__name__, exc,
|
||||||
|
)
|
||||||
|
if wait > 0:
|
||||||
|
sleep(wait)
|
||||||
|
|
||||||
|
def decorator(target: Callable) -> Callable:
|
||||||
|
@functools.wraps(target)
|
||||||
|
def wrapper(*args, **kwargs):
|
||||||
|
return run(target, args, kwargs)
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
if fn is not None:
|
||||||
|
return run(fn, (), {})
|
||||||
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
|
def aretry(
|
||||||
|
fn: Optional[Callable] = None,
|
||||||
|
*,
|
||||||
|
attempts: int = 3,
|
||||||
|
backoff: float = 0.5,
|
||||||
|
factor: float = 2.0,
|
||||||
|
max_backoff: float = 30.0,
|
||||||
|
jitter: bool = True,
|
||||||
|
on: Iterable[Type[BaseException]] = (Exception,),
|
||||||
|
give_up: Optional[Callable[[BaseException], bool]] = None,
|
||||||
|
sleep: Callable[[float], "asyncio.Future"] = asyncio.sleep,
|
||||||
|
rand: Callable[[], float] = random.random,
|
||||||
|
):
|
||||||
|
"""retry an async callable with exponential backoff; call form or decorator
|
||||||
|
|
||||||
|
async twin of `retry`. `await aretry(coro_fn, ...)` runs immediately;
|
||||||
|
`@aretry(...)` wraps a coroutine function. same semantics: retry on `on`, stop on
|
||||||
|
`give_up`, re-raise the last exception after `attempts`. `sleep`/`rand` injectable.
|
||||||
|
"""
|
||||||
|
types = _as_types(on)
|
||||||
|
|
||||||
|
async def run(target: Callable, args, kwargs):
|
||||||
|
delays = list(_delays(attempts, backoff, factor, max_backoff))
|
||||||
|
last_index = attempts - 1
|
||||||
|
for index in range(attempts):
|
||||||
|
try:
|
||||||
|
return await target(*args, **kwargs)
|
||||||
|
except types as exc:
|
||||||
|
if give_up is not None and give_up(exc):
|
||||||
|
raise
|
||||||
|
if index == last_index:
|
||||||
|
raise
|
||||||
|
wait = _jittered(delays[index], jitter, rand)
|
||||||
|
log.warning(
|
||||||
|
"retry %d/%d after %s: %s",
|
||||||
|
index + 1, last_index, type(exc).__name__, exc,
|
||||||
|
)
|
||||||
|
if wait > 0:
|
||||||
|
await sleep(wait)
|
||||||
|
|
||||||
|
def decorator(target: Callable) -> Callable:
|
||||||
|
@functools.wraps(target)
|
||||||
|
async def wrapper(*args, **kwargs):
|
||||||
|
return await run(target, args, kwargs)
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
if fn is not None:
|
||||||
|
return run(fn, (), {})
|
||||||
|
return decorator
|
||||||
Loading…
Reference in New Issue
Block a user