add package: pyproject + src

ExtendedSession: aiohttp session wrapper with proxies, header overwrites,
ephemeral headers, domain rewriting, request previews/cURL export, and
retry-with-backoff. byte-sending isolated behind one overridable
_raw_request seam so a TLS-fingerprinting backend can subclass and swap
the client. backend-agnostic Response/FailureResponse (same surface,
falsy on failure). config-free, object-only, explicit lifecycle.
src/ multi-module layout, hatchling build.

Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
disqualifier 2026-06-24 18:01:39 -04:00
parent 6a3c4347ec
commit 182cfb5fe3
5 changed files with 628 additions and 0 deletions

16
pyproject.toml Normal file
View File

@ -0,0 +1,16 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "aioweb"
version = "0.1.0"
description = "Async HTTP session wrapper over aiohttp — proxies, header overwrites, retries, previews. Config-free, installable."
requires-python = ">=3.10"
dependencies = [
"aiohttp>=3.9",
"yarl>=1.9",
]
[tool.hatch.build.targets.wheel]
packages = ["src/aioweb"]

33
src/aioweb/__init__.py Normal file
View File

@ -0,0 +1,33 @@
from .session import ExtendedSession, DEFAULT_ATTEMPTS, RETRY_STATUSES
from .responses import Response, FailureResponse, AiowebError, aiowebResponse
from .preview import RequestPreview
__all__ = [
"ExtendedSession",
"Response",
"aiowebResponse",
"FailureResponse",
"AiowebError",
"RequestPreview",
"request_retries",
"test_proxies",
"DEFAULT_ATTEMPTS",
"RETRY_STATUSES",
]
async def request_retries(session, method, url, **kwargs):
"""back-compat wrapper for session.request_with_retries(...)
prefer calling session.request_with_retries(...) directly.
"""
return await session.request_with_retries(method, url, **kwargs)
async def test_proxies(session, url="https://api.ipify.org?format=json"):
"""fetch the public IP via the session (verifies proxy config)
url defaults to ipify; pass another IP-echo endpoint to point elsewhere.
"""
response = await session.request("GET", url)
return response.json()

39
src/aioweb/preview.py Normal file
View File

@ -0,0 +1,39 @@
"""
request preview for aioweb format or export a request without sending it
"""
import json as _json
class RequestPreview:
"""a formatted preview of an HTTP request (does not send)"""
def __init__(self, method, url, headers, proxy, data=None, json=None, params=None, timeout=None):
self.details = {
"method": method.upper(),
"url": url,
"headers": headers,
"proxy": proxy,
"data": data,
"json": json,
"params": params,
"timeout": timeout,
}
def __str__(self):
"""key: value lines for each detail"""
return "\n".join(f"{key}: {value}" for key, value in self.details.items())
def as_curl(self):
"""equivalent cURL command for the request"""
parts = [f"curl -X {self.details['method']}"]
for header, value in (self.details["headers"] or {}).items():
parts.append(f"-H '{header}: {value}'")
if self.details["data"]:
parts.append(f"--data '{self.details['data']}'")
elif self.details["json"]:
parts.append(f"--data '{_json.dumps(self.details['json'])}'")
parts.append(f"'{self.details['url']}'")
if self.details["proxy"]:
parts.append(f"--proxy '{self.details['proxy']}'")
return " \\\n ".join(parts)

175
src/aioweb/responses.py Normal file
View File

@ -0,0 +1,175 @@
"""
backend-agnostic response objects for aioweb
Response is built from primitives (status, headers, content, url, history) rather
than holding a raw aiohttp object, so any backend can produce one. FailureResponse
mirrors the same surface so callers can branch uniformly every status/predicate is
a property on both.
"""
import json as _json
from typing import Optional
class AiowebError(Exception):
"""raised by raise_for_status() on a non-2xx response"""
class Response:
"""a wrapped HTTP response with cached content, built from primitives"""
def __init__(
self, status_code: int, headers, content: bytes, url: str,
*, reason: Optional[str] = None, history=None, cookies=None, encoding: str = "utf-8",
):
self._status_code = status_code
self._headers = headers or {}
self._content = content
self._url = url
self._reason = reason
self._history = history or []
self._cookies = cookies
self._encoding = encoding
self._text = None
@property
def status_code(self) -> int:
"""HTTP status code"""
return self._status_code
@property
def headers(self):
"""response headers"""
return self._headers
@property
def url(self) -> str:
"""final URL after redirects"""
return self._url
@property
def reason(self):
"""reason phrase for the status code"""
return self._reason
@property
def cookies(self):
"""cookies set in the response"""
return self._cookies
@property
def history(self):
"""redirect history (list of (status, url) tuples)"""
return self._history
@property
def redirect_chain(self):
"""list of (status, url) tuples for all redirects"""
return list(self._history)
@property
def is_redirect(self) -> bool:
"""whether the status is a redirect"""
return self._status_code in (301, 302, 303, 307, 308)
@property
def is_success(self) -> bool:
"""whether the status indicates success (2xx)"""
return 200 <= self._status_code < 300
@property
def content(self) -> bytes:
"""raw response bytes"""
return self._content
def text(self, encoding: Optional[str] = None) -> str:
"""decoded text content (cached)"""
if self._text is None:
self._text = self._content.decode(encoding or self._encoding)
return self._text
def json(self):
"""parsed JSON content, or None if not valid JSON"""
try:
return _json.loads(self.text())
except _json.JSONDecodeError:
return None
def raise_for_status(self):
"""raise AiowebError if the status is not 2xx"""
if not self.is_success:
raise AiowebError(f"HTTP {self._status_code}: {self._reason}")
def __bool__(self) -> bool:
return self.is_success
def __repr__(self) -> str:
return f"<Response [{self._status_code}]>"
class FailureResponse:
"""response-like object returned when all retries fail; falsy, same surface as Response"""
def __init__(self, reason, status_code: int = 0, url: Optional[str] = None):
self._reason = reason
self._status_code = status_code
self._url = url
@property
def status_code(self) -> int:
return self._status_code
@property
def reason(self):
return self._reason
@property
def url(self):
return self._url
@property
def headers(self):
return {}
@property
def cookies(self):
return None
@property
def history(self):
return []
@property
def redirect_chain(self):
return []
@property
def is_redirect(self) -> bool:
return False
@property
def is_success(self) -> bool:
return False
@property
def content(self):
return None
def text(self, encoding=None):
return None
def json(self):
return None
def raise_for_status(self):
raise AiowebError(self._reason)
def __bool__(self) -> bool:
return False
def __repr__(self) -> str:
return f"<FailureResponse [{self._status_code}] {self._reason}>"
# back-compat alias — the response class was renamed Response
aiowebResponse = Response

365
src/aioweb/session.py Normal file
View File

@ -0,0 +1,365 @@
"""
async HTTP session wrapper over aiohttp
ExtendedSession adds session-level proxies, header overwrites, ephemeral
(per-request generated) headers, domain rewriting, request previews, and
retry/backoff on top of aiohttp. The actual byte-sending is isolated in
_raw_request() so a different backend (e.g. a TLS-fingerprinting client) can
subclass and override just that one method, inheriting everything else.
async with ExtendedSession(proxies={"https": "http://..."}) as s:
resp = await s.request_with_retries("GET", url)
if resp: # FailureResponse is falsy
data = resp.json()
config-free: proxies/headers/timeouts are passed at construction or per call.
sessions must be closed explicitly (async with, or await s.close()); there is no
__del__ auto-close (that pattern is unsafe for async resources).
"""
import asyncio
import logging
import warnings
import aiohttp
from yarl import URL
from .preview import RequestPreview
from .responses import Response, FailureResponse
log = logging.getLogger(__name__)
DEFAULT_ATTEMPTS = 3
DEFAULT_BACKOFF_BASE = 2.0
# statuses worth retrying: rate limit + transient server errors
RETRY_STATUSES = frozenset({429, 500, 502, 503, 504})
class ExtendedSession:
"""aiohttp session wrapper with proxies, header overwrites, and retries"""
def __init__(
self, proxies=None, headers=None, overwrites=None, domain_overwrites=None,
inject=False, timeout=10, **kwargs,
):
"""
args:
proxies: dict of scheme -> proxy url (e.g. {"https": "http://..."})
headers: default session headers
overwrites: header overwrites applied per request (see inject)
domain_overwrites: {target: replacement} host substring rewrites
inject: if True, overwrites/ephemerals are added even when absent from
the request headers; if False, they only replace existing ones
timeout: total timeout seconds (connect/read get half)
kwargs: passed through to aiohttp.ClientSession
"""
self.inject = inject
self.header_overwrites = overwrites or {}
self.domain_overwrites = domain_overwrites or {}
self.ephemeral_headers = {}
self.proxies = proxies or {}
# track our own default headers instead of touching aiohttp privates
self._default_headers = dict(headers or {})
self.session = self._create_session(self._default_headers, timeout, **kwargs)
def _create_session(self, headers, timeout, **kwargs):
"""create the backend HTTP session — override to use a different client
a subclass swapping the HTTP backend (e.g. a TLS-fingerprinting client)
overrides this to return its own session object. the overwrite/domain/
proxy/retry/preview logic in this class never touches the session object
directly (only _raw_request, the cookie methods, and close do), so those
features work unchanged on any backend.
"""
return aiohttp.ClientSession(
headers=headers,
timeout=aiohttp.ClientTimeout(
total=timeout,
connect=timeout / 2,
sock_read=timeout / 2,
sock_connect=timeout / 2,
),
**kwargs,
)
# -------------------------------------------------------------------------
# header overwrites
def overwrite_header(self, header, value):
"""set a header overwrite applied to requests (per `inject` rules)"""
self.header_overwrites[header] = value
def _apply_overwrites(self, request_headers):
"""apply static overwrites and ephemeral headers to a request's headers"""
request_headers = dict(request_headers or {})
for header, value in self.header_overwrites.items():
if self.inject or header in request_headers:
request_headers[header] = value
for header, value_callable in self.ephemeral_headers.items():
if self.inject or header in request_headers:
value = value_callable()
if isinstance(value, dict):
request_headers.update(value)
else:
request_headers[header] = value
return request_headers
def set_ephemeral(self, header, value_callable):
"""register a header generated fresh (via callable) for each request"""
if not callable(value_callable):
raise ValueError("value_callable must be callable")
self.ephemeral_headers[header] = value_callable
def clear_ephemeral(self, header=None):
"""clear one or all ephemeral headers"""
if header is None:
self.ephemeral_headers.clear()
else:
self.ephemeral_headers.pop(header, None)
def overwrite_inject(self, value: bool):
"""enable/disable injecting overwrites into requests that lack the header"""
if not isinstance(value, bool):
raise ValueError("overwrite_inject expects a bool")
self.inject = value
def clear_overwrites(self):
"""clear header and domain overwrites"""
self.header_overwrites.clear()
self.domain_overwrites.clear()
def update_headers(self, new_headers):
"""merge new headers (with overwrites applied) into the session defaults"""
if not new_headers:
return
self._default_headers.update(self._apply_overwrites(new_headers))
def get_headers(self):
"""current session headers with overwrites applied"""
return self._apply_overwrites(dict(self._default_headers))
def clear_headers(self):
"""clear session-level default headers"""
self._default_headers.clear()
# -------------------------------------------------------------------------
# domain overwrites
def overwrite_domain(self, target, replacement):
"""register a host-substring rewrite (target -> replacement)"""
self.domain_overwrites[target] = replacement
def _apply_domain_overwrites(self, url: str) -> str:
"""apply any host-substring rewrites to a url"""
parsed = URL(url)
if not parsed.host:
return url
for target, replacement in self.domain_overwrites.items():
if target in parsed.host:
return str(parsed.with_host(parsed.host.replace(target, replacement)))
return url
# -------------------------------------------------------------------------
# proxies
def update_proxies(self, proxies):
"""merge proxies into the session set"""
if not isinstance(proxies, dict):
raise ValueError("proxies must be a dict keyed by scheme (http/https)")
self.proxies.update(proxies)
def clear_proxies(self):
"""clear all proxies"""
self.proxies.clear()
def _get_proxy(self, url, proxies=None):
"""resolve the proxy for a url's scheme"""
proxies = proxies or self.proxies
scheme = url.split("://")[0]
return proxies.get(scheme)
# -------------------------------------------------------------------------
# cookies
def get_cookies(self):
"""cookies stored in the session jar"""
return self.session.cookie_jar.filter_cookies()
def set_cookie(self, name, value, domain=None, path="/"):
"""set a cookie in the session jar"""
response_url = URL(domain or "http://localhost")
self.session.cookie_jar.update_cookies({name: value}, response_url=response_url)
def clear_cookies(self):
"""clear the session cookie jar"""
self.session.cookie_jar.clear()
def _cookies_for_url(self, url):
"""dict of cookies the backend would send for url — override per backend
used by preview(). non-aiohttp backends override this (or return {}); the
rest of preview (domain rewrites, header overwrites) is backend-agnostic.
"""
return {k: v.value for k, v in self.session.cookie_jar.filter_cookies(URL(url)).items()}
# -------------------------------------------------------------------------
# preview
def preview(self, method, url, **kwargs):
"""build a RequestPreview for a request without sending it"""
proxy = self._get_proxy(url, kwargs.pop("proxies", None))
if kwargs.get("headers"):
headers = self._apply_overwrites(kwargs.pop("headers"))
else:
headers = dict(self.get_headers())
timeout = kwargs.get("timeout")
timeout_total = timeout if isinstance(timeout, (int, float)) else None
cookies = kwargs.pop("cookies", None)
if cookies is None:
cookies = self._cookies_for_url(url)
if cookies:
cookie_header = "; ".join(f"{k}={v}" for k, v in cookies.items())
headers["Cookie"] = (
f"{headers['Cookie']}; {cookie_header}" if "Cookie" in headers else cookie_header
)
return RequestPreview(
method=method,
url=self._apply_domain_overwrites(url),
headers=headers,
proxy=proxy,
data=kwargs.get("data"),
json=kwargs.get("json"),
params=kwargs.get("params"),
timeout=timeout_total,
)
# -------------------------------------------------------------------------
# request
async def _raw_request(self, method, url, **kwargs) -> Response:
"""send one request with aiohttp and adapt it into a Response
this is the backend seam: a subclass can override only this method (using a
different HTTP client, e.g. a TLS-fingerprinting one), building a Response
from that backend's primitives. everything else (overwrites, retries,
preview) is inherited.
"""
response = await self.session.request(method, url, **kwargs)
async with response:
content = await response.read()
return Response(
status_code=response.status,
headers=response.headers,
content=content,
url=str(response.url),
reason=response.reason,
history=[(r.status, str(r.url)) for r in response.history],
cookies=response.cookies,
encoding=response.get_encoding(),
)
async def request(self, method, url, **kwargs) -> Response:
"""make a request, applying overwrites/domain rewrites/proxy resolution"""
kwargs["proxy"] = self._get_proxy(url, kwargs.pop("proxies", None))
debug = kwargs.pop("debug", False)
kwargs["headers"] = self._apply_overwrites(kwargs.get("headers"))
kwargs["headers"] = {str(k): str(v) for k, v in kwargs["headers"].items()}
timeout = kwargs.get("timeout")
if isinstance(timeout, (int, float)):
kwargs["timeout"] = aiohttp.ClientTimeout(total=timeout)
url = self._apply_domain_overwrites(url)
if debug:
log.info("sending request to: %s", url)
try:
result = await self._raw_request(method, url, **kwargs)
if debug and result.redirect_chain:
log.info("redirect chain: %s", result.redirect_chain)
return result
except aiohttp.ClientError as error:
raise aiohttp.ClientError(f"client error for {url}: {error}") from error
async def request_with_retries(
self, method, url, *, data=None, proxies=None, timeout=None, attempts=None,
headers=None, params=None, debug=False, retry_statuses=RETRY_STATUSES,
backoff_base=DEFAULT_BACKOFF_BASE,
):
"""request with retries on exceptions AND retryable statuses (429/5xx)
returns a Response on success (or non-retryable status), or a falsy
FailureResponse if every attempt fails. backoff is exponential
(backoff_base ** attempt).
"""
attempts = attempts or DEFAULT_ATTEMPTS
last_error = None
if debug:
preview = self.preview(
method=method, url=url, params=params,
data=None if isinstance(data, dict) else data,
json=data if isinstance(data, dict) else None,
headers=headers, proxies=proxies, timeout=timeout,
).as_curl()
log.info("[aioweb.debug]\n%s\nproxies: %s inject: %s", preview, self.proxies, self.inject)
for attempt in range(attempts):
try:
response = await self.request(
method=method, url=url, params=params,
data=None if isinstance(data, dict) else data,
json=data if isinstance(data, dict) else None,
headers=headers, proxies=proxies, timeout=timeout, debug=debug,
)
if response.status_code in retry_statuses:
last_error = f"retryable status {response.status_code}"
log.warning("attempt %d: %s for %s", attempt + 1, last_error, url)
else:
return response
except aiohttp.ClientError as error:
last_error = f"client error: {error}"
log.warning("attempt %d: %s, retrying", attempt + 1, last_error)
except Exception as error:
last_error = f"unexpected error: {error}"
log.exception("attempt %d: %s, retrying", attempt + 1, last_error)
if attempt < attempts - 1:
await asyncio.sleep(backoff_base ** attempt)
log.error("all %d attempts failed for %s", attempts, url)
return FailureResponse(reason=last_error, url=url)
# -------------------------------------------------------------------------
# lifecycle
async def close(self):
"""close the backend session — override if the backend's close differs"""
await self.session.close()
def _is_closed(self) -> bool:
"""whether the backend session is closed — override for non-aiohttp backends"""
return self.session.closed
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
def __del__(self):
# do NOT attempt async cleanup here — spinning event loops in a finalizer
# is unsafe. just warn so the leak is visible; callers must close explicitly.
try:
closed = self._is_closed()
except Exception:
return
if not closed:
warnings.warn(
f"{type(self).__name__} was not closed; use 'async with' or await .close()",
ResourceWarning, stacklevel=2,
)