add package: pyproject + src
commons: stdlib-only sync helpers shared across projects. - timing: unix-ts deltas + tz datetime conversions (one _delta_seconds engine), a configurable Clock, fast-mode for tests. - paths: deep_get / deep_set by dotted path (numeric segments index lists). - masking: display masking for cards/cvv/tokens (provider covers VISA/MC/AMEX/UPAY/DISC/JCB/DNRS). - addr: ip/address tooling — pure stdlib ipaddress utils in base, async geo lookups (ipify/geo.ipify/nominatim) behind the [addr] aiohttp extra; geo.ipify api_key is injected, never hardcoded. config-free, emit-only logging, base is zero-deps. src/ layout, hatchling. Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
parent
5997e3f1a1
commit
5ffecaf594
16
pyproject.toml
Normal file
16
pyproject.toml
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["hatchling"]
|
||||||
|
build-backend = "hatchling.build"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "commons"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "small stdlib-only sync helpers: time/timezone deltas, dotted-path dict access, display masking, and ip/address tooling"
|
||||||
|
requires-python = ">=3.10"
|
||||||
|
dependencies = []
|
||||||
|
|
||||||
|
[project.optional-dependencies]
|
||||||
|
addr = ["aiohttp>=3.9"]
|
||||||
|
|
||||||
|
[tool.hatch.build.targets.wheel]
|
||||||
|
packages = ["src/commons"]
|
||||||
61
src/commons/__init__.py
Normal file
61
src/commons/__init__.py
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
"""commons — small sync helpers shared across projects.
|
||||||
|
|
||||||
|
timing: unix-timestamp deltas + timezone-aware datetime conversions, bare
|
||||||
|
functions and a configurable Clock.
|
||||||
|
paths: nested dict/list access by dotted path (deep_get / deep_set).
|
||||||
|
masking: display masking for cards / cvv / tokens (cosmetic, not a security control).
|
||||||
|
addr: ip/address tooling — pure stdlib ip utils (`commons.addr.ip`) in base,
|
||||||
|
async geo lookups (`commons.addr.geo`) behind the `commons[addr]` extra.
|
||||||
|
|
||||||
|
base is stdlib only, no dependencies (the addr geo lookups add aiohttp via the
|
||||||
|
extra). to toggle the timing test mode for bare calls, set it on the module —
|
||||||
|
`from commons import timing; timing.FAST_MODE = True` — or use `Clock(fast=True)`
|
||||||
|
for instance-scoped control. addr is exposed as a submodule (`from commons import
|
||||||
|
addr`); its ip helpers live under `commons.addr.ip` to keep top-level uncluttered.
|
||||||
|
"""
|
||||||
|
from . import addr, masking, paths, timing
|
||||||
|
from .masking import credit, cvv, phantom, provider
|
||||||
|
from .paths import deep_get, deep_set
|
||||||
|
from .timing import (
|
||||||
|
UTC,
|
||||||
|
Clock,
|
||||||
|
add,
|
||||||
|
ago,
|
||||||
|
ahead,
|
||||||
|
convert,
|
||||||
|
date,
|
||||||
|
fmt,
|
||||||
|
is_expired,
|
||||||
|
now,
|
||||||
|
now_dt,
|
||||||
|
to_dt,
|
||||||
|
to_unix,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"timing",
|
||||||
|
"paths",
|
||||||
|
"masking",
|
||||||
|
"addr",
|
||||||
|
"now",
|
||||||
|
"add",
|
||||||
|
"ahead",
|
||||||
|
"ago",
|
||||||
|
"is_expired",
|
||||||
|
"to_dt",
|
||||||
|
"to_unix",
|
||||||
|
"now_dt",
|
||||||
|
"convert",
|
||||||
|
"fmt",
|
||||||
|
"date",
|
||||||
|
"UTC",
|
||||||
|
"Clock",
|
||||||
|
"deep_get",
|
||||||
|
"deep_set",
|
||||||
|
"credit",
|
||||||
|
"cvv",
|
||||||
|
"phantom",
|
||||||
|
"provider",
|
||||||
|
]
|
||||||
|
|
||||||
|
__version__ = "0.1.0"
|
||||||
55
src/commons/addr/__init__.py
Normal file
55
src/commons/addr/__init__.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
"""addr — ip/address tooling for commons.
|
||||||
|
|
||||||
|
two concerns:
|
||||||
|
- `ip` — pure stdlib ipaddress utilities (validation, membership, cidr shape,
|
||||||
|
conversions). ships in the base install, no dependencies.
|
||||||
|
- `geo` — async network lookups (public ip, ip->geo, reverse geocode). needs
|
||||||
|
aiohttp, gated behind the `commons[addr]` extra; importing this package is fine
|
||||||
|
without it, but calling a geo function raises until it is installed.
|
||||||
|
|
||||||
|
the geo.ipify api_key is always injected by the caller — nothing is hardcoded.
|
||||||
|
"""
|
||||||
|
from . import geo, ip
|
||||||
|
from .geo import fetch_ip, fetch_location, ip_location
|
||||||
|
from .ip import (
|
||||||
|
broadcast_address,
|
||||||
|
from_int,
|
||||||
|
host_bits,
|
||||||
|
hosts,
|
||||||
|
in_any,
|
||||||
|
in_network,
|
||||||
|
is_global,
|
||||||
|
is_private,
|
||||||
|
is_valid,
|
||||||
|
netmask,
|
||||||
|
network_address,
|
||||||
|
num_addresses,
|
||||||
|
prefix_bits,
|
||||||
|
set_bits,
|
||||||
|
to_int,
|
||||||
|
version,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"ip",
|
||||||
|
"geo",
|
||||||
|
"is_valid",
|
||||||
|
"version",
|
||||||
|
"to_int",
|
||||||
|
"from_int",
|
||||||
|
"is_private",
|
||||||
|
"is_global",
|
||||||
|
"in_network",
|
||||||
|
"in_any",
|
||||||
|
"network_address",
|
||||||
|
"broadcast_address",
|
||||||
|
"netmask",
|
||||||
|
"prefix_bits",
|
||||||
|
"host_bits",
|
||||||
|
"num_addresses",
|
||||||
|
"set_bits",
|
||||||
|
"hosts",
|
||||||
|
"fetch_ip",
|
||||||
|
"ip_location",
|
||||||
|
"fetch_location",
|
||||||
|
]
|
||||||
133
src/commons/addr/geo.py
Normal file
133
src/commons/addr/geo.py
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
"""async ip/geo network lookups (aiohttp, gated behind the [addr] extra).
|
||||||
|
|
||||||
|
importing this module without aiohttp is fine; only calling a lookup without it
|
||||||
|
raises a clear RuntimeError. each call may reuse a caller-supplied aiohttp
|
||||||
|
ClientSession (`session=`) or create and close one internally.
|
||||||
|
|
||||||
|
url-building and json->dict parsing are pulled into pure helpers (`_*_url`,
|
||||||
|
`_parse_*`) so they unit-test without network. every lookup returns None on any
|
||||||
|
request/parse failure (logged, never raised, never printed).
|
||||||
|
|
||||||
|
security: the only secret is geo.ipify's `api_key`, which is a REQUIRED keyword on
|
||||||
|
`ip_location` — the caller injects it. nothing is hardcoded here.
|
||||||
|
"""
|
||||||
|
import logging
|
||||||
|
from typing import Optional
|
||||||
|
from urllib.parse import urlencode
|
||||||
|
|
||||||
|
log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
try:
|
||||||
|
import aiohttp
|
||||||
|
|
||||||
|
_HAVE_AIOHTTP = True
|
||||||
|
except ImportError: # pragma: no cover
|
||||||
|
aiohttp = None # type: ignore[assignment]
|
||||||
|
_HAVE_AIOHTTP = False
|
||||||
|
|
||||||
|
_MISSING = "aiohttp is required for address lookups; install commons[addr]"
|
||||||
|
|
||||||
|
_IPIFY_URL = "https://api.ipify.org?format=json"
|
||||||
|
_GEO_IPIFY_URL = "https://geo.ipify.org/api/v2/country"
|
||||||
|
_NOMINATIM_URL = "https://nominatim.openstreetmap.org/reverse"
|
||||||
|
|
||||||
|
|
||||||
|
def _ip_location_url(ip: str, api_key: str) -> str:
|
||||||
|
"""build the geo.ipify lookup url for an ip (api_key injected by the caller)"""
|
||||||
|
return f"{_GEO_IPIFY_URL}?{urlencode({'apiKey': api_key, 'ipAddress': ip})}"
|
||||||
|
|
||||||
|
|
||||||
|
def _reverse_url(lat: float, lon: float) -> str:
|
||||||
|
"""build the nominatim reverse-geocode url for a lat/lon"""
|
||||||
|
params = {
|
||||||
|
"format": "json",
|
||||||
|
"lat": lat,
|
||||||
|
"lon": lon,
|
||||||
|
"zoom": 5,
|
||||||
|
"addressdetails": 1,
|
||||||
|
}
|
||||||
|
return f"{_NOMINATIM_URL}?{urlencode(params)}"
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_ipify(data: dict) -> Optional[str]:
|
||||||
|
"""pull the ip string out of an ipify response"""
|
||||||
|
ip = data.get("ip")
|
||||||
|
return ip or None
|
||||||
|
|
||||||
|
|
||||||
|
def _parse_reverse(data: dict) -> Optional[dict]:
|
||||||
|
"""parse a nominatim reverse response into {country: iso2 lower, state: slug|None}"""
|
||||||
|
address = data.get("address")
|
||||||
|
if not isinstance(address, dict):
|
||||||
|
return None
|
||||||
|
country = address.get("country_code")
|
||||||
|
if not country:
|
||||||
|
return None
|
||||||
|
state = address.get("state")
|
||||||
|
return {
|
||||||
|
"country": str(country).lower(),
|
||||||
|
"state": state.lower().replace(" ", "-") if state else None,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def _get_json(
|
||||||
|
url: str, *, session, timeout: float, headers: Optional[dict] = None
|
||||||
|
) -> Optional[dict]:
|
||||||
|
"""GET url and return parsed json, or None on any failure; reuses session if given"""
|
||||||
|
if not _HAVE_AIOHTTP:
|
||||||
|
raise RuntimeError(_MISSING)
|
||||||
|
owns = session is None
|
||||||
|
if owns:
|
||||||
|
session = aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=timeout))
|
||||||
|
try:
|
||||||
|
async with session.get(url, headers=headers) as resp:
|
||||||
|
if resp.status != 200:
|
||||||
|
log.warning("address lookup %s -> %s", url, resp.status)
|
||||||
|
return None
|
||||||
|
return await resp.json()
|
||||||
|
except Exception as exc:
|
||||||
|
log.warning("address lookup failed (%s): %s", url, exc)
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
if owns:
|
||||||
|
await session.close()
|
||||||
|
|
||||||
|
|
||||||
|
async def fetch_ip(*, session=None, timeout: float = 15) -> Optional[str]:
|
||||||
|
"""return the caller's public ip via ipify, or None on failure"""
|
||||||
|
data = await _get_json(_IPIFY_URL, session=session, timeout=timeout)
|
||||||
|
return _parse_ipify(data) if data else None
|
||||||
|
|
||||||
|
|
||||||
|
async def ip_location(
|
||||||
|
ip: str, *, api_key: str, session=None, timeout: float = 15
|
||||||
|
) -> Optional[dict]:
|
||||||
|
"""look up geo data for an ip via geo.ipify; api_key is required and injected
|
||||||
|
|
||||||
|
returns the raw geo.ipify json dict, or None on failure. raises ValueError if
|
||||||
|
no api_key is supplied — there is no default and nothing hardcoded.
|
||||||
|
"""
|
||||||
|
if not api_key:
|
||||||
|
raise ValueError("ip_location requires an api_key (inject it; never hardcode)")
|
||||||
|
url = _ip_location_url(ip, api_key)
|
||||||
|
return await _get_json(url, session=session, timeout=timeout)
|
||||||
|
|
||||||
|
|
||||||
|
async def fetch_location(
|
||||||
|
lat: float,
|
||||||
|
lon: float,
|
||||||
|
*,
|
||||||
|
user_agent: str = "commons-addr/1.0",
|
||||||
|
session=None,
|
||||||
|
timeout: float = 15,
|
||||||
|
) -> Optional[dict]:
|
||||||
|
"""reverse-geocode a lat/lon via nominatim -> {country, state}, or None
|
||||||
|
|
||||||
|
nominatim's terms require a User-Agent; it is a parameter (not hidden) with a
|
||||||
|
sensible default the caller can override.
|
||||||
|
"""
|
||||||
|
url = _reverse_url(lat, lon)
|
||||||
|
data = await _get_json(
|
||||||
|
url, session=session, timeout=timeout, headers={"User-Agent": user_agent}
|
||||||
|
)
|
||||||
|
return _parse_reverse(data) if data else None
|
||||||
123
src/commons/addr/ip.py
Normal file
123
src/commons/addr/ip.py
Normal file
@ -0,0 +1,123 @@
|
|||||||
|
"""pure ip/address utilities over stdlib ipaddress (no network, no deps).
|
||||||
|
|
||||||
|
every function accepts strings. validation/membership helpers
|
||||||
|
(`is_valid`/`version`/`in_network`/`in_any`) never raise on bad input — they return
|
||||||
|
a falsy value. functions that require a valid address or cidr (`to_int`/`set_bits`/
|
||||||
|
the network-shape helpers) let `ValueError` propagate so misuse is visible.
|
||||||
|
|
||||||
|
cidr parsing uses `ip_network(cidr, strict=False)` throughout, so host bits set in
|
||||||
|
the network string are tolerated (e.g. "10.0.0.5/24").
|
||||||
|
"""
|
||||||
|
import ipaddress
|
||||||
|
from typing import List, Optional
|
||||||
|
|
||||||
|
|
||||||
|
def is_valid(addr: str) -> bool:
|
||||||
|
"""whether addr parses as an ipv4 or ipv6 address"""
|
||||||
|
try:
|
||||||
|
ipaddress.ip_address(addr)
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def version(addr: str) -> Optional[int]:
|
||||||
|
"""ip version (4 or 6) for addr, or None if it is not a valid address"""
|
||||||
|
try:
|
||||||
|
return ipaddress.ip_address(addr).version
|
||||||
|
except ValueError:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def to_int(addr: str) -> int:
|
||||||
|
"""integer value of an ip address (raises ValueError on a bad address)"""
|
||||||
|
return int(ipaddress.ip_address(addr))
|
||||||
|
|
||||||
|
|
||||||
|
def from_int(n: int, *, version: int = 4) -> str:
|
||||||
|
"""ip address string for an integer (version 4 or 6)"""
|
||||||
|
if version == 4:
|
||||||
|
return str(ipaddress.IPv4Address(n))
|
||||||
|
if version == 6:
|
||||||
|
return str(ipaddress.IPv6Address(n))
|
||||||
|
raise ValueError("version must be 4 or 6")
|
||||||
|
|
||||||
|
|
||||||
|
def is_private(addr: str) -> bool:
|
||||||
|
"""whether addr is in private/reserved space (raises on a bad address)"""
|
||||||
|
return ipaddress.ip_address(addr).is_private
|
||||||
|
|
||||||
|
|
||||||
|
def is_global(addr: str) -> bool:
|
||||||
|
"""whether addr is a public, globally-routable address (raises on a bad address)"""
|
||||||
|
return ipaddress.ip_address(addr).is_global
|
||||||
|
|
||||||
|
|
||||||
|
def in_network(addr: str, cidr: str) -> bool:
|
||||||
|
"""whether addr falls inside a single cidr; False on any bad input"""
|
||||||
|
try:
|
||||||
|
return ipaddress.ip_address(addr) in ipaddress.ip_network(cidr, strict=False)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def in_any(addr: str, cidrs: List[str]) -> bool:
|
||||||
|
"""whether addr falls inside any of the cidrs (allow/blocklists); False on bad input"""
|
||||||
|
try:
|
||||||
|
ip = ipaddress.ip_address(addr)
|
||||||
|
except ValueError:
|
||||||
|
return False
|
||||||
|
for cidr in cidrs:
|
||||||
|
try:
|
||||||
|
if ip in ipaddress.ip_network(cidr, strict=False):
|
||||||
|
return True
|
||||||
|
except ValueError:
|
||||||
|
continue
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def network_address(cidr: str) -> str:
|
||||||
|
"""network address of a cidr (e.g. 10.0.0.0 for 10.0.0.5/24)"""
|
||||||
|
return str(ipaddress.ip_network(cidr, strict=False).network_address)
|
||||||
|
|
||||||
|
|
||||||
|
def broadcast_address(cidr: str) -> str:
|
||||||
|
"""broadcast address of a cidr"""
|
||||||
|
return str(ipaddress.ip_network(cidr, strict=False).broadcast_address)
|
||||||
|
|
||||||
|
|
||||||
|
def netmask(cidr: str) -> str:
|
||||||
|
"""netmask of a cidr (e.g. 255.255.255.0)"""
|
||||||
|
return str(ipaddress.ip_network(cidr, strict=False).netmask)
|
||||||
|
|
||||||
|
|
||||||
|
def prefix_bits(cidr: str) -> int:
|
||||||
|
"""prefix length of a cidr (the /N)"""
|
||||||
|
return ipaddress.ip_network(cidr, strict=False).prefixlen
|
||||||
|
|
||||||
|
|
||||||
|
def host_bits(cidr: str) -> int:
|
||||||
|
"""number of host bits in a cidr (max prefix minus prefix length)"""
|
||||||
|
net = ipaddress.ip_network(cidr, strict=False)
|
||||||
|
return net.max_prefixlen - net.prefixlen
|
||||||
|
|
||||||
|
|
||||||
|
def num_addresses(cidr: str) -> int:
|
||||||
|
"""total addresses in a cidr (network + hosts + broadcast)"""
|
||||||
|
return ipaddress.ip_network(cidr, strict=False).num_addresses
|
||||||
|
|
||||||
|
|
||||||
|
def set_bits(addr: str) -> int:
|
||||||
|
"""population count of an address's integer value (raises on a bad address)"""
|
||||||
|
return bin(int(ipaddress.ip_address(addr))).count("1")
|
||||||
|
|
||||||
|
|
||||||
|
def hosts(cidr: str, *, limit: Optional[int] = None) -> List[str]:
|
||||||
|
"""usable host addresses of a cidr as strings; limit caps how many are materialized"""
|
||||||
|
gen = ipaddress.ip_network(cidr, strict=False).hosts()
|
||||||
|
out: List[str] = []
|
||||||
|
for host in gen:
|
||||||
|
out.append(str(host))
|
||||||
|
if limit is not None and len(out) >= limit:
|
||||||
|
break
|
||||||
|
return out
|
||||||
53
src/commons/masking.py
Normal file
53
src/commons/masking.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
"""display masking for sensitive-looking values.
|
||||||
|
|
||||||
|
these are DISPLAY helpers only — they format a value for showing in a UI or log
|
||||||
|
(e.g. "•••• •••• •••• 1234"). they are not a security control: the underlying
|
||||||
|
value is unchanged and still needs proper handling (encryption at rest, etc.).
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def _digits(value: str) -> str:
|
||||||
|
"""keep only the digit characters of a string"""
|
||||||
|
return "".join(c for c in value if c.isdigit())
|
||||||
|
|
||||||
|
|
||||||
|
def credit(card_number: str) -> str:
|
||||||
|
"""mask a card number to bullets plus the last four digits"""
|
||||||
|
last4 = _digits(card_number)[-4:]
|
||||||
|
return "•••• •••• •••• " + last4
|
||||||
|
|
||||||
|
|
||||||
|
def cvv(value: str) -> str:
|
||||||
|
"""mask a cvv to bullets of the same length"""
|
||||||
|
return "•" * len(value)
|
||||||
|
|
||||||
|
|
||||||
|
def phantom(value: str) -> str:
|
||||||
|
"""show a long token as first-six...last-four (e.g. a hash or id)"""
|
||||||
|
return f"{value[:6]}...{value[-4:]}"
|
||||||
|
|
||||||
|
|
||||||
|
def provider(card_number: str) -> str:
|
||||||
|
"""short card brand from the number's prefix, or UNKW if undetermined
|
||||||
|
|
||||||
|
a BIN-prefix heuristic for display/labeling — not authoritative validation.
|
||||||
|
tolerates spaces/dashes and short or non-numeric input (returns UNKW).
|
||||||
|
"""
|
||||||
|
n = _digits(card_number)
|
||||||
|
if not n:
|
||||||
|
return "UNKW"
|
||||||
|
if n.startswith("4"):
|
||||||
|
return "VISA"
|
||||||
|
if n[:2] in {"51", "52", "53", "54", "55"} or (len(n) >= 4 and 2221 <= int(n[:4]) <= 2720):
|
||||||
|
return "MC"
|
||||||
|
if n[:2] in {"34", "37"}:
|
||||||
|
return "AMEX"
|
||||||
|
if n.startswith("62") or n.startswith("81"):
|
||||||
|
return "UPAY"
|
||||||
|
if n.startswith("6011") or n.startswith("65") or (len(n) >= 3 and 644 <= int(n[:3]) <= 649):
|
||||||
|
return "DISC"
|
||||||
|
if n.startswith("35"):
|
||||||
|
return "JCB"
|
||||||
|
if n.startswith(("30", "36", "38", "39")):
|
||||||
|
return "DNRS"
|
||||||
|
return "UNKW"
|
||||||
50
src/commons/paths.py
Normal file
50
src/commons/paths.py
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
"""nested access by dotted path.
|
||||||
|
|
||||||
|
`deep_get(data, "in.this.old.notation")` walks dicts (and lists, when a segment is
|
||||||
|
a number) and returns a default instead of raising on a missing/!wrong path.
|
||||||
|
`deep_set` writes a nested value, creating intermediate dicts.
|
||||||
|
"""
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
_MISSING = object()
|
||||||
|
|
||||||
|
|
||||||
|
def deep_get(data: Any, path: str, default: Any = None, *, sep: str = ".") -> Any:
|
||||||
|
"""get a nested value by dotted path, returning default if absent
|
||||||
|
|
||||||
|
dict keys are matched by name; a numeric segment indexes a list/tuple
|
||||||
|
(e.g. "items.0.id"). any missing key, out-of-range index, or non-container
|
||||||
|
along the way yields `default` rather than raising.
|
||||||
|
"""
|
||||||
|
cur = data
|
||||||
|
for seg in path.split(sep):
|
||||||
|
if isinstance(cur, dict):
|
||||||
|
cur = cur.get(seg, _MISSING)
|
||||||
|
if cur is _MISSING:
|
||||||
|
return default
|
||||||
|
elif isinstance(cur, (list, tuple)):
|
||||||
|
try:
|
||||||
|
cur = cur[int(seg)]
|
||||||
|
except (ValueError, IndexError):
|
||||||
|
return default
|
||||||
|
else:
|
||||||
|
return default
|
||||||
|
return cur
|
||||||
|
|
||||||
|
|
||||||
|
def deep_set(data: dict, path: str, value: Any, *, sep: str = ".") -> dict:
|
||||||
|
"""set a nested value by dotted path, creating intermediate dicts
|
||||||
|
|
||||||
|
returns the same top-level dict for chaining. overwrites any non-dict value
|
||||||
|
sitting where an intermediate dict is needed.
|
||||||
|
"""
|
||||||
|
segments = path.split(sep)
|
||||||
|
cur = data
|
||||||
|
for seg in segments[:-1]:
|
||||||
|
nxt = cur.get(seg)
|
||||||
|
if not isinstance(nxt, dict):
|
||||||
|
nxt = {}
|
||||||
|
cur[seg] = nxt
|
||||||
|
cur = nxt
|
||||||
|
cur[segments[-1]] = value
|
||||||
|
return data
|
||||||
169
src/commons/timing.py
Normal file
169
src/commons/timing.py
Normal file
@ -0,0 +1,169 @@
|
|||||||
|
"""time helpers built on unix timestamps with timezone-aware datetime support.
|
||||||
|
|
||||||
|
one engine, two ergonomics:
|
||||||
|
- bare module functions (now/add/ahead/ago/is_expired/to_dt/...) operate on unix
|
||||||
|
ints for quick, stateless use. all delta math routes through `_delta_seconds`.
|
||||||
|
- `Clock` holds a timezone + fast flag and delegates to the same functions, so a
|
||||||
|
configured clock and the bare functions never diverge.
|
||||||
|
|
||||||
|
unix ints stay the storable value; datetimes are produced on demand in whatever
|
||||||
|
timezone you ask for (stdlib zoneinfo, no pytz).
|
||||||
|
"""
|
||||||
|
import time as _time
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
from typing import Optional, Union
|
||||||
|
from zoneinfo import ZoneInfo
|
||||||
|
|
||||||
|
# default fast flag for bare module calls. when True, every unit (day/hour/minute)
|
||||||
|
# counts as one second so time-based flows run fast in tests. set
|
||||||
|
# `commons.timing.FAST_MODE` from test setup; off by default. not a config import.
|
||||||
|
FAST_MODE = False
|
||||||
|
|
||||||
|
UTC = timezone.utc
|
||||||
|
|
||||||
|
# accepted timezone inputs: a tzinfo, an IANA name string, or None (-> UTC)
|
||||||
|
TZ = Union[str, timezone, ZoneInfo, None]
|
||||||
|
|
||||||
|
|
||||||
|
def _zone(tz: TZ):
|
||||||
|
"""resolve a tz input (name / tzinfo / None) to a tzinfo, defaulting to UTC"""
|
||||||
|
if tz is None:
|
||||||
|
return UTC
|
||||||
|
if isinstance(tz, str):
|
||||||
|
return ZoneInfo(tz)
|
||||||
|
return tz
|
||||||
|
|
||||||
|
|
||||||
|
def now() -> int:
|
||||||
|
"""current unix timestamp as an int"""
|
||||||
|
return int(_time.time())
|
||||||
|
|
||||||
|
|
||||||
|
def _delta_seconds(days: int, hours: int, minutes: int, seconds: int, fast: bool) -> int:
|
||||||
|
"""seconds for a unit combination; collapses to 1s/unit when fast is True
|
||||||
|
|
||||||
|
single source of truth for unit math — every delta helper routes through here.
|
||||||
|
"""
|
||||||
|
if fast:
|
||||||
|
return days + hours + minutes + seconds
|
||||||
|
return days * 86400 + hours * 3600 + minutes * 60 + seconds
|
||||||
|
|
||||||
|
|
||||||
|
def add(
|
||||||
|
ts: int,
|
||||||
|
*,
|
||||||
|
days: int = 0,
|
||||||
|
hours: int = 0,
|
||||||
|
minutes: int = 0,
|
||||||
|
seconds: int = 0,
|
||||||
|
fast: Optional[bool] = None,
|
||||||
|
) -> int:
|
||||||
|
"""shift a unix timestamp by signed units (negatives subtract)"""
|
||||||
|
fast = FAST_MODE if fast is None else fast
|
||||||
|
return int(ts + _delta_seconds(days, hours, minutes, seconds, fast))
|
||||||
|
|
||||||
|
|
||||||
|
def ahead(*, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0, fast: Optional[bool] = None) -> int:
|
||||||
|
"""unix timestamp the given units in the future (now + delta)"""
|
||||||
|
return add(now(), days=days, hours=hours, minutes=minutes, seconds=seconds, fast=fast)
|
||||||
|
|
||||||
|
|
||||||
|
def ago(*, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0, fast: Optional[bool] = None) -> int:
|
||||||
|
"""unix timestamp the given units in the past (now - delta)"""
|
||||||
|
return add(now(), days=-days, hours=-hours, minutes=-minutes, seconds=-seconds, fast=fast)
|
||||||
|
|
||||||
|
|
||||||
|
def is_expired(deadline: int) -> bool:
|
||||||
|
"""True if a unix deadline has passed; a falsy/None deadline never expires"""
|
||||||
|
if not deadline:
|
||||||
|
return False
|
||||||
|
return now() > deadline
|
||||||
|
|
||||||
|
|
||||||
|
def to_dt(ts: int, tz: TZ = None) -> datetime:
|
||||||
|
"""convert a unix timestamp to an aware datetime in the given tz (default UTC)"""
|
||||||
|
return datetime.fromtimestamp(ts, _zone(tz))
|
||||||
|
|
||||||
|
|
||||||
|
def to_unix(dt: datetime, tz: TZ = None) -> int:
|
||||||
|
"""convert a datetime to a unix timestamp; naive inputs are read as tz/UTC"""
|
||||||
|
if dt.tzinfo is None:
|
||||||
|
dt = dt.replace(tzinfo=_zone(tz))
|
||||||
|
return int(dt.timestamp())
|
||||||
|
|
||||||
|
|
||||||
|
def now_dt(tz: TZ = None) -> datetime:
|
||||||
|
"""current time as an aware datetime in the given tz (default UTC)"""
|
||||||
|
return datetime.now(_zone(tz))
|
||||||
|
|
||||||
|
|
||||||
|
def convert(dt: datetime, tz: TZ) -> datetime:
|
||||||
|
"""re-express a datetime in another timezone (same instant, new wall clock)"""
|
||||||
|
return dt.astimezone(_zone(tz))
|
||||||
|
|
||||||
|
|
||||||
|
def fmt(ts: int, tz: TZ = None, pattern: str = "%Y-%m-%d %H:%M:%S") -> str:
|
||||||
|
"""format a unix timestamp in the given tz with an strftime pattern"""
|
||||||
|
return to_dt(ts, tz).strftime(pattern)
|
||||||
|
|
||||||
|
|
||||||
|
def date(ts: Optional[int] = None, tz: TZ = None, pattern: str = "%m/%d/%Y") -> str:
|
||||||
|
"""format the date portion of a timestamp (or now) in the given tz"""
|
||||||
|
return to_dt(now() if ts is None else ts, tz).strftime(pattern)
|
||||||
|
|
||||||
|
|
||||||
|
class Clock:
|
||||||
|
"""a timezone + fast-mode aware view over the same timing engine
|
||||||
|
|
||||||
|
construct with any IANA tz name (or tzinfo; default UTC) and an optional fast
|
||||||
|
flag. delta methods delegate to the module functions passing the clock's fast
|
||||||
|
setting; datetime methods produce values in the clock's timezone.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, tz: TZ = "UTC", *, fast: bool = False):
|
||||||
|
self.tz = _zone(tz)
|
||||||
|
self.fast = fast
|
||||||
|
|
||||||
|
def now(self) -> int:
|
||||||
|
"""current unix timestamp (timezone-independent)"""
|
||||||
|
return now()
|
||||||
|
|
||||||
|
def add(self, ts: int, *, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0) -> int:
|
||||||
|
"""shift a timestamp by signed units, honoring this clock's fast flag"""
|
||||||
|
return add(ts, days=days, hours=hours, minutes=minutes, seconds=seconds, fast=self.fast)
|
||||||
|
|
||||||
|
def ahead(self, *, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0) -> int:
|
||||||
|
"""unix timestamp the given units in the future"""
|
||||||
|
return ahead(days=days, hours=hours, minutes=minutes, seconds=seconds, fast=self.fast)
|
||||||
|
|
||||||
|
def ago(self, *, days: int = 0, hours: int = 0, minutes: int = 0, seconds: int = 0) -> int:
|
||||||
|
"""unix timestamp the given units in the past"""
|
||||||
|
return ago(days=days, hours=hours, minutes=minutes, seconds=seconds, fast=self.fast)
|
||||||
|
|
||||||
|
def is_expired(self, deadline: int) -> bool:
|
||||||
|
"""True if a unix deadline has passed; falsy/None never expires"""
|
||||||
|
return is_expired(deadline)
|
||||||
|
|
||||||
|
def now_dt(self) -> datetime:
|
||||||
|
"""current time as an aware datetime in this clock's timezone"""
|
||||||
|
return now_dt(self.tz)
|
||||||
|
|
||||||
|
def to_dt(self, ts: int) -> datetime:
|
||||||
|
"""convert a unix timestamp to an aware datetime in this clock's timezone"""
|
||||||
|
return to_dt(ts, self.tz)
|
||||||
|
|
||||||
|
def to_unix(self, dt: datetime) -> int:
|
||||||
|
"""convert a datetime to unix; naive inputs are read as this clock's tz"""
|
||||||
|
return to_unix(dt, self.tz)
|
||||||
|
|
||||||
|
def convert(self, dt: datetime) -> datetime:
|
||||||
|
"""re-express any datetime in this clock's timezone (same instant)"""
|
||||||
|
return convert(dt, self.tz)
|
||||||
|
|
||||||
|
def fmt(self, ts: int, pattern: str = "%Y-%m-%d %H:%M:%S") -> str:
|
||||||
|
"""format a unix timestamp in this clock's timezone"""
|
||||||
|
return fmt(ts, self.tz, pattern)
|
||||||
|
|
||||||
|
def date(self, ts: Optional[int] = None, pattern: str = "%m/%d/%Y") -> str:
|
||||||
|
"""format the date of a timestamp (or now) in this clock's timezone"""
|
||||||
|
return date(ts, self.tz, pattern)
|
||||||
Loading…
Reference in New Issue
Block a user