From 5f7ed74306c53b9e13b92fa03d39af1e23f96785 Mon Sep 17 00:00:00 2001 From: disqualifier Date: Wed, 24 Jun 2026 20:10:03 -0400 Subject: [PATCH] add package: pyproject + src async IMAP one-time-code retrieval consolidated from 5 forks. three injected layers: auth (PasswordAuth/OAuth2Auth XOAUTH2), client (IMAPClient connect/retry/folders/search/fetch/mark_seen), retrieve (retrieve_otp newest-first with sender/subject/code matching via substring/regex/callable). config-free, emit-only logging. optional [oauth] extra adds aiohttp refresh-token providers (Microsoft/Google). fixed vs forks: dropped nonexistent uid_fetch/uid_store for aioimaplib's uid() dispatcher, xoauth2 token now bytes, guarded XOAUTH2 fallback, use_uid decoupled from use_ssl. src/ multi-module, hatchling. Signed-off-by: disqualifier --- pyproject.toml | 19 +++++ src/aiomail/__init__.py | 32 ++++++++ src/aiomail/auth.py | 94 +++++++++++++++++++++ src/aiomail/client.py | 178 ++++++++++++++++++++++++++++++++++++++++ src/aiomail/extract.py | 126 ++++++++++++++++++++++++++++ src/aiomail/oauth.py | 120 +++++++++++++++++++++++++++ src/aiomail/retrieve.py | 118 ++++++++++++++++++++++++++ 7 files changed, 687 insertions(+) create mode 100644 pyproject.toml create mode 100644 src/aiomail/__init__.py create mode 100644 src/aiomail/auth.py create mode 100644 src/aiomail/client.py create mode 100644 src/aiomail/extract.py create mode 100644 src/aiomail/oauth.py create mode 100644 src/aiomail/retrieve.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e47c598 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,19 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "aiomail" +version = "0.1.0" +description = "async IMAP one-time-code retrieval with password/OAuth2 auth and dynamic matching" +requires-python = ">=3.10" +dependencies = [ + "aioimaplib>=1.0", + "beautifulsoup4>=4.11", +] + +[project.optional-dependencies] +oauth = ["aiohttp>=3.9"] + +[tool.hatch.build.targets.wheel] +packages = ["src/aiomail"] diff --git a/src/aiomail/__init__.py b/src/aiomail/__init__.py new file mode 100644 index 0000000..1a66e99 --- /dev/null +++ b/src/aiomail/__init__.py @@ -0,0 +1,32 @@ +"""aiomail — async IMAP one-time-code retrieval. + +reads OTP / login codes out of IMAP mailboxes (accounts you own). supports plain +password and OAuth2 (XOAUTH2) auth, and dynamic sender/subject/code matching via +substrings, regexes, or callables. +""" +from .auth import Auth, OAuth2Auth, PasswordAuth +from .client import IMAPClient +from .extract import ( + DEFAULT_LENGTHS, + DEFAULT_PATTERNS, + MatchSpec, + as_predicate, + extract_code, +) +from .retrieve import DEFAULT_FOLDERS, retrieve_otp + +__all__ = [ + "Auth", + "PasswordAuth", + "OAuth2Auth", + "IMAPClient", + "extract_code", + "as_predicate", + "retrieve_otp", + "MatchSpec", + "DEFAULT_PATTERNS", + "DEFAULT_LENGTHS", + "DEFAULT_FOLDERS", +] + +__version__ = "0.1.0" diff --git a/src/aiomail/auth.py b/src/aiomail/auth.py new file mode 100644 index 0000000..4c725b8 --- /dev/null +++ b/src/aiomail/auth.py @@ -0,0 +1,94 @@ +"""authentication mechanisms for the IMAP client. + +an `Auth` is anything that can authenticate an already-connected aioimaplib +client. two ship: `PasswordAuth` (plain LOGIN) and `OAuth2Auth` (XOAUTH2 using an +access token, either passed directly or pulled from a token provider callable). +credentials are always injected by the caller, never hardcoded here. +""" +import base64 +import logging +from typing import Awaitable, Callable, Optional, Protocol, Union, runtime_checkable + +log = logging.getLogger(__name__) + +# a token provider is any (optionally async) callable returning a fresh access +# token string; see aiomail.oauth for ready-made Microsoft / Google providers +TokenProvider = Callable[[], Union[str, Awaitable[str]]] + + +@runtime_checkable +class Auth(Protocol): + """contract every auth mechanism satisfies""" + + async def authenticate(self, mail) -> None: + """authenticate the connected aioimaplib client, raising on failure""" + ... + + +class PasswordAuth: + """plain IMAP LOGIN with a username and password""" + + def __init__(self, user: str, password: str): + self.user = user + self.password = password + + async def authenticate(self, mail) -> None: + result, data = await mail.login(self.user, self.password) + if result != "OK": + raise RuntimeError(f"login failed: {result} {data}") + + +def _sasl_xoauth2(user: str, token: str) -> str: + """build the base64 XOAUTH2 SASL initial-response string""" + raw = f"user={user}\x01auth=Bearer {token}\x01\x01".encode() + return base64.b64encode(raw).decode() + + +class OAuth2Auth: + """XOAUTH2 auth using an access token or a token provider + + pass `token` for a static token, or `token_provider` (sync or async callable) + to fetch a fresh one at connect time — the provider path is what makes refresh + flows "easy": hand it a provider from aiomail.oauth and forget about it. + """ + + def __init__( + self, + user: str, + *, + token: Optional[str] = None, + token_provider: Optional[TokenProvider] = None, + ): + if not token and not token_provider: + raise ValueError("OAuth2Auth needs either token or token_provider") + self.user = user + self._token = token + self._token_provider = token_provider + + async def _resolve_token(self) -> str: + if self._token_provider is not None: + result = self._token_provider() + token = await result if hasattr(result, "__await__") else result + if not token: + raise RuntimeError("token provider returned an empty token") + return token + return self._token # type: ignore[return-value] + + async def authenticate(self, mail) -> None: + token = await self._resolve_token() + # aioimaplib exposes mail.xoauth2(user, token: bytes) — note the token must + # be bytes, not str. older/other clients that lack it but expose a generic + # authenticate() are driven via the SASL string from _sasl_xoauth2. + xoauth2 = getattr(mail, "xoauth2", None) + if xoauth2 is not None: + result, data = await xoauth2(self.user, token.encode()) + elif hasattr(mail, "authenticate"): + result, data = await mail.authenticate( + "XOAUTH2", lambda _: _sasl_xoauth2(self.user, token) + ) + else: + raise RuntimeError( + "IMAP client exposes no XOAUTH2 entrypoint (no .xoauth2 or .authenticate)" + ) + if result != "OK": + raise RuntimeError(f"xoauth2 auth failed: {result} {data}") diff --git a/src/aiomail/client.py b/src/aiomail/client.py new file mode 100644 index 0000000..0228ba2 --- /dev/null +++ b/src/aiomail/client.py @@ -0,0 +1,178 @@ +"""async IMAP client wrapping aioimaplib. + +a thin, provider-agnostic client: it owns the connection lifecycle (connect with +retries, reconnect-on-stale, close) and exposes the handful of operations the OTP +flow needs (folders, search, fetch, mark-seen). auth is injected, so the same +client serves password and OAuth accounts. +""" +import asyncio +import email +import email.message +import logging +from typing import List, Optional + +from aioimaplib import IMAP4, IMAP4_SSL + +from .auth import Auth + +log = logging.getLogger(__name__) + + +class IMAPClient: + """connection-managing IMAP client driven by an injected auth mechanism + + note: `use_uid` selects UID vs sequence-number addressing and is independent + of `use_ssl` — the two were conflated in an earlier draft (`use_uid = use_ssl`), + which is a bug; they are unrelated concerns. + """ + + def __init__( + self, + auth: Auth, + host: str, + port: int = 993, + *, + use_ssl: bool = True, + use_uid: bool = False, + timeout: int = 10, + max_retries: int = 5, + ): + self.auth = auth + self.host = host + self.port = port + self.use_ssl = use_ssl + self.use_uid = use_uid + self.timeout = timeout + self.max_retries = max_retries + self._mail = None + + async def __aenter__(self) -> "IMAPClient": + await self.ensure_connection() + return self + + async def __aexit__(self, *exc) -> None: + await self.close() + + async def connect(self) -> bool: + """open a connection and authenticate, retrying with linear backoff""" + await self.close() + for attempt in range(self.max_retries): + try: + if self.use_ssl: + self._mail = IMAP4_SSL(self.host, port=self.port, timeout=self.timeout) + else: + self._mail = IMAP4(self.host, port=self.port, timeout=self.timeout) + await self._mail.wait_hello_from_server() + await self.auth.authenticate(self._mail) + return True + except Exception as exc: + log.warning("connect attempt %d/%d failed: %s", attempt + 1, self.max_retries, exc) + self._mail = None + await asyncio.sleep(2 * (attempt + 1)) + return False + + async def close(self) -> None: + """log out and drop the connection, swallowing teardown errors""" + if self._mail is not None: + try: + await self._mail.logout() + except Exception as exc: + log.debug("logout error ignored: %s", exc) + self._mail = None + + async def ensure_connection(self) -> bool: + """return a live connection, reconnecting if the link is stale""" + if self._mail is None: + return await self.connect() + try: + await self._mail.noop() + return True + except Exception: + return await self.connect() + + def is_throttled(self) -> bool: + """best-effort detection of a provider throttling response""" + return bool( + self._mail is not None + and getattr(self._mail, "resp", None) + and "THROTTLED" in str(self._mail.resp) + ) + + async def get_folders(self) -> List[str]: + """list mailbox folder names""" + if not await self.ensure_connection(): + return [] + try: + _, folder_list = await self._mail.list('""', "*") + except Exception as exc: + log.debug("list folders failed: %s", exc) + return [] + folders: List[str] = [] + for folder in folder_list or []: + try: + folders.append(folder.decode().split(' "/" ')[-1].strip('"')) + except Exception: + continue + return folders + + async def select(self, folder: str) -> bool: + """select a folder, returning whether it succeeded""" + if not await self.ensure_connection(): + return False + try: + result, _ = await self._mail.select(f'"{folder}"') + return result == "OK" + except Exception as exc: + log.debug("select %s failed: %s", folder, exc) + return False + + async def search(self, query: str) -> List[int]: + """search the selected folder, returning ids newest-first""" + if not await self.ensure_connection(): + return [] + try: + search = self._mail.uid_search if self.use_uid else self._mail.search + result, data = await search(query) + except Exception as exc: + log.debug("search failed (%s): %s", query, exc) + return [] + if result != "OK" or not data or not data[0]: + return [] + ids = [int(x) for x in data[0].split()] + return sorted(set(ids), reverse=True) + + async def fetch(self, email_id: int, *, icloud: bool = False) -> Optional[email.message.Message]: + """fetch and parse a full message by id""" + if not await self.ensure_connection(): + return None + part = "(BODY[])" if icloud else "(RFC822)" + try: + if self.use_uid: + result, data = await self._mail.uid("fetch", str(email_id), part) + else: + result, data = await self._mail.fetch(str(email_id), part) + except Exception as exc: + log.debug("fetch %s failed: %s", email_id, exc) + return None + if result != "OK" or not data: + return None + for item in data: + if isinstance(item, (bytes, bytearray)) and len(item) > 20: + return email.message_from_bytes(bytes(item)) + if isinstance(item, tuple) and len(item) > 1: + return email.message_from_bytes(item[1]) + return None + + async def mark_seen(self, email_id: int) -> bool: + """flag a message as read without deleting it""" + if not await self.ensure_connection(): + return False + try: + if self.use_uid: + result, _ = await self._mail.uid("store", str(email_id), "+FLAGS", "(\\Seen)") + else: + result, _ = await self._mail.store(str(email_id), "+FLAGS", "(\\Seen)") + return result == "OK" + except Exception as exc: + log.debug("mark_seen %s failed: %s", email_id, exc) + return False diff --git a/src/aiomail/extract.py b/src/aiomail/extract.py new file mode 100644 index 0000000..56d5a35 --- /dev/null +++ b/src/aiomail/extract.py @@ -0,0 +1,126 @@ +"""code extraction and dynamic matching for email messages. + +this module is pure logic with no network IO, so it is the easiest part to +unit-test. `extract_code` pulls a one-time code out of a message; `as_predicate` +turns a string / compiled regex / callable into a uniform match function used to +filter senders and subjects. +""" +import email.message +import logging +import re +from typing import Callable, Iterable, Iterator, Optional, Pattern, Sequence, Union + +from bs4 import BeautifulSoup + +log = logging.getLogger(__name__) + +# spec accepted anywhere a "match this string" is wanted: a plain substring +# (case-insensitive), a precompiled regex, or a predicate callable +MatchSpec = Union[str, Pattern, Callable[[Optional[str]], bool], None] + +# patterns tried (in order) against the subject then each body part; the first +# capturing group is the code, or the whole match if there are no groups +DEFAULT_PATTERNS: Sequence[Union[str, Pattern]] = ( + r"(\d{4,8})\s+is\s+your", + r"your.{0,20}?code(?:\s+is)?[:\s]+(\d{4,8})", + r"code(?:\s+is)?[:\s]+(\d{4,8})", + r"\b(\d{4,8})\b", +) + +# default digit-run lengths to treat as a standalone code when no pattern hits +DEFAULT_LENGTHS: Sequence[int] = (4, 6, 8) + + +def _compile(patterns: Sequence[Union[str, Pattern]]) -> list[Pattern]: + """compile any raw-string patterns once, leaving precompiled ones as-is""" + out: list[Pattern] = [] + for p in patterns: + out.append(re.compile(p, re.IGNORECASE) if isinstance(p, str) else p) + return out + + +def _decode_part(part: email.message.Message) -> Optional[str]: + """decode a single message part to text, tolerating bad charsets""" + payload = part.get_payload(decode=True) + if not payload: + return None + charset = part.get_content_charset() or "utf-8" + try: + return payload.decode(charset, errors="replace") + except (LookupError, TypeError): + return payload.decode("utf-8", errors="replace") + + +def _iter_text(message: email.message.Message) -> Iterator[str]: + """yield plain-text views of every text/* part (html flattened to text)""" + for part in message.walk(): + ctype = part.get_content_type() + if ctype not in ("text/plain", "text/html"): + continue + text = _decode_part(part) + if not text: + continue + if ctype == "text/html": + try: + text = BeautifulSoup(text, "html.parser").get_text(separator=" ", strip=True) + except Exception as exc: # malformed html should not abort extraction + log.debug("html parse failed, using raw text: %s", exc) + yield text + + +def _scan(text: str, patterns: list[Pattern], lengths: set[int]) -> Optional[str]: + """run patterns then a standalone digit-run fallback over one block of text""" + for pat in patterns: + m = pat.search(text) + if m: + return m.group(1) if m.groups() else m.group(0) + for token in re.split(r"\s+", text): + digits = "".join(c for c in token if c.isdigit()) + if digits and len(digits) in lengths and digits.isdigit(): + return digits + return None + + +def extract_code( + message: email.message.Message, + *, + patterns: Sequence[Union[str, Pattern]] = DEFAULT_PATTERNS, + lengths: Iterable[int] = DEFAULT_LENGTHS, +) -> Optional[str]: + """extract a one-time code from a message, subject first then body parts + + `patterns` are regexes tried in order; the first capturing group wins, or the + whole match if a pattern has no groups. when no pattern matches a block, any + standalone digit run whose length is in `lengths` is returned. both knobs are + parameters so callers can tune per provider without forking this function. + """ + compiled = _compile(patterns) + length_set = set(lengths) + + subject = message.get("Subject", "") or "" + hit = _scan(subject, compiled, length_set) + if hit: + return hit + + for text in _iter_text(message): + hit = _scan(text, compiled, length_set) + if hit: + return hit + return None + + +def as_predicate(spec: MatchSpec) -> Callable[[Optional[str]], bool]: + """normalize a match spec into a predicate over an optional string + + accepts None (matches everything), a precompiled regex (search), a callable + (used directly), or a plain string (case-insensitive substring). this is what + gives sender/subject filtering its "string or regex, caller's choice" behavior. + """ + if spec is None: + return lambda value: True + if isinstance(spec, re.Pattern): + return lambda value: bool(spec.search(value or "")) + if callable(spec): + return spec + needle = str(spec).lower() + return lambda value: needle in (value or "").lower() diff --git a/src/aiomail/oauth.py b/src/aiomail/oauth.py new file mode 100644 index 0000000..3b7c18b --- /dev/null +++ b/src/aiomail/oauth.py @@ -0,0 +1,120 @@ +"""optional OAuth2 token providers (refresh-token -> access-token). + +these turn a refresh token into a fresh access token for `OAuth2Auth`. they need +aiohttp, kept as an optional extra so the core stays light; importing this module +without aiohttp raises a clear error only when a provider is instantiated. + +credentials (client_id, refresh_token) are always supplied by the caller. +""" +import logging +import time +from typing import Optional, Sequence + +log = logging.getLogger(__name__) + +try: + import aiohttp + + _HAVE_AIOHTTP = True +except ImportError: # pragma: no cover + aiohttp = None # type: ignore[assignment] + _HAVE_AIOHTTP = False + +_MISSING = "aiohttp is required for OAuth token providers; install aiomail[oauth]" + + +class _RefreshTokenProvider: + """base refresh-token grant client with a small failure circuit breaker""" + + endpoints: Sequence[str] = () + + def __init__( + self, + client_id: str, + refresh_token: str, + *, + endpoints: Optional[Sequence[str]] = None, + timeout: float = 30.0, + max_retries: int = 3, + breaker_threshold: int = 3, + breaker_cooldown: float = 300.0, + ): + if not _HAVE_AIOHTTP: + raise RuntimeError(_MISSING) + self.client_id = client_id + self.refresh_token = refresh_token + self.endpoints = tuple(endpoints) if endpoints else self.endpoints + self.timeout = timeout + self.max_retries = max_retries + self._breaker_threshold = breaker_threshold + self._breaker_cooldown = breaker_cooldown + self._failures = 0 + self._opened_at = 0.0 + + def _extra_data(self) -> dict: + """provider-specific token-request fields (override per provider)""" + return {} + + async def __call__(self) -> str: + if self._failures >= self._breaker_threshold: + if time.time() - self._opened_at < self._breaker_cooldown: + raise RuntimeError("token provider circuit breaker open") + self._failures = 0 # cooldown elapsed, allow a probe + + data = { + "client_id": self.client_id, + "grant_type": "refresh_token", + "refresh_token": self.refresh_token, + **self._extra_data(), + } + timeout = aiohttp.ClientTimeout(total=self.timeout) + + for attempt in range(self.max_retries): + for endpoint in self.endpoints: + try: + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post(endpoint, data=data) as resp: + if resp.status == 200: + token = (await resp.json()).get("access_token") + if token: + self._failures = 0 + return token + else: + body = await resp.text() + log.warning("token endpoint %s -> %s: %s", endpoint, resp.status, body) + except Exception as exc: + log.warning("token request to %s failed: %s", endpoint, exc) + if attempt < self.max_retries - 1: + import asyncio + + await asyncio.sleep(2 ** attempt) + + self._failures += 1 + self._opened_at = time.time() + raise RuntimeError("all token endpoints failed") + + +class MicrosoftTokenProvider(_RefreshTokenProvider): + """access tokens for Outlook / Office365 IMAP via refresh token""" + + endpoints = ( + "https://login.microsoftonline.com/common/oauth2/v2.0/token", + "https://login.live.com/oauth20_token.srf", + ) + + def _extra_data(self) -> dict: + # IMAP/SMTP scope for Outlook; adjust if your app registration differs + return {"scope": "https://outlook.office.com/IMAP.AccessAsUser.All offline_access"} + + +class GoogleTokenProvider(_RefreshTokenProvider): + """access tokens for Gmail IMAP via refresh token (needs client_secret)""" + + endpoints = ("https://oauth2.googleapis.com/token",) + + def __init__(self, client_id: str, refresh_token: str, client_secret: str, **kwargs): + super().__init__(client_id, refresh_token, **kwargs) + self.client_secret = client_secret + + def _extra_data(self) -> dict: + return {"client_secret": self.client_secret} diff --git a/src/aiomail/retrieve.py b/src/aiomail/retrieve.py new file mode 100644 index 0000000..58b943a --- /dev/null +++ b/src/aiomail/retrieve.py @@ -0,0 +1,118 @@ +"""orchestration: find the most recent valid OTP across folders. + +`retrieve_otp` ties the client and extractor together. sender/subject accept the +flexible match specs from `extract`, folders/age/patterns are all parameters with +sane defaults, and provider quirks live in the arguments rather than hardcoded +branches inside the function. +""" +import asyncio +import logging +import time +from email.utils import parsedate_to_datetime +from typing import Iterable, List, Optional, Pattern, Sequence, Union + +from .client import IMAPClient +from .extract import DEFAULT_LENGTHS, DEFAULT_PATTERNS, MatchSpec, as_predicate, extract_code + +log = logging.getLogger(__name__) + +# reasonable cross-provider folder set; override per account as needed +DEFAULT_FOLDERS: Sequence[str] = ("INBOX", "Junk", "Spam", "Archive", "All Mail") + + +def _server_query(sender: MatchSpec, subject: MatchSpec) -> str: + """build a narrowing IMAP query from plain-string specs only + + only plain strings translate to server-side FROM/SUBJECT filters; regex and + callable specs fall back to ALL and are filtered client-side, so dynamic + matching always works even when the server cannot express it. + """ + parts: List[str] = [] + if isinstance(sender, str): + parts.append(f'FROM "{sender}"') + if isinstance(subject, str): + parts.append(f'SUBJECT "{subject}"') + return f"({' '.join(parts)})" if parts else "ALL" + + +def _age_seconds(message) -> Optional[float]: + """age of a message in seconds from its Date header, or None if unparseable""" + raw = message.get("Date") + if not raw: + return None + try: + return time.time() - parsedate_to_datetime(raw).timestamp() + except (TypeError, ValueError) as exc: + log.debug("date parse failed (%s): %s", raw, exc) + return None + + +async def retrieve_otp( + client: IMAPClient, + *, + sender: MatchSpec = None, + subject: MatchSpec = None, + folders: Optional[Iterable[str]] = None, + patterns: Sequence[Union[str, Pattern]] = DEFAULT_PATTERNS, + lengths: Iterable[int] = DEFAULT_LENGTHS, + max_age: Optional[float] = 1800, + per_folder: int = 5, + retries: int = 3, + retry_delay: float = 5.0, + icloud: Optional[bool] = None, + mark_seen: bool = True, +) -> Optional[str]: + """return the newest OTP matching the filters, or None + + sender/subject accept a substring, a compiled regex, or a callable. folders, + patterns, code lengths, max age and retry behavior are all tunable. set + `max_age=None` to disable the freshness check. + """ + folders = list(folders) if folders is not None else list(DEFAULT_FOLDERS) + sender_ok = as_predicate(sender) + subject_ok = as_predicate(subject) + query = _server_query(sender, subject) + + for attempt in range(retries + 1): + for folder in folders: + if not await client.select(folder): + continue + ids = await client.search(query) + if not ids and query != "ALL": + ids = await client.search("ALL") + if not ids: + continue + + log.info("scanning %d message(s) in %s", min(len(ids), per_folder), folder) + for email_id in ids[:per_folder]: + message = await client.fetch(email_id, icloud=bool(icloud)) + if message is None: + continue + + from_hdr = message.get("From", "") + subj_hdr = message.get("Subject", "") + if not sender_ok(from_hdr) or not subject_ok(subj_hdr): + continue + + code = extract_code(message, patterns=patterns, lengths=lengths) + if not code: + continue + + if max_age is not None: + age = _age_seconds(message) + if age is not None and age > max_age: + log.info("code %s skipped, too old (%.0fs > %.0fs)", code, age, max_age) + continue + + if mark_seen: + await client.mark_seen(email_id) + log.info("found code %s in %s", code, folder) + return code + + if attempt < retries: + wait = retry_delay * (attempt + 1) + log.info("no code yet, retrying in %.0fs (%d/%d)", wait, attempt + 1, retries) + await asyncio.sleep(wait) + + log.info("no matching OTP found") + return None