add package: pyproject + src

EnvelopeCrypto: hybrid envelope encryption for dict records — a random
AES-256-GCM data key (DEK) encrypts the data, wrapped per-system via
RSA-OAEP (SHA-256) for distribution. config-free (DEK + key paths
injected), storage-agnostic, object-only. covers bootstrap/self_test,
authorize/deauthorize, rotate + reencrypt, and record-level decrypt.
src/ layout, hatchling build, cryptography backend.

Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
disqualifier 2026-06-24 21:25:27 -04:00
parent 0b708cdf9a
commit 659aa7849d
3 changed files with 419 additions and 0 deletions

15
pyproject.toml Normal file
View File

@ -0,0 +1,15 @@
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[project]
name = "envelope_crypto"
version = "0.1.0"
description = "Envelope encryption (RSA-OAEP wrapped AES-256-GCM) for dict records — config-free, storage-agnostic, installable."
requires-python = ">=3.10"
dependencies = [
"cryptography>=42.0",
]
[tool.hatch.build.targets.wheel]
packages = ["src/envelope_crypto"]

View File

@ -0,0 +1,27 @@
from .envelope_crypto import (
EnvelopeCrypto,
DocumentCrypto,
RecordCrypto,
PCICrypto,
is_encrypted_record,
is_encrypted_document,
is_encrypted_dict,
decrypt_record,
decrypt_document,
decrypt_dict,
fingerprint_data,
)
__all__ = [
"EnvelopeCrypto",
"DocumentCrypto",
"RecordCrypto",
"PCICrypto",
"is_encrypted_record",
"is_encrypted_document",
"is_encrypted_dict",
"decrypt_record",
"decrypt_document",
"decrypt_dict",
"fingerprint_data",
]

View File

@ -0,0 +1,377 @@
"""
envelope encryption for dict records
hybrid encryption: a random AES-256-GCM data key (DEK) encrypts the data, and
that key is wrapped (RSA-OAEP) per authorized system's public key (KEK) for
distribution. the wrapped key is stored by the caller, keyed by fingerprint;
each system unwraps its own copy with its private key. this is the same
envelope-encryption pattern used by KMS-style systems.
from envelope_crypto import EnvelopeCrypto
crypto = EnvelopeCrypto()
crypto.initialize(master_key) # 32-byte AES DEK
enc = crypto.encrypt_data({"ssn": "..."}) # -> {secure, iv, data}
plain = crypto.decrypt_data(enc) # -> original
first-time setup: generate the DEK and wrap it for the first system in one call,
then verify the pipeline before storing anything:
crypto, fingerprint, wrapped = EnvelopeCrypto.bootstrap("public_key.pem")
crypto.self_test("public_key.pem", "private_key.pem") # raises if anything is wrong
caller_store({"_id": fingerprint, "key": wrapped}) # the only record of the DEK
boot (already set up): fingerprint own pubkey, fetch the wrapped DEK, unwrap:
fp = crypto.get_rsa_key_fingerprint("public_key.pem")
record = caller_lookup(fp)
crypto.initialize(crypto.decrypt_aes_key_with_rsa(record["key"], "private_key.pem"))
authorize another system (this instance must already hold the DEK):
fp, wrapped = crypto.authorize_system(other_pub_path)
caller_store({"_id": fp, "key": wrapped})
deauthorize: caller deletes that fingerprint's record. note this stops future
unwraps but does not revoke a DEK already in a running system's memory — rotate
if compromised.
rotate (new DEK + re-encrypt): generate a new DEK, wrap for the still-authorized
set, then re-encrypt existing records old -> new:
new_key, wrapped = crypto.rotate_master_key([pub_a, pub_b])
new_crypto = EnvelopeCrypto(); new_crypto.initialize(new_key)
for record in caller_iter():
caller_update(new_crypto.reencrypt(crypto, record))
config-free: the host supplies the DEK and RSA key paths; this lib never imports
config, configures logging, or touches a database. storage-agnostic the
encrypted blob is a plain dict; store it in mongo, a sql json column, or a file.
naming: EnvelopeCrypto is canonical. PCICrypto / DocumentCrypto / RecordCrypto are
aliases (PCICrypto is a deprecated legacy alias). the document/record/dict
function variants are the same functions use whichever fits your storage.
"""
import os
import json
import base64
import hashlib
import logging
from typing import Any, Dict, List, Optional, Tuple, Union
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.serialization import load_ssh_public_key
_log = logging.getLogger(__name__)
class EnvelopeCrypto:
"""hybrid RSA/AES-256-GCM envelope encryption for dict records
holds an AES data key (DEK), injected via initialize. encrypts/decrypts record
fields, wraps/unwraps the DEK with RSA keys for distribution, and re-encrypts
records across key rotations. config-free and storage-agnostic.
"""
def __init__(self):
self.master_key: Optional[bytes] = None
@classmethod
def bootstrap(cls, rsa_public_key: str, is_file: bool = True) -> Tuple["EnvelopeCrypto", str, str]:
"""first-time setup: generate a DEK and wrap it for the first system
returns (crypto, fingerprint, wrapped_key) an initialized instance plus
the record to store as the first authorization. the plaintext DEK is never
returned or persisted; it survives only as the wrapped copy. run self_test
before storing to confirm the keypair round-trips.
"""
crypto = cls()
crypto.initialize(crypto.create_aes_key())
fingerprint, wrapped = crypto.authorize_system(rsa_public_key, is_file=is_file)
return crypto, fingerprint, wrapped
def self_test(
self, rsa_public_key: str, rsa_private_key: str, *,
is_file: bool = True, password: Optional[str] = None,
) -> bool:
"""verify the full pipeline against a keypair; raises on any mismatch
round-trips sample data through this instance's DEK, then wraps the DEK
with the public key and unwraps with the private key, confirming they
match. run after bootstrap (or anytime as a health check) to catch a bad
keypair or wrong key path before relying on it. returns True on success.
"""
if not self.master_key:
raise ValueError("self_test: not initialized with a key")
sample = {"_selftest": "ok", "n": 12345}
if self.decrypt_data(self.encrypt_data(sample)) != sample:
raise RuntimeError("self_test: data round-trip failed")
_, wrapped = self.encrypt_aes_key_with_rsa(self.master_key, rsa_public_key, is_file=is_file)
try:
recovered = self.decrypt_aes_key_with_rsa(wrapped, rsa_private_key, password=password)
except Exception as error:
raise RuntimeError(
"self_test: key unwrap failed (public/private keys do not pair, "
"or wrong password)"
) from error
if recovered != self.master_key:
raise RuntimeError("self_test: key wrap/unwrap mismatch (public/private keys do not pair)")
_log.info("self_test passed")
return True
def initialize(self, master_key: bytes) -> None:
"""arm the instance with the AES data key (DEK)"""
self.master_key = master_key
_log.info("crypto initialized with data key")
def shutdown(self) -> None:
"""drop the data key reference
note: python cannot guarantee zeroing of immutable bytes in memory; this
only releases the reference for garbage collection. do not rely on it to
scrub the key from RAM.
"""
self.master_key = None
_log.info("data key reference cleared")
def create_aes_key(self) -> bytes:
"""generate a random AES-256 data key"""
key = os.urandom(32)
_log.info("generated new AES data key")
return key
def get_rsa_key_fingerprint(
self, key_path_or_data: str, is_private: bool = False, is_file: bool = True
) -> str:
"""return a base64 SHA-256 fingerprint of an RSA key for identification"""
if is_file:
with open(key_path_or_data, "rb") as key_file:
key_data = key_file.read()
else:
key_data = (
key_path_or_data.encode()
if isinstance(key_path_or_data, str)
else key_path_or_data
)
if is_private:
private_key = serialization.load_pem_private_key(key_data, password=None)
public_key = private_key.public_key()
else:
try:
public_key = serialization.load_pem_public_key(key_data)
except ValueError:
public_key = load_ssh_public_key(key_data)
key_bytes = public_key.public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo,
)
digest = hashes.Hash(hashes.SHA256())
digest.update(key_bytes)
fingerprint = base64.b64encode(digest.finalize()).decode()
_log.info("generated %s key fingerprint", "private" if is_private else "public")
return fingerprint
def encrypt_aes_key_with_rsa(
self, aes_key: bytes, rsa_key: str, is_file: bool = True
) -> Tuple[str, str]:
"""wrap an AES key with an RSA public key; returns (fingerprint, wrapped_b64)"""
if is_file:
with open(rsa_key, "rb") as key_file:
key_data = key_file.read()
else:
key_data = rsa_key.encode() if isinstance(rsa_key, str) else rsa_key
try:
public_key = serialization.load_pem_public_key(key_data)
except ValueError:
public_key = load_ssh_public_key(key_data)
wrapped = public_key.encrypt(
aes_key,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
fingerprint = self.get_rsa_key_fingerprint(rsa_key, is_private=False, is_file=is_file)
wrapped_b64 = base64.b64encode(wrapped).decode()
_log.info("wrapped data key for fingerprint %s", fingerprint[:8])
return fingerprint, wrapped_b64
def decrypt_aes_key_with_rsa(
self, encrypted_key_base64: str, rsa_private_key_path: str,
password: Optional[str] = None,
) -> bytes:
"""unwrap an AES key with an RSA private key"""
with open(rsa_private_key_path, "rb") as key_file:
key_data = key_file.read()
try:
private_key = serialization.load_pem_private_key(
key_data, password=password.encode() if password else None
)
except ValueError as error:
if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
private_key = serialization.load_ssh_private_key(
key_data, password=password.encode() if password else None
)
else:
raise error
wrapped = base64.b64decode(encrypted_key_base64)
aes_key = private_key.decrypt(
wrapped,
padding.OAEP(
mgf=padding.MGF1(algorithm=hashes.SHA256()),
algorithm=hashes.SHA256(),
label=None,
),
)
_log.info("unwrapped data key with RSA private key")
return aes_key
def authorize_system(self, rsa_public_key: str, is_file: bool = True) -> Tuple[str, str]:
"""wrap the current data key for another system's public key
returns (fingerprint, wrapped_b64) for the caller to store as that
system's key-authorization record. requires this instance to already
hold the data key only an authorized system can authorize others.
"""
if not self.master_key:
raise ValueError("cannot authorize another system: not initialized")
return self.encrypt_aes_key_with_rsa(self.master_key, rsa_public_key, is_file=is_file)
def rotate_master_key(
self, authorized_public_keys: List[str], is_file: bool = True
) -> Tuple[bytes, Dict[str, str]]:
"""generate a NEW data key and wrap it for each authorized public key
returns (new_key, {fingerprint: wrapped_b64}). does NOT re-encrypt existing
data build a new instance with the new key and call reencrypt() on each
record. systems not in the list get no wrapped copy (deauthorized).
"""
new_key = self.create_aes_key()
wrapped = {}
for pub in authorized_public_keys:
fingerprint, wrapped_b64 = self.encrypt_aes_key_with_rsa(new_key, pub, is_file=is_file)
wrapped[fingerprint] = wrapped_b64
_log.info("rotated data key, wrapped for %d systems", len(wrapped))
return new_key, wrapped
def encrypt_data(self, data: Union[Dict[str, Any], str]) -> Dict[str, str]:
"""encrypt a dict or string under the data key with a unique IV"""
if not self.master_key:
raise ValueError("not initialized with data key")
data_str = json.dumps(data) if isinstance(data, dict) else data
iv = os.urandom(12)
aesgcm = AESGCM(self.master_key)
ciphertext = aesgcm.encrypt(iv, data_str.encode(), None)
return {
"secure": True,
"iv": base64.b64encode(iv).decode(),
"data": base64.b64encode(ciphertext).decode(),
}
def decrypt_data(self, encrypted_data: Dict[str, str]) -> Union[Dict[str, Any], str]:
"""decrypt a {secure, iv, data} blob; returns the original dict or string"""
if not self.master_key:
raise ValueError("not initialized with data key")
iv = base64.b64decode(encrypted_data["iv"])
ciphertext = base64.b64decode(encrypted_data["data"])
aesgcm = AESGCM(self.master_key)
plaintext = aesgcm.decrypt(iv, ciphertext, None).decode()
try:
return json.loads(plaintext)
except json.JSONDecodeError:
return plaintext
def reencrypt(self, source_crypto: "EnvelopeCrypto", record: dict, traversal_level: int = 2) -> dict:
"""re-encrypt a record's encrypted fields from source_crypto's key to this one's
self holds the destination (new) key; source_crypto holds the source (old)
key. only {secure, iv, data} fields are touched; plaintext fields are left
as-is. returns a new dict; the input is not mutated. used during rotation.
"""
if not self.master_key:
raise ValueError("destination not initialized with data key")
result = record.copy()
for key, value in record.items():
if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value:
result[key] = self.encrypt_data(source_crypto.decrypt_data(value))
elif traversal_level > 0 and isinstance(value, dict):
result[key] = self.reencrypt(source_crypto, value, traversal_level - 1)
return result
# naming aliases — same class
DocumentCrypto = EnvelopeCrypto
RecordCrypto = EnvelopeCrypto
PCICrypto = EnvelopeCrypto # deprecated legacy alias; remove after all systems migrate
def is_encrypted_record(record, traversal_level: int = 2) -> bool:
"""return whether a record has any encrypted ({secure, iv, data}) fields
aliases: is_encrypted_document, is_encrypted_dict same function
"""
if not isinstance(record, dict):
return False
for value in record.values():
if isinstance(value, dict) and value.get("secure") is True:
if "iv" in value and "data" in value:
return True
if traversal_level > 0:
for value in record.values():
if isinstance(value, dict) and is_encrypted_record(value, traversal_level - 1):
return True
return False
def decrypt_record(crypto: EnvelopeCrypto, record, traversal_level: int = 2) -> dict:
"""decrypt a record's encrypted fields into a new dict (up to traversal_level deep)
failures on a single field are logged and that field is left encrypted, so a
partial failure is visible (the {secure,...} blob remains) rather than silent.
aliases: decrypt_document, decrypt_dict same function
"""
if not crypto.master_key:
raise ValueError("not initialized with data key")
if not isinstance(record, dict):
return record
result = record.copy()
for key, value in record.items():
if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value:
try:
result[key] = crypto.decrypt_data(value)
except Exception:
_log.exception("failed to decrypt field %s", key)
elif traversal_level > 0 and isinstance(value, dict):
result[key] = decrypt_record(crypto, value, traversal_level - 1)
return result
def fingerprint_data(data: dict) -> str:
"""return a deterministic SHA-256 hex fingerprint of a dict"""
return hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()
# function aliases — same functions, naming preference only
is_encrypted_document = is_encrypted_record
is_encrypted_dict = is_encrypted_record
decrypt_document = decrypt_record
decrypt_dict = decrypt_record