From 659aa7849d9c0965809b0d301876b9d8cffc2102 Mon Sep 17 00:00:00 2001 From: disqualifier Date: Wed, 24 Jun 2026 21:25:27 -0400 Subject: [PATCH] add package: pyproject + src MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit EnvelopeCrypto: hybrid envelope encryption for dict records — a random AES-256-GCM data key (DEK) encrypts the data, wrapped per-system via RSA-OAEP (SHA-256) for distribution. config-free (DEK + key paths injected), storage-agnostic, object-only. covers bootstrap/self_test, authorize/deauthorize, rotate + reencrypt, and record-level decrypt. src/ layout, hatchling build, cryptography backend. Signed-off-by: disqualifier --- pyproject.toml | 15 + src/envelope_crypto/__init__.py | 27 ++ src/envelope_crypto/envelope_crypto.py | 377 +++++++++++++++++++++++++ 3 files changed, 419 insertions(+) create mode 100644 pyproject.toml create mode 100644 src/envelope_crypto/__init__.py create mode 100644 src/envelope_crypto/envelope_crypto.py diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..e38238c --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,15 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "envelope_crypto" +version = "0.1.0" +description = "Envelope encryption (RSA-OAEP wrapped AES-256-GCM) for dict records — config-free, storage-agnostic, installable." +requires-python = ">=3.10" +dependencies = [ + "cryptography>=42.0", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/envelope_crypto"] diff --git a/src/envelope_crypto/__init__.py b/src/envelope_crypto/__init__.py new file mode 100644 index 0000000..4baa820 --- /dev/null +++ b/src/envelope_crypto/__init__.py @@ -0,0 +1,27 @@ +from .envelope_crypto import ( + EnvelopeCrypto, + DocumentCrypto, + RecordCrypto, + PCICrypto, + is_encrypted_record, + is_encrypted_document, + is_encrypted_dict, + decrypt_record, + decrypt_document, + decrypt_dict, + fingerprint_data, +) + +__all__ = [ + "EnvelopeCrypto", + "DocumentCrypto", + "RecordCrypto", + "PCICrypto", + "is_encrypted_record", + "is_encrypted_document", + "is_encrypted_dict", + "decrypt_record", + "decrypt_document", + "decrypt_dict", + "fingerprint_data", +] diff --git a/src/envelope_crypto/envelope_crypto.py b/src/envelope_crypto/envelope_crypto.py new file mode 100644 index 0000000..43127cf --- /dev/null +++ b/src/envelope_crypto/envelope_crypto.py @@ -0,0 +1,377 @@ +""" +envelope encryption for dict records + +hybrid encryption: a random AES-256-GCM data key (DEK) encrypts the data, and +that key is wrapped (RSA-OAEP) per authorized system's public key (KEK) for +distribution. the wrapped key is stored by the caller, keyed by fingerprint; +each system unwraps its own copy with its private key. this is the same +envelope-encryption pattern used by KMS-style systems. + + from envelope_crypto import EnvelopeCrypto + + crypto = EnvelopeCrypto() + crypto.initialize(master_key) # 32-byte AES DEK + enc = crypto.encrypt_data({"ssn": "..."}) # -> {secure, iv, data} + plain = crypto.decrypt_data(enc) # -> original + +first-time setup: generate the DEK and wrap it for the first system in one call, +then verify the pipeline before storing anything: + + crypto, fingerprint, wrapped = EnvelopeCrypto.bootstrap("public_key.pem") + crypto.self_test("public_key.pem", "private_key.pem") # raises if anything is wrong + caller_store({"_id": fingerprint, "key": wrapped}) # the only record of the DEK + +boot (already set up): fingerprint own pubkey, fetch the wrapped DEK, unwrap: + + fp = crypto.get_rsa_key_fingerprint("public_key.pem") + record = caller_lookup(fp) + crypto.initialize(crypto.decrypt_aes_key_with_rsa(record["key"], "private_key.pem")) + +authorize another system (this instance must already hold the DEK): + + fp, wrapped = crypto.authorize_system(other_pub_path) + caller_store({"_id": fp, "key": wrapped}) + +deauthorize: caller deletes that fingerprint's record. note this stops future +unwraps but does not revoke a DEK already in a running system's memory — rotate +if compromised. + +rotate (new DEK + re-encrypt): generate a new DEK, wrap for the still-authorized +set, then re-encrypt existing records old -> new: + + new_key, wrapped = crypto.rotate_master_key([pub_a, pub_b]) + new_crypto = EnvelopeCrypto(); new_crypto.initialize(new_key) + for record in caller_iter(): + caller_update(new_crypto.reencrypt(crypto, record)) + +config-free: the host supplies the DEK and RSA key paths; this lib never imports +config, configures logging, or touches a database. storage-agnostic — the +encrypted blob is a plain dict; store it in mongo, a sql json column, or a file. + +naming: EnvelopeCrypto is canonical. PCICrypto / DocumentCrypto / RecordCrypto are +aliases (PCICrypto is a deprecated legacy alias). the document/record/dict +function variants are the same functions — use whichever fits your storage. +""" + +import os +import json +import base64 +import hashlib +import logging +from typing import Any, Dict, List, Optional, Tuple, Union + +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from cryptography.hazmat.primitives.serialization import load_ssh_public_key + +_log = logging.getLogger(__name__) + + +class EnvelopeCrypto: + """hybrid RSA/AES-256-GCM envelope encryption for dict records + + holds an AES data key (DEK), injected via initialize. encrypts/decrypts record + fields, wraps/unwraps the DEK with RSA keys for distribution, and re-encrypts + records across key rotations. config-free and storage-agnostic. + """ + + def __init__(self): + self.master_key: Optional[bytes] = None + + @classmethod + def bootstrap(cls, rsa_public_key: str, is_file: bool = True) -> Tuple["EnvelopeCrypto", str, str]: + """first-time setup: generate a DEK and wrap it for the first system + + returns (crypto, fingerprint, wrapped_key) — an initialized instance plus + the record to store as the first authorization. the plaintext DEK is never + returned or persisted; it survives only as the wrapped copy. run self_test + before storing to confirm the keypair round-trips. + """ + crypto = cls() + crypto.initialize(crypto.create_aes_key()) + fingerprint, wrapped = crypto.authorize_system(rsa_public_key, is_file=is_file) + return crypto, fingerprint, wrapped + + def self_test( + self, rsa_public_key: str, rsa_private_key: str, *, + is_file: bool = True, password: Optional[str] = None, + ) -> bool: + """verify the full pipeline against a keypair; raises on any mismatch + + round-trips sample data through this instance's DEK, then wraps the DEK + with the public key and unwraps with the private key, confirming they + match. run after bootstrap (or anytime as a health check) to catch a bad + keypair or wrong key path before relying on it. returns True on success. + """ + if not self.master_key: + raise ValueError("self_test: not initialized with a key") + + sample = {"_selftest": "ok", "n": 12345} + if self.decrypt_data(self.encrypt_data(sample)) != sample: + raise RuntimeError("self_test: data round-trip failed") + + _, wrapped = self.encrypt_aes_key_with_rsa(self.master_key, rsa_public_key, is_file=is_file) + try: + recovered = self.decrypt_aes_key_with_rsa(wrapped, rsa_private_key, password=password) + except Exception as error: + raise RuntimeError( + "self_test: key unwrap failed (public/private keys do not pair, " + "or wrong password)" + ) from error + if recovered != self.master_key: + raise RuntimeError("self_test: key wrap/unwrap mismatch (public/private keys do not pair)") + + _log.info("self_test passed") + return True + + def initialize(self, master_key: bytes) -> None: + """arm the instance with the AES data key (DEK)""" + self.master_key = master_key + _log.info("crypto initialized with data key") + + def shutdown(self) -> None: + """drop the data key reference + + note: python cannot guarantee zeroing of immutable bytes in memory; this + only releases the reference for garbage collection. do not rely on it to + scrub the key from RAM. + """ + self.master_key = None + _log.info("data key reference cleared") + + def create_aes_key(self) -> bytes: + """generate a random AES-256 data key""" + key = os.urandom(32) + _log.info("generated new AES data key") + return key + + def get_rsa_key_fingerprint( + self, key_path_or_data: str, is_private: bool = False, is_file: bool = True + ) -> str: + """return a base64 SHA-256 fingerprint of an RSA key for identification""" + if is_file: + with open(key_path_or_data, "rb") as key_file: + key_data = key_file.read() + else: + key_data = ( + key_path_or_data.encode() + if isinstance(key_path_or_data, str) + else key_path_or_data + ) + + if is_private: + private_key = serialization.load_pem_private_key(key_data, password=None) + public_key = private_key.public_key() + else: + try: + public_key = serialization.load_pem_public_key(key_data) + except ValueError: + public_key = load_ssh_public_key(key_data) + + key_bytes = public_key.public_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PublicFormat.SubjectPublicKeyInfo, + ) + digest = hashes.Hash(hashes.SHA256()) + digest.update(key_bytes) + fingerprint = base64.b64encode(digest.finalize()).decode() + _log.info("generated %s key fingerprint", "private" if is_private else "public") + return fingerprint + + def encrypt_aes_key_with_rsa( + self, aes_key: bytes, rsa_key: str, is_file: bool = True + ) -> Tuple[str, str]: + """wrap an AES key with an RSA public key; returns (fingerprint, wrapped_b64)""" + if is_file: + with open(rsa_key, "rb") as key_file: + key_data = key_file.read() + else: + key_data = rsa_key.encode() if isinstance(rsa_key, str) else rsa_key + + try: + public_key = serialization.load_pem_public_key(key_data) + except ValueError: + public_key = load_ssh_public_key(key_data) + + wrapped = public_key.encrypt( + aes_key, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + fingerprint = self.get_rsa_key_fingerprint(rsa_key, is_private=False, is_file=is_file) + wrapped_b64 = base64.b64encode(wrapped).decode() + _log.info("wrapped data key for fingerprint %s", fingerprint[:8]) + return fingerprint, wrapped_b64 + + def decrypt_aes_key_with_rsa( + self, encrypted_key_base64: str, rsa_private_key_path: str, + password: Optional[str] = None, + ) -> bytes: + """unwrap an AES key with an RSA private key""" + with open(rsa_private_key_path, "rb") as key_file: + key_data = key_file.read() + try: + private_key = serialization.load_pem_private_key( + key_data, password=password.encode() if password else None + ) + except ValueError as error: + if b"BEGIN OPENSSH PRIVATE KEY" in key_data: + private_key = serialization.load_ssh_private_key( + key_data, password=password.encode() if password else None + ) + else: + raise error + + wrapped = base64.b64decode(encrypted_key_base64) + aes_key = private_key.decrypt( + wrapped, + padding.OAEP( + mgf=padding.MGF1(algorithm=hashes.SHA256()), + algorithm=hashes.SHA256(), + label=None, + ), + ) + _log.info("unwrapped data key with RSA private key") + return aes_key + + def authorize_system(self, rsa_public_key: str, is_file: bool = True) -> Tuple[str, str]: + """wrap the current data key for another system's public key + + returns (fingerprint, wrapped_b64) for the caller to store as that + system's key-authorization record. requires this instance to already + hold the data key — only an authorized system can authorize others. + """ + if not self.master_key: + raise ValueError("cannot authorize another system: not initialized") + return self.encrypt_aes_key_with_rsa(self.master_key, rsa_public_key, is_file=is_file) + + def rotate_master_key( + self, authorized_public_keys: List[str], is_file: bool = True + ) -> Tuple[bytes, Dict[str, str]]: + """generate a NEW data key and wrap it for each authorized public key + + returns (new_key, {fingerprint: wrapped_b64}). does NOT re-encrypt existing + data — build a new instance with the new key and call reencrypt() on each + record. systems not in the list get no wrapped copy (deauthorized). + """ + new_key = self.create_aes_key() + wrapped = {} + for pub in authorized_public_keys: + fingerprint, wrapped_b64 = self.encrypt_aes_key_with_rsa(new_key, pub, is_file=is_file) + wrapped[fingerprint] = wrapped_b64 + _log.info("rotated data key, wrapped for %d systems", len(wrapped)) + return new_key, wrapped + + def encrypt_data(self, data: Union[Dict[str, Any], str]) -> Dict[str, str]: + """encrypt a dict or string under the data key with a unique IV""" + if not self.master_key: + raise ValueError("not initialized with data key") + + data_str = json.dumps(data) if isinstance(data, dict) else data + iv = os.urandom(12) + aesgcm = AESGCM(self.master_key) + ciphertext = aesgcm.encrypt(iv, data_str.encode(), None) + return { + "secure": True, + "iv": base64.b64encode(iv).decode(), + "data": base64.b64encode(ciphertext).decode(), + } + + def decrypt_data(self, encrypted_data: Dict[str, str]) -> Union[Dict[str, Any], str]: + """decrypt a {secure, iv, data} blob; returns the original dict or string""" + if not self.master_key: + raise ValueError("not initialized with data key") + + iv = base64.b64decode(encrypted_data["iv"]) + ciphertext = base64.b64decode(encrypted_data["data"]) + aesgcm = AESGCM(self.master_key) + plaintext = aesgcm.decrypt(iv, ciphertext, None).decode() + try: + return json.loads(plaintext) + except json.JSONDecodeError: + return plaintext + + def reencrypt(self, source_crypto: "EnvelopeCrypto", record: dict, traversal_level: int = 2) -> dict: + """re-encrypt a record's encrypted fields from source_crypto's key to this one's + + self holds the destination (new) key; source_crypto holds the source (old) + key. only {secure, iv, data} fields are touched; plaintext fields are left + as-is. returns a new dict; the input is not mutated. used during rotation. + """ + if not self.master_key: + raise ValueError("destination not initialized with data key") + + result = record.copy() + for key, value in record.items(): + if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value: + result[key] = self.encrypt_data(source_crypto.decrypt_data(value)) + elif traversal_level > 0 and isinstance(value, dict): + result[key] = self.reencrypt(source_crypto, value, traversal_level - 1) + return result + + +# naming aliases — same class +DocumentCrypto = EnvelopeCrypto +RecordCrypto = EnvelopeCrypto +PCICrypto = EnvelopeCrypto # deprecated legacy alias; remove after all systems migrate + + +def is_encrypted_record(record, traversal_level: int = 2) -> bool: + """return whether a record has any encrypted ({secure, iv, data}) fields + + aliases: is_encrypted_document, is_encrypted_dict — same function + """ + if not isinstance(record, dict): + return False + + for value in record.values(): + if isinstance(value, dict) and value.get("secure") is True: + if "iv" in value and "data" in value: + return True + + if traversal_level > 0: + for value in record.values(): + if isinstance(value, dict) and is_encrypted_record(value, traversal_level - 1): + return True + return False + + +def decrypt_record(crypto: EnvelopeCrypto, record, traversal_level: int = 2) -> dict: + """decrypt a record's encrypted fields into a new dict (up to traversal_level deep) + + failures on a single field are logged and that field is left encrypted, so a + partial failure is visible (the {secure,...} blob remains) rather than silent. + + aliases: decrypt_document, decrypt_dict — same function + """ + if not crypto.master_key: + raise ValueError("not initialized with data key") + if not isinstance(record, dict): + return record + + result = record.copy() + for key, value in record.items(): + if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value: + try: + result[key] = crypto.decrypt_data(value) + except Exception: + _log.exception("failed to decrypt field %s", key) + elif traversal_level > 0 and isinstance(value, dict): + result[key] = decrypt_record(crypto, value, traversal_level - 1) + return result + + +def fingerprint_data(data: dict) -> str: + """return a deterministic SHA-256 hex fingerprint of a dict""" + return hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest() + + +# function aliases — same functions, naming preference only +is_encrypted_document = is_encrypted_record +is_encrypted_dict = is_encrypted_record +decrypt_document = decrypt_record +decrypt_dict = decrypt_record