fix: never-raises contract + retry migration to commons.aretry (v0.1.1)
- seam bug: _burn/get() only caught ProxiesExhaustedError, but aioproxies.burn()
raises ValueError ('proxy not in pool') which escaped send() and broke the
'never raises on send failure' contract. catch ANY exception across the
duck-typed provider seam and convert to a failed WebhookResult.
- 5xx hot loop: 5xx retries had no backoff (immediate retry, hammering the
endpoint). migrate 429/5xx retry onto commons.aretry (>=0.2.0) for correct
exponential backoff + cap.
- lost response: exhausted retries returned a synthetic status-0 result; now the
real last 4xx/5xx status + body is returned (aretry re-raises the carried
_Retryable, the loop unwraps it).
verified by execution: burn/get ValueError no longer escapes, 5xx backs off
(~1.9s over 3 retries vs ~0s hot loop), exhausted 5xx returns real 503 + body,
429 retry_after honored, 4xx/rotation/round-robin intact.
Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
parent
7a0c8abf30
commit
40f8cc5b5f
@ -4,11 +4,12 @@ build-backend = "hatchling.build"
|
|||||||
|
|
||||||
[project]
|
[project]
|
||||||
name = "aiowebhooks"
|
name = "aiowebhooks"
|
||||||
version = "0.1.0"
|
version = "0.1.1"
|
||||||
description = "async webhook sender (aiohttp) with round-robin urls, retry, and proxy rotation; optional discord.py embeds"
|
description = "async webhook sender (aiohttp) with round-robin urls, retry, and proxy rotation; optional discord.py embeds"
|
||||||
requires-python = ">=3.10"
|
requires-python = ">=3.10"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aiohttp>=3.9",
|
"aiohttp>=3.9",
|
||||||
|
"commons @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.2.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
@ -16,5 +17,8 @@ discord = [
|
|||||||
"discord.py>=2.3",
|
"discord.py>=2.3",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[tool.hatch.metadata]
|
||||||
|
allow-direct-references = true
|
||||||
|
|
||||||
[tool.hatch.build.targets.wheel]
|
[tool.hatch.build.targets.wheel]
|
||||||
packages = ["src/aiowebhooks"]
|
packages = ["src/aiowebhooks"]
|
||||||
|
|||||||
@ -21,4 +21,4 @@ from .sender import Webhook
|
|||||||
|
|
||||||
__all__ = ["Webhook", "WebhookResult", "WebhookError", "NoUrlsError"]
|
__all__ = ["Webhook", "WebhookResult", "WebhookError", "NoUrlsError"]
|
||||||
|
|
||||||
__version__ = "0.1.0"
|
__version__ = "0.1.1"
|
||||||
|
|||||||
@ -13,6 +13,7 @@ from typing import Dict, List, Optional, Union
|
|||||||
from urllib.parse import unquote, urlsplit
|
from urllib.parse import unquote, urlsplit
|
||||||
|
|
||||||
import aiohttp
|
import aiohttp
|
||||||
|
from commons import aretry
|
||||||
|
|
||||||
from .errors import NoUrlsError
|
from .errors import NoUrlsError
|
||||||
from .result import WebhookResult
|
from .result import WebhookResult
|
||||||
@ -20,6 +21,18 @@ from .result import WebhookResult
|
|||||||
log = logging.getLogger(__name__)
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class _Retryable(Exception):
|
||||||
|
"""internal signal: a retryable HTTP status (429/5xx); carries the response
|
||||||
|
|
||||||
|
raised inside an attempt so commons.aretry drives the backoff + cap; the loop
|
||||||
|
catches the final one to return the REAL last response, not a synthetic result.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, result: WebhookResult):
|
||||||
|
super().__init__(f"retryable status {result.status}")
|
||||||
|
self.result = result
|
||||||
|
|
||||||
|
|
||||||
def _proxy_string(proxies_dict: Optional[Dict[str, str]]) -> Optional[str]:
|
def _proxy_string(proxies_dict: Optional[Dict[str, str]]) -> Optional[str]:
|
||||||
"""canonical host:port:user:pass (or host:port) from an aiohttp proxies dict
|
"""canonical host:port:user:pass (or host:port) from an aiohttp proxies dict
|
||||||
|
|
||||||
@ -118,29 +131,57 @@ class Webhook:
|
|||||||
async def _send_loop(
|
async def _send_loop(
|
||||||
self, session: aiohttp.ClientSession, url: str, payload: Dict
|
self, session: aiohttp.ClientSession, url: str, payload: Dict
|
||||||
) -> WebhookResult:
|
) -> WebhookResult:
|
||||||
"""retry/rotation loop for a single send"""
|
"""status-retry (via commons.aretry) wrapping proxy rotation; never raises
|
||||||
attempts = 0
|
|
||||||
retries = 0
|
commons.aretry owns the 429/5xx backoff schedule + retry cap (max_retries),
|
||||||
proxy_tries = 0
|
retrying on the internal _Retryable signal. on exhaustion it re-raises the
|
||||||
last_proxy: Optional[str] = None
|
last _Retryable, whose carried result is the REAL last response (not a
|
||||||
|
synthetic status-0). proxy rotation on connection errors lives inside the
|
||||||
|
attempt and is capped separately.
|
||||||
|
"""
|
||||||
|
self._attempt_no = 0
|
||||||
|
try:
|
||||||
|
return await aretry(
|
||||||
|
lambda: self._attempt(session, url, payload),
|
||||||
|
attempts=self.max_retries + 1,
|
||||||
|
on=(_Retryable,),
|
||||||
|
)
|
||||||
|
except _Retryable as exhausted:
|
||||||
|
return exhausted.result
|
||||||
|
|
||||||
|
async def _attempt(
|
||||||
|
self, session: aiohttp.ClientSession, url: str, payload: Dict
|
||||||
|
) -> WebhookResult:
|
||||||
|
"""one logical send: proxy rotation + a single POST; may raise _Retryable
|
||||||
|
|
||||||
|
raises _Retryable (carrying the real response) on a 429/5xx so the caller's
|
||||||
|
aretry applies backoff; honors an explicit 429 retry_after by sleeping it
|
||||||
|
before signalling. returns a final WebhookResult on success or a terminal
|
||||||
|
(non-retryable) failure — never lets a provider/connection error escape.
|
||||||
|
"""
|
||||||
timeout = aiohttp.ClientTimeout(total=self.timeout)
|
timeout = aiohttp.ClientTimeout(total=self.timeout)
|
||||||
|
last_proxy: Optional[str] = None
|
||||||
|
proxy_tries = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
self._attempt_no += 1
|
||||||
|
attempts = self._attempt_no
|
||||||
proxy_url = None
|
proxy_url = None
|
||||||
if self._proxies is not None:
|
if self._proxies is not None:
|
||||||
try:
|
try:
|
||||||
proxy_dict = self._proxies.get()
|
proxy_dict = self._proxies.get()
|
||||||
except Exception as error:
|
except Exception:
|
||||||
if type(error).__name__ == "ProxiesExhaustedError":
|
# duck-typed provider; any error from get() means no proxy is
|
||||||
return WebhookResult(
|
# available — fail cleanly rather than escaping send().
|
||||||
ok=False, status=None, url=url, attempts=attempts,
|
log.warning("webhook: proxy get() failed; no proxy available",
|
||||||
error="proxies exhausted", proxy=last_proxy,
|
exc_info=True)
|
||||||
)
|
return WebhookResult(
|
||||||
raise
|
ok=False, status=None, url=url, attempts=attempts,
|
||||||
|
error="proxies unavailable", proxy=last_proxy,
|
||||||
|
)
|
||||||
last_proxy = _proxy_string(proxy_dict)
|
last_proxy = _proxy_string(proxy_dict)
|
||||||
proxy_url = (proxy_dict or {}).get("http") or (proxy_dict or {}).get("https")
|
proxy_url = (proxy_dict or {}).get("http") or (proxy_dict or {}).get("https")
|
||||||
|
|
||||||
attempts += 1
|
|
||||||
try:
|
try:
|
||||||
async with session.post(
|
async with session.post(
|
||||||
url, json=payload, proxy=proxy_url, timeout=timeout
|
url, json=payload, proxy=proxy_url, timeout=timeout
|
||||||
@ -154,25 +195,20 @@ class Webhook:
|
|||||||
response=body, proxy=last_proxy,
|
response=body, proxy=last_proxy,
|
||||||
)
|
)
|
||||||
|
|
||||||
wait = self._retry_after(status, resp.headers, body)
|
result = WebhookResult(
|
||||||
if wait is not None and retries < self.max_retries:
|
|
||||||
retries += 1
|
|
||||||
log.warning("webhook 429 on %s; waiting %.3fs (retry %d/%d)",
|
|
||||||
url, wait, retries, self.max_retries)
|
|
||||||
await asyncio.sleep(wait)
|
|
||||||
continue
|
|
||||||
|
|
||||||
if status >= 500 and retries < self.max_retries:
|
|
||||||
retries += 1
|
|
||||||
log.warning("webhook %d on %s; retry %d/%d",
|
|
||||||
status, url, retries, self.max_retries)
|
|
||||||
continue
|
|
||||||
|
|
||||||
return WebhookResult(
|
|
||||||
ok=False, status=status, url=url, attempts=attempts,
|
ok=False, status=status, url=url, attempts=attempts,
|
||||||
error=f"http {status}", response=body, proxy=last_proxy,
|
error=f"http {status}", response=body, proxy=last_proxy,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
wait = self._retry_after(status, resp.headers, body)
|
||||||
|
if wait is not None:
|
||||||
|
log.warning("webhook 429 on %s; honoring retry_after %.3fs", url, wait)
|
||||||
|
await asyncio.sleep(wait)
|
||||||
|
raise _Retryable(result)
|
||||||
|
if status >= 500:
|
||||||
|
raise _Retryable(result)
|
||||||
|
return result
|
||||||
|
|
||||||
except (aiohttp.ClientError, asyncio.TimeoutError) as error:
|
except (aiohttp.ClientError, asyncio.TimeoutError) as error:
|
||||||
if self._proxies is not None and proxy_tries < self.max_proxy_retries:
|
if self._proxies is not None and proxy_tries < self.max_proxy_retries:
|
||||||
if self._burn(last_proxy):
|
if self._burn(last_proxy):
|
||||||
@ -188,18 +224,20 @@ class Webhook:
|
|||||||
)
|
)
|
||||||
|
|
||||||
def _burn(self, proxy: Optional[str]) -> bool:
|
def _burn(self, proxy: Optional[str]) -> bool:
|
||||||
"""burn the current proxy; return False if the provider is exhausted
|
"""burn the current proxy; return False if it can't be rotated
|
||||||
|
|
||||||
catches a provider ProxiesExhaustedError duck-typed by class name (the
|
the provider is duck-typed and never imported, so we cannot catch its
|
||||||
provider is never imported), so a dying pool ends the loop cleanly.
|
exception types by class. ANY exception from burn (a ProxiesExhaustedError
|
||||||
|
on a dead pool, a ValueError when the proxy isn't in the pool, etc.) means
|
||||||
|
we can't rotate — return False so the caller ends the loop with a failed
|
||||||
|
result rather than letting it escape send() (which must never raise).
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
self._proxies.burn(proxy)
|
self._proxies.burn(proxy)
|
||||||
return True
|
return True
|
||||||
except Exception as error:
|
except Exception:
|
||||||
if type(error).__name__ == "ProxiesExhaustedError":
|
log.warning("webhook: proxy burn failed; ending rotation", exc_info=True)
|
||||||
return False
|
return False
|
||||||
raise
|
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def _read_body(resp) -> Optional[Union[str, Dict]]:
|
async def _read_body(resp) -> Optional[Union[str, Dict]]:
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user