add package: pyproject + src

commons: stdlib-only sync helpers shared across projects.
- timing: unix-ts deltas + tz datetime conversions (one _delta_seconds
  engine), a configurable Clock, fast-mode for tests.
- paths: deep_get / deep_set by dotted path (numeric segments index lists).
- masking: display masking for cards/cvv/tokens (provider covers
  VISA/MC/AMEX/UPAY/DISC/JCB/DNRS).
- addr: ip/address tooling — pure stdlib ipaddress utils in base, async
  geo lookups (ipify/geo.ipify/nominatim) behind the [addr] aiohttp extra;
  geo.ipify api_key is injected, never hardcoded.
config-free, emit-only logging, base is zero-deps. src/ layout, hatchling.

Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
disqualifier 2026-06-24 22:18:37 -04:00
parent 5997e3f1a1
commit 7437e1feb0
8 changed files with 660 additions and 0 deletions

16
pyproject.toml Normal file
View File

@ -0,0 +1,16 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "commons"
version = "0.1.0"
description = "small stdlib-only sync helpers: time/timezone deltas, dotted-path dict access, display masking, and ip/address tooling"
requires-python = ">=3.10"
dependencies = []
[project.optional-dependencies]
addr = ["aiohttp>=3.9"]
[tool.hatch.build.targets.wheel]
packages = ["src/commons"]

61
src/commons/__init__.py Normal file
View File

@ -0,0 +1,61 @@
"""commons — small sync helpers shared across projects.
timing: unix-timestamp deltas + timezone-aware datetime conversions, bare
functions and a configurable Clock.
paths: nested dict/list access by dotted path (deep_get / deep_set).
masking: display masking for cards / cvv / tokens (cosmetic, not a security control).
addr: ip/address tooling pure stdlib ip utils (`commons.addr.ip`) in base,
async geo lookups (`commons.addr.geo`) behind the `commons[addr]` extra.
base is stdlib only, no dependencies (the addr geo lookups add aiohttp via the
extra). to toggle the timing test mode for bare calls, set it on the module
`from commons import timing; timing.FAST_MODE = True` or use `Clock(fast=True)`
for instance-scoped control. addr is exposed as a submodule (`from commons import
addr`); its ip helpers live under `commons.addr.ip` to keep top-level uncluttered.
"""
from . import addr, masking, paths, timing
from .masking import credit, cvv, phantom, provider
from .paths import deep_get, deep_set
from .timing import (
UTC,
Clock,
add,
ago,
ahead,
convert,
date,
fmt,
is_expired,
now,
now_dt,
to_dt,
to_unix,
)
__all__ = [
"timing",
"paths",
"masking",
"addr",
"now",
"add",
"ahead",
"ago",
"is_expired",
"to_dt",
"to_unix",
"now_dt",
"convert",
"fmt",
"date",
"UTC",
"Clock",
"deep_get",
"deep_set",
"credit",
"cvv",
"phantom",
"provider",
]
__version__ = "0.1.0"

View File

@ -0,0 +1,55 @@
"""addr — ip/address tooling for commons.
two concerns:
- `ip` pure stdlib ipaddress utilities (validation, membership, cidr shape,
conversions). ships in the base install, no dependencies.
- `geo` async network lookups (public ip, ip->geo, reverse geocode). needs
aiohttp, gated behind the `commons[addr]` extra; importing this package is fine
without it, but calling a geo function raises until it is installed.
the geo.ipify api_key is always injected by the caller nothing is hardcoded.
"""
from . import geo, ip
from .geo import fetch_ip, fetch_location, ip_location
from .ip import (
broadcast_address,
from_int,
host_bits,
hosts,
in_any,
in_network,
is_global,
is_private,
is_valid,
netmask,
network_address,
num_addresses,
prefix_bits,
set_bits,
to_int,
version,
)
__all__ = [
"ip",
"geo",
"is_valid",
"version",
"to_int",
"from_int",
"is_private",
"is_global",
"in_network",
"in_any",
"network_address",
"broadcast_address",
"netmask",
"prefix_bits",
"host_bits",
"num_addresses",
"set_bits",
"hosts",
"fetch_ip",
"ip_location",
"fetch_location",
]

133
src/commons/addr/geo.py Normal file
View File

@ -0,0 +1,133 @@
"""async ip/geo network lookups (aiohttp, gated behind the [addr] extra).
importing this module without aiohttp is fine; only calling a lookup without it
raises a clear RuntimeError. each call may reuse a caller-supplied aiohttp
ClientSession (`session=`) or create and close one internally.
url-building and json->dict parsing are pulled into pure helpers (`_*_url`,
`_parse_*`) so they unit-test without network. every lookup returns None on any
request/parse failure (logged, never raised, never printed).
security: the only secret is geo.ipify's `api_key`, which is a REQUIRED keyword on
`ip_location` the caller injects it. nothing is hardcoded here.
"""
import logging
from typing import Optional
from urllib.parse import urlencode
log = logging.getLogger(__name__)
try:
import aiohttp
_HAVE_AIOHTTP = True
except ImportError: # pragma: no cover
aiohttp = None # type: ignore[assignment]
_HAVE_AIOHTTP = False
_MISSING = "aiohttp is required for address lookups; install commons[addr]"
_IPIFY_URL = "https://api.ipify.org?format=json"
_GEO_IPIFY_URL = "https://geo.ipify.org/api/v2/country"
_NOMINATIM_URL = "https://nominatim.openstreetmap.org/reverse"
def _ip_location_url(ip: str, api_key: str) -> str:
"""build the geo.ipify lookup url for an ip (api_key injected by the caller)"""
return f"{_GEO_IPIFY_URL}?{urlencode({'apiKey': api_key, 'ipAddress': ip})}"
def _reverse_url(lat: float, lon: float) -> str:
"""build the nominatim reverse-geocode url for a lat/lon"""
params = {
"format": "json",
"lat": lat,
"lon": lon,
"zoom": 5,
"addressdetails": 1,
}
return f"{_NOMINATIM_URL}?{urlencode(params)}"
def _parse_ipify(data: dict) -> Optional[str]:
"""pull the ip string out of an ipify response"""
ip = data.get("ip")
return ip or None
def _parse_reverse(data: dict) -> Optional[dict]:
"""parse a nominatim reverse response into {country: iso2 lower, state: slug|None}"""
address = data.get("address")
if not isinstance(address, dict):
return None
country = address.get("country_code")
if not country:
return None
state = address.get("state")
return {
"country": str(country).lower(),
"state": state.lower().replace(" ", "-") if state else None,
}
async def _get_json(
url: str, *, session, timeout: float, headers: Optional[dict] = None
) -> Optional[dict]:
"""GET url and return parsed json, or None on any failure; reuses session if given"""
if not _HAVE_AIOHTTP:
raise RuntimeError(_MISSING)
owns = session is None
if owns:
session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout))
try:
async with session.get(url, headers=headers) as resp:
if resp.status != 200:
log.warning("address lookup %s -> %s", url, resp.status)
return None
return await resp.json()
except Exception as exc:
log.warning("address lookup failed (%s): %s", url, exc)
return None
finally:
if owns:
await session.close()
async def fetch_ip(*, session=None, timeout: float = 15) -> Optional[str]:
"""return the caller's public ip via ipify, or None on failure"""
data = await _get_json(_IPIFY_URL, session=session, timeout=timeout)
return _parse_ipify(data) if data else None
async def ip_location(
ip: str, *, api_key: str, session=None, timeout: float = 15
) -> Optional[dict]:
"""look up geo data for an ip via geo.ipify; api_key is required and injected
returns the raw geo.ipify json dict, or None on failure. raises ValueError if
no api_key is supplied there is no default and nothing hardcoded.
"""
if not api_key:
raise ValueError("ip_location requires an api_key (inject it; never hardcode)")
url = _ip_location_url(ip, api_key)
return await _get_json(url, session=session, timeout=timeout)
async def fetch_location(
lat: float,
lon: float,
*,
user_agent: str = "commons-addr/1.0",
session=None,
timeout: float = 15,
) -> Optional[dict]:
"""reverse-geocode a lat/lon via nominatim -> {country, state}, or None
nominatim's terms require a User-Agent; it is a parameter (not hidden) with a
sensible default the caller can override.
"""
url = _reverse_url(lat, lon)
data = await _get_json(
url, session=session, timeout=timeout, headers={"User-Agent": user_agent}
)
return _parse_reverse(data) if data else None

123
src/commons/addr/ip.py Normal file
View File

@ -0,0 +1,123 @@
"""pure ip/address utilities over stdlib ipaddress (no network, no deps).
every function accepts strings. validation/membership helpers
(`is_valid`/`version`/`in_network`/`in_any`) never raise on bad input they return
a falsy value. functions that require a valid address or cidr (`to_int`/`set_bits`/
the network-shape helpers) let `ValueError` propagate so misuse is visible.
cidr parsing uses `ip_network(cidr, strict=False)` throughout, so host bits set in
the network string are tolerated (e.g. "10.0.0.5/24").
"""
import ipaddress
from typing import List, Optional
def is_valid(addr: str) -> bool:
"""whether addr parses as an ipv4 or ipv6 address"""
try:
ipaddress.ip_address(addr)
return True
except ValueError:
return False
def version(addr: str) -> Optional[int]:
"""ip version (4 or 6) for addr, or None if it is not a valid address"""
try:
return ipaddress.ip_address(addr).version
except ValueError:
return None
def to_int(addr: str) -> int:
"""integer value of an ip address (raises ValueError on a bad address)"""
return int(ipaddress.ip_address(addr))
def from_int(n: int, *, version: int = 4) -> str:
"""ip address string for an integer (version 4 or 6)"""
if version == 4:
return str(ipaddress.IPv4Address(n))
if version == 6:
return str(ipaddress.IPv6Address(n))
raise ValueError("version must be 4 or 6")
def is_private(addr: str) -> bool:
"""whether addr is in private/reserved space (raises on a bad address)"""
return ipaddress.ip_address(addr).is_private
def is_global(addr: str) -> bool:
"""whether addr is a public, globally-routable address (raises on a bad address)"""
return ipaddress.ip_address(addr).is_global
def in_network(addr: str, cidr: str) -> bool:
"""whether addr falls inside a single cidr; False on any bad input"""
try:
return ipaddress.ip_address(addr) in ipaddress.ip_network(cidr, strict=False)
except ValueError:
return False
def in_any(addr: str, cidrs: List[str]) -> bool:
"""whether addr falls inside any of the cidrs (allow/blocklists); False on bad input"""
try:
ip = ipaddress.ip_address(addr)
except ValueError:
return False
for cidr in cidrs:
try:
if ip in ipaddress.ip_network(cidr, strict=False):
return True
except ValueError:
continue
return False
def network_address(cidr: str) -> str:
"""network address of a cidr (e.g. 10.0.0.0 for 10.0.0.5/24)"""
return str(ipaddress.ip_network(cidr, strict=False).network_address)
def broadcast_address(cidr: str) -> str:
"""broadcast address of a cidr"""
return str(ipaddress.ip_network(cidr, strict=False).broadcast_address)
def netmask(cidr: str) -> str:
"""netmask of a cidr (e.g. 255.255.255.0)"""
return str(ipaddress.ip_network(cidr, strict=False).netmask)
def prefix_bits(cidr: str) -> int:
"""prefix length of a cidr (the /N)"""
return ipaddress.ip_network(cidr, strict=False).prefixlen
def host_bits(cidr: str) -> int:
"""number of host bits in a cidr (max prefix minus prefix length)"""
net = ipaddress.ip_network(cidr, strict=False)
return net.max_prefixlen - net.prefixlen
def num_addresses(cidr: str) -> int:
"""total addresses in a cidr (network + hosts + broadcast)"""
return ipaddress.ip_network(cidr, strict=False).num_addresses
def set_bits(addr: str) -> int:
"""population count of an address's integer value (raises on a bad address)"""
return bin(int(ipaddress.ip_address(addr))).count("1")
def hosts(cidr: str, *, limit: Optional[int] = None) -> List[str]:
"""usable host addresses of a cidr as strings; limit caps how many are materialized"""
gen = ipaddress.ip_network(cidr, strict=False).hosts()
out: List[str] = []
for host in gen:
out.append(str(host))
if limit is not None and len(out) >= limit:
break
return out

53
src/commons/masking.py Normal file
View File

@ -0,0 +1,53 @@
"""display masking for sensitive-looking values.
these are DISPLAY helpers only they format a value for showing in a UI or log
(e.g. "•••• •••• •••• 1234"). they are not a security control: the underlying
value is unchanged and still needs proper handling (encryption at rest, etc.).
"""
def _digits(value: str) -> str:
"""keep only the digit characters of a string"""
return "".join(c for c in value if c.isdigit())
def credit(card_number: str) -> str:
"""mask a card number to bullets plus the last four digits"""
last4 = _digits(card_number)[-4:]
return "•••• •••• •••• " + last4
def cvv(value: str) -> str:
"""mask a cvv to bullets of the same length"""
return "" * len(value)
def phantom(value: str) -> str:
"""show a long token as first-six...last-four (e.g. a hash or id)"""
return f"{value[:6]}...{value[-4:]}"
def provider(card_number: str) -> str:
"""short card brand from the number's prefix, or UNKW if undetermined
a BIN-prefix heuristic for display/labeling not authoritative validation.
tolerates spaces/dashes and short or non-numeric input (returns UNKW).
"""
n = _digits(card_number)
if not n:
return "UNKW"
if n.startswith("4"):
return "VISA"
if n[:2] in {"51", "52", "53", "54", "55"} or (len(n) >= 4 and 2221 <= int(n[:4]) <= 2720):
return "MC"
if n[:2] in {"34", "37"}:
return "AMEX"
if n.startswith("62") or n.startswith("81"):
return "UPAY"
if n.startswith("6011") or n.startswith("65") or (len(n) >= 3 and 644 <= int(n[:3]) <= 649):
return "DISC"
if n.startswith("35"):
return "JCB"
if n.startswith(("30", "36", "38", "39")):
return "DNRS"
return "UNKW"

50
src/commons/paths.py Normal file
View File

@ -0,0 +1,50 @@
"""nested access by dotted path.
`deep_get(data, "in.this.old.notation")` walks dicts (and lists, when a segment is
a number) and returns a default instead of raising on a missing/!wrong path.
`deep_set` writes a nested value, creating intermediate dicts.
"""
from typing import Any
_MISSING = object()
def deep_get(data: Any, path: str, default: Any = None, *, sep: str = ".") -> Any:
"""get a nested value by dotted path, returning default if absent
dict keys are matched by name; a numeric segment indexes a list/tuple
(e.g. "items.0.id"). any missing key, out-of-range index, or non-container
along the way yields `default` rather than raising.
"""
cur = data
for seg in path.split(sep):
if isinstance(cur, dict):
cur = cur.get(seg, _MISSING)
if cur is _MISSING:
return default
elif isinstance(cur, (list, tuple)):
try:
cur = cur[int(seg)]
except (ValueError, IndexError):
return default
else:
return default
return cur
def deep_set(data: dict, path: str, value: Any, *, sep: str = ".") -> dict:
"""set a nested value by dotted path, creating intermediate dicts
returns the same top-level dict for chaining. overwrites any non-dict value
sitting where an intermediate dict is needed.
"""
segments = path.split(sep)
cur = data
for seg in segments[:-1]:
nxt = cur.get(seg)
if not isinstance(nxt, dict):
nxt = {}
cur[seg] = nxt
cur = nxt
cur[segments[-1]] = value
return data

169
src/commons/timing.py Normal file
View File

@ -0,0 +1,169 @@
"""time helpers built on unix timestamps with timezone-aware datetime support.
one engine, two ergonomics:
- bare module functions (now/add/ahead/ago/is_expired/to_dt/...) operate on unix
ints for quick, stateless use. all delta math routes through `_delta_seconds`.
- `Clock` holds a timezone + fast flag and delegates to the same functions, so a
configured clock and the bare functions never diverge.
unix ints stay the storable value; datetimes are produced on demand in whatever
timezone you ask for (stdlib zoneinfo, no pytz).
"""
import time as _time
from datetime import datetime, timezone
from typing import Optional, Union
from zoneinfo import ZoneInfo
# default fast flag for bare module calls. when True, every unit (day/hour/minute)
# counts as one second so time-based flows run fast in tests. set
# `commons.timing.FAST_MODE` from test setup; off by default. not a config import.
FAST_MODE = False
UTC = timezone.utc
# accepted timezone inputs: a tzinfo, an IANA name string, or None (-> UTC)
TZ = Union[str, timezone, ZoneInfo, None]
def _zone(tz: TZ):
"""resolve a tz input (name / tzinfo / None) to a tzinfo, defaulting to UTC"""
if tz is None:
return UTC
if isinstance(tz, str):
return ZoneInfo(tz)
return tz
def now() -> int:
"""current unix timestamp as an int"""
return int(_time.time())
def _delta_seconds(days: int, hours: int, minutes: int, seconds: int, fast: bool) -> int:
"""seconds for a unit combination; collapses to 1s/unit when fast is True
single source of truth for unit math every delta helper routes through here.
"""
if fast:
return days + hours + minutes + seconds
return days * 86400 + hours * 3600 + minutes * 60 + seconds
def add(
ts: int,
*,
days: int = 0,
hours: int = 0,
minutes: int = 0,
seconds: int = 0,
fast: Optional[bool] = None,
) -> int:
"""shift a unix timestamp by signed units (negatives subtract)"""
fast = FAST_MODE if fast is None else fast
return int(ts + _delta_seconds(days, hours, minutes, seconds, fast))
def ahead(*, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0, fast: Optional[bool] = None) -> int:
"""unix timestamp the given units in the future (now + delta)"""
return add(now(), days=days, hours=hours, minutes=minutes, seconds=seconds, fast=fast)
def ago(*, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0, fast: Optional[bool] = None) -> int:
"""unix timestamp the given units in the past (now - delta)"""
return add(now(), days=-days, hours=-hours, minutes=-minutes, seconds=-seconds, fast=fast)
def is_expired(deadline: int) -> bool:
"""True if a unix deadline has passed; a falsy/None deadline never expires"""
if not deadline:
return False
return now() > deadline
def to_dt(ts: int, tz: TZ = None) -> datetime:
"""convert a unix timestamp to an aware datetime in the given tz (default UTC)"""
return datetime.fromtimestamp(ts, _zone(tz))
def to_unix(dt: datetime, tz: TZ = None) -> int:
"""convert a datetime to a unix timestamp; naive inputs are read as tz/UTC"""
if dt.tzinfo is None:
dt = dt.replace(tzinfo=_zone(tz))
return int(dt.timestamp())
def now_dt(tz: TZ = None) -> datetime:
"""current time as an aware datetime in the given tz (default UTC)"""
return datetime.now(_zone(tz))
def convert(dt: datetime, tz: TZ) -> datetime:
"""re-express a datetime in another timezone (same instant, new wall clock)"""
return dt.astimezone(_zone(tz))
def fmt(ts: int, tz: TZ = None, pattern: str = "%Y-%m-%d %H:%M:%S") -> str:
"""format a unix timestamp in the given tz with an strftime pattern"""
return to_dt(ts, tz).strftime(pattern)
def date(ts: Optional[int] = None, tz: TZ = None, pattern: str = "%m/%d/%Y") -> str:
"""format the date portion of a timestamp (or now) in the given tz"""
return to_dt(now() if ts is None else ts, tz).strftime(pattern)
class Clock:
"""a timezone + fast-mode aware view over the same timing engine
construct with any IANA tz name (or tzinfo; default UTC) and an optional fast
flag. delta methods delegate to the module functions passing the clock's fast
setting; datetime methods produce values in the clock's timezone.
"""
def __init__(self, tz: TZ = "UTC", *, fast: bool = False):
self.tz = _zone(tz)
self.fast = fast
def now(self) -> int:
"""current unix timestamp (timezone-independent)"""
return now()
def add(self, ts: int, *, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0) -> int:
"""shift a timestamp by signed units, honoring this clock's fast flag"""
return add(ts, days=days, hours=hours, minutes=minutes, seconds=seconds, fast=self.fast)
def ahead(self, *, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0) -> int:
"""unix timestamp the given units in the future"""
return ahead(days=days, hours=hours, minutes=minutes, seconds=seconds, fast=self.fast)
def ago(self, *, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0) -> int:
"""unix timestamp the given units in the past"""
return ago(days=days, hours=hours, minutes=minutes, seconds=seconds, fast=self.fast)
def is_expired(self, deadline: int) -> bool:
"""True if a unix deadline has passed; falsy/None never expires"""
return is_expired(deadline)
def now_dt(self) -> datetime:
"""current time as an aware datetime in this clock's timezone"""
return now_dt(self.tz)
def to_dt(self, ts: int) -> datetime:
"""convert a unix timestamp to an aware datetime in this clock's timezone"""
return to_dt(ts, self.tz)
def to_unix(self, dt: datetime) -> int:
"""convert a datetime to unix; naive inputs are read as this clock's tz"""
return to_unix(dt, self.tz)
def convert(self, dt: datetime) -> datetime:
"""re-express any datetime in this clock's timezone (same instant)"""
return convert(dt, self.tz)
def fmt(self, ts: int, pattern: str = "%Y-%m-%d %H:%M:%S") -> str:
"""format a unix timestamp in this clock's timezone"""
return fmt(ts, self.tz, pattern)
def date(self, ts: Optional[int] = None, pattern: str = "%m/%d/%Y") -> str:
"""format the date of a timestamp (or now) in this clock's timezone"""
return date(ts, self.tz, pattern)