diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6008471 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,16 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "commons" +version = "0.1.0" +description = "small stdlib-only sync helpers: time/timezone deltas, dotted-path dict access, display masking, and ip/address tooling" +requires-python = ">=3.10" +dependencies = [] + +[project.optional-dependencies] +addr = ["aiohttp>=3.9"] + +[tool.hatch.build.targets.wheel] +packages = ["src/commons"] diff --git a/src/commons/__init__.py b/src/commons/__init__.py new file mode 100644 index 0000000..a95cd90 --- /dev/null +++ b/src/commons/__init__.py @@ -0,0 +1,61 @@ +"""commons — small sync helpers shared across projects. + +timing: unix-timestamp deltas + timezone-aware datetime conversions, bare + functions and a configurable Clock. +paths: nested dict/list access by dotted path (deep_get / deep_set). +masking: display masking for cards / cvv / tokens (cosmetic, not a security control). +addr: ip/address tooling — pure stdlib ip utils (`commons.addr.ip`) in base, + async geo lookups (`commons.addr.geo`) behind the `commons[addr]` extra. + +base is stdlib only, no dependencies (the addr geo lookups add aiohttp via the +extra). to toggle the timing test mode for bare calls, set it on the module — +`from commons import timing; timing.FAST_MODE = True` — or use `Clock(fast=True)` +for instance-scoped control. addr is exposed as a submodule (`from commons import +addr`); its ip helpers live under `commons.addr.ip` to keep top-level uncluttered. +""" +from . import addr, masking, paths, timing +from .masking import credit, cvv, phantom, provider +from .paths import deep_get, deep_set +from .timing import ( + UTC, + Clock, + add, + ago, + ahead, + convert, + date, + fmt, + is_expired, + now, + now_dt, + to_dt, + to_unix, +) + +__all__ = [ + "timing", + "paths", + "masking", + "addr", + "now", + "add", + "ahead", + "ago", + "is_expired", + "to_dt", + "to_unix", + "now_dt", + "convert", + "fmt", + "date", + "UTC", + "Clock", + "deep_get", + "deep_set", + "credit", + "cvv", + "phantom", + "provider", +] + +__version__ = "0.1.0" diff --git a/src/commons/addr/__init__.py b/src/commons/addr/__init__.py new file mode 100644 index 0000000..f61ef80 --- /dev/null +++ b/src/commons/addr/__init__.py @@ -0,0 +1,55 @@ +"""addr — ip/address tooling for commons. + +two concerns: +- `ip` — pure stdlib ipaddress utilities (validation, membership, cidr shape, + conversions). ships in the base install, no dependencies. +- `geo` — async network lookups (public ip, ip->geo, reverse geocode). needs + aiohttp, gated behind the `commons[addr]` extra; importing this package is fine + without it, but calling a geo function raises until it is installed. + +the geo.ipify api_key is always injected by the caller — nothing is hardcoded. +""" +from . import geo, ip +from .geo import fetch_ip, fetch_location, ip_location +from .ip import ( + broadcast_address, + from_int, + host_bits, + hosts, + in_any, + in_network, + is_global, + is_private, + is_valid, + netmask, + network_address, + num_addresses, + prefix_bits, + set_bits, + to_int, + version, +) + +__all__ = [ + "ip", + "geo", + "is_valid", + "version", + "to_int", + "from_int", + "is_private", + "is_global", + "in_network", + "in_any", + "network_address", + "broadcast_address", + "netmask", + "prefix_bits", + "host_bits", + "num_addresses", + "set_bits", + "hosts", + "fetch_ip", + "ip_location", + "fetch_location", +] diff --git a/src/commons/addr/geo.py b/src/commons/addr/geo.py new file mode 100644 index 0000000..4755a9e --- /dev/null +++ b/src/commons/addr/geo.py @@ -0,0 +1,133 @@ +"""async ip/geo network lookups (aiohttp, gated behind the [addr] extra). + +importing this module without aiohttp is fine; only calling a lookup without it +raises a clear RuntimeError. each call may reuse a caller-supplied aiohttp +ClientSession (`session=`) or create and close one internally. + +url-building and json->dict parsing are pulled into pure helpers (`_*_url`, +`_parse_*`) so they unit-test without network. every lookup returns None on any +request/parse failure (logged, never raised, never printed). + +security: the only secret is geo.ipify's `api_key`, which is a REQUIRED keyword on +`ip_location` — the caller injects it. nothing is hardcoded here. +""" +import logging +from typing import Optional +from urllib.parse import urlencode + +log = logging.getLogger(__name__) + +try: + import aiohttp + + _HAVE_AIOHTTP = True +except ImportError: # pragma: no cover + aiohttp = None # type: ignore[assignment] + _HAVE_AIOHTTP = False + +_MISSING = "aiohttp is required for address lookups; install commons[addr]" + +_IPIFY_URL = "https://api.ipify.org?format=json" +_GEO_IPIFY_URL = "https://geo.ipify.org/api/v2/country" +_NOMINATIM_URL = "https://nominatim.openstreetmap.org/reverse" + + +def _ip_location_url(ip: str, api_key: str) -> str: + """build the geo.ipify lookup url for an ip (api_key injected by the caller)""" + return f"{_GEO_IPIFY_URL}?{urlencode({'apiKey': api_key, 'ipAddress': ip})}" + + +def _reverse_url(lat: float, lon: float) -> str: + """build the nominatim reverse-geocode url for a lat/lon""" + params = { + "format": "json", + "lat": lat, + "lon": lon, + "zoom": 5, + "addressdetails": 1, + } + return f"{_NOMINATIM_URL}?{urlencode(params)}" + + +def _parse_ipify(data: dict) -> Optional[str]: + """pull the ip string out of an ipify response""" + ip = data.get("ip") + return ip or None + + +def _parse_reverse(data: dict) -> Optional[dict]: + """parse a nominatim reverse response into {country: iso2 lower, state: slug|None}""" + address = data.get("address") + if not isinstance(address, dict): + return None + country = address.get("country_code") + if not country: + return None + state = address.get("state") + return { + "country": str(country).lower(), + "state": state.lower().replace(" ", "-") if state else None, + } + + +async def _get_json( + url: str, *, session, timeout: float, headers: Optional[dict] = None +) -> Optional[dict]: + """GET url and return parsed json, or None on any failure; reuses session if given""" + if not _HAVE_AIOHTTP: + raise RuntimeError(_MISSING) + owns = session is None + if owns: + session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout)) + try: + async with session.get(url, headers=headers) as resp: + if resp.status != 200: + log.warning("address lookup %s -> %s", url, resp.status) + return None + return await resp.json() + except Exception as exc: + log.warning("address lookup failed (%s): %s", url, exc) + return None + finally: + if owns: + await session.close() + + +async def fetch_ip(*, session=None, timeout: float = 15) -> Optional[str]: + """return the caller's public ip via ipify, or None on failure""" + data = await _get_json(_IPIFY_URL, session=session, timeout=timeout) + return _parse_ipify(data) if data else None + + +async def ip_location( + ip: str, *, api_key: str, session=None, timeout: float = 15 +) -> Optional[dict]: + """look up geo data for an ip via geo.ipify; api_key is required and injected + + returns the raw geo.ipify json dict, or None on failure. raises ValueError if + no api_key is supplied — there is no default and nothing hardcoded. + """ + if not api_key: + raise ValueError("ip_location requires an api_key (inject it; never hardcode)") + url = _ip_location_url(ip, api_key) + return await _get_json(url, session=session, timeout=timeout) + + +async def fetch_location( + lat: float, + lon: float, + *, + user_agent: str = "commons-addr/1.0", + session=None, + timeout: float = 15, +) -> Optional[dict]: + """reverse-geocode a lat/lon via nominatim -> {country, state}, or None + + nominatim's terms require a User-Agent; it is a parameter (not hidden) with a + sensible default the caller can override. + """ + url = _reverse_url(lat, lon) + data = await _get_json( + url, session=session, timeout=timeout, headers={"User-Agent": user_agent} + ) + return _parse_reverse(data) if data else None diff --git a/src/commons/addr/ip.py b/src/commons/addr/ip.py new file mode 100644 index 0000000..4b5abfd --- /dev/null +++ b/src/commons/addr/ip.py @@ -0,0 +1,123 @@ +"""pure ip/address utilities over stdlib ipaddress (no network, no deps). + +every function accepts strings. validation/membership helpers +(`is_valid`/`version`/`in_network`/`in_any`) never raise on bad input — they return +a falsy value. functions that require a valid address or cidr (`to_int`/`set_bits`/ +the network-shape helpers) let `ValueError` propagate so misuse is visible. + +cidr parsing uses `ip_network(cidr, strict=False)` throughout, so host bits set in +the network string are tolerated (e.g. "10.0.0.5/24"). +""" +import ipaddress +from typing import List, Optional + + +def is_valid(addr: str) -> bool: + """whether addr parses as an ipv4 or ipv6 address""" + try: + ipaddress.ip_address(addr) + return True + except ValueError: + return False + + +def version(addr: str) -> Optional[int]: + """ip version (4 or 6) for addr, or None if it is not a valid address""" + try: + return ipaddress.ip_address(addr).version + except ValueError: + return None + + +def to_int(addr: str) -> int: + """integer value of an ip address (raises ValueError on a bad address)""" + return int(ipaddress.ip_address(addr)) + + +def from_int(n: int, *, version: int = 4) -> str: + """ip address string for an integer (version 4 or 6)""" + if version == 4: + return str(ipaddress.IPv4Address(n)) + if version == 6: + return str(ipaddress.IPv6Address(n)) + raise ValueError("version must be 4 or 6") + + +def is_private(addr: str) -> bool: + """whether addr is in private/reserved space (raises on a bad address)""" + return ipaddress.ip_address(addr).is_private + + +def is_global(addr: str) -> bool: + """whether addr is a public, globally-routable address (raises on a bad address)""" + return ipaddress.ip_address(addr).is_global + + +def in_network(addr: str, cidr: str) -> bool: + """whether addr falls inside a single cidr; False on any bad input""" + try: + return ipaddress.ip_address(addr) in ipaddress.ip_network(cidr, strict=False) + except ValueError: + return False + + +def in_any(addr: str, cidrs: List[str]) -> bool: + """whether addr falls inside any of the cidrs (allow/blocklists); False on bad input""" + try: + ip = ipaddress.ip_address(addr) + except ValueError: + return False + for cidr in cidrs: + try: + if ip in ipaddress.ip_network(cidr, strict=False): + return True + except ValueError: + continue + return False + + +def network_address(cidr: str) -> str: + """network address of a cidr (e.g. 10.0.0.0 for 10.0.0.5/24)""" + return str(ipaddress.ip_network(cidr, strict=False).network_address) + + +def broadcast_address(cidr: str) -> str: + """broadcast address of a cidr""" + return str(ipaddress.ip_network(cidr, strict=False).broadcast_address) + + +def netmask(cidr: str) -> str: + """netmask of a cidr (e.g. 255.255.255.0)""" + return str(ipaddress.ip_network(cidr, strict=False).netmask) + + +def prefix_bits(cidr: str) -> int: + """prefix length of a cidr (the /N)""" + return ipaddress.ip_network(cidr, strict=False).prefixlen + + +def host_bits(cidr: str) -> int: + """number of host bits in a cidr (max prefix minus prefix length)""" + net = ipaddress.ip_network(cidr, strict=False) + return net.max_prefixlen - net.prefixlen + + +def num_addresses(cidr: str) -> int: + """total addresses in a cidr (network + hosts + broadcast)""" + return ipaddress.ip_network(cidr, strict=False).num_addresses + + +def set_bits(addr: str) -> int: + """population count of an address's integer value (raises on a bad address)""" + return bin(int(ipaddress.ip_address(addr))).count("1") + + +def hosts(cidr: str, *, limit: Optional[int] = None) -> List[str]: + """usable host addresses of a cidr as strings; limit caps how many are materialized""" + gen = ipaddress.ip_network(cidr, strict=False).hosts() + out: List[str] = [] + for host in gen: + out.append(str(host)) + if limit is not None and len(out) >= limit: + break + return out diff --git a/src/commons/masking.py b/src/commons/masking.py new file mode 100644 index 0000000..eefd824 --- /dev/null +++ b/src/commons/masking.py @@ -0,0 +1,53 @@ +"""display masking for sensitive-looking values. + +these are DISPLAY helpers only — they format a value for showing in a UI or log +(e.g. "•••• •••• •••• 1234"). they are not a security control: the underlying +value is unchanged and still needs proper handling (encryption at rest, etc.). +""" + + +def _digits(value: str) -> str: + """keep only the digit characters of a string""" + return "".join(c for c in value if c.isdigit()) + + +def credit(card_number: str) -> str: + """mask a card number to bullets plus the last four digits""" + last4 = _digits(card_number)[-4:] + return "•••• •••• •••• " + last4 + + +def cvv(value: str) -> str: + """mask a cvv to bullets of the same length""" + return "•" * len(value) + + +def phantom(value: str) -> str: + """show a long token as first-six...last-four (e.g. a hash or id)""" + return f"{value[:6]}...{value[-4:]}" + + +def provider(card_number: str) -> str: + """short card brand from the number's prefix, or UNKW if undetermined + + a BIN-prefix heuristic for display/labeling — not authoritative validation. + tolerates spaces/dashes and short or non-numeric input (returns UNKW). + """ + n = _digits(card_number) + if not n: + return "UNKW" + if n.startswith("4"): + return "VISA" + if n[:2] in {"51", "52", "53", "54", "55"} or (len(n) >= 4 and 2221 <= int(n[:4]) <= 2720): + return "MC" + if n[:2] in {"34", "37"}: + return "AMEX" + if n.startswith("62") or n.startswith("81"): + return "UPAY" + if n.startswith("6011") or n.startswith("65") or (len(n) >= 3 and 644 <= int(n[:3]) <= 649): + return "DISC" + if n.startswith("35"): + return "JCB" + if n.startswith(("30", "36", "38", "39")): + return "DNRS" + return "UNKW" diff --git a/src/commons/paths.py b/src/commons/paths.py new file mode 100644 index 0000000..ff0c612 --- /dev/null +++ b/src/commons/paths.py @@ -0,0 +1,50 @@ +"""nested access by dotted path. + +`deep_get(data, "in.this.old.notation")` walks dicts (and lists, when a segment is +a number) and returns a default instead of raising on a missing/!wrong path. +`deep_set` writes a nested value, creating intermediate dicts. +""" +from typing import Any + +_MISSING = object() + + +def deep_get(data: Any, path: str, default: Any = None, *, sep: str = ".") -> Any: + """get a nested value by dotted path, returning default if absent + + dict keys are matched by name; a numeric segment indexes a list/tuple + (e.g. "items.0.id"). any missing key, out-of-range index, or non-container + along the way yields `default` rather than raising. + """ + cur = data + for seg in path.split(sep): + if isinstance(cur, dict): + cur = cur.get(seg, _MISSING) + if cur is _MISSING: + return default + elif isinstance(cur, (list, tuple)): + try: + cur = cur[int(seg)] + except (ValueError, IndexError): + return default + else: + return default + return cur + + +def deep_set(data: dict, path: str, value: Any, *, sep: str = ".") -> dict: + """set a nested value by dotted path, creating intermediate dicts + + returns the same top-level dict for chaining. overwrites any non-dict value + sitting where an intermediate dict is needed. + """ + segments = path.split(sep) + cur = data + for seg in segments[:-1]: + nxt = cur.get(seg) + if not isinstance(nxt, dict): + nxt = {} + cur[seg] = nxt + cur = nxt + cur[segments[-1]] = value + return data diff --git a/src/commons/timing.py b/src/commons/timing.py new file mode 100644 index 0000000..fe486ed --- /dev/null +++ b/src/commons/timing.py @@ -0,0 +1,169 @@ +"""time helpers built on unix timestamps with timezone-aware datetime support. + +one engine, two ergonomics: +- bare module functions (now/add/ahead/ago/is_expired/to_dt/...) operate on unix + ints for quick, stateless use. all delta math routes through `_delta_seconds`. +- `Clock` holds a timezone + fast flag and delegates to the same functions, so a + configured clock and the bare functions never diverge. + +unix ints stay the storable value; datetimes are produced on demand in whatever +timezone you ask for (stdlib zoneinfo, no pytz). +""" +import time as _time +from datetime import datetime, timezone +from typing import Optional, Union +from zoneinfo import ZoneInfo + +# default fast flag for bare module calls. when True, every unit (day/hour/minute) +# counts as one second so time-based flows run fast in tests. set +# `commons.timing.FAST_MODE` from test setup; off by default. not a config import. +FAST_MODE = False + +UTC = timezone.utc + +# accepted timezone inputs: a tzinfo, an IANA name string, or None (-> UTC) +TZ = Union[str, timezone, ZoneInfo, None] + + +def _zone(tz: TZ): + """resolve a tz input (name / tzinfo / None) to a tzinfo, defaulting to UTC""" + if tz is None: + return UTC + if isinstance(tz, str): + return ZoneInfo(tz) + return tz + + +def now() -> int: + """current unix timestamp as an int""" + return int(_time.time()) + + +def _delta_seconds(days: int, hours: int, minutes: int, seconds: int, fast: bool) -> int: + """seconds for a unit combination; collapses to 1s/unit when fast is True + + single source of truth for unit math — every delta helper routes through here. + """ + if fast: + return days + hours + minutes + seconds + return days * 86400 + hours * 3600 + minutes * 60 + seconds + + +def add( + ts: int, + *, + days: int = 0, + hours: int = 0, + minutes: int = 0, + seconds: int = 0, + fast: Optional[bool] = None, +) -> int: + """shift a unix timestamp by signed units (negatives subtract)""" + fast = FAST_MODE if fast is None else fast + return int(ts + _delta_seconds(days, hours, minutes, seconds, fast)) + + +def ahead(*, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0, fast: Optional[bool] = None) -> int: + """unix timestamp the given units in the future (now + delta)""" + return add(now(), days=days, hours=hours, minutes=minutes, seconds=seconds, fast=fast) + + +def ago(*, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0, fast: Optional[bool] = None) -> int: + """unix timestamp the given units in the past (now - delta)""" + return add(now(), days=-days, hours=-hours, minutes=-minutes, seconds=-seconds, fast=fast) + + +def is_expired(deadline: int) -> bool: + """True if a unix deadline has passed; a falsy/None deadline never expires""" + if not deadline: + return False + return now() > deadline + + +def to_dt(ts: int, tz: TZ = None) -> datetime: + """convert a unix timestamp to an aware datetime in the given tz (default UTC)""" + return datetime.fromtimestamp(ts, _zone(tz)) + + +def to_unix(dt: datetime, tz: TZ = None) -> int: + """convert a datetime to a unix timestamp; naive inputs are read as tz/UTC""" + if dt.tzinfo is None: + dt = dt.replace(tzinfo=_zone(tz)) + return int(dt.timestamp()) + + +def now_dt(tz: TZ = None) -> datetime: + """current time as an aware datetime in the given tz (default UTC)""" + return datetime.now(_zone(tz)) + + +def convert(dt: datetime, tz: TZ) -> datetime: + """re-express a datetime in another timezone (same instant, new wall clock)""" + return dt.astimezone(_zone(tz)) + + +def fmt(ts: int, tz: TZ = None, pattern: str = "%Y-%m-%d %H:%M:%S") -> str: + """format a unix timestamp in the given tz with an strftime pattern""" + return to_dt(ts, tz).strftime(pattern) + + +def date(ts: Optional[int] = None, tz: TZ = None, pattern: str = "%m/%d/%Y") -> str: + """format the date portion of a timestamp (or now) in the given tz""" + return to_dt(now() if ts is None else ts, tz).strftime(pattern) + + +class Clock: + """a timezone + fast-mode aware view over the same timing engine + + construct with any IANA tz name (or tzinfo; default UTC) and an optional fast + flag. delta methods delegate to the module functions passing the clock's fast + setting; datetime methods produce values in the clock's timezone. + """ + + def __init__(self, tz: TZ = "UTC", *, fast: bool = False): + self.tz = _zone(tz) + self.fast = fast + + def now(self) -> int: + """current unix timestamp (timezone-independent)""" + return now() + + def add(self, ts: int, *, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0) -> int: + """shift a timestamp by signed units, honoring this clock's fast flag""" + return add(ts, days=days, hours=hours, minutes=minutes, seconds=seconds, fast=self.fast) + + def ahead(self, *, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0) -> int: + """unix timestamp the given units in the future""" + return ahead(days=days, hours=hours, minutes=minutes, seconds=seconds, fast=self.fast) + + def ago(self, *, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0) -> int: + """unix timestamp the given units in the past""" + return ago(days=days, hours=hours, minutes=minutes, seconds=seconds, fast=self.fast) + + def is_expired(self, deadline: int) -> bool: + """True if a unix deadline has passed; falsy/None never expires""" + return is_expired(deadline) + + def now_dt(self) -> datetime: + """current time as an aware datetime in this clock's timezone""" + return now_dt(self.tz) + + def to_dt(self, ts: int) -> datetime: + """convert a unix timestamp to an aware datetime in this clock's timezone""" + return to_dt(ts, self.tz) + + def to_unix(self, dt: datetime) -> int: + """convert a datetime to unix; naive inputs are read as this clock's tz""" + return to_unix(dt, self.tz) + + def convert(self, dt: datetime) -> datetime: + """re-express any datetime in this clock's timezone (same instant)""" + return convert(dt, self.tz) + + def fmt(self, ts: int, pattern: str = "%Y-%m-%d %H:%M:%S") -> str: + """format a unix timestamp in this clock's timezone""" + return fmt(ts, self.tz, pattern) + + def date(self, ts: Optional[int] = None, pattern: str = "%m/%d/%Y") -> str: + """format the date of a timestamp (or now) in this clock's timezone""" + return date(ts, self.tz, pattern)