add package: pyproject + src

async IMAP one-time-code retrieval consolidated from 5 forks. three
injected layers: auth (PasswordAuth/OAuth2Auth XOAUTH2), client
(IMAPClient connect/retry/folders/search/fetch/mark_seen), retrieve
(retrieve_otp newest-first with sender/subject/code matching via
substring/regex/callable). config-free, emit-only logging. optional
[oauth] extra adds aiohttp refresh-token providers (Microsoft/Google).

fixed vs forks: dropped nonexistent uid_fetch/uid_store for aioimaplib's
uid() dispatcher, xoauth2 token now bytes, guarded XOAUTH2 fallback,
use_uid decoupled from use_ssl. src/ multi-module, hatchling.

Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
disqualifier 2026-06-24 20:10:03 -04:00
parent c08d13c02a
commit 5f7ed74306
7 changed files with 687 additions and 0 deletions

19
pyproject.toml Normal file
View File

@ -0,0 +1,19 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "aiomail"
version = "0.1.0"
description = "async IMAP one-time-code retrieval with password/OAuth2 auth and dynamic matching"
requires-python = ">=3.10"
dependencies = [
"aioimaplib>=1.0",
"beautifulsoup4>=4.11",
]
[project.optional-dependencies]
oauth = ["aiohttp>=3.9"]
[tool.hatch.build.targets.wheel]
packages = ["src/aiomail"]

32
src/aiomail/__init__.py Normal file
View File

@ -0,0 +1,32 @@
"""aiomail — async IMAP one-time-code retrieval.
reads OTP / login codes out of IMAP mailboxes (accounts you own). supports plain
password and OAuth2 (XOAUTH2) auth, and dynamic sender/subject/code matching via
substrings, regexes, or callables.
"""
from .auth import Auth, OAuth2Auth, PasswordAuth
from .client import IMAPClient
from .extract import (
DEFAULT_LENGTHS,
DEFAULT_PATTERNS,
MatchSpec,
as_predicate,
extract_code,
)
from .retrieve import DEFAULT_FOLDERS, retrieve_otp
__all__ = [
"Auth",
"PasswordAuth",
"OAuth2Auth",
"IMAPClient",
"extract_code",
"as_predicate",
"retrieve_otp",
"MatchSpec",
"DEFAULT_PATTERNS",
"DEFAULT_LENGTHS",
"DEFAULT_FOLDERS",
]
__version__ = "0.1.0"

94
src/aiomail/auth.py Normal file
View File

@ -0,0 +1,94 @@
"""authentication mechanisms for the IMAP client.
an `Auth` is anything that can authenticate an already-connected aioimaplib
client. two ship: `PasswordAuth` (plain LOGIN) and `OAuth2Auth` (XOAUTH2 using an
access token, either passed directly or pulled from a token provider callable).
credentials are always injected by the caller, never hardcoded here.
"""
import base64
import logging
from typing import Awaitable, Callable, Optional, Protocol, Union, runtime_checkable
log = logging.getLogger(__name__)
# a token provider is any (optionally async) callable returning a fresh access
# token string; see aiomail.oauth for ready-made Microsoft / Google providers
TokenProvider = Callable[[], Union[str, Awaitable[str]]]
@runtime_checkable
class Auth(Protocol):
"""contract every auth mechanism satisfies"""
async def authenticate(self, mail) -> None:
"""authenticate the connected aioimaplib client, raising on failure"""
...
class PasswordAuth:
"""plain IMAP LOGIN with a username and password"""
def __init__(self, user: str, password: str):
self.user = user
self.password = password
async def authenticate(self, mail) -> None:
result, data = await mail.login(self.user, self.password)
if result != "OK":
raise RuntimeError(f"login failed: {result} {data}")
def _sasl_xoauth2(user: str, token: str) -> str:
"""build the base64 XOAUTH2 SASL initial-response string"""
raw = f"user={user}\x01auth=Bearer {token}\x01\x01".encode()
return base64.b64encode(raw).decode()
class OAuth2Auth:
"""XOAUTH2 auth using an access token or a token provider
pass `token` for a static token, or `token_provider` (sync or async callable)
to fetch a fresh one at connect time the provider path is what makes refresh
flows "easy": hand it a provider from aiomail.oauth and forget about it.
"""
def __init__(
self,
user: str,
*,
token: Optional[str] = None,
token_provider: Optional[TokenProvider] = None,
):
if not token and not token_provider:
raise ValueError("OAuth2Auth needs either token or token_provider")
self.user = user
self._token = token
self._token_provider = token_provider
async def _resolve_token(self) -> str:
if self._token_provider is not None:
result = self._token_provider()
token = await result if hasattr(result, "__await__") else result
if not token:
raise RuntimeError("token provider returned an empty token")
return token
return self._token # type: ignore[return-value]
async def authenticate(self, mail) -> None:
token = await self._resolve_token()
# aioimaplib exposes mail.xoauth2(user, token: bytes) — note the token must
# be bytes, not str. older/other clients that lack it but expose a generic
# authenticate() are driven via the SASL string from _sasl_xoauth2.
xoauth2 = getattr(mail, "xoauth2", None)
if xoauth2 is not None:
result, data = await xoauth2(self.user, token.encode())
elif hasattr(mail, "authenticate"):
result, data = await mail.authenticate(
"XOAUTH2", lambda _: _sasl_xoauth2(self.user, token)
)
else:
raise RuntimeError(
"IMAP client exposes no XOAUTH2 entrypoint (no .xoauth2 or .authenticate)"
)
if result != "OK":
raise RuntimeError(f"xoauth2 auth failed: {result} {data}")

178
src/aiomail/client.py Normal file
View File

@ -0,0 +1,178 @@
"""async IMAP client wrapping aioimaplib.
a thin, provider-agnostic client: it owns the connection lifecycle (connect with
retries, reconnect-on-stale, close) and exposes the handful of operations the OTP
flow needs (folders, search, fetch, mark-seen). auth is injected, so the same
client serves password and OAuth accounts.
"""
import asyncio
import email
import email.message
import logging
from typing import List, Optional
from aioimaplib import IMAP4, IMAP4_SSL
from .auth import Auth
log = logging.getLogger(__name__)
class IMAPClient:
"""connection-managing IMAP client driven by an injected auth mechanism
note: `use_uid` selects UID vs sequence-number addressing and is independent
of `use_ssl` the two were conflated in an earlier draft (`use_uid = use_ssl`),
which is a bug; they are unrelated concerns.
"""
def __init__(
self,
auth: Auth,
host: str,
port: int = 993,
*,
use_ssl: bool = True,
use_uid: bool = False,
timeout: int = 10,
max_retries: int = 5,
):
self.auth = auth
self.host = host
self.port = port
self.use_ssl = use_ssl
self.use_uid = use_uid
self.timeout = timeout
self.max_retries = max_retries
self._mail = None
async def __aenter__(self) -> "IMAPClient":
await self.ensure_connection()
return self
async def __aexit__(self, *exc) -> None:
await self.close()
async def connect(self) -> bool:
"""open a connection and authenticate, retrying with linear backoff"""
await self.close()
for attempt in range(self.max_retries):
try:
if self.use_ssl:
self._mail = IMAP4_SSL(self.host, port=self.port, timeout=self.timeout)
else:
self._mail = IMAP4(self.host, port=self.port, timeout=self.timeout)
await self._mail.wait_hello_from_server()
await self.auth.authenticate(self._mail)
return True
except Exception as exc:
log.warning("connect attempt %d/%d failed: %s", attempt + 1, self.max_retries, exc)
self._mail = None
await asyncio.sleep(2 * (attempt + 1))
return False
async def close(self) -> None:
"""log out and drop the connection, swallowing teardown errors"""
if self._mail is not None:
try:
await self._mail.logout()
except Exception as exc:
log.debug("logout error ignored: %s", exc)
self._mail = None
async def ensure_connection(self) -> bool:
"""return a live connection, reconnecting if the link is stale"""
if self._mail is None:
return await self.connect()
try:
await self._mail.noop()
return True
except Exception:
return await self.connect()
def is_throttled(self) -> bool:
"""best-effort detection of a provider throttling response"""
return bool(
self._mail is not None
and getattr(self._mail, "resp", None)
and "THROTTLED" in str(self._mail.resp)
)
async def get_folders(self) -> List[str]:
"""list mailbox folder names"""
if not await self.ensure_connection():
return []
try:
_, folder_list = await self._mail.list('""', "*")
except Exception as exc:
log.debug("list folders failed: %s", exc)
return []
folders: List[str] = []
for folder in folder_list or []:
try:
folders.append(folder.decode().split(' "/" ')[-1].strip('"'))
except Exception:
continue
return folders
async def select(self, folder: str) -> bool:
"""select a folder, returning whether it succeeded"""
if not await self.ensure_connection():
return False
try:
result, _ = await self._mail.select(f'"{folder}"')
return result == "OK"
except Exception as exc:
log.debug("select %s failed: %s", folder, exc)
return False
async def search(self, query: str) -> List[int]:
"""search the selected folder, returning ids newest-first"""
if not await self.ensure_connection():
return []
try:
search = self._mail.uid_search if self.use_uid else self._mail.search
result, data = await search(query)
except Exception as exc:
log.debug("search failed (%s): %s", query, exc)
return []
if result != "OK" or not data or not data[0]:
return []
ids = [int(x) for x in data[0].split()]
return sorted(set(ids), reverse=True)
async def fetch(self, email_id: int, *, icloud: bool = False) -> Optional[email.message.Message]:
"""fetch and parse a full message by id"""
if not await self.ensure_connection():
return None
part = "(BODY[])" if icloud else "(RFC822)"
try:
if self.use_uid:
result, data = await self._mail.uid("fetch", str(email_id), part)
else:
result, data = await self._mail.fetch(str(email_id), part)
except Exception as exc:
log.debug("fetch %s failed: %s", email_id, exc)
return None
if result != "OK" or not data:
return None
for item in data:
if isinstance(item, (bytes, bytearray)) and len(item) > 20:
return email.message_from_bytes(bytes(item))
if isinstance(item, tuple) and len(item) > 1:
return email.message_from_bytes(item[1])
return None
async def mark_seen(self, email_id: int) -> bool:
"""flag a message as read without deleting it"""
if not await self.ensure_connection():
return False
try:
if self.use_uid:
result, _ = await self._mail.uid("store", str(email_id), "+FLAGS", "(\\Seen)")
else:
result, _ = await self._mail.store(str(email_id), "+FLAGS", "(\\Seen)")
return result == "OK"
except Exception as exc:
log.debug("mark_seen %s failed: %s", email_id, exc)
return False

126
src/aiomail/extract.py Normal file
View File

@ -0,0 +1,126 @@
"""code extraction and dynamic matching for email messages.
this module is pure logic with no network IO, so it is the easiest part to
unit-test. `extract_code` pulls a one-time code out of a message; `as_predicate`
turns a string / compiled regex / callable into a uniform match function used to
filter senders and subjects.
"""
import email.message
import logging
import re
from typing import Callable, Iterable, Iterator, Optional, Pattern, Sequence, Union
from bs4 import BeautifulSoup
log = logging.getLogger(__name__)
# spec accepted anywhere a "match this string" is wanted: a plain substring
# (case-insensitive), a precompiled regex, or a predicate callable
MatchSpec = Union[str, Pattern, Callable[[Optional[str]], bool], None]
# patterns tried (in order) against the subject then each body part; the first
# capturing group is the code, or the whole match if there are no groups
DEFAULT_PATTERNS: Sequence[Union[str, Pattern]] = (
r"(\d{4,8})\s+is\s+your",
r"your.{0,20}?code(?:\s+is)?[:\s]+(\d{4,8})",
r"code(?:\s+is)?[:\s]+(\d{4,8})",
r"\b(\d{4,8})\b",
)
# default digit-run lengths to treat as a standalone code when no pattern hits
DEFAULT_LENGTHS: Sequence[int] = (4, 6, 8)
def _compile(patterns: Sequence[Union[str, Pattern]]) -> list[Pattern]:
"""compile any raw-string patterns once, leaving precompiled ones as-is"""
out: list[Pattern] = []
for p in patterns:
out.append(re.compile(p, re.IGNORECASE) if isinstance(p, str) else p)
return out
def _decode_part(part: email.message.Message) -> Optional[str]:
"""decode a single message part to text, tolerating bad charsets"""
payload = part.get_payload(decode=True)
if not payload:
return None
charset = part.get_content_charset() or "utf-8"
try:
return payload.decode(charset, errors="replace")
except (LookupError, TypeError):
return payload.decode("utf-8", errors="replace")
def _iter_text(message: email.message.Message) -> Iterator[str]:
"""yield plain-text views of every text/* part (html flattened to text)"""
for part in message.walk():
ctype = part.get_content_type()
if ctype not in ("text/plain", "text/html"):
continue
text = _decode_part(part)
if not text:
continue
if ctype == "text/html":
try:
text = BeautifulSoup(text, "html.parser").get_text(separator=" ", strip=True)
except Exception as exc: # malformed html should not abort extraction
log.debug("html parse failed, using raw text: %s", exc)
yield text
def _scan(text: str, patterns: list[Pattern], lengths: set[int]) -> Optional[str]:
"""run patterns then a standalone digit-run fallback over one block of text"""
for pat in patterns:
m = pat.search(text)
if m:
return m.group(1) if m.groups() else m.group(0)
for token in re.split(r"\s+", text):
digits = "".join(c for c in token if c.isdigit())
if digits and len(digits) in lengths and digits.isdigit():
return digits
return None
def extract_code(
message: email.message.Message,
*,
patterns: Sequence[Union[str, Pattern]] = DEFAULT_PATTERNS,
lengths: Iterable[int] = DEFAULT_LENGTHS,
) -> Optional[str]:
"""extract a one-time code from a message, subject first then body parts
`patterns` are regexes tried in order; the first capturing group wins, or the
whole match if a pattern has no groups. when no pattern matches a block, any
standalone digit run whose length is in `lengths` is returned. both knobs are
parameters so callers can tune per provider without forking this function.
"""
compiled = _compile(patterns)
length_set = set(lengths)
subject = message.get("Subject", "") or ""
hit = _scan(subject, compiled, length_set)
if hit:
return hit
for text in _iter_text(message):
hit = _scan(text, compiled, length_set)
if hit:
return hit
return None
def as_predicate(spec: MatchSpec) -> Callable[[Optional[str]], bool]:
"""normalize a match spec into a predicate over an optional string
accepts None (matches everything), a precompiled regex (search), a callable
(used directly), or a plain string (case-insensitive substring). this is what
gives sender/subject filtering its "string or regex, caller's choice" behavior.
"""
if spec is None:
return lambda value: True
if isinstance(spec, re.Pattern):
return lambda value: bool(spec.search(value or ""))
if callable(spec):
return spec
needle = str(spec).lower()
return lambda value: needle in (value or "").lower()

120
src/aiomail/oauth.py Normal file
View File

@ -0,0 +1,120 @@
"""optional OAuth2 token providers (refresh-token -> access-token).
these turn a refresh token into a fresh access token for `OAuth2Auth`. they need
aiohttp, kept as an optional extra so the core stays light; importing this module
without aiohttp raises a clear error only when a provider is instantiated.
credentials (client_id, refresh_token) are always supplied by the caller.
"""
import logging
import time
from typing import Optional, Sequence
log = logging.getLogger(__name__)
try:
import aiohttp
_HAVE_AIOHTTP = True
except ImportError: # pragma: no cover
aiohttp = None # type: ignore[assignment]
_HAVE_AIOHTTP = False
_MISSING = "aiohttp is required for OAuth token providers; install aiomail[oauth]"
class _RefreshTokenProvider:
"""base refresh-token grant client with a small failure circuit breaker"""
endpoints: Sequence[str] = ()
def __init__(
self,
client_id: str,
refresh_token: str,
*,
endpoints: Optional[Sequence[str]] = None,
timeout: float = 30.0,
max_retries: int = 3,
breaker_threshold: int = 3,
breaker_cooldown: float = 300.0,
):
if not _HAVE_AIOHTTP:
raise RuntimeError(_MISSING)
self.client_id = client_id
self.refresh_token = refresh_token
self.endpoints = tuple(endpoints) if endpoints else self.endpoints
self.timeout = timeout
self.max_retries = max_retries
self._breaker_threshold = breaker_threshold
self._breaker_cooldown = breaker_cooldown
self._failures = 0
self._opened_at = 0.0
def _extra_data(self) -> dict:
"""provider-specific token-request fields (override per provider)"""
return {}
async def __call__(self) -> str:
if self._failures >= self._breaker_threshold:
if time.time() - self._opened_at < self._breaker_cooldown:
raise RuntimeError("token provider circuit breaker open")
self._failures = 0 # cooldown elapsed, allow a probe
data = {
"client_id": self.client_id,
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
**self._extra_data(),
}
timeout = aiohttp.ClientTimeout(total=self.timeout)
for attempt in range(self.max_retries):
for endpoint in self.endpoints:
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(endpoint, data=data) as resp:
if resp.status == 200:
token = (await resp.json()).get("access_token")
if token:
self._failures = 0
return token
else:
body = await resp.text()
log.warning("token endpoint %s -> %s: %s", endpoint, resp.status, body)
except Exception as exc:
log.warning("token request to %s failed: %s", endpoint, exc)
if attempt < self.max_retries - 1:
import asyncio
await asyncio.sleep(2 ** attempt)
self._failures += 1
self._opened_at = time.time()
raise RuntimeError("all token endpoints failed")
class MicrosoftTokenProvider(_RefreshTokenProvider):
"""access tokens for Outlook / Office365 IMAP via refresh token"""
endpoints = (
"https://login.microsoftonline.com/common/oauth2/v2.0/token",
"https://login.live.com/oauth20_token.srf",
)
def _extra_data(self) -> dict:
# IMAP/SMTP scope for Outlook; adjust if your app registration differs
return {"scope": "https://outlook.office.com/IMAP.AccessAsUser.All offline_access"}
class GoogleTokenProvider(_RefreshTokenProvider):
"""access tokens for Gmail IMAP via refresh token (needs client_secret)"""
endpoints = ("https://oauth2.googleapis.com/token",)
def __init__(self, client_id: str, refresh_token: str, client_secret: str, **kwargs):
super().__init__(client_id, refresh_token, **kwargs)
self.client_secret = client_secret
def _extra_data(self) -> dict:
return {"client_secret": self.client_secret}

118
src/aiomail/retrieve.py Normal file
View File

@ -0,0 +1,118 @@
"""orchestration: find the most recent valid OTP across folders.
`retrieve_otp` ties the client and extractor together. sender/subject accept the
flexible match specs from `extract`, folders/age/patterns are all parameters with
sane defaults, and provider quirks live in the arguments rather than hardcoded
branches inside the function.
"""
import asyncio
import logging
import time
from email.utils import parsedate_to_datetime
from typing import Iterable, List, Optional, Pattern, Sequence, Union
from .client import IMAPClient
from .extract import DEFAULT_LENGTHS, DEFAULT_PATTERNS, MatchSpec, as_predicate, extract_code
log = logging.getLogger(__name__)
# reasonable cross-provider folder set; override per account as needed
DEFAULT_FOLDERS: Sequence[str] = ("INBOX", "Junk", "Spam", "Archive", "All Mail")
def _server_query(sender: MatchSpec, subject: MatchSpec) -> str:
"""build a narrowing IMAP query from plain-string specs only
only plain strings translate to server-side FROM/SUBJECT filters; regex and
callable specs fall back to ALL and are filtered client-side, so dynamic
matching always works even when the server cannot express it.
"""
parts: List[str] = []
if isinstance(sender, str):
parts.append(f'FROM "{sender}"')
if isinstance(subject, str):
parts.append(f'SUBJECT "{subject}"')
return f"({' '.join(parts)})" if parts else "ALL"
def _age_seconds(message) -> Optional[float]:
"""age of a message in seconds from its Date header, or None if unparseable"""
raw = message.get("Date")
if not raw:
return None
try:
return time.time() - parsedate_to_datetime(raw).timestamp()
except (TypeError, ValueError) as exc:
log.debug("date parse failed (%s): %s", raw, exc)
return None
async def retrieve_otp(
client: IMAPClient,
*,
sender: MatchSpec = None,
subject: MatchSpec = None,
folders: Optional[Iterable[str]] = None,
patterns: Sequence[Union[str, Pattern]] = DEFAULT_PATTERNS,
lengths: Iterable[int] = DEFAULT_LENGTHS,
max_age: Optional[float] = 1800,
per_folder: int = 5,
retries: int = 3,
retry_delay: float = 5.0,
icloud: Optional[bool] = None,
mark_seen: bool = True,
) -> Optional[str]:
"""return the newest OTP matching the filters, or None
sender/subject accept a substring, a compiled regex, or a callable. folders,
patterns, code lengths, max age and retry behavior are all tunable. set
`max_age=None` to disable the freshness check.
"""
folders = list(folders) if folders is not None else list(DEFAULT_FOLDERS)
sender_ok = as_predicate(sender)
subject_ok = as_predicate(subject)
query = _server_query(sender, subject)
for attempt in range(retries + 1):
for folder in folders:
if not await client.select(folder):
continue
ids = await client.search(query)
if not ids and query != "ALL":
ids = await client.search("ALL")
if not ids:
continue
log.info("scanning %d message(s) in %s", min(len(ids), per_folder), folder)
for email_id in ids[:per_folder]:
message = await client.fetch(email_id, icloud=bool(icloud))
if message is None:
continue
from_hdr = message.get("From", "")
subj_hdr = message.get("Subject", "")
if not sender_ok(from_hdr) or not subject_ok(subj_hdr):
continue
code = extract_code(message, patterns=patterns, lengths=lengths)
if not code:
continue
if max_age is not None:
age = _age_seconds(message)
if age is not None and age > max_age:
log.info("code %s skipped, too old (%.0fs > %.0fs)", code, age, max_age)
continue
if mark_seen:
await client.mark_seen(email_id)
log.info("found code %s in %s", code, folder)
return code
if attempt < retries:
wait = retry_delay * (attempt + 1)
log.info("no code yet, retrying in %.0fs (%d/%d)", wait, attempt + 1, retries)
await asyncio.sleep(wait)
log.info("no matching OTP found")
return None