add package: pyproject + src (authorizer CLI, json/mongo storage, gated capability flag)

Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
disqualifier 2026-06-25 00:04:40 -04:00
parent f6c43f49c6
commit 9288ec73ce
16 changed files with 747 additions and 0 deletions

27
pyproject.toml Normal file
View File

@ -0,0 +1,27 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "envelope_authorizer"
version = "0.1.0"
description = "CLI key-authorization manager for envelope_crypto"
requires-python = ">=3.10"
dependencies = [
"envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.0",
"tomli>=2.0; python_version<'3.11'",
]
[project.optional-dependencies]
mongo = [
"mongo @ git+ssh://git@git.rethinkstudios.io/rethink-public/mongo.git@v0.1.0",
]
[project.scripts]
authorizer = "envelope_authorizer.__main__:main"
[tool.hatch.metadata]
allow-direct-references = true
[tool.hatch.build.targets.wheel]
packages = ["src/envelope_authorizer"]

View File

@ -0,0 +1 @@
__version__ = "0.1.0"

View File

@ -0,0 +1,6 @@
import sys
from .cli import main
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,96 @@
"""argparse setup and dispatch for the authorizer CLI
`config init` runs without an existing config (it writes one); every other
command loads config and resolves a storage backend first. expected failures
(ConfigError, CommandError, RuntimeError from a guarded backend) print a clean
`[] ...` line and exit non-zero no traceback.
"""
import argparse
import sys
from . import __version__
from .config import ConfigError, load_config
from .commands import CommandError, authorize, config_init, init, list_keys, revoke, verify
from .storage import resolve
def _build_parser() -> argparse.ArgumentParser:
"""construct the argument parser with all subcommands"""
parser = argparse.ArgumentParser(
prog="authorizer",
description="key-authorization manager for envelope_crypto",
)
parser.add_argument("--version", action="version", version=f"envelope_authorizer {__version__}")
sub = parser.add_subparsers(dest="cmd")
config_cmd = sub.add_parser("config", help="manage the authorizer config file")
config_sub = config_cmd.add_subparsers(dest="config_cmd")
config_sub.add_parser("init", help="write a starter .authorizer.toml in the current directory")
init_cmd = sub.add_parser("init", help="initialize the key system on this machine")
init_cmd.add_argument("--friendly", required=True, help="human name for this key")
auth_cmd = sub.add_parser("authorize", help="authorize another machine's public key")
auth_cmd.add_argument("--key", required=True, help="path to the public key to authorize")
auth_cmd.add_argument("--friendly", required=True, help="human name for the new key")
auth_cmd.add_argument(
"--can-authorize", action="store_true", dest="can_authorize",
help="let the new key authorize others (omit for servers)",
)
sub.add_parser("verify", help="health-check the local crypto setup")
sub.add_parser("list", help="list all authorized keys")
revoke_cmd = sub.add_parser("revoke", help="remove a key's authorization record")
group = revoke_cmd.add_mutually_exclusive_group(required=True)
group.add_argument("--friendly", help="friendly name of the key to revoke")
group.add_argument("--fingerprint", help="fingerprint prefix of the key to revoke")
return parser
def _fail(message: str) -> int:
"""print a clean error line and return a non-zero exit code"""
print(f"[✘] {message}")
return 1
def main() -> int:
"""parse args, dispatch the command, and translate errors to exit codes"""
parser = _build_parser()
args = parser.parse_args()
if args.cmd is None:
parser.print_help()
return 0
if args.cmd == "config":
if args.config_cmd == "init":
try:
config_init.run(None, None, args)
return 0
except CommandError as error:
return _fail(str(error))
parser.parse_args(["config", "--help"])
return 0
handlers = {
"init": init.run,
"authorize": authorize.run,
"verify": verify.run,
"list": list_keys.run,
"revoke": revoke.run,
}
try:
config = load_config()
storage = resolve(config)
handlers[args.cmd](config, storage, args)
return 0
except (ConfigError, CommandError, RuntimeError, ValueError, FileNotFoundError) as error:
return _fail(str(error))
if __name__ == "__main__":
sys.exit(main())

View File

@ -0,0 +1,72 @@
"""command implementations + shared crypto helpers for the authorizer CLI
each command module exposes `run(config, storage, args)`. helpers here own the
recurring crypto plumbing: minting/reading the encrypted capability flag and the
boot sequence (unwrap the local DEK and arm a crypto instance). these never
print key material.
"""
import time
from typing import Optional, Tuple
from envelope_crypto import EnvelopeCrypto
class CommandError(Exception):
"""raised by a command for an expected, user-facing failure"""
def make_flag(crypto: EnvelopeCrypto, allowed: bool) -> dict:
"""seal the authorizer capability flag under the DEK as {secure,iv,data}"""
return crypto.encrypt_data({"allowed": bool(allowed)})
def read_flag(crypto: EnvelopeCrypto, blob: dict) -> bool:
"""decrypt a capability flag; return the `allowed` bool"""
data = crypto.decrypt_data(blob)
return bool(data.get("allowed", False)) if isinstance(data, dict) else False
def build_doc(fingerprint: str, wrapped: str, flag: dict, identity: str, friendly: str) -> dict:
"""assemble a key document in the envelope_authorizer schema"""
return {
"_id": fingerprint,
"key": wrapped,
"meta": {
"authorizer": flag,
"created_by": identity,
"created_at": int(time.time()),
"friendly": friendly,
},
}
def local_fingerprint(crypto: EnvelopeCrypto, config) -> str:
"""fingerprint of the local public key from config"""
return crypto.get_rsa_key_fingerprint(config.public_key)
def boot_local(config, storage) -> Tuple[EnvelopeCrypto, dict]:
"""unwrap the local DEK and return an armed crypto plus the local key doc
raises CommandError if this machine has no key doc (not initialized /
authorized here). never logs or prints the unwrapped key.
"""
crypto = EnvelopeCrypto()
fingerprint = local_fingerprint(crypto, config)
doc = storage.get(fingerprint)
if not doc:
raise CommandError(
"not initialized on this machine (no key doc for the local public key)"
)
aes_key = crypto.decrypt_aes_key_with_rsa(doc["key"], config.private_key)
crypto.initialize(aes_key)
return crypto, doc
def find_by_friendly(storage, friendly: str) -> Optional[dict]:
"""return the doc whose meta.friendly matches, or None"""
for doc in storage.get_all():
if doc.get("meta", {}).get("friendly") == friendly:
return doc
return None

View File

@ -0,0 +1,38 @@
"""`authorizer authorize` — grant another machine access to the DEK
boots the local DEK, verifies the local key is itself an authorizer, then wraps
the same DEK to the target public key and stores a new key doc. `--can-authorize`
decides whether the new key may authorize others (omit it for servers).
"""
from . import (
CommandError,
boot_local,
build_doc,
find_by_friendly,
make_flag,
read_flag,
)
def run(config, storage, args) -> None:
"""authorize a target public key, gated on the local key's authority"""
if find_by_friendly(storage, args.friendly):
raise CommandError(f"friendly name '{args.friendly}' is already in use")
crypto, local_doc = boot_local(config, storage)
if not read_flag(crypto, local_doc["meta"]["authorizer"]):
raise CommandError("this key is not permitted to authorize others")
new_fp, new_wrapped = crypto.encrypt_aes_key_with_rsa(
crypto.master_key, args.key
)
flag = make_flag(crypto, args.can_authorize)
doc = build_doc(new_fp, new_wrapped, flag, config.identity, args.friendly)
storage.save(doc)
print(
f"[✔] Authorized {new_fp} | friendly: {args.friendly} "
f"[can_authorize={bool(args.can_authorize)}]"
)

View File

@ -0,0 +1,35 @@
"""`authorizer config init` — scaffold a starter .authorizer.toml in cwd"""
from pathlib import Path
from ..config import CONFIG_NAME
from . import CommandError
_TEMPLATE = """[keys]
public = "~/.ssh/id_rsa.pub"
private = "~/.ssh/id_rsa"
identity = "user@hostname"
[storage]
backend = "json"
path = ".authorizer_keys.json"
# mongo backend (shared dev->server storage); install envelope_authorizer[mongo]:
# [storage]
# backend = "mongo"
# uri = "mongodb://localhost:27017"
# database = "myapp"
# collection = "keys"
"""
def run(config, storage, args) -> None:
"""write a commented starter config; refuse to overwrite an existing one"""
target = Path.cwd() / CONFIG_NAME
if target.exists():
raise CommandError(f"{target} already exists — refusing to overwrite")
with open(target, "w", encoding="utf-8") as handle:
handle.write(_TEMPLATE)
print(f"[✔] Wrote starter config: {target}")
print(" Edit [keys] and [storage], then run `authorizer init --friendly NAME`")

View File

@ -0,0 +1,31 @@
"""`authorizer init` — create a fresh DEK and authorize the local key
generates a new AES data key, wraps it to the local public key, and stores the
first key doc marked as an authorizer (allowed: True). refuses if the system is
already initialized or the friendly name is taken.
"""
from envelope_crypto import EnvelopeCrypto
from . import CommandError, build_doc, find_by_friendly, make_flag
def run(config, storage, args) -> None:
"""initialize the key system on this machine as the first authorizer"""
if storage.get_all():
raise CommandError(
"already initialized; use `authorizer list` to see existing keys"
)
if find_by_friendly(storage, args.friendly):
raise CommandError(f"friendly name '{args.friendly}' is already in use")
crypto = EnvelopeCrypto()
aes_key = crypto.create_aes_key()
fingerprint, wrapped = crypto.encrypt_aes_key_with_rsa(aes_key, config.public_key)
crypto.initialize(aes_key)
flag = make_flag(crypto, True)
doc = build_doc(fingerprint, wrapped, flag, config.identity, args.friendly)
storage.save(doc)
print(f"[✔] Initialized — fingerprint: {fingerprint} | friendly: {args.friendly} [authorizer=True]")

View File

@ -0,0 +1,46 @@
"""`authorizer list` — show every authorized key in a padded table
boots the local DEK once, then for each doc decrypts its capability flag to show
CAN_AUTHORIZE. a doc the local key cannot unwrap shows `?` rather than crashing.
prints only fingerprint/metadata never the wrapped key or DEK.
"""
from datetime import datetime, timezone
from . import boot_local, read_flag
def _can_authorize(crypto, doc) -> str:
"""decrypted authority of a doc as Yes/No, or `?` if not readable here"""
try:
return "Yes" if read_flag(crypto, doc["meta"]["authorizer"]) else "No"
except Exception:
return "?"
def _created(doc) -> str:
"""format created_at as a UTC timestamp string, or '-' if absent"""
raw = doc.get("meta", {}).get("created_at")
if not raw:
return "-"
return datetime.fromtimestamp(int(raw), tz=timezone.utc).strftime("%Y-%m-%d %H:%M:%S")
def run(config, storage, args) -> None:
"""print the authorized-key table with a count footer"""
docs = storage.get_all()
crypto, _ = boot_local(config, storage)
header = f"{'FINGERPRINT':<18} {'FRIENDLY':<16} {'CREATED_BY':<18} {'CREATED_AT':<21} {'CAN_AUTH':<8}"
print(header)
print("-" * len(header))
for doc in docs:
meta = doc.get("meta", {})
print(
f"{doc.get('_id', '')[:16]:<18} "
f"{str(meta.get('friendly', '-')):<16} "
f"{str(meta.get('created_by', '-')):<18} "
f"{_created(doc):<21} "
f"{_can_authorize(crypto, doc):<8}"
)
print(f"\n{len(docs)} key(s) authorized")

View File

@ -0,0 +1,47 @@
"""`authorizer revoke` — remove a key's authorization record
finds the target by friendly name or fingerprint prefix, refuses to revoke the
local key, deletes the record, and prints an honesty warning that revoke is
bookkeeping only it does not rotate the DEK or scrub it from a machine that
already unwrapped it.
"""
from envelope_crypto import EnvelopeCrypto
from . import CommandError, find_by_friendly, local_fingerprint
_WARNING = (
"[!] Revoke removes the record only. It does NOT rotate the DEK or scrub it\n"
" from any machine that already unwrapped it. If this key was compromised,\n"
" re-init the system and re-authorize trusted machines."
)
def _find_by_fingerprint(storage, prefix: str):
"""return the doc whose `_id` starts with the given prefix, or None"""
for doc in storage.get_all():
if doc.get("_id", "").startswith(prefix):
return doc
return None
def run(config, storage, args) -> None:
"""delete a key record by friendly or fingerprint, guarding the local key"""
if args.friendly:
doc = find_by_friendly(storage, args.friendly)
label = args.friendly
else:
doc = _find_by_fingerprint(storage, args.fingerprint)
label = args.fingerprint
if not doc:
raise CommandError(f"no key found matching '{label}'")
crypto = EnvelopeCrypto()
if doc["_id"] == local_fingerprint(crypto, config):
raise CommandError("refusing to revoke the local key")
storage.delete(doc["_id"])
friendly = doc.get("meta", {}).get("friendly", "?")
print(f"[✔] Revoked: {friendly} ({doc['_id'][:16]})")
print(_WARNING)

View File

@ -0,0 +1,22 @@
"""`authorizer verify` — health-check the local crypto setup
boots the local DEK, then runs envelope_crypto's self_test against the local
keypair. if the installed envelope_crypto lacks self_test, falls back to a
minimal in-CLI round-trip. never prints key material.
"""
from . import CommandError, boot_local
def run(config, storage, args) -> None:
"""verify the local keypair + DEK round-trip without leaking secrets"""
crypto, _ = boot_local(config, storage)
if hasattr(crypto, "self_test"):
crypto.self_test(config.public_key, config.private_key)
else:
sample = {"_authorizer_verify": "ok", "n": 12345}
if crypto.decrypt_data(crypto.encrypt_data(sample)) != sample:
raise CommandError("verify failed: DEK round-trip mismatch")
print("[✔] Crypto system OK")

View File

@ -0,0 +1,106 @@
"""TOML config loader for the authorizer CLI
resolves a `.authorizer.toml` (cwd first, then ~), expands user paths, and
exposes typed accessors. no defaults are baked in: a missing required field
raises a clear error naming the field and the config path that was searched.
the loaded config is the only place key paths, identity, and storage live
this lib never imports a host `config` module.
"""
import os
import sys
from pathlib import Path
from typing import Any, Dict, List
if sys.version_info >= (3, 11):
import tomllib
else:
import tomli as tomllib
CONFIG_NAME = ".authorizer.toml"
class ConfigError(Exception):
"""raised when config is missing or a required field is absent"""
def _search_paths() -> List[Path]:
"""ordered candidate config paths: cwd first, then home"""
return [Path.cwd() / CONFIG_NAME, Path.home() / CONFIG_NAME]
def _expand(path: str) -> str:
"""expand ~ and environment vars in a filesystem path"""
return os.path.expanduser(os.path.expandvars(path))
class Config:
"""parsed authorizer config with section/field accessors
wraps the raw TOML mapping and the path it came from; `require` and the
typed helpers raise ConfigError naming the missing field and source file.
"""
def __init__(self, data: Dict[str, Any], source: Path):
self._data = data
self.source = source
def require(self, section: str, field: str) -> Any:
"""return a required field or raise naming the field and config path"""
block = self._data.get(section)
if not isinstance(block, dict) or field not in block:
raise ConfigError(
f"missing required field [{section}].{field} in {self.source}"
)
return block[field]
def optional(self, section: str, field: str, default: Any = None) -> Any:
"""return a field if present, else the default (no raise)"""
block = self._data.get(section)
if not isinstance(block, dict):
return default
return block.get(field, default)
@property
def public_key(self) -> str:
"""path to the local RSA public key (expanded)"""
return _expand(self.require("keys", "public"))
@property
def private_key(self) -> str:
"""path to the local RSA private key (expanded)"""
return _expand(self.require("keys", "private"))
@property
def identity(self) -> str:
"""human identity stamped as created_by on every key doc"""
return self.require("keys", "identity")
@property
def storage_backend(self) -> str:
"""selected storage backend name ("json" or "mongo")"""
return self.require("storage", "backend")
def load_config() -> Config:
"""find and parse the authorizer config, or raise with guidance
searches cwd then home for `.authorizer.toml`; raises ConfigError telling
the user to run `authorizer config init` if none is found.
"""
for candidate in _search_paths():
if candidate.is_file():
with open(candidate, "rb") as handle:
data = tomllib.load(handle)
return Config(data, candidate)
seen = []
for path in _search_paths():
if str(path) not in seen:
seen.append(str(path))
searched = ", ".join(seen)
raise ConfigError(
f"no {CONFIG_NAME} found (searched: {searched}). "
f"run `authorizer config init` to create one, or add it manually"
)

View File

@ -0,0 +1,28 @@
"""storage backend resolution for the authorizer CLI
`resolve(config)` reads `[storage].backend` and returns the matching sync
StorageBackend (json or mongo). the mongo backend is guarded importing this
package never requires the mongo extra; only selecting the mongo backend does.
"""
from .base import StorageBackend
from .json_store import JsonStore
def resolve(config) -> StorageBackend:
"""build the storage backend named by the config, or raise on unknown"""
backend = config.storage_backend
if backend == "json":
path = config.require("storage", "path")
return JsonStore(path)
if backend == "mongo":
from .mongo_store import MongoStore
return MongoStore(
uri=config.require("storage", "uri"),
database=config.require("storage", "database"),
collection=config.require("storage", "collection"),
)
raise ValueError(f"unknown storage backend: {backend!r} (use 'json' or 'mongo')")
__all__ = ["StorageBackend", "JsonStore", "resolve"]

View File

@ -0,0 +1,29 @@
"""abstract storage interface for authorizer key documents
a backend persists key docs keyed by `_id` (the RSA fingerprint). the interface
is intentionally sync the CLI is a one-shot tool; an async backend (mongo)
bridges to sync internally. `save` is an upsert by `_id`, never a duplicate-
inserting or duplicate-raising operation.
"""
from typing import List, Optional
class StorageBackend:
"""sync key-document store keyed on the `_id` fingerprint"""
def get(self, fingerprint: str) -> Optional[dict]:
"""return the key doc with this `_id`, or None"""
raise NotImplementedError
def get_all(self) -> List[dict]:
"""return every stored key doc"""
raise NotImplementedError
def save(self, doc: dict) -> None:
"""upsert a key doc by its `_id` (replace if present, else insert)"""
raise NotImplementedError
def delete(self, fingerprint: str) -> bool:
"""delete the key doc with this `_id`; return whether one was removed"""
raise NotImplementedError

View File

@ -0,0 +1,74 @@
"""JSON-file storage backend (base install, stdlib only)
key docs live as a JSON list in a single file. writes are atomic (temp file
then os.replace) so a crash mid-write never leaves a partial file. upsert
replaces the doc whose `_id` matches, else appends. suited to single-machine
setups; for dev->server flows across machines, use the mongo backend.
"""
import json
import os
from pathlib import Path
from typing import List, Optional
from .base import StorageBackend
class JsonStore(StorageBackend):
"""list-of-docs JSON file backend with atomic upserting writes"""
def __init__(self, path: str):
self.path = Path(os.path.expanduser(path))
def _read(self) -> List[dict]:
"""load the docs list, returning [] if the file is absent or empty"""
if not self.path.is_file():
return []
with open(self.path, "r", encoding="utf-8") as handle:
content = handle.read().strip()
if not content:
return []
data = json.loads(content)
if not isinstance(data, list):
raise ValueError(f"{self.path} is not a JSON list of key docs")
return data
def _write(self, docs: List[dict]) -> None:
"""atomically write the docs list (temp file then replace)"""
self.path.parent.mkdir(parents=True, exist_ok=True)
tmp = self.path.with_suffix(self.path.suffix + ".tmp")
with open(tmp, "w", encoding="utf-8") as handle:
json.dump(docs, handle, indent=2)
handle.write("\n")
os.replace(tmp, self.path)
def get(self, fingerprint: str) -> Optional[dict]:
"""return the doc whose `_id` matches, or None"""
for doc in self._read():
if doc.get("_id") == fingerprint:
return doc
return None
def get_all(self) -> List[dict]:
"""return every stored doc"""
return self._read()
def save(self, doc: dict) -> None:
"""replace the doc with a matching `_id`, else append; atomic write"""
docs = self._read()
for index, existing in enumerate(docs):
if existing.get("_id") == doc["_id"]:
docs[index] = doc
break
else:
docs.append(doc)
self._write(docs)
def delete(self, fingerprint: str) -> bool:
"""remove the doc with this `_id`; return whether one was removed"""
docs = self._read()
remaining = [doc for doc in docs if doc.get("_id") != fingerprint]
if len(remaining) == len(docs):
return False
self._write(remaining)
return True

View File

@ -0,0 +1,89 @@
"""mongo storage backend (behind the [mongo] extra)
bridges the async rethink-public `mongo` lib to the sync StorageBackend by
wrapping each operation in its own asyncio.run: connect -> op -> close, fully
self-contained per call. no module-level client and no shared event loop, so it
never collides with a running loop. a per-call connect is an accepted tradeoff
for a one-shot admin CLI. requires envelope_authorizer[mongo]; without it every
method raises a clear RuntimeError.
"""
import asyncio
from typing import List, Optional
from .base import StorageBackend
try:
from mongo import Mongo
_HAVE_MONGO = True
except ImportError:
_HAVE_MONGO = False
_MISSING = "mongo backend requires envelope_authorizer[mongo]"
class MongoStore(StorageBackend):
"""async-mongo-backed key store presented as a sync interface"""
def __init__(self, uri: str, database: str, collection: str):
if not _HAVE_MONGO:
raise RuntimeError(_MISSING)
self.uri = uri
self.database = database
self.collection = collection
async def _get(self, fingerprint: str) -> Optional[dict]:
db = Mongo(self.uri, self.database)
try:
return await db.get_document(self.collection, {"_id": fingerprint})
finally:
await db.close()
async def _get_all(self) -> List[dict]:
db = Mongo(self.uri, self.database)
try:
return await db.get_documents(self.collection, {})
finally:
await db.close()
async def _save(self, doc: dict) -> None:
db = Mongo(self.uri, self.database)
try:
await db.update_document(
self.collection, {"_id": doc["_id"]}, doc, do_upsert=True
)
finally:
await db.close()
async def _delete(self, fingerprint: str) -> bool:
db = Mongo(self.uri, self.database)
try:
removed = await db.delete_document(self.collection, {"_id": fingerprint})
return bool(removed)
finally:
await db.close()
def get(self, fingerprint: str) -> Optional[dict]:
"""return the key doc with this `_id`, or None"""
if not _HAVE_MONGO:
raise RuntimeError(_MISSING)
return asyncio.run(self._get(fingerprint))
def get_all(self) -> List[dict]:
"""return every stored key doc"""
if not _HAVE_MONGO:
raise RuntimeError(_MISSING)
return asyncio.run(self._get_all())
def save(self, doc: dict) -> None:
"""upsert a key doc by `_id` (replace_one with upsert)"""
if not _HAVE_MONGO:
raise RuntimeError(_MISSING)
asyncio.run(self._save(doc))
def delete(self, fingerprint: str) -> bool:
"""delete the key doc with this `_id`; return whether one was removed"""
if not _HAVE_MONGO:
raise RuntimeError(_MISSING)
return asyncio.run(self._delete(fingerprint))