fix: EA-1 clean error on malformed flag/doc; EA-2 fingerprint ambiguity + fd-leak

EA-1: main dispatch catches KeyError/TypeError so a structurally-malformed flag/doc prints
a clean [x] line instead of a traceback. EA-2: fingerprint revoke rejects an empty prefix
and an ambiguous prefix (was: silently revoked the first match). json_store closes the raw
fd if os.fdopen raises before taking ownership (was: leaked). init TOCTOU documented as
by-design (trusted-DEK model, save upserts by _id). list '?' wording clarified.

Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
disqualifier 2026-06-29 21:35:03 -04:00
parent 130c62e31c
commit 88e1eaef39
5 changed files with 39 additions and 10 deletions

View File

@ -94,9 +94,10 @@ def main() -> int:
return 0 return 0
except InvalidTag: except InvalidTag:
return _fail("capability flag failed authentication — tampered or wrong DEK") return _fail("capability flag failed authentication — tampered or wrong DEK")
except (ConfigError, CommandError, RuntimeError, ValueError, OSError) as error: except (ConfigError, CommandError, RuntimeError, ValueError, OSError, KeyError, TypeError) as error:
# OSError covers the FileNotFoundError/PermissionError/IsADirectoryError family # OSError covers the FileNotFoundError/PermissionError/IsADirectoryError family;
# so an i/o failure prints a clean [✘] line instead of a traceback # KeyError/TypeError cover a structurally-malformed flag/doc (unguarded indexing
# of ['iv']/['meta']['authorizer']/['key']) — all print a clean [✘] line, not a traceback
return _fail(str(error)) return _fail(str(error))

View File

@ -11,7 +11,14 @@ from . import CommandError, build_doc, find_by_friendly, make_flag
def run(config, storage, args) -> None: def run(config, storage, args) -> None:
"""initialize the key system on this machine as the first authorizer""" """initialize the key system on this machine as the first authorizer
the already-initialized / duplicate-friendly checks are non-atomic (a check-then-act
TOCTOU under two concurrent CLIs), but this is a one-shot human admin tool and `save`
upserts by `_id`, so key material can never collide the worst case is a cosmetic
double-init under a race, which carries no security consequence in the trusted-
DEK-holder threat model. left non-atomic by design.
"""
if storage.get_all(): if storage.get_all():
raise CommandError( raise CommandError(
"already initialized; use `authorizer list` to see existing keys" "already initialized; use `authorizer list` to see existing keys"

View File

@ -11,7 +11,12 @@ from . import boot_local, read_flag
def _can_authorize(crypto, doc) -> str: def _can_authorize(crypto, doc) -> str:
"""decrypted authority of a doc as Yes/No, or `?` if not readable here""" """decrypted authority of a doc as Yes/No, or `?` if not readable here
`?` means the flag could not be read for ANY reason the local key can't unwrap it,
or the doc is missing/malformed so the table always renders rather than crashing on
one bad row. it is not specifically a corruption signal.
"""
try: try:
return "Yes" if read_flag(crypto, doc["meta"]["authorizer"]) else "No" return "Yes" if read_flag(crypto, doc["meta"]["authorizer"]) else "No"
except Exception: except Exception:

View File

@ -18,11 +18,18 @@ _WARNING = (
def _find_by_fingerprint(storage, prefix: str): def _find_by_fingerprint(storage, prefix: str):
"""return the doc whose `_id` starts with the given prefix, or None""" """return the single doc whose `_id` starts with the given prefix, or None
for doc in storage.get_all():
if doc.get("_id", "").startswith(prefix): rejects an empty prefix (which would match every key) and an ambiguous prefix
return doc that matches more than one key, rather than silently revoking the first match.
return None """
if not prefix:
raise CommandError("fingerprint prefix must not be empty")
matches = [doc for doc in storage.get_all() if doc.get("_id", "").startswith(prefix)]
if len(matches) > 1:
ids = ", ".join(d["_id"][:16] for d in matches)
raise CommandError(f"fingerprint prefix '{prefix}' is ambiguous; matches: {ids}")
return matches[0] if matches else None
def run(config, storage, args) -> None: def run(config, storage, args) -> None:

View File

@ -45,12 +45,21 @@ class JsonStore(StorageBackend):
fd, tmp = tempfile.mkstemp( fd, tmp = tempfile.mkstemp(
dir=self.path.parent, prefix=self.path.name + ".", suffix=".tmp" dir=self.path.parent, prefix=self.path.name + ".", suffix=".tmp"
) )
wrapped = False
try: try:
with os.fdopen(fd, "w", encoding="utf-8") as handle: with os.fdopen(fd, "w", encoding="utf-8") as handle:
wrapped = True # fdopen took ownership of fd; its close() handles it
json.dump(docs, handle, indent=2) json.dump(docs, handle, indent=2)
handle.write("\n") handle.write("\n")
os.replace(tmp, self.path) os.replace(tmp, self.path)
except BaseException: except BaseException:
if not wrapped:
# fdopen raised before taking ownership — close the raw fd ourselves so
# it isn't leaked (the `with` only closes once fdopen returns a file object)
try:
os.close(fd)
except OSError:
pass
try: try:
os.unlink(tmp) os.unlink(tmp)
except OSError: except OSError: