Compare commits

..

10 Commits
v0.1.0 ... main

Author SHA1 Message Date
eef4b25f07 chore: ignore .claude/ dir (CLAUDE.md now lives under .claude/)
Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-29 21:55:13 -04:00
a5e36544d4 docs: note url()/aiohttp() percent-encode proxy credentials
Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-29 21:41:36 -04:00
72c5342a6b fix: AP-1 percent-encode url() creds, AP-2 reject next(session=) collision
AP-1: url()/aiohttp() percent-encode user/password so reserved chars (/ # ? @) produce a
valid url, mirroring the parse-side unquote. AP-2: next(session=...) raises a clear
ValueError instead of an opaque str.format TypeError. net.current_ip uses
json(content_type=None) + dict guard.

Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-29 21:34:36 -04:00
932fb71c95 docs: pin install line to release, note unpinned-latest option
Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-29 18:13:33 -04:00
0abc071f14 docs: show unpinned install line; note tag-pinning for reproducibility
Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-29 18:07:17 -04:00
fc27d77000 fix: preserve URL-embedded proxy auth; clearer empty-list + malformed-template errors (v0.2.2)
- _proxy_from_dict server branch falls back to auth embedded in the server URL when no
  explicit username/password keys are given, so it isn't dropped and the proxy keys
  with its credentials instead of colliding auth-less (L5)
- AioProxies(proxies=[]) now raises a clear 'empty' error, not the misleading
  'provide exactly one of' (nit)
- a malformed template re-raises with a naming message instead of a bare str.format
  ValueError (nit).

Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-29 17:57:37 -04:00
260b92b66a docs: add v0.2.1 changelog entry (legible missing-template-field error)
the README changelog jumped from v0.2.0 to nothing; added the v0.2.1 entry documenting the legible ValueError for a template placeholder not supplied to next(**fields).

Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-29 01:39:21 -04:00
f618b6a6a1 style: drop inline comments in the template-field error path
the rationale they carried is already in the ValueError message; the lib convention is docstrings only.

Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-28 17:18:28 -04:00
aa661bd6de fix: legible error on missing template placeholder (v0.2.1)
next() on a template source did template.format(...) directly, so a placeholder not
supplied in next(**fields) leaked a bare KeyError. catch KeyError/IndexError and
raise a clear ValueError naming the missing placeholder.

verified: next() without a required field -> ValueError naming it; happy path intact.
Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-28 15:50:33 -04:00
ca708191c8 feat: proxy health & pool management (v0.2.0)
burn/timeout (dead -1 vs timed, lazy expiry), usage counters, reuse cooldown,
and live pool management (replace/add/remove) for the rotating list source.
template/static sources treat these as no-ops that log a warning.

- canonical key (host:port:user:pass, or host:port auth-less) identifies a proxy
  across every input shape (spec/Proxy/url/aiohttp/camoufox/socks5 dict); host is
  lowercased (DNS-caseless), password included, 4-part split on first 3 colons so
  colon passwords survive
- ProxiesExhaustedError when the whole pool is permanently dead
- cooldown soft (falls through to soonest-recovering, never raises); default 0 = off
- soonest-recovering fallthrough logs warning on a genuine burn, debug on cooldown
- {session} now 8-char alphanumeric (was 10-digit numeric); session_len default 8
- backward-compatible: a v0.1.0 manager (no burns, cooldown=0) is byte-for-byte
  identical — sequential round-robin, next()->Proxy, get()->aiohttp dict, never raises

Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-25 14:23:20 -04:00
7 changed files with 521 additions and 32 deletions

2
.gitignore vendored
View File

@ -1,5 +1,5 @@
# claude # claude
CLAUDE.md .claude/
# python # python
__pycache__/ __pycache__/

132
README.md
View File

@ -1,33 +1,43 @@
# aioproxies # aioproxies
Proxy parsing, formatting, and source management. Renders proxies for Proxy parsing, formatting, health, and pool management. Renders proxies for
aiohttp/aioweb, camoufox, and socks5; manages session templates (with aiohttp/aioweb, camoufox, and socks5; manages session templates (with
caller-supplied fields like country/ttl), rotating lists, or a static proxy. caller-supplied fields like country/ttl), rotating lists, or a static proxy; and
**Credentials are always injected — never hardcoded.** (for rotating lists) tracks burn/timeout, usage, reuse cooldown, and live pool
edits. **Credentials are always injected — never hardcoded.**
## Install ## Install
``` ```
aioproxies @ git+ssh://git@git.rethinkstudios.io/rethink-public/aioproxies.git@v0.1.0 aioproxies @ git+ssh://git@git.rethinkstudios.io/rethink-public/aioproxies.git@v0.2.2
# network helpers (current_ip / reset) need the extra: # network helpers (current_ip / reset) need the extra:
aioproxies[net] @ git+ssh://git@git.rethinkstudios.io/rethink-public/aioproxies.git@v0.1.0 aioproxies[net] @ git+ssh://git@git.rethinkstudios.io/rethink-public/aioproxies.git@v0.2.2
``` ```
The core has no dependencies. The `net` extra adds `aiohttp` for `current_ip` / The core has no dependencies. The `net` extra adds `aiohttp` for `current_ip` /
`reset`. `reset`.
Drop the `@v0.2.2` suffix from the line above to install the latest unpinned.
## Formatting ## Formatting
```python ```python
from aioproxies import parse from aioproxies import parse
p = parse("1.2.3.4:8080:user:pass") # or "host:port" p = parse("1.2.3.4:8080:user:pass") # or "host:port" for IP-authenticated proxies
p.aiohttp() # {"http": "...", "https": "..."} -> aioweb ExtendedSession(proxies=) p.aiohttp() # {"http": "...", "https": "..."} -> aioweb ExtendedSession(proxies=)
p.camoufox() # {"server": "...", "username": ..., "password": ...} p.camoufox() # {"server": "...", "username": ..., "password": ...}
p.socks5() # {"server": "socks5://...", ...} p.socks5() # {"server": "socks5://...", ...}
p.url() # "http://user:pass@host:port" p.url() # "http://user:pass@host:port"
p.key() # "1.2.3.4:8080:user:pass" (canonical identity; "host:port" if auth-less)
``` ```
Auth-less (IP-authenticated) proxies are first-class: `"host:port"` parses and
every render shape omits the credentials. The 4-part form splits on the first three
colons, so a password may itself contain colons (`host:port:user:pa:ss:word`).
`url()` / `aiohttp()` percent-encode the credentials, so reserved characters
(`/ # ? @`) in a user or password still produce a valid URL.
## Sources ## Sources
Construct with exactly one source: Construct with exactly one source:
@ -86,6 +96,92 @@ The credentials are baked in once with `.format()`; the per-call fields and `{se
are double-braced (`{{country}}`) so they pass through that `.format()` untouched and are double-braced (`{{country}}`) so they pass through that `.format()` untouched and
remain for `next(**fields)` / the lib to fill. remain for `next(**fields)` / the lib to fill.
## Proxy health & pool management (rotating list source)
For `proxies=` / `from_file` sources, the manager tracks each proxy's health and
usage and lets you edit the pool live. (On `template=` / `static=` these methods are
**no-ops that log a warning** and return cleanly — generic caller code can call them
regardless of source.)
```python
from aioproxies import AioProxies, ProxiesExhaustedError
pm = AioProxies(proxies=[...], cooldown=5) # 5s reuse spacing; cooldown defaults to 0 (off)
try:
proxy = pm.get() # next usable proxy, aiohttp dict
resp = await session.get(url, proxies=proxy)
if response_looks_blocked(resp):
pm.burn(proxy, 600) # time out 10 min ... or pm.burn(proxy) for dead
except ProxiesExhaustedError:
... # whole pool permanently dead — back off / refetch
pm.replace(fresh_batch) # swap in a new provider batch
pm.stats() # monitor uses + timeout state
```
### Selection
Rotation is **sequential round-robin over usable proxies**:
1. proxies that are fine (never burned, or a timed burn already expired) cycle in
order — same as v0.1.0.
2. if none are fine but some are merely timed, the manager **warns** and hands out
the one recovering soonest (still counts a use).
3. if every proxy is permanently dead (`-1`), `next()`/`get()` raise
`ProxiesExhaustedError`.
`next()` still returns a `Proxy`; `get()` still returns an aiohttp dict.
### Burn / restore
```python
pm.burn(proxy) # dead/permanent (-1) — only manual restore() brings it back
pm.burn(proxy, 600) # timed — usable again automatically after 600s (lazy, no timers)
pm.restore(proxy) # clear any burn/timeout, back to fine
pm.is_burned(proxy) # current state (expired timed burns read False)
```
`burn`/`restore`/`is_burned`/`remove` accept **any proxy shape** — a spec string, a
`Proxy`, an aiohttp/camoufox/socks5 dict, or a url — all resolve to the same canonical
key (`host:port:user:pass`, or `host:port` auth-less). The password is part of the key,
so two proxies differing only by password are distinct slots. `burn` on a proxy not in
the pool raises `ValueError`.
### Cooldown
`AioProxies(proxies=[...], cooldown=5)` spaces reuse: each handout times the proxy out
for `cooldown` seconds so it isn't reused if avoidable. It is **soft** — under load
(everything cooling) it falls through to the soonest-to-recover and never raises on
cooldown alone. Default `0` = off (exact v0.1.0 behavior).
### Stats
```python
pm.stats() # [{"proxy": "h:p:u:pw", "uses": int, "state": "active"|"timed"|"dead",
# "timeout": <ts | -1 | None>}, ...]
pm.reset_stats() # zero all use counters; leave timeouts untouched
```
`uses` is a pure counter (every handout, including forced ones); it never drives
selection and survives burns — a proxy can read "used 500× and dead". The `proxy`
field is the full canonical spec (passwords included).
### Live pool edits
```python
pm.replace(new_batch) # swap the whole list; wipes per-proxy state
pm.replace(new_batch, keep_state=True) # survivors keep uses/timeout; new ones start clean
pm.add("h:p:u:pw") # append (single or list); skip exact-duplicate keys
pm.remove(proxy) # drop a slot entirely (any shape) — distinct from burn
```
`replace` resets the rotation index and honors the manager's `shuffle` setting on the
incoming list. `remove` differs from `burn`: burn = unusable but still tracked; remove =
gone from the pool. Like the burn family, `add`/`replace` accept **any proxy shape**
(spec/`Proxy`/url/aiohttp dict/camoufox/socks5 dict). `canonical_key(shape)` and
`to_proxy(shape)` are exported if you need the key or a normalized `Proxy` yourself.
## Network helpers (optional) ## Network helpers (optional)
```python ```python
@ -103,3 +199,27 @@ await reset("https://provider/reset-url") # rotate upstream ip
- A missing proxy file raises, it does not exit the process. - A missing proxy file raises, it does not exit the process.
- Country/ASN tables, provider accounts, and reset URLs are project config — - Country/ASN tables, provider accounts, and reset URLs are project config —
inject them; do not hardcode credentials in shared code. inject them; do not hardcode credentials in shared code.
- aioweb integration is the manual loop shown above (get → use → burn on block).
A provider-protocol auto-rotation is a possible later enhancement, not in this lib.
## Changelog
### v0.2.1
- **Legible missing-template-field error:** a `template=` placeholder not supplied to
`next(**fields)` now raises a clear `ValueError` naming the field, instead of leaking
a bare `KeyError` from `str.format`.
### v0.2.0
- **Proxy health for rotating lists:** `burn`/`restore`/`is_burned` (dead `-1` vs
timed), `stats`/`reset_stats`, and the new `ProxiesExhaustedError` (all-dead pool).
- **Cooldown:** new `cooldown=` constructor arg spaces reuse; default `0` = off.
- **Live pool edits:** `replace` (with `keep_state=`), `add`, `remove`, keyed by a
canonical proxy key that accepts every input shape (incl. auth-less / IP-auth).
- **`{session}` default is now 8-char alphanumeric** (was 10-digit numeric);
`session_len` default is `8`. Templates that set `session_len` explicitly are
unaffected by the length change; the charset is now alphanumeric regardless.
- **Backward-compatible:** a v0.1.0-style manager (no burns, `cooldown=0`) behaves
byte-for-byte identically — sequential round-robin, `next()`→`Proxy`,
`get()`→aiohttp dict, never raises.

View File

@ -4,8 +4,8 @@ build-backend = "hatchling.build"
[project] [project]
name = "aioproxies" name = "aioproxies"
version = "0.1.0" version = "0.2.2"
description = "proxy parsing, formatting, and source management for aiohttp/aioweb, camoufox, and socks5" description = "proxy parsing, formatting, health, and pool management for aiohttp/aioweb, camoufox, and socks5"
requires-python = ">=3.10" requires-python = ">=3.10"
dependencies = [] dependencies = []

View File

@ -4,9 +4,18 @@ renders proxies for aiohttp/aioweb, camoufox, and socks5; manages session
templates (with caller-supplied fields like country/ttl), rotating lists, or a templates (with caller-supplied fields like country/ttl), rotating lists, or a
static proxy. credentials are always injected, never hardcoded. static proxy. credentials are always injected, never hardcoded.
""" """
from .manager import AioProxies, ProxyManager, aioproxies from .manager import AioProxies, ProxiesExhaustedError, ProxyManager, aioproxies
from .proxy import Proxy, parse from .proxy import Proxy, canonical_key, parse, to_proxy
__all__ = ["AioProxies", "ProxyManager", "aioproxies", "Proxy", "parse"] __all__ = [
"AioProxies",
"ProxyManager",
"aioproxies",
"ProxiesExhaustedError",
"Proxy",
"parse",
"canonical_key",
"to_proxy",
]
__version__ = "0.1.0" __version__ = "0.2.2"

View File

@ -11,16 +11,39 @@ sources:
also accepted and treated as the session slot (back-compat with simple templates). also accepted and treated as the session slot (back-compat with simple templates).
- proxies: a list of specs cycled round-robin - proxies: a list of specs cycled round-robin
- static: one fixed proxy - static: one fixed proxy
v0.2.0 adds proxy health to the rotating list source only burn/timeout, usage
counters, reuse cooldown, and pool management (replace/add/remove). these are
keyed by each proxy's canonical key (`host:port:user:pass`, or `host:port` for
auth-less / IP-authenticated proxies). on template/static sources they are no-ops
that log a warning and return cleanly, so generic caller code can call them
regardless of source.
per-proxy state (keyed by canonical key):
- `uses`: pure counter, incremented on every handout. never drives selection;
survives burns (a proxy can read "used 500x and dead").
- `timeout`: availability state. `None`/`0` = fine; `-1` = dead/permanent (manual
restore only); a future unix ts = timed out until then (lazy, checked against
`time.time()`, no timers). cooldown and timed burns share this field. durations
(seconds) only ever exist as arguments converted to `now + seconds` and
discarded; the lib never stores a raw duration.
""" """
import logging import logging
import random import random
import string import string
import time
from typing import Dict, List, Optional, Union from typing import Dict, List, Optional, Union
from .proxy import Proxy, parse from .proxy import Proxy, canonical_key, parse, to_proxy
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
_DEAD = -1
class ProxiesExhaustedError(Exception):
"""raised by next() when every proxy in the pool is permanently dead (-1)"""
class AioProxies: class AioProxies:
"""hands out proxies from a template, a rotating list, or a static value""" """hands out proxies from a template, a rotating list, or a static value"""
@ -31,19 +54,26 @@ class AioProxies:
template: Optional[str] = None, template: Optional[str] = None,
proxies: Optional[List[Union[str, Proxy]]] = None, proxies: Optional[List[Union[str, Proxy]]] = None,
static: Optional[Union[str, Proxy]] = None, static: Optional[Union[str, Proxy]] = None,
session_len: int = 10, session_len: int = 8,
shuffle: bool = True, shuffle: bool = True,
cooldown: int = 0,
): ):
if proxies is not None and len(proxies) == 0:
raise ValueError("proxies list is empty; provide at least one proxy")
sources = [s for s in (template, proxies, static) if s] sources = [s for s in (template, proxies, static) if s]
if len(sources) != 1: if len(sources) != 1:
raise ValueError("provide exactly one of: template, proxies, static") raise ValueError("provide exactly one of: template, proxies, static")
# normalize a bare {} session slot to the named {session} form
self.template = template.replace("{}", "{session}") if template else None self.template = template.replace("{}", "{session}") if template else None
self.session_len = session_len self.session_len = session_len
self.cooldown = cooldown
self._shuffle = shuffle
self._static = parse(static) if static else None self._static = parse(static) if static else None
self._proxies = [parse(p) for p in proxies] if proxies else [] self._proxies = [parse(p) for p in proxies] if proxies else []
if self._proxies and shuffle: if self._proxies and shuffle:
random.shuffle(self._proxies) random.shuffle(self._proxies)
self._state: Dict[str, Dict[str, object]] = {}
for proxy in self._proxies:
self._state.setdefault(proxy.key(), {"uses": 0, "timeout": None})
self._index = 0 self._index = 0
@classmethod @classmethod
@ -59,8 +89,25 @@ class AioProxies:
return cls(proxies=lines, **kwargs) return cls(proxies=lines, **kwargs)
def session_id(self) -> str: def session_id(self) -> str:
"""generate a fresh numeric session id for template filling""" """generate a fresh alphanumeric session id for template filling"""
return "".join(random.choices(string.digits, k=self.session_len)) alphabet = string.ascii_letters + string.digits
return "".join(random.choices(alphabet, k=self.session_len))
def _is_list_source(self) -> bool:
"""whether this manager rotates a proxy list (vs template/static)"""
return self._static is None and self.template is None
def _warn_non_list(self, method: str) -> None:
"""log that a health/pool method only applies to list sources"""
log.warning("%s applies only to list sources; no-op on template/static", method)
def _available(self, timeout: object, now: float) -> bool:
"""whether a timeout value means the proxy is selectable right now"""
if timeout is None or timeout == 0:
return True
if timeout == _DEAD:
return False
return now >= timeout
def next(self, **fields: object) -> Proxy: def next(self, **fields: object) -> Proxy:
"""return the next proxy from the configured source """return the next proxy from the configured source
@ -68,19 +115,240 @@ class AioProxies:
for template sources, `{session}` is always filled with a fresh id and any for template sources, `{session}` is always filled with a fresh id and any
other named placeholder is filled from `fields` (e.g. next(country="ca", other named placeholder is filled from `fields` (e.g. next(country="ca",
ttl=30)). fields are ignored by list/static sources. ttl=30)). fields are ignored by list/static sources.
for list sources, skips proxies whose timeout is active (-1 dead, or a
future ts not yet passed), increments `uses` on handout, and applies the
manager's cooldown. if none are fine but some are merely timed, hands out
the one recovering soonest (with a warning); raises ProxiesExhaustedError
if every proxy is permanently dead.
""" """
if self._static is not None: if self._static is not None:
return self._static return self._static
if self.template is not None: if self.template is not None:
return parse(self.template.format(session=self.session_id(), **fields)) if "session" in fields:
proxy = self._proxies[self._index] # `session` is auto-filled with a fresh id; a caller-supplied one would
self._index = (self._index + 1) % len(self._proxies) # collide in str.format with an opaque TypeError — reject it clearly
raise ValueError("'session' is filled automatically; do not pass it to next()")
try:
filled = self.template.format(session=self.session_id(), **fields)
except (KeyError, IndexError) as exc:
raise ValueError(
f"template placeholder {exc} not provided; pass it to next(**fields)"
) from exc
except ValueError as exc:
# a malformed template (e.g. an unmatched '{') makes str.format raise a
# bare ValueError; re-raise naming the cause so it isn't cryptic
raise ValueError(f"malformed proxy template {self.template!r}: {exc}") from exc
return parse(filled)
return self._next_from_list()
def _next_from_list(self) -> Proxy:
"""rotation + health selection over the proxy list"""
if not self._proxies:
raise ProxiesExhaustedError("no proxies in pool")
now = time.time()
count = len(self._proxies)
fine: List[int] = []
for offset in range(count):
idx = (self._index + offset) % count
timeout = self._state[self._proxies[idx].key()]["timeout"]
if self._available(timeout, now):
fine.append(idx)
if fine:
chosen = fine[0]
else:
chosen = self._soonest_recovering()
if chosen is None:
raise ProxiesExhaustedError("all proxies are permanently dead (-1)")
if self._has_burned():
log.warning("no proxies available; handing out the one recovering soonest")
else:
log.debug("all proxies cooling down; handing out the one recovering soonest")
proxy = self._proxies[chosen]
self._index = (chosen + 1) % count
state = self._state[proxy.key()]
state["uses"] = int(state["uses"]) + 1
if self.cooldown > 0:
state["timeout"] = now + self.cooldown
return proxy return proxy
def _has_burned(self) -> bool:
"""whether the empty 'fine' tier reflects a genuine burn, not just cooldown
a dead (-1) proxy is always a real burn. a future-ts timeout is a real timed
burn only when cooldown is off; with cooldown on, future-ts entries are the
manager's own resting and not a pool-health signal. used to pick warning
(real trouble) vs debug (normal cooldown) on the soonest-recovering path.
"""
for state in self._state.values():
timeout = state["timeout"]
if timeout == _DEAD:
return True
if self.cooldown == 0 and timeout not in (None, 0):
return True
return False
def _soonest_recovering(self) -> Optional[int]:
"""index of the timed (non-dead) proxy recovering soonest, or None"""
best_idx: Optional[int] = None
best_ts: Optional[float] = None
for idx, proxy in enumerate(self._proxies):
timeout = self._state[proxy.key()]["timeout"]
if timeout is None or timeout == 0 or timeout == _DEAD:
continue
ts = float(timeout) # type: ignore[arg-type]
if best_ts is None or ts < best_ts:
best_ts = ts
best_idx = idx
return best_idx
def get(self, **fields: object) -> Dict[str, str]: def get(self, **fields: object) -> Dict[str, str]:
"""convenience: next proxy as an aiohttp / aioweb proxies dict""" """convenience: next proxy as an aiohttp / aioweb proxies dict"""
return self.next(**fields).aiohttp() return self.next(**fields).aiohttp()
def burn(self, proxy: Union[str, Proxy, Dict[str, str]], seconds: Optional[int] = None) -> None:
"""mark a proxy unusable: dead (-1) by default, or timed for `seconds`
accepts any supported proxy shape. raises ValueError (naming the key) if the
proxy is not in the pool. no-op + warning on template/static sources.
"""
if not self._is_list_source():
self._warn_non_list("burn()")
return
key = canonical_key(proxy)
if key not in self._state:
raise ValueError(f"proxy not in pool: {key}")
if seconds is None:
self._state[key]["timeout"] = _DEAD
else:
self._state[key]["timeout"] = time.time() + seconds
def restore(self, proxy: Union[str, Proxy, Dict[str, str]]) -> None:
"""clear any burn/timeout on a proxy (back to fine). no-op if already fine"""
if not self._is_list_source():
self._warn_non_list("restore()")
return
key = canonical_key(proxy)
if key in self._state:
self._state[key]["timeout"] = None
def is_burned(self, proxy: Union[str, Proxy, Dict[str, str]]) -> bool:
"""whether a proxy is currently unavailable (lazy expiry of timed burns)
-1 is always burned; an expired timed burn reads False. unknown proxies and
non-list sources read False.
"""
if not self._is_list_source():
return False
key = canonical_key(proxy)
if key not in self._state:
return False
timeout = self._state[key]["timeout"]
return not self._available(timeout, time.time())
def stats(self) -> List[Dict[str, object]]:
"""per-proxy usage + state snapshot (list sources only, else empty)
each entry: {"proxy": <spec>, "uses": int, "state": active|timed|dead,
"timeout": <ts | -1 | None>}. the spec is the full canonical key.
"""
if not self._is_list_source():
self._warn_non_list("stats()")
return []
now = time.time()
out: List[Dict[str, object]] = []
for proxy in self._proxies:
state = self._state[proxy.key()]
timeout = state["timeout"]
if timeout == _DEAD:
label = "dead"
elif self._available(timeout, now):
label = "active"
else:
label = "timed"
out.append({
"proxy": proxy.key(),
"uses": int(state["uses"]),
"state": label,
"timeout": timeout,
})
return out
def reset_stats(self) -> None:
"""zero every proxy's use counter; leave timeouts untouched"""
if not self._is_list_source():
self._warn_non_list("reset_stats()")
return
for state in self._state.values():
state["uses"] = 0
def replace(self, proxies: List[Union[str, Proxy]], *, keep_state: bool = False) -> None:
"""swap the entire proxy list
keep_state=False (default) wipes all per-proxy state (fresh batch).
keep_state=True preserves uses/timeout for proxies whose canonical key
survives the swap; new proxies start clean, dropped ones are forgotten.
resets the rotation index and honors the manager's shuffle setting.
"""
if not self._is_list_source():
self._warn_non_list("replace()")
return
old_state = self._state
incoming = [to_proxy(p) for p in proxies]
if self._shuffle:
random.shuffle(incoming)
new_proxies: List[Proxy] = []
new_state: Dict[str, Dict[str, object]] = {}
for proxy in incoming:
key = proxy.key()
if key in new_state:
continue
new_proxies.append(proxy)
if keep_state and key in old_state:
new_state[key] = old_state[key]
else:
new_state[key] = {"uses": 0, "timeout": None}
self._proxies = new_proxies
self._state = new_state
self._index = 0
def add(self, proxies: Union[str, Proxy, List[Union[str, Proxy]]]) -> None:
"""append proxies to the pool, keeping existing state; skip duplicate keys"""
if not self._is_list_source():
self._warn_non_list("add()")
return
items = proxies if isinstance(proxies, list) else [proxies]
for item in items:
proxy = to_proxy(item)
key = proxy.key()
if key in self._state:
continue
self._proxies.append(proxy)
self._state[key] = {"uses": 0, "timeout": None}
def remove(self, proxy: Union[str, Proxy, Dict[str, str]]) -> None:
"""drop a proxy from the pool entirely (by canonical key, any shape)
distinct from burn (burn = unusable but tracked; remove = gone). clamps the
rotation index if needed. no-op + warning if the proxy is not present.
"""
if not self._is_list_source():
self._warn_non_list("remove()")
return
key = canonical_key(proxy)
if key not in self._state:
log.warning("remove(): proxy not in pool: %s", key)
return
self._proxies = [p for p in self._proxies if p.key() != key]
del self._state[key]
if self._proxies:
self._index %= len(self._proxies)
else:
self._index = 0
# name aliases — same class, call it whichever reads best at your call site # name aliases — same class, call it whichever reads best at your call site
ProxyManager = AioProxies ProxyManager = AioProxies

View File

@ -37,8 +37,10 @@ async def current_ip(
try: try:
async with aiohttp.ClientSession(timeout=t) as session: async with aiohttp.ClientSession(timeout=t) as session:
async with session.get(test_url, proxy=p.url()) as resp: async with session.get(test_url, proxy=p.url()) as resp:
data = await resp.json() # content_type=None: an echo endpoint may return text/plain json; the
return data.get("ip") # default would raise ContentTypeError and silently return None
data = await resp.json(content_type=None)
return data.get("ip") if isinstance(data, dict) else None
except Exception as exc: except Exception as exc:
log.warning("ip check failed: %s", exc) log.warning("ip check failed: %s", exc)
return None return None

View File

@ -3,10 +3,17 @@
a `Proxy` holds host/port/optional-auth and renders the shapes different clients a `Proxy` holds host/port/optional-auth and renders the shapes different clients
want: an aiohttp/aioweb proxies dict, a camoufox proxy dict, a socks5 dict, or a want: an aiohttp/aioweb proxies dict, a camoufox proxy dict, a socks5 dict, or a
plain url. `parse` accepts the common "host:port" and "host:port:user:pass" string plain url. `parse` accepts the common "host:port" and "host:port:user:pass" string
forms (the user field may itself contain commas, e.g. session-param proxies). forms (the user field may itself contain commas, e.g. session-param proxies; the
password field may itself contain colons). auth-less (IP-authenticated) proxies
with no creds are first-class every render shape handles them via `has_auth`.
`key()` produces a stable canonical identity (`host:port:user:pass`, or
`host:port` when auth-less) so burn/remove/stats recognize the same proxy from any
input shape. `canonical_key()` extends that to dict/url forms.
""" """
from dataclasses import dataclass from dataclasses import dataclass
from typing import Dict, Optional, Union from typing import Dict, Optional, Union
from urllib.parse import quote, unquote, urlsplit
SCHEME_HTTP = "http" SCHEME_HTTP = "http"
SCHEME_SOCKS5 = "socks5" SCHEME_SOCKS5 = "socks5"
@ -26,10 +33,30 @@ class Proxy:
"""whether credentials are present""" """whether credentials are present"""
return bool(self.user and self.password) return bool(self.user and self.password)
def url(self, scheme: str = SCHEME_HTTP) -> str: def key(self) -> str:
"""render as a url, embedding auth when present""" """stable canonical identity: host:port:user:pass, or host:port if auth-less
the host is lowercased (hostnames are case-insensitive per DNS, so
PROXY.example.com and proxy.example.com are the same host and collapse to one
key); port and credentials are kept verbatim. the password is included in
full (two proxies differing only by password are distinct slots). auth-less
proxies collapse to host:port with no trailing colons.
"""
host = self.host.lower()
if self.has_auth: if self.has_auth:
return f"{scheme}://{self.user}:{self.password}@{self.host}:{self.port}" return f"{host}:{self.port}:{self.user}:{self.password}"
return f"{host}:{self.port}"
def url(self, scheme: str = SCHEME_HTTP) -> str:
"""render as a url, embedding auth when present
credentials are percent-encoded so reserved chars (/ # ? @ :) in a user or
password produce a valid url; this mirrors the unquote() on the parse side.
"""
if self.has_auth:
user = quote(str(self.user), safe="")
password = quote(str(self.password), safe="")
return f"{scheme}://{user}:{password}@{self.host}:{self.port}"
return f"{scheme}://{self.host}:{self.port}" return f"{scheme}://{self.host}:{self.port}"
def aiohttp(self) -> Dict[str, str]: def aiohttp(self) -> Dict[str, str]:
@ -62,16 +89,79 @@ def parse(spec: Union[str, Proxy]) -> Proxy:
"""parse a proxy spec into a Proxy """parse a proxy spec into a Proxy
accepts an existing Proxy (returned as-is) or a colon-delimited string in accepts an existing Proxy (returned as-is) or a colon-delimited string in
`host:port` or `host:port:user:pass` form. raises ValueError on anything else `host:port` or `host:port:user:pass` form. the 4-part form splits on the first
rather than guessing. three colons only, so a password may itself contain colons. raises ValueError
on anything else rather than guessing.
""" """
if isinstance(spec, Proxy): if isinstance(spec, Proxy):
return spec return spec
parts = spec.split(":") if spec.count(":") >= 3:
if len(parts) == 4: host, port, user, password = spec.split(":", 3)
host, port, user, password = parts
return Proxy(host, port, user, password) return Proxy(host, port, user, password)
parts = spec.split(":")
if len(parts) == 2: if len(parts) == 2:
host, port = parts host, port = parts
return Proxy(host, port) return Proxy(host, port)
raise ValueError("expected 'host:port' or 'host:port:user:pass'") raise ValueError("expected 'host:port' or 'host:port:user:pass'")
def canonical_key(spec: Union[str, Proxy, Dict[str, str]]) -> str:
"""canonical identity key for any supported proxy shape
accepts a spec string, a Proxy, a url string (`http://user:pass@host:port`,
`socks5://...`), an aiohttp dict (`{"http": url, "https": url}`), or a
camoufox/socks5 dict (`{"server": ..., "username": ..., "password": ...}`).
all forms collapse to the same `host:port:user:pass` (or `host:port` auth-less)
key, so burn/remove/stats recognize one proxy regardless of how it was passed.
"""
return to_proxy(spec).key()
def to_proxy(spec: Union[str, Proxy, Dict[str, str]]) -> Proxy:
"""normalize any supported proxy shape into a Proxy
accepts a spec string, a Proxy, a url string, an aiohttp dict, or a
camoufox/socks5 dict the same shapes `canonical_key` accepts. used both for
keying and for adding proxies to a pool from any shape.
"""
if isinstance(spec, Proxy):
return spec
if isinstance(spec, dict):
return _proxy_from_dict(spec)
if isinstance(spec, str):
if "://" in spec:
return _proxy_from_url(spec)
return parse(spec)
raise ValueError(f"unsupported proxy shape: {type(spec).__name__}")
def _proxy_from_dict(spec: Dict[str, str]) -> Proxy:
"""normalize an aiohttp / camoufox / socks5 dict into a Proxy"""
if "server" in spec:
proxy = _proxy_from_url(spec["server"])
# prefer explicit dict auth; otherwise fall back to auth embedded in the server
# URL (http://user:pass@host:port) so it isn't silently dropped — which would
# key the proxy auth-less and collide with a genuinely auth-less one
user = spec.get("username") or proxy.user
password = spec.get("password") or proxy.password
if user and password:
return Proxy(proxy.host, proxy.port, user, password)
return Proxy(proxy.host, proxy.port)
for field in ("http", "https"):
if field in spec:
return _proxy_from_url(spec[field])
raise ValueError("dict proxy must have 'server' or 'http'/'https'")
def _proxy_from_url(url: str) -> Proxy:
"""normalize a proxy url (any scheme, optional auth) into a Proxy"""
parts = urlsplit(url)
host = parts.hostname
port = str(parts.port) if parts.port is not None else ""
if host is None:
raise ValueError(f"could not parse proxy url: {url}")
if parts.username:
user = unquote(parts.username)
password = unquote(parts.password) if parts.password is not None else ""
return Proxy(host, port, user, password)
return Proxy(host, port)