Compare commits

..

No commits in common. "main" and "v0.1.0" have entirely different histories.
main ... v0.1.0

7 changed files with 55 additions and 498 deletions

2
.gitignore vendored
View File

@ -1,5 +1,5 @@
# claude # claude
.claude/ CLAUDE.md
# python # python
__pycache__/ __pycache__/

136
README.md
View File

@ -13,13 +13,11 @@ and emit; their records flow into the handlers `log_setup` wired.
## Install ## Install
``` ```
log_setup @ git+ssh://git@git.rethinkstudios.io/rethink-public/log_setup.git@v0.4.1 log_setup @ git+ssh://git@git.rethinkstudios.io/rethink-public/log_setup.git@v0.1.0
``` ```
No dependencies — stdlib only. No dependencies — stdlib only.
Drop the `@v0.4.1` suffix from the line above to install the latest unpinned.
## Quick start ## Quick start
```python ```python
@ -50,143 +48,28 @@ emits; the records land in the configured root.
(`run.<timestamp>.log[.gz]`) and starts fresh; prunes to `backup_count`. (`run.<timestamp>.log[.gz]`) and starts fresh; prunes to `backup_count`.
- `None` — single file, no rotation. - `None` — single file, no rotation.
- **compress=True** (default) gzips each rolled file (`run.log.2026-06-27.gz`). - **compress=True** (default) gzips each rolled file (`run.log.2026-06-27.gz`).
- **Retention** = `backup_count` (default 14) for every mode — unless tiered retention is - **Retention** = `backup_count` (default 14) for every mode.
enabled (below).
- **console=True** (off by default) also logs to stdout in the same format — opt in when - **console=True** (off by default) also logs to stdout in the same format — opt in when
you want live terminal output alongside the file. you want live terminal output alongside the file.
The `name` you pass is normalized so it produces exactly one `.log`: `name="latest"` and
`name="latest.log"` both yield the live file `latest.log` (never `latest.log.log`).
## Tiered retention (`keep_uncompressed` / `keep_compressed`)
The default is a flat `backup_count`: every rolled file is gzipped on roll and the oldest
are deleted past the count. If instead you want the recent logs **uncompressed** (read them
without `zcat`) and older ones **gzipped**, pass the two tier knobs:
```python
setup_logging(
name="latest",
rotate="on_start", # works for on_start, daily, and size
keep_uncompressed=3, # newest 3 rolled logs kept PLAIN
keep_compressed=7, # next 7 kept GZIPPED; total retained = 10
)
```
Result in `log_dir` (newest → oldest):
```
latest.log <- live "latest" (stable, tail -f)
latest.<t1>.log latest.<t2>.log latest.<t3>.log <- 3 newest: plain
latest.<t4>.log.gz ... latest.<t10>.log.gz <- next 7: gzipped
(anything past 10 deleted)
```
- Each restart (`on_start`) or roll (`daily`/`size`) moves the live file into `log_dir`,
then re-tiers: newest `keep_uncompressed` stay plain, the next `keep_compressed` are
gzipped in place, the rest deleted. Total kept = `keep_uncompressed + keep_compressed`.
- **Opt-in by presence** — pass either knob to enable tiering. Pass **neither** and
rotation behaves exactly as before (`backup_count` + gzip-on-roll), so existing callers
are unaffected.
- In tiered mode `backup_count` and the gzip-on-roll behavior of `compress` are **ignored**
— the tier counts bound retention instead.
- `keep_uncompressed=0` → everything gzipped; `keep_compressed=0` → only the plain tier.
Retention is count-based (not time-based).
## Output format (`output=`)
Two formats, two needs. Default is `"text"`; the live-file name is the same either way
(`run.log`, never auto-renamed), so a service can switch text↔json without breaking the
Promtail glob, bind-mount path, or your `tail` command.
- **`output="text"`** (default) — human-readable
`2026-06-27 19:55:05 | module.name | INFO | message`, **local time**. The
single-machine `tail -f` path. `fmt`/`datefmt` override it. Unchanged from v0.1.x.
- **`output="json"`** — structured **one JSON object per line** (JSON Lines) for the
Grafana/Loki pipeline (Promtail → Loki → Grafana); Loki parses JSON fields into labels
natively, no regex.
```python
setup_logging(name="run", output="json")
logging.getLogger("bot.core").info("ready", extra={"monitor": "heartbeat"})
# -> {"time": "2026-06-28T14:03:11Z", "level": "INFO", "module": "bot.core",
# "message": "ready", "monitor": "heartbeat"}
```
- **Fields:** `time`, `level`, `module`, `message` always; any `extra={...}` keys land
as **top-level** fields (stamp `monitor`/`service`/request-id for Loki labels — the lib
stays domain-agnostic); error records carry the traceback in `exc_info` (never dropped).
- **Time is UTC ISO-8601 with a `Z`** (`2026-06-28T14:03:11Z`), not local. json is the
aggregation path — logs from many servers/containers sort unambiguously only in UTC;
Grafana converts to local for display. (Text mode stays local — that's a human on one
box.)
- Both file and console use the chosen format. `fmt`/`datefmt` apply to text only (json
builds fields, not a format string). An unknown `output` falls back to text + warns,
never crashes. **Zero new deps** — stdlib `json` only.
## Signature ## Signature
```python ```python
setup_logging( setup_logging(
name="run", # base -> run.log (the live file at cwd) name="run", # base -> run.log (the live file at cwd)
log_dir="logs", # rotated/compressed copies live here (created if absent) log_dir="logs", # rotated/compressed copies live here (created if absent)
level="INFO", # root level everything inherits (str name or logging constant) level="INFO", # root level (str name or logging constant)
module_levels=None, # {logger_name: level} per-logger overrides (exact name match)
rotate="daily", # "daily" | "size" | "on_start" | None rotate="daily", # "daily" | "size" | "on_start" | None
backup_count=14, # rotated files to keep (flat retention; ignored if tiered) backup_count=14, # rotated files to keep (older auto-deleted)
keep_uncompressed=None, # tiered: newest N rolled logs kept PLAIN (opt-in)
keep_compressed=None, # tiered: next M rolled logs kept GZIPPED (opt-in)
max_bytes=10_000_000, # only for rotate="size" max_bytes=10_000_000, # only for rotate="size"
compress=True, # gzip rolled files compress=True, # gzip rolled files
console=False, # also log to stdout (off by default; opt in) console=False, # also log to stdout (off by default; opt in)
queue=False, # route through a background QueueListener (async-friendly) queue=False, # route through a background QueueListener (async-friendly)
output="text", # "text" (human, local time) | "json" (structured, UTC) fmt=None, # override the format string
fmt=None, # override the text format string (text mode only) datefmt=None, # override the date format
datefmt=None, # override the text date format (text mode only)
) -> logging.Logger # returns the configured root logger ) -> logging.Logger # returns the configured root logger
``` ```
## Quieting noisy dependencies (`module_levels`)
`level` is the **root default** — every logger inherits it. `module_levels` is an
optional `{logger_name: level}` map of **per-logger overrides** applied at setup, the
standard "turn down the chatty dependency while my own code stays at INFO" case:
```python
setup_logging(
name="run",
level="INFO", # our code logs at INFO
module_levels={
"motor": "WARNING", # quiet the driver
"pymongo": "WARNING",
"aiohttp": "WARNING", # also quiets aiohttp.client / aiohttp.access (hierarchy)
},
)
```
- **Exact-name match — names are NOT discovered.** It calls
`logging.getLogger(name).setLevel(level)` for exactly the name you give. There's no
smart find of noisy modules; you name the loggers. A typo (`"moter"`) silently
configures a logger nothing uses — no error, no effect. Get the names right.
- **Hierarchy applies** (the one "smart" part, and it's just stdlib): naming a **parent**
quiets its whole subtree. `"aiohttp"` also quiets `aiohttp.client`, `aiohttp.access`,
etc. — the way to catch sub-loggers without listing each.
- **str or int** per entry (`"WARNING"` or `logging.WARNING`) — same normalization as the
root `level`.
- **Never crashes:** a bad level for one entry is **skipped with a warning**; the other
entries and the rest of setup still apply. Consistent with the never-crash-over-logging
rule.
- `None`/`{}` (default) → no overrides; existing callers are unaffected.
Common noisy library logger names: `motor`, `pymongo`, `aiohttp` (parent quiets
`aiohttp.client`/`aiohttp.access`), `discord` / `discord.*`, `asyncio`, `urllib3`. Check
a lib's *actual* logger name — some log under a name different from their package.
This already works without the lib (`logging.getLogger("motor").setLevel(WARNING)` after
setup does the same via stdlib hierarchy). The param's value is ergonomic: it keeps the
overrides in the **one** `setup_logging` call at the entry point instead of scattering
`setLevel` calls afterward — which is the whole point of `log_setup`.
## Async-friendly (`queue=True`) ## Async-friendly (`queue=True`)
For async-heavy apps, `queue=True` routes records through a stdlib `QueueHandler` to a For async-heavy apps, `queue=True` routes records through a stdlib `QueueHandler` to a
@ -214,10 +97,9 @@ handlers. Getting files to a backend is a separate concern (e.g. Promtail tails
backend can change without touching any app, and the consistent format here is what backend can change without touching any app, and the consistent format here is what
makes downstream parsing and alerting easy. makes downstream parsing and alerting easy.
Structured/JSON output is **in** as of v0.2.0 (`output="json"`) — text and json only. Also out of v0.1.0 (possible later additions): structured/JSON logging, color
Still deliberately out: logfmt or other formats, a format DSL, per-handler formats, formatting, per-logger filters, remote handlers.
color formatting, per-logger filters, remote handlers.
## Versioning ## Versioning
Releases are tagged `vX.Y.Z`. The install line above pins a release; drop the `@vX.Y.Z` suffix to install the latest unpinned. Pin deliberately for reproducible installs. Tagged `vX.Y.Z`. Pin the tag.

View File

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "log_setup" name = "log_setup"
version = "0.4.1" version = "0.1.0"
description = "stdlib app-entry-point logging setup: live run.log, rotation, gzip, retention, consistent format" description = "stdlib app-entry-point logging setup: live run.log, rotation, gzip, retention, consistent format"
requires-python = ">=3.10" requires-python = ">=3.10"
dependencies = [] dependencies = []

View File

@ -19,4 +19,4 @@ from .setup import setup_logging
__all__ = ["setup_logging"] __all__ = ["setup_logging"]
__version__ = "0.4.1" __version__ = "0.1.0"

View File

@ -1,76 +1,15 @@
"""log formats for the app-wide setup: human-readable text + structured JSON lines. """default log format + datefmt for the app-wide setup.
two output formats, two proven needs. `text` (default) is the human `tail -f` format one format for v0.1.0, used on both console and file. `%(name)s` is the getLogger
(`time | module | level | message`, local time). `json` is the Grafana/Loki path name the emitting module used, so each library/module shows in the line.
one JSON object per line (JSON Lines), fields parsed into labels natively, UTC
timestamps so logs aggregated from many machines/containers sort unambiguously.
`%(name)s` is the getLogger name the emitting module used, so each module shows.
""" """
import datetime
import json
import logging import logging
DEFAULT_FORMAT = "%(asctime)s | %(name)s | %(levelname)s | %(message)s" DEFAULT_FORMAT = "%(asctime)s | %(name)s | %(levelname)s | %(message)s"
DEFAULT_DATEFMT = "%Y-%m-%d %H:%M:%S" DEFAULT_DATEFMT = "%Y-%m-%d %H:%M:%S"
_RESERVED = frozenset(vars(logging.makeLogRecord({})).keys()) | {"message", "asctime"}
# this formatter's own canonical output keys — stdlib's LogRecord rejects `extra` keys
# colliding with real attribute names (e.g. `module`), but `time`/`level` are NOT
# LogRecord attrs, so a caller's extra={"time":...}/{"level":...} would otherwise
# overwrite the UTC timestamp / levelname. guard them explicitly
_OUTPUT_KEYS = frozenset({"time", "level", "module", "message"})
def build_formatter(fmt=None, datefmt=None) -> logging.Formatter:
class JsonLinesFormatter(logging.Formatter): """build a logging.Formatter from overrides, falling back to the defaults"""
"""format each record as a single-line JSON object (JSON Lines / .jsonl)
emits at minimum time/level/module/message. time is UTC ISO-8601 with a `Z`
suffix (e.g. 2026-06-28T14:03:11Z) so logs aggregated across machines and
containers sort unambiguously Grafana converts to local for display. any
field passed via logging `extra={...}` lands as a top-level JSON field, which
is how a caller stamps monitor/service/request-id for Loki labels without the
lib knowing those domain concepts. a traceback (exc_info) is rendered into an
`exc_info` string field rather than dropped.
"""
def format(self, record: logging.LogRecord) -> str:
when = datetime.datetime.fromtimestamp(record.created, datetime.timezone.utc)
payload = {
"time": when.strftime("%Y-%m-%dT%H:%M:%SZ"),
"level": record.levelname,
"module": record.name,
"message": record.getMessage(),
}
for key, value in record.__dict__.items():
if key not in _RESERVED and key not in _OUTPUT_KEYS and not key.startswith("_"):
payload[key] = value
if record.exc_info:
# cache the rendered traceback on the record (as stdlib Formatter does) so a
# second handler/format() of the same record doesn't re-render it
if not record.exc_text:
record.exc_text = self.formatException(record.exc_info)
payload["exc_info"] = record.exc_text
elif record.exc_text:
payload["exc_info"] = record.exc_text
if record.stack_info:
payload["stack_info"] = self.formatStack(record.stack_info)
return json.dumps(payload, default=str)
def build_formatter(output: str = "text", fmt=None, datefmt=None) -> logging.Formatter:
"""build the formatter for the chosen output format
`output="text"` (default) returns the human-readable text formatter, honoring
the raw `fmt`/`datefmt` format-string overrides. `output="json"` returns the
structured `JsonLinesFormatter` (which ignores `fmt`/`datefmt` it builds
fields, not a format string). an unrecognized `output` falls back to text and
warns, never raising a bad format arg must not take the app down.
"""
if output == "json":
return JsonLinesFormatter()
if output != "text":
logging.getLogger(__name__).warning(
"log_setup: unknown output %r; falling back to 'text'", output
)
return logging.Formatter(fmt or DEFAULT_FORMAT, datefmt or DEFAULT_DATEFMT) return logging.Formatter(fmt or DEFAULT_FORMAT, datefmt or DEFAULT_DATEFMT)

View File

@ -10,43 +10,7 @@ import gzip
import os import os
import shutil import shutil
import time import time
from typing import Callable, Optional, Tuple from typing import Callable, Tuple
def _move(source: str, dest: str) -> None:
"""rename source to dest, falling back to copy+unlink across filesystems
os.replace is atomic but raises OSError(EXDEV) when source and dest are on
different filesystems exactly the container bind-mount / separate-logs-volume
case this lib targets. fall back to shutil.move (copy+unlink) so the roll still
lands instead of failing every rotation via the handler's silent handleError.
precondition: `dest` is a free, non-directory path (all call sites generate a unique
timestamped/dated dest). os.replace and shutil.move differ on a dest that already
exists as a directory, so this helper is not safe for arbitrary dests only the
rotation paths that guarantee a fresh file dest.
"""
try:
os.replace(source, dest)
except OSError:
shutil.move(source, dest)
def _gzip_file(source: str, dest: str) -> None:
"""gzip source into dest then remove source (the rolled-file compression idiom)
the source mtime is carried onto dest so a file keeps its position when it crosses
the plain->gz tier boundary retier ranks by mtime, and a fresh write would
otherwise make a just-compressed file look like the newest one and reshuffle tiers.
"""
mtime = _safe_mtime(source)
with open(source, "rb") as src, gzip.open(dest, "wb") as dst:
shutil.copyfileobj(src, dst)
os.remove(source)
try:
os.utime(dest, (mtime, mtime))
except OSError:
pass
def make_namer(log_dir: str, compress: bool) -> Callable[[str], str]: def make_namer(log_dir: str, compress: bool) -> Callable[[str], str]:
@ -62,131 +26,38 @@ def make_namer(log_dir: str, compress: bool) -> Callable[[str], str]:
return namer return namer
def make_rotator( def make_rotator(compress: bool) -> Callable[[str, str], None]:
compress: bool, log_dir: Optional[str] = None, """rotator: move (or gzip) the source live file to the destination rolled path"""
prune_stem: Optional[str] = None, backup_count: int = 0,
keep_uncompressed: Optional[int] = None, keep_compressed: Optional[int] = None,
) -> Callable[[str, str], None]:
"""rotator: move (or gzip) the source live file to the destination rolled path
legacy mode (default): gzip on roll when `compress`, then prune `log_dir` to
`backup_count` newest rolled files. the stdlib handler's own retention
(`getFilesToDelete`) only scans the live file's directory, so it never sees the
rolled files we redirect into `log_dir` pruning here is what bounds retention for
the daily and size rolling modes.
tiered mode (when `keep_uncompressed`/`keep_compressed` are given): land the rolled
file PLAIN and re-tier `log_dir` newest `keep_uncompressed` stay uncompressed, the
next `keep_compressed` are gzipped, the rest deleted. `compress`/`backup_count` are
ignored in this mode (the tier counts bound retention instead).
"""
tiered = keep_uncompressed is not None or keep_compressed is not None
def rotator(source: str, dest: str) -> None: def rotator(source: str, dest: str) -> None:
if not os.path.exists(source): if not os.path.exists(source):
return return
if tiered:
# dest carries the namer's .gz suffix in compress mode; strip it so the
# freshly-rolled file lands plain and retier decides its tier
plain_dest = dest[:-3] if dest.endswith(".gz") else dest
_move(source, plain_dest)
if log_dir is not None and prune_stem is not None:
retier(log_dir, prune_stem, keep_uncompressed or 0, keep_compressed or 0)
return
if compress: if compress:
_gzip_file(source, dest) with open(source, "rb") as src, gzip.open(dest, "wb") as dst:
shutil.copyfileobj(src, dst)
os.remove(source)
else: else:
_move(source, dest) os.replace(source, dest)
if log_dir is not None and prune_stem is not None:
prune(log_dir, prune_stem, backup_count)
return rotator return rotator
def rotate_on_start( def rotate_on_start(live_path: str, log_dir: str, compress: bool, clock=time.localtime) -> None:
live_path: str, log_dir: str, compress: bool, clock=time.localtime,
keep_uncompressed: Optional[int] = None, keep_compressed: Optional[int] = None,
) -> None:
"""move an existing live file into log_dir with a timestamp, gzipped if asked """move an existing live file into log_dir with a timestamp, gzipped if asked
no-op if the live file doesn't exist. used by rotate="on_start" before the fresh no-op if the live file doesn't exist. used by rotate="on_start" before the fresh
handler opens a new live file. the timestamp form is run.<%Y-%m-%d_%H-%M-%S>.log. handler opens a new live file. the timestamp form is run.<%Y-%m-%d_%H-%M-%S>.log.
tiered mode (when `keep_uncompressed`/`keep_compressed` are given): the rolled file
always lands PLAIN (so it can occupy the newest uncompressed tier) and `retier`
decides compression/deletion across the whole stem `compress` is ignored for the
just-rolled file.
""" """
if not os.path.exists(live_path): if not os.path.exists(live_path):
return return
tiered = keep_uncompressed is not None or keep_compressed is not None
stem = os.path.splitext(os.path.basename(live_path))[0] stem = os.path.splitext(os.path.basename(live_path))[0]
stamp = time.strftime("%Y-%m-%d_%H-%M-%S", clock()) stamp = time.strftime("%Y-%m-%d_%H-%M-%S", clock())
suffix = ".log.gz" if (compress and not tiered) else ".log" dest = os.path.join(log_dir, f"{stem}.{stamp}.log")
# the stamp is 1-second resolution; two starts in the same second would collide if compress:
# and the second clobber the first. disambiguate with a numeric counter so a rapid dest += ".gz"
# crash-restart loop doesn't lose the earlier rolled file. check BOTH the .log and with open(live_path, "rb") as src, gzip.open(dest, "wb") as dst:
# .log.gz forms of each candidate: in tiered mode an earlier same-stamp roll may have shutil.copyfileobj(src, dst)
# already been compressed to .log.gz, and reusing its bare stem would create a second os.remove(live_path)
# file for the same logical roll and break the tier counts
def _taken(path: str) -> bool:
base = path[:-3] if path.endswith(".gz") else path
return os.path.exists(base) or os.path.exists(base + ".gz")
dest = os.path.join(log_dir, f"{stem}.{stamp}{suffix}")
counter = 1
while _taken(dest):
dest = os.path.join(log_dir, f"{stem}.{stamp}.{counter}{suffix}")
counter += 1
if compress and not tiered:
_gzip_file(live_path, dest)
else: else:
_move(live_path, dest) os.replace(live_path, dest)
if tiered:
retier(log_dir, stem, keep_uncompressed or 0, keep_compressed or 0)
def retier(log_dir: str, stem: str, keep_uncompressed: int, keep_compressed: int) -> None:
"""re-tier rolled files for stem: newest plain, next gzipped, rest deleted
newest-first by mtime: the first `keep_uncompressed` stay uncompressed, the next
`keep_compressed` are gzipped in place (a still-plain file in that band is compressed
to <name>.gz and the plain source removed), and everything beyond
keep_uncompressed+keep_compressed is deleted. the live <stem>.log is never touched.
fail-soft per file (skip on OSError) so retention never crashes setup.
`stem` is reduced to its basename: rolled files land in log_dir under the basename
(the namer/rotate_on_start basename them), so a `name` containing a directory (e.g.
"sub/run") must be matched by "run." here or nothing matches and retention silently
never fires (unbounded pileup).
"""
stem = os.path.basename(stem)
try:
names = [
name for name in os.listdir(log_dir)
if name.startswith(f"{stem}.") and name != f"{stem}.log"
]
except OSError:
return
entries = [os.path.join(log_dir, name) for name in names]
files = [(p, _safe_mtime(p)) for p in entries if os.path.isfile(p)]
files.sort(key=lambda pair: pair[1], reverse=True)
keep = keep_uncompressed + keep_compressed
for index, (path, _) in enumerate(files):
if index >= keep:
try:
os.remove(path)
except OSError:
pass
elif index >= keep_uncompressed and not path.endswith(".gz"):
dest = path + ".gz"
if os.path.exists(dest):
continue
try:
_gzip_file(path, dest)
except OSError:
pass
def prune(log_dir: str, stem: str, backup_count: int) -> None: def prune(log_dir: str, stem: str, backup_count: int) -> None:
@ -194,14 +65,9 @@ def prune(log_dir: str, stem: str, backup_count: int) -> None:
matches files beginning with `<stem>.` (e.g. run.*), sorted by mtime, deleting the matches files beginning with `<stem>.` (e.g. run.*), sorted by mtime, deleting the
oldest beyond the count. used for on_start, which the handlers don't auto-prune. oldest beyond the count. used for on_start, which the handlers don't auto-prune.
`stem` is reduced to its basename so a `name` containing a directory (e.g. "sub/run")
still matches the basenamed rolled files in log_dir (else nothing matches and old
files pile up forever).
""" """
if backup_count <= 0: if backup_count <= 0:
return return
stem = os.path.basename(stem)
try: try:
entries = [ entries = [
os.path.join(log_dir, name) os.path.join(log_dir, name)
@ -227,22 +93,10 @@ def _safe_mtime(path: str) -> float:
return 0.0 return 0.0
def attach_rolling( def attach_rolling(handler, log_dir: str, compress: bool) -> Tuple[Callable, Callable]:
handler, log_dir: str, compress: bool, """wire the custom namer + rotator onto a rotating handler; return them"""
prune_stem: Optional[str] = None, backup_count: int = 0,
keep_uncompressed: Optional[int] = None, keep_compressed: Optional[int] = None,
) -> Tuple[Callable, Callable]:
"""wire the custom namer + rotator onto a rotating handler; return them
pass `prune_stem`/`backup_count` so the rotator prunes `log_dir` after each roll
(the handler's own retention can't see the redirected rolled files). pass
`keep_uncompressed`/`keep_compressed` instead to use tiered retention (newest plain,
next gzipped, rest deleted) see make_rotator.
"""
namer = make_namer(log_dir, compress) namer = make_namer(log_dir, compress)
rotator = make_rotator( rotator = make_rotator(compress)
compress, log_dir, prune_stem, backup_count, keep_uncompressed, keep_compressed,
)
handler.namer = namer handler.namer = namer
handler.rotator = rotator handler.rotator = rotator
return namer, rotator return namer, rotator

View File

@ -12,8 +12,8 @@ import atexit
import logging import logging
import logging.handlers import logging.handlers
import os import os
import queue as _queue import queue
from typing import Dict, Optional, Union from typing import Optional, Union
from .formats import build_formatter from .formats import build_formatter
from .rotation import attach_rolling, prune, rotate_on_start from .rotation import attach_rolling, prune, rotate_on_start
@ -22,58 +22,13 @@ log = logging.getLogger(__name__)
_MARKER = "_log_setup_owned" _MARKER = "_log_setup_owned"
_listener = None _listener = None
_atexit_registered = False
def _level_value(level: Union[int, str]) -> int: def _level_value(level: Union[int, str]) -> int:
"""coerce a level name or int to a logging level int (defaults to INFO)""" """coerce a level name or int to a logging level int (defaults to INFO)"""
if isinstance(level, bool):
# bool is an int subclass (True==1, below DEBUG) but is never a real level —
# reject it consistently with the per-module path rather than set level 1
return logging.INFO
if isinstance(level, int): if isinstance(level, int):
return level return level
if not isinstance(level, str): return logging.getLevelName(str(level).upper()) if isinstance(level, str) else logging.INFO
return logging.INFO
resolved = logging.getLevelName(level.upper())
# getLevelName returns the string "Level XXX" for an unknown name, which
# setLevel then rejects — never crash the app over a bad level, fall back to INFO
return resolved if isinstance(resolved, int) else logging.INFO
def _strict_level_value(level: Union[int, str]) -> Optional[int]:
"""coerce a level name or int to a logging level int, or None if invalid
unlike `_level_value` (which falls back to INFO for the root `level`), this reports
an invalid value as None so the per-module path can skip + warn rather than silently
apply INFO to a logger the caller named with a typo'd level
"""
if isinstance(level, bool):
return None
if isinstance(level, int):
return level
if not isinstance(level, str):
return None
resolved = logging.getLevelName(level.upper())
return resolved if isinstance(resolved, int) else None
def _apply_module_levels(module_levels: Optional[Dict[str, Union[int, str]]]) -> None:
"""set per-logger level overrides by exact logger name, never crashing
each name->level entry calls `logging.getLogger(name).setLevel(<level>)`. names are
matched exactly (no discovery); stdlib hierarchy still applies, so a parent name
quiets its whole subtree. a bad level for one entry is skipped with a warning so the
other entries and the rest of setup still proceed.
"""
if not module_levels:
return
for mod_name, raw_level in module_levels.items():
value = _strict_level_value(raw_level)
if value is None:
log.warning("log_setup: invalid level %r for logger %r; skipping", raw_level, mod_name)
continue
logging.getLogger(mod_name).setLevel(value)
def _clear_owned(root: logging.Logger) -> None: def _clear_owned(root: logging.Logger) -> None:
@ -81,14 +36,6 @@ def _clear_owned(root: logging.Logger) -> None:
global _listener global _listener
if _listener is not None: if _listener is not None:
_listener.stop() _listener.stop()
# the listener owns the real file/console handlers (only the QueueHandler is
# root-attached + marked); stopping it doesn't close them, so close them here
# to avoid relying on GC finalizers across a re-setup
for wrapped in getattr(_listener, "handlers", ()):
try:
wrapped.close()
except Exception:
log.warning("log_setup: failed to close queued handler %r", wrapped, exc_info=True)
_listener = None _listener = None
for handler in list(root.handlers): for handler in list(root.handlers):
if getattr(handler, _MARKER, False): if getattr(handler, _MARKER, False):
@ -96,9 +43,7 @@ def _clear_owned(root: logging.Logger) -> None:
try: try:
handler.close() handler.close()
except Exception: except Exception:
# a handler failing to close must not abort re-setup, but log it pass
# rather than swallow silently (consistent with the lib's warn pattern)
log.warning("log_setup: failed to close handler %r during re-setup", handler, exc_info=True)
def _tag(handler: logging.Handler) -> logging.Handler: def _tag(handler: logging.Handler) -> logging.Handler:
@ -107,52 +52,25 @@ def _tag(handler: logging.Handler) -> logging.Handler:
return handler return handler
def _normalize_name(name: str) -> str:
"""strip one trailing '.log' (case-insensitive) so the stem is extension-free
`name` is allowed to be passed with or without the extension "latest" and
"latest.log" both yield stem "latest" (live file latest.log), never latest.log.log.
only one level is stripped: "app.log.log" -> "app.log" so a legit ".log" inside a
name survives.
"""
if name.lower().endswith(".log"):
return name[:-4]
return name
def _file_handler( def _file_handler(
name: str, live_path: str, log_dir: str, rotate: Optional[str], name: str, live_path: str, log_dir: str, rotate: Optional[str],
backup_count: int, max_bytes: int, compress: bool, backup_count: int, max_bytes: int, compress: bool,
keep_uncompressed: Optional[int], keep_compressed: Optional[int],
) -> logging.Handler: ) -> logging.Handler:
"""build the configured file handler with custom rolling into log_dir""" """build the configured file handler with custom rolling into log_dir"""
tiered = keep_uncompressed is not None or keep_compressed is not None
if rotate == "size": if rotate == "size":
handler = logging.handlers.RotatingFileHandler( handler = logging.handlers.RotatingFileHandler(
live_path, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8", live_path, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8",
) )
attach_rolling( attach_rolling(handler, log_dir, compress)
handler, log_dir, compress, prune_stem=name, backup_count=backup_count,
keep_uncompressed=keep_uncompressed, keep_compressed=keep_compressed,
)
elif rotate == "daily": elif rotate == "daily":
handler = logging.handlers.TimedRotatingFileHandler( handler = logging.handlers.TimedRotatingFileHandler(
live_path, when="midnight", backupCount=backup_count, encoding="utf-8", live_path, when="midnight", backupCount=backup_count, encoding="utf-8",
) )
attach_rolling( attach_rolling(handler, log_dir, compress)
handler, log_dir, compress, prune_stem=name, backup_count=backup_count,
keep_uncompressed=keep_uncompressed, keep_compressed=keep_compressed,
)
else: else:
if rotate == "on_start": if rotate == "on_start":
if tiered: rotate_on_start(live_path, log_dir, compress)
rotate_on_start( prune(log_dir, name, backup_count)
live_path, log_dir, compress,
keep_uncompressed=keep_uncompressed, keep_compressed=keep_compressed,
)
else:
rotate_on_start(live_path, log_dir, compress)
prune(log_dir, name, backup_count)
handler = logging.FileHandler(live_path, encoding="utf-8") handler = logging.FileHandler(live_path, encoding="utf-8")
return handler return handler
@ -161,62 +79,33 @@ def setup_logging(
name: str = "run", name: str = "run",
log_dir: str = "logs", log_dir: str = "logs",
level: Union[int, str] = "INFO", level: Union[int, str] = "INFO",
module_levels: Optional[Dict[str, Union[int, str]]] = None,
rotate: Optional[str] = "daily", rotate: Optional[str] = "daily",
backup_count: int = 14, backup_count: int = 14,
keep_uncompressed: Optional[int] = None,
keep_compressed: Optional[int] = None,
max_bytes: int = 10_000_000, max_bytes: int = 10_000_000,
compress: bool = True, compress: bool = True,
console: bool = False, console: bool = False,
queue: bool = False, queue: bool = False,
output: str = "text",
fmt: Optional[str] = None, fmt: Optional[str] = None,
datefmt: Optional[str] = None, datefmt: Optional[str] = None,
) -> logging.Logger: ) -> logging.Logger:
"""configure the root logger for the whole process and return it """configure the root logger for the whole process and return it
`name` -> <name>.log live file at cwd; rolled/compressed copies go to `log_dir`. a `name` -> <name>.log live file at cwd; rolled/compressed copies go to `log_dir`.
trailing ".log" in `name` is stripped so "latest" and "latest.log" both produce the
live file latest.log (never latest.log.log).
`keep_uncompressed`/`keep_compressed` (default None) enable TIERED retention: when
either is given, rolled files are kept as the newest `keep_uncompressed` uncompressed
+ the next `keep_compressed` gzipped, and the rest are deleted (total retained =
sum). this applies to "on_start", "daily", and "size". `backup_count` and the
gzip-on-roll behavior of `compress` are IGNORED in tiered mode (the tier counts bound
retention). pass NEITHER knob and rotation behaves exactly as before (backup_count +
compress) existing callers are unaffected.
`level` is the root default every logger inherits. `module_levels` is an optional
map of exact logger name -> level applied after the root is set, the ergonomic way
to quiet noisy dependencies (e.g. {"motor": "WARNING", "aiohttp": "WARNING"}) from
the one setup call instead of scattering `getLogger(...).setLevel(...)` afterwards
it's stdlib hierarchy under the hood, not new capability. names match EXACTLY (no
discovery: a typo'd name silently configures an unused logger), but stdlib hierarchy
applies, so naming a parent ("aiohttp") quiets its whole subtree (aiohttp.client,
aiohttp.access, ...). each entry accepts a str or int level; a bad value for one
entry is skipped with a warning and never aborts the others or the setup.
`rotate` is "daily" (default), "size", "on_start", or None. `console=True` adds a `rotate` is "daily" (default), "size", "on_start", or None. `console=True` adds a
stdout handler (off by default the file is the output). `queue=True` routes records stdout handler (off by default the file is the output). `queue=True` routes records
through a background QueueListener so file I/O never blocks the caller (the listener through a background QueueListener so file I/O never blocks the caller (the listener
is stopped at exit). `output` is "text" (default, human `time | module | level | is stopped at exit). idempotent: a repeat call clears only the handlers this function
message`, local time) or "json" (structured one-JSON-object-per-line for the added. never raises over logging an unwritable `log_dir` falls back to console-only
Grafana/Loki path, UTC timestamps, `extra=` fields surfaced as top-level keys); both with a warning even when `console` is off, so output is never silently lost.
file and console use the chosen format and the live-file name is the same regardless.
the raw `fmt`/`datefmt` overrides apply to text output only. idempotent: a repeat call
clears only the handlers this function added. never raises over logging an
unwritable `log_dir` falls back to console-only with a warning even when `console` is
off, so output is never silently lost; an unknown `output` falls back to text.
""" """
global _listener, _atexit_registered global _listener
root = logging.getLogger() root = logging.getLogger()
root.setLevel(_level_value(level)) root.setLevel(_level_value(level))
_apply_module_levels(module_levels)
_clear_owned(root) _clear_owned(root)
formatter = build_formatter(output, fmt, datefmt) formatter = build_formatter(fmt, datefmt)
stem = _normalize_name(name) live_path = f"{name}.log"
live_path = f"{stem}.log"
handlers = [] handlers = []
@ -228,10 +117,7 @@ def setup_logging(
if file_ok: if file_ok:
try: try:
fh = _file_handler( fh = _file_handler(name, live_path, log_dir, rotate, backup_count, max_bytes, compress)
stem, live_path, log_dir, rotate, backup_count, max_bytes, compress,
keep_uncompressed, keep_compressed,
)
fh.setFormatter(formatter) fh.setFormatter(formatter)
handlers.append(fh) handlers.append(fh)
except OSError: except OSError:
@ -243,16 +129,12 @@ def setup_logging(
handlers.append(sh) handlers.append(sh)
if queue: if queue:
record_queue: "_queue.Queue" = _make_queue() record_queue: "queue.Queue" = _make_queue()
qh = _tag(logging.handlers.QueueHandler(record_queue)) qh = _tag(logging.handlers.QueueHandler(record_queue))
root.addHandler(qh) root.addHandler(qh)
_listener = logging.handlers.QueueListener(record_queue, *handlers, respect_handler_level=True) _listener = logging.handlers.QueueListener(record_queue, *handlers, respect_handler_level=True)
_listener.start() _listener.start()
if not _atexit_registered: atexit.register(_stop_listener)
# register once — atexit doesn't dedupe, so repeated queue re-setups would
# otherwise stack identical callbacks (harmless but unbounded)
atexit.register(_stop_listener)
_atexit_registered = True
else: else:
for handler in handlers: for handler in handlers:
root.addHandler(_tag(handler)) root.addHandler(_tag(handler))
@ -263,9 +145,9 @@ def setup_logging(
return root return root
def _make_queue() -> "_queue.Queue": def _make_queue() -> "queue.Queue":
"""unbounded in-memory queue for the QueueHandler -> QueueListener path""" """unbounded in-memory queue for the QueueHandler -> QueueListener path"""
return _queue.Queue(-1) return queue.Queue(-1)
def _stop_listener() -> None: def _stop_listener() -> None: