add package: pyproject + src
EnvelopeCrypto: hybrid envelope encryption for dict records — a random AES-256-GCM data key (DEK) encrypts the data, wrapped per-system via RSA-OAEP (SHA-256) for distribution. config-free (DEK + key paths injected), storage-agnostic, object-only. covers bootstrap/self_test, authorize/deauthorize, rotate + reencrypt, and record-level decrypt. src/ layout, hatchling build, cryptography backend. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
parent
1864612d64
commit
8a220a8810
15
pyproject.toml
Normal file
15
pyproject.toml
Normal file
@ -0,0 +1,15 @@
|
||||
[build-system]
|
||||
requires = ["hatchling"]
|
||||
build-backend = "hatchling.build"
|
||||
|
||||
[project]
|
||||
name = "envelope_crypto"
|
||||
version = "0.1.0"
|
||||
description = "Envelope encryption (RSA-OAEP wrapped AES-256-GCM) for dict records — config-free, storage-agnostic, installable."
|
||||
requires-python = ">=3.10"
|
||||
dependencies = [
|
||||
"cryptography>=42.0",
|
||||
]
|
||||
|
||||
[tool.hatch.build.targets.wheel]
|
||||
packages = ["src/envelope_crypto"]
|
||||
27
src/envelope_crypto/__init__.py
Normal file
27
src/envelope_crypto/__init__.py
Normal file
@ -0,0 +1,27 @@
|
||||
from .envelope_crypto import (
|
||||
EnvelopeCrypto,
|
||||
DocumentCrypto,
|
||||
RecordCrypto,
|
||||
PCICrypto,
|
||||
is_encrypted_record,
|
||||
is_encrypted_document,
|
||||
is_encrypted_dict,
|
||||
decrypt_record,
|
||||
decrypt_document,
|
||||
decrypt_dict,
|
||||
fingerprint_data,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"EnvelopeCrypto",
|
||||
"DocumentCrypto",
|
||||
"RecordCrypto",
|
||||
"PCICrypto",
|
||||
"is_encrypted_record",
|
||||
"is_encrypted_document",
|
||||
"is_encrypted_dict",
|
||||
"decrypt_record",
|
||||
"decrypt_document",
|
||||
"decrypt_dict",
|
||||
"fingerprint_data",
|
||||
]
|
||||
377
src/envelope_crypto/envelope_crypto.py
Normal file
377
src/envelope_crypto/envelope_crypto.py
Normal file
@ -0,0 +1,377 @@
|
||||
"""
|
||||
envelope encryption for dict records
|
||||
|
||||
hybrid encryption: a random AES-256-GCM data key (DEK) encrypts the data, and
|
||||
that key is wrapped (RSA-OAEP) per authorized system's public key (KEK) for
|
||||
distribution. the wrapped key is stored by the caller, keyed by fingerprint;
|
||||
each system unwraps its own copy with its private key. this is the same
|
||||
envelope-encryption pattern used by KMS-style systems.
|
||||
|
||||
from envelope_crypto import EnvelopeCrypto
|
||||
|
||||
crypto = EnvelopeCrypto()
|
||||
crypto.initialize(master_key) # 32-byte AES DEK
|
||||
enc = crypto.encrypt_data({"ssn": "..."}) # -> {secure, iv, data}
|
||||
plain = crypto.decrypt_data(enc) # -> original
|
||||
|
||||
first-time setup: generate the DEK and wrap it for the first system in one call,
|
||||
then verify the pipeline before storing anything:
|
||||
|
||||
crypto, fingerprint, wrapped = EnvelopeCrypto.bootstrap(cfg.local_pub)
|
||||
crypto.self_test(cfg.local_pub, cfg.local_priv) # raises if anything is wrong
|
||||
caller_store({"_id": fingerprint, "key": wrapped}) # the only record of the DEK
|
||||
|
||||
boot (already set up): fingerprint own pubkey, fetch the wrapped DEK, unwrap:
|
||||
|
||||
fp = crypto.get_rsa_key_fingerprint(cfg.local_pub)
|
||||
record = caller_lookup(fp)
|
||||
crypto.initialize(crypto.decrypt_aes_key_with_rsa(record["key"], cfg.local_priv))
|
||||
|
||||
authorize another system (this instance must already hold the DEK):
|
||||
|
||||
fp, wrapped = crypto.authorize_system(other_pub_path)
|
||||
caller_store({"_id": fp, "key": wrapped})
|
||||
|
||||
deauthorize: caller deletes that fingerprint's record. note this stops future
|
||||
unwraps but does not revoke a DEK already in a running system's memory — rotate
|
||||
if compromised.
|
||||
|
||||
rotate (new DEK + re-encrypt): generate a new DEK, wrap for the still-authorized
|
||||
set, then re-encrypt existing records old -> new:
|
||||
|
||||
new_key, wrapped = crypto.rotate_master_key([pub_a, pub_b])
|
||||
new_crypto = EnvelopeCrypto(); new_crypto.initialize(new_key)
|
||||
for record in caller_iter():
|
||||
caller_update(new_crypto.reencrypt(crypto, record))
|
||||
|
||||
config-free: the host supplies the DEK and RSA key paths; this lib never imports
|
||||
config, configures logging, or touches a database. storage-agnostic — the
|
||||
encrypted blob is a plain dict; store it in mongo, a sql json column, or a file.
|
||||
|
||||
naming: EnvelopeCrypto is canonical. PCICrypto / DocumentCrypto / RecordCrypto are
|
||||
aliases (PCICrypto is a deprecated legacy alias). the document/record/dict
|
||||
function variants are the same functions — use whichever fits your storage.
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
import base64
|
||||
import hashlib
|
||||
import logging
|
||||
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||
|
||||
from cryptography.hazmat.primitives import hashes, serialization
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||
from cryptography.hazmat.primitives.serialization import load_ssh_public_key
|
||||
|
||||
_log = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EnvelopeCrypto:
|
||||
"""hybrid RSA/AES-256-GCM envelope encryption for dict records
|
||||
|
||||
holds an AES data key (DEK), injected via initialize. encrypts/decrypts record
|
||||
fields, wraps/unwraps the DEK with RSA keys for distribution, and re-encrypts
|
||||
records across key rotations. config-free and storage-agnostic.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.master_key: Optional[bytes] = None
|
||||
|
||||
@classmethod
|
||||
def bootstrap(cls, rsa_public_key: str, is_file: bool = True) -> Tuple["EnvelopeCrypto", str, str]:
|
||||
"""first-time setup: generate a DEK and wrap it for the first system
|
||||
|
||||
returns (crypto, fingerprint, wrapped_key) — an initialized instance plus
|
||||
the record to store as the first authorization. the plaintext DEK is never
|
||||
returned or persisted; it survives only as the wrapped copy. run self_test
|
||||
before storing to confirm the keypair round-trips.
|
||||
"""
|
||||
crypto = cls()
|
||||
crypto.initialize(crypto.create_aes_key())
|
||||
fingerprint, wrapped = crypto.authorize_system(rsa_public_key, is_file=is_file)
|
||||
return crypto, fingerprint, wrapped
|
||||
|
||||
def self_test(
|
||||
self, rsa_public_key: str, rsa_private_key: str, *,
|
||||
is_file: bool = True, password: Optional[str] = None,
|
||||
) -> bool:
|
||||
"""verify the full pipeline against a keypair; raises on any mismatch
|
||||
|
||||
round-trips sample data through this instance's DEK, then wraps the DEK
|
||||
with the public key and unwraps with the private key, confirming they
|
||||
match. run after bootstrap (or anytime as a health check) to catch a bad
|
||||
keypair or wrong key path before relying on it. returns True on success.
|
||||
"""
|
||||
if not self.master_key:
|
||||
raise ValueError("self_test: not initialized with a key")
|
||||
|
||||
sample = {"_selftest": "ok", "n": 12345}
|
||||
if self.decrypt_data(self.encrypt_data(sample)) != sample:
|
||||
raise RuntimeError("self_test: data round-trip failed")
|
||||
|
||||
_, wrapped = self.encrypt_aes_key_with_rsa(self.master_key, rsa_public_key, is_file=is_file)
|
||||
try:
|
||||
recovered = self.decrypt_aes_key_with_rsa(wrapped, rsa_private_key, password=password)
|
||||
except Exception as error:
|
||||
raise RuntimeError(
|
||||
"self_test: key unwrap failed (public/private keys do not pair, "
|
||||
"or wrong password)"
|
||||
) from error
|
||||
if recovered != self.master_key:
|
||||
raise RuntimeError("self_test: key wrap/unwrap mismatch (public/private keys do not pair)")
|
||||
|
||||
_log.info("self_test passed")
|
||||
return True
|
||||
|
||||
def initialize(self, master_key: bytes) -> None:
|
||||
"""arm the instance with the AES data key (DEK)"""
|
||||
self.master_key = master_key
|
||||
_log.info("crypto initialized with data key")
|
||||
|
||||
def shutdown(self) -> None:
|
||||
"""drop the data key reference
|
||||
|
||||
note: python cannot guarantee zeroing of immutable bytes in memory; this
|
||||
only releases the reference for garbage collection. do not rely on it to
|
||||
scrub the key from RAM.
|
||||
"""
|
||||
self.master_key = None
|
||||
_log.info("data key reference cleared")
|
||||
|
||||
def create_aes_key(self) -> bytes:
|
||||
"""generate a random AES-256 data key"""
|
||||
key = os.urandom(32)
|
||||
_log.info("generated new AES data key")
|
||||
return key
|
||||
|
||||
def get_rsa_key_fingerprint(
|
||||
self, key_path_or_data: str, is_private: bool = False, is_file: bool = True
|
||||
) -> str:
|
||||
"""return a base64 SHA-256 fingerprint of an RSA key for identification"""
|
||||
if is_file:
|
||||
with open(key_path_or_data, "rb") as key_file:
|
||||
key_data = key_file.read()
|
||||
else:
|
||||
key_data = (
|
||||
key_path_or_data.encode()
|
||||
if isinstance(key_path_or_data, str)
|
||||
else key_path_or_data
|
||||
)
|
||||
|
||||
if is_private:
|
||||
private_key = serialization.load_pem_private_key(key_data, password=None)
|
||||
public_key = private_key.public_key()
|
||||
else:
|
||||
try:
|
||||
public_key = serialization.load_pem_public_key(key_data)
|
||||
except ValueError:
|
||||
public_key = load_ssh_public_key(key_data)
|
||||
|
||||
key_bytes = public_key.public_bytes(
|
||||
encoding=serialization.Encoding.DER,
|
||||
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||
)
|
||||
digest = hashes.Hash(hashes.SHA256())
|
||||
digest.update(key_bytes)
|
||||
fingerprint = base64.b64encode(digest.finalize()).decode()
|
||||
_log.info("generated %s key fingerprint", "private" if is_private else "public")
|
||||
return fingerprint
|
||||
|
||||
def encrypt_aes_key_with_rsa(
|
||||
self, aes_key: bytes, rsa_key: str, is_file: bool = True
|
||||
) -> Tuple[str, str]:
|
||||
"""wrap an AES key with an RSA public key; returns (fingerprint, wrapped_b64)"""
|
||||
if is_file:
|
||||
with open(rsa_key, "rb") as key_file:
|
||||
key_data = key_file.read()
|
||||
else:
|
||||
key_data = rsa_key.encode() if isinstance(rsa_key, str) else rsa_key
|
||||
|
||||
try:
|
||||
public_key = serialization.load_pem_public_key(key_data)
|
||||
except ValueError:
|
||||
public_key = load_ssh_public_key(key_data)
|
||||
|
||||
wrapped = public_key.encrypt(
|
||||
aes_key,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None,
|
||||
),
|
||||
)
|
||||
fingerprint = self.get_rsa_key_fingerprint(rsa_key, is_private=False, is_file=is_file)
|
||||
wrapped_b64 = base64.b64encode(wrapped).decode()
|
||||
_log.info("wrapped data key for fingerprint %s", fingerprint[:8])
|
||||
return fingerprint, wrapped_b64
|
||||
|
||||
def decrypt_aes_key_with_rsa(
|
||||
self, encrypted_key_base64: str, rsa_private_key_path: str,
|
||||
password: Optional[str] = None,
|
||||
) -> bytes:
|
||||
"""unwrap an AES key with an RSA private key"""
|
||||
with open(rsa_private_key_path, "rb") as key_file:
|
||||
key_data = key_file.read()
|
||||
try:
|
||||
private_key = serialization.load_pem_private_key(
|
||||
key_data, password=password.encode() if password else None
|
||||
)
|
||||
except ValueError as error:
|
||||
if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
|
||||
private_key = serialization.load_ssh_private_key(
|
||||
key_data, password=password.encode() if password else None
|
||||
)
|
||||
else:
|
||||
raise error
|
||||
|
||||
wrapped = base64.b64decode(encrypted_key_base64)
|
||||
aes_key = private_key.decrypt(
|
||||
wrapped,
|
||||
padding.OAEP(
|
||||
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||
algorithm=hashes.SHA256(),
|
||||
label=None,
|
||||
),
|
||||
)
|
||||
_log.info("unwrapped data key with RSA private key")
|
||||
return aes_key
|
||||
|
||||
def authorize_system(self, rsa_public_key: str, is_file: bool = True) -> Tuple[str, str]:
|
||||
"""wrap the current data key for another system's public key
|
||||
|
||||
returns (fingerprint, wrapped_b64) for the caller to store as that
|
||||
system's key-authorization record. requires this instance to already
|
||||
hold the data key — only an authorized system can authorize others.
|
||||
"""
|
||||
if not self.master_key:
|
||||
raise ValueError("cannot authorize another system: not initialized")
|
||||
return self.encrypt_aes_key_with_rsa(self.master_key, rsa_public_key, is_file=is_file)
|
||||
|
||||
def rotate_master_key(
|
||||
self, authorized_public_keys: List[str], is_file: bool = True
|
||||
) -> Tuple[bytes, Dict[str, str]]:
|
||||
"""generate a NEW data key and wrap it for each authorized public key
|
||||
|
||||
returns (new_key, {fingerprint: wrapped_b64}). does NOT re-encrypt existing
|
||||
data — build a new instance with the new key and call reencrypt() on each
|
||||
record. systems not in the list get no wrapped copy (deauthorized).
|
||||
"""
|
||||
new_key = self.create_aes_key()
|
||||
wrapped = {}
|
||||
for pub in authorized_public_keys:
|
||||
fingerprint, wrapped_b64 = self.encrypt_aes_key_with_rsa(new_key, pub, is_file=is_file)
|
||||
wrapped[fingerprint] = wrapped_b64
|
||||
_log.info("rotated data key, wrapped for %d systems", len(wrapped))
|
||||
return new_key, wrapped
|
||||
|
||||
def encrypt_data(self, data: Union[Dict[str, Any], str]) -> Dict[str, str]:
|
||||
"""encrypt a dict or string under the data key with a unique IV"""
|
||||
if not self.master_key:
|
||||
raise ValueError("not initialized with data key")
|
||||
|
||||
data_str = json.dumps(data) if isinstance(data, dict) else data
|
||||
iv = os.urandom(12)
|
||||
aesgcm = AESGCM(self.master_key)
|
||||
ciphertext = aesgcm.encrypt(iv, data_str.encode(), None)
|
||||
return {
|
||||
"secure": True,
|
||||
"iv": base64.b64encode(iv).decode(),
|
||||
"data": base64.b64encode(ciphertext).decode(),
|
||||
}
|
||||
|
||||
def decrypt_data(self, encrypted_data: Dict[str, str]) -> Union[Dict[str, Any], str]:
|
||||
"""decrypt a {secure, iv, data} blob; returns the original dict or string"""
|
||||
if not self.master_key:
|
||||
raise ValueError("not initialized with data key")
|
||||
|
||||
iv = base64.b64decode(encrypted_data["iv"])
|
||||
ciphertext = base64.b64decode(encrypted_data["data"])
|
||||
aesgcm = AESGCM(self.master_key)
|
||||
plaintext = aesgcm.decrypt(iv, ciphertext, None).decode()
|
||||
try:
|
||||
return json.loads(plaintext)
|
||||
except json.JSONDecodeError:
|
||||
return plaintext
|
||||
|
||||
def reencrypt(self, source_crypto: "EnvelopeCrypto", record: dict, traversal_level: int = 2) -> dict:
|
||||
"""re-encrypt a record's encrypted fields from source_crypto's key to this one's
|
||||
|
||||
self holds the destination (new) key; source_crypto holds the source (old)
|
||||
key. only {secure, iv, data} fields are touched; plaintext fields are left
|
||||
as-is. returns a new dict; the input is not mutated. used during rotation.
|
||||
"""
|
||||
if not self.master_key:
|
||||
raise ValueError("destination not initialized with data key")
|
||||
|
||||
result = record.copy()
|
||||
for key, value in record.items():
|
||||
if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value:
|
||||
result[key] = self.encrypt_data(source_crypto.decrypt_data(value))
|
||||
elif traversal_level > 0 and isinstance(value, dict):
|
||||
result[key] = self.reencrypt(source_crypto, value, traversal_level - 1)
|
||||
return result
|
||||
|
||||
|
||||
# naming aliases — same class
|
||||
DocumentCrypto = EnvelopeCrypto
|
||||
RecordCrypto = EnvelopeCrypto
|
||||
PCICrypto = EnvelopeCrypto # deprecated legacy alias; remove after all systems migrate
|
||||
|
||||
|
||||
def is_encrypted_record(record, traversal_level: int = 2) -> bool:
|
||||
"""return whether a record has any encrypted ({secure, iv, data}) fields
|
||||
|
||||
aliases: is_encrypted_document, is_encrypted_dict — same function
|
||||
"""
|
||||
if not isinstance(record, dict):
|
||||
return False
|
||||
|
||||
for value in record.values():
|
||||
if isinstance(value, dict) and value.get("secure") is True:
|
||||
if "iv" in value and "data" in value:
|
||||
return True
|
||||
|
||||
if traversal_level > 0:
|
||||
for value in record.values():
|
||||
if isinstance(value, dict) and is_encrypted_record(value, traversal_level - 1):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def decrypt_record(crypto: EnvelopeCrypto, record, traversal_level: int = 2) -> dict:
|
||||
"""decrypt a record's encrypted fields into a new dict (up to traversal_level deep)
|
||||
|
||||
failures on a single field are logged and that field is left encrypted, so a
|
||||
partial failure is visible (the {secure,...} blob remains) rather than silent.
|
||||
|
||||
aliases: decrypt_document, decrypt_dict — same function
|
||||
"""
|
||||
if not crypto.master_key:
|
||||
raise ValueError("not initialized with data key")
|
||||
if not isinstance(record, dict):
|
||||
return record
|
||||
|
||||
result = record.copy()
|
||||
for key, value in record.items():
|
||||
if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value:
|
||||
try:
|
||||
result[key] = crypto.decrypt_data(value)
|
||||
except Exception:
|
||||
_log.exception("failed to decrypt field %s", key)
|
||||
elif traversal_level > 0 and isinstance(value, dict):
|
||||
result[key] = decrypt_record(crypto, value, traversal_level - 1)
|
||||
return result
|
||||
|
||||
|
||||
def fingerprint_data(data: dict) -> str:
|
||||
"""return a deterministic SHA-256 hex fingerprint of a dict"""
|
||||
return hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()
|
||||
|
||||
|
||||
# function aliases — same functions, naming preference only
|
||||
is_encrypted_document = is_encrypted_record
|
||||
is_encrypted_dict = is_encrypted_record
|
||||
decrypt_document = decrypt_record
|
||||
decrypt_dict = decrypt_record
|
||||
Loading…
Reference in New Issue
Block a user