diff --git a/pyproject.toml b/pyproject.toml index 34a6d7d..9f55fc6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,11 +4,12 @@ build-backend = "hatchling.build" [project] name = "aiowebhooks" -version = "0.1.0" +version = "0.1.1" description = "async webhook sender (aiohttp) with round-robin urls, retry, and proxy rotation; optional discord.py embeds" requires-python = ">=3.10" dependencies = [ "aiohttp>=3.9", + "commons @ git+ssh://git@git.rethinkstudios.io/rethink-public/commons.git@v0.2.0", ] [project.optional-dependencies] @@ -16,5 +17,8 @@ discord = [ "discord.py>=2.3", ] +[tool.hatch.metadata] +allow-direct-references = true + [tool.hatch.build.targets.wheel] packages = ["src/aiowebhooks"] diff --git a/src/aiowebhooks/__init__.py b/src/aiowebhooks/__init__.py index 785ce02..e00ce4b 100644 --- a/src/aiowebhooks/__init__.py +++ b/src/aiowebhooks/__init__.py @@ -21,4 +21,4 @@ from .sender import Webhook __all__ = ["Webhook", "WebhookResult", "WebhookError", "NoUrlsError"] -__version__ = "0.1.0" +__version__ = "0.1.1" diff --git a/src/aiowebhooks/sender.py b/src/aiowebhooks/sender.py index 53899d4..9cc873d 100644 --- a/src/aiowebhooks/sender.py +++ b/src/aiowebhooks/sender.py @@ -13,6 +13,7 @@ from typing import Dict, List, Optional, Union from urllib.parse import unquote, urlsplit import aiohttp +from commons import aretry from .errors import NoUrlsError from .result import WebhookResult @@ -20,6 +21,18 @@ from .result import WebhookResult log = logging.getLogger(__name__) +class _Retryable(Exception): + """internal signal: a retryable HTTP status (429/5xx); carries the response + + raised inside an attempt so commons.aretry drives the backoff + cap; the loop + catches the final one to return the REAL last response, not a synthetic result. + """ + + def __init__(self, result: WebhookResult): + super().__init__(f"retryable status {result.status}") + self.result = result + + def _proxy_string(proxies_dict: Optional[Dict[str, str]]) -> Optional[str]: """canonical host:port:user:pass (or host:port) from an aiohttp proxies dict @@ -118,29 +131,57 @@ class Webhook: async def _send_loop( self, session: aiohttp.ClientSession, url: str, payload: Dict ) -> WebhookResult: - """retry/rotation loop for a single send""" - attempts = 0 - retries = 0 - proxy_tries = 0 - last_proxy: Optional[str] = None + """status-retry (via commons.aretry) wrapping proxy rotation; never raises + + commons.aretry owns the 429/5xx backoff schedule + retry cap (max_retries), + retrying on the internal _Retryable signal. on exhaustion it re-raises the + last _Retryable, whose carried result is the REAL last response (not a + synthetic status-0). proxy rotation on connection errors lives inside the + attempt and is capped separately. + """ + self._attempt_no = 0 + try: + return await aretry( + lambda: self._attempt(session, url, payload), + attempts=self.max_retries + 1, + on=(_Retryable,), + ) + except _Retryable as exhausted: + return exhausted.result + + async def _attempt( + self, session: aiohttp.ClientSession, url: str, payload: Dict + ) -> WebhookResult: + """one logical send: proxy rotation + a single POST; may raise _Retryable + + raises _Retryable (carrying the real response) on a 429/5xx so the caller's + aretry applies backoff; honors an explicit 429 retry_after by sleeping it + before signalling. returns a final WebhookResult on success or a terminal + (non-retryable) failure — never lets a provider/connection error escape. + """ timeout = aiohttp.ClientTimeout(total=self.timeout) + last_proxy: Optional[str] = None + proxy_tries = 0 while True: + self._attempt_no += 1 + attempts = self._attempt_no proxy_url = None if self._proxies is not None: try: proxy_dict = self._proxies.get() - except Exception as error: - if type(error).__name__ == "ProxiesExhaustedError": - return WebhookResult( - ok=False, status=None, url=url, attempts=attempts, - error="proxies exhausted", proxy=last_proxy, - ) - raise + except Exception: + # duck-typed provider; any error from get() means no proxy is + # available — fail cleanly rather than escaping send(). + log.warning("webhook: proxy get() failed; no proxy available", + exc_info=True) + return WebhookResult( + ok=False, status=None, url=url, attempts=attempts, + error="proxies unavailable", proxy=last_proxy, + ) last_proxy = _proxy_string(proxy_dict) proxy_url = (proxy_dict or {}).get("http") or (proxy_dict or {}).get("https") - attempts += 1 try: async with session.post( url, json=payload, proxy=proxy_url, timeout=timeout @@ -154,25 +195,20 @@ class Webhook: response=body, proxy=last_proxy, ) - wait = self._retry_after(status, resp.headers, body) - if wait is not None and retries < self.max_retries: - retries += 1 - log.warning("webhook 429 on %s; waiting %.3fs (retry %d/%d)", - url, wait, retries, self.max_retries) - await asyncio.sleep(wait) - continue - - if status >= 500 and retries < self.max_retries: - retries += 1 - log.warning("webhook %d on %s; retry %d/%d", - status, url, retries, self.max_retries) - continue - - return WebhookResult( + result = WebhookResult( ok=False, status=status, url=url, attempts=attempts, error=f"http {status}", response=body, proxy=last_proxy, ) + wait = self._retry_after(status, resp.headers, body) + if wait is not None: + log.warning("webhook 429 on %s; honoring retry_after %.3fs", url, wait) + await asyncio.sleep(wait) + raise _Retryable(result) + if status >= 500: + raise _Retryable(result) + return result + except (aiohttp.ClientError, asyncio.TimeoutError) as error: if self._proxies is not None and proxy_tries < self.max_proxy_retries: if self._burn(last_proxy): @@ -188,18 +224,20 @@ class Webhook: ) def _burn(self, proxy: Optional[str]) -> bool: - """burn the current proxy; return False if the provider is exhausted + """burn the current proxy; return False if it can't be rotated - catches a provider ProxiesExhaustedError duck-typed by class name (the - provider is never imported), so a dying pool ends the loop cleanly. + the provider is duck-typed and never imported, so we cannot catch its + exception types by class. ANY exception from burn (a ProxiesExhaustedError + on a dead pool, a ValueError when the proxy isn't in the pool, etc.) means + we can't rotate — return False so the caller ends the loop with a failed + result rather than letting it escape send() (which must never raise). """ try: self._proxies.burn(proxy) return True - except Exception as error: - if type(error).__name__ == "ProxiesExhaustedError": - return False - raise + except Exception: + log.warning("webhook: proxy burn failed; ending rotation", exc_info=True) + return False @staticmethod async def _read_body(resp) -> Optional[Union[str, Dict]]: