add package: pyproject + src
EnvelopeCrypto: hybrid envelope encryption for dict records — a random AES-256-GCM data key (DEK) encrypts the data, wrapped per-system via RSA-OAEP (SHA-256) for distribution. config-free (DEK + key paths injected), storage-agnostic, object-only. covers bootstrap/self_test, authorize/deauthorize, rotate + reencrypt, and record-level decrypt. src/ layout, hatchling build, cryptography backend. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
parent
0b708cdf9a
commit
41859d70f8
15
pyproject.toml
Normal file
15
pyproject.toml
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
[build-system]
|
||||||
|
requires = ["hatchling"]
|
||||||
|
build-backend = "hatchling.build"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "envelope_crypto"
|
||||||
|
version = "0.1.0"
|
||||||
|
description = "Envelope encryption (RSA-OAEP wrapped AES-256-GCM) for dict records — config-free, storage-agnostic, installable."
|
||||||
|
requires-python = ">=3.10"
|
||||||
|
dependencies = [
|
||||||
|
"cryptography>=42.0",
|
||||||
|
]
|
||||||
|
|
||||||
|
[tool.hatch.build.targets.wheel]
|
||||||
|
packages = ["src/envelope_crypto"]
|
||||||
27
src/envelope_crypto/__init__.py
Normal file
27
src/envelope_crypto/__init__.py
Normal file
@ -0,0 +1,27 @@
|
|||||||
|
from .envelope_crypto import (
|
||||||
|
EnvelopeCrypto,
|
||||||
|
DocumentCrypto,
|
||||||
|
RecordCrypto,
|
||||||
|
PCICrypto,
|
||||||
|
is_encrypted_record,
|
||||||
|
is_encrypted_document,
|
||||||
|
is_encrypted_dict,
|
||||||
|
decrypt_record,
|
||||||
|
decrypt_document,
|
||||||
|
decrypt_dict,
|
||||||
|
fingerprint_data,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"EnvelopeCrypto",
|
||||||
|
"DocumentCrypto",
|
||||||
|
"RecordCrypto",
|
||||||
|
"PCICrypto",
|
||||||
|
"is_encrypted_record",
|
||||||
|
"is_encrypted_document",
|
||||||
|
"is_encrypted_dict",
|
||||||
|
"decrypt_record",
|
||||||
|
"decrypt_document",
|
||||||
|
"decrypt_dict",
|
||||||
|
"fingerprint_data",
|
||||||
|
]
|
||||||
377
src/envelope_crypto/envelope_crypto.py
Normal file
377
src/envelope_crypto/envelope_crypto.py
Normal file
@ -0,0 +1,377 @@
|
|||||||
|
"""
|
||||||
|
envelope encryption for dict records
|
||||||
|
|
||||||
|
hybrid encryption: a random AES-256-GCM data key (DEK) encrypts the data, and
|
||||||
|
that key is wrapped (RSA-OAEP) per authorized system's public key (KEK) for
|
||||||
|
distribution. the wrapped key is stored by the caller, keyed by fingerprint;
|
||||||
|
each system unwraps its own copy with its private key. this is the same
|
||||||
|
envelope-encryption pattern used by KMS-style systems.
|
||||||
|
|
||||||
|
from envelope_crypto import EnvelopeCrypto
|
||||||
|
|
||||||
|
crypto = EnvelopeCrypto()
|
||||||
|
crypto.initialize(master_key) # 32-byte AES DEK
|
||||||
|
enc = crypto.encrypt_data({"ssn": "..."}) # -> {secure, iv, data}
|
||||||
|
plain = crypto.decrypt_data(enc) # -> original
|
||||||
|
|
||||||
|
first-time setup: generate the DEK and wrap it for the first system in one call,
|
||||||
|
then verify the pipeline before storing anything:
|
||||||
|
|
||||||
|
crypto, fingerprint, wrapped = EnvelopeCrypto.bootstrap("public_key.pem")
|
||||||
|
crypto.self_test("public_key.pem", "private_key.pem") # raises if anything is wrong
|
||||||
|
caller_store({"_id": fingerprint, "key": wrapped}) # the only record of the DEK
|
||||||
|
|
||||||
|
boot (already set up): fingerprint own pubkey, fetch the wrapped DEK, unwrap:
|
||||||
|
|
||||||
|
fp = crypto.get_rsa_key_fingerprint("public_key.pem")
|
||||||
|
record = caller_lookup(fp)
|
||||||
|
crypto.initialize(crypto.decrypt_aes_key_with_rsa(record["key"], "private_key.pem"))
|
||||||
|
|
||||||
|
authorize another system (this instance must already hold the DEK):
|
||||||
|
|
||||||
|
fp, wrapped = crypto.authorize_system(other_pub_path)
|
||||||
|
caller_store({"_id": fp, "key": wrapped})
|
||||||
|
|
||||||
|
deauthorize: caller deletes that fingerprint's record. note this stops future
|
||||||
|
unwraps but does not revoke a DEK already in a running system's memory — rotate
|
||||||
|
if compromised.
|
||||||
|
|
||||||
|
rotate (new DEK + re-encrypt): generate a new DEK, wrap for the still-authorized
|
||||||
|
set, then re-encrypt existing records old -> new:
|
||||||
|
|
||||||
|
new_key, wrapped = crypto.rotate_master_key([pub_a, pub_b])
|
||||||
|
new_crypto = EnvelopeCrypto(); new_crypto.initialize(new_key)
|
||||||
|
for record in caller_iter():
|
||||||
|
caller_update(new_crypto.reencrypt(crypto, record))
|
||||||
|
|
||||||
|
config-free: the host supplies the DEK and RSA key paths; this lib never imports
|
||||||
|
config, configures logging, or touches a database. storage-agnostic — the
|
||||||
|
encrypted blob is a plain dict; store it in mongo, a sql json column, or a file.
|
||||||
|
|
||||||
|
naming: EnvelopeCrypto is canonical. PCICrypto / DocumentCrypto / RecordCrypto are
|
||||||
|
aliases (PCICrypto is a deprecated legacy alias). the document/record/dict
|
||||||
|
function variants are the same functions — use whichever fits your storage.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
import json
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
import logging
|
||||||
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
||||||
|
|
||||||
|
from cryptography.hazmat.primitives import hashes, serialization
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import padding
|
||||||
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
||||||
|
from cryptography.hazmat.primitives.serialization import load_ssh_public_key
|
||||||
|
|
||||||
|
_log = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class EnvelopeCrypto:
|
||||||
|
"""hybrid RSA/AES-256-GCM envelope encryption for dict records
|
||||||
|
|
||||||
|
holds an AES data key (DEK), injected via initialize. encrypts/decrypts record
|
||||||
|
fields, wraps/unwraps the DEK with RSA keys for distribution, and re-encrypts
|
||||||
|
records across key rotations. config-free and storage-agnostic.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.master_key: Optional[bytes] = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def bootstrap(cls, rsa_public_key: str, is_file: bool = True) -> Tuple["EnvelopeCrypto", str, str]:
|
||||||
|
"""first-time setup: generate a DEK and wrap it for the first system
|
||||||
|
|
||||||
|
returns (crypto, fingerprint, wrapped_key) — an initialized instance plus
|
||||||
|
the record to store as the first authorization. the plaintext DEK is never
|
||||||
|
returned or persisted; it survives only as the wrapped copy. run self_test
|
||||||
|
before storing to confirm the keypair round-trips.
|
||||||
|
"""
|
||||||
|
crypto = cls()
|
||||||
|
crypto.initialize(crypto.create_aes_key())
|
||||||
|
fingerprint, wrapped = crypto.authorize_system(rsa_public_key, is_file=is_file)
|
||||||
|
return crypto, fingerprint, wrapped
|
||||||
|
|
||||||
|
def self_test(
|
||||||
|
self, rsa_public_key: str, rsa_private_key: str, *,
|
||||||
|
is_file: bool = True, password: Optional[str] = None,
|
||||||
|
) -> bool:
|
||||||
|
"""verify the full pipeline against a keypair; raises on any mismatch
|
||||||
|
|
||||||
|
round-trips sample data through this instance's DEK, then wraps the DEK
|
||||||
|
with the public key and unwraps with the private key, confirming they
|
||||||
|
match. run after bootstrap (or anytime as a health check) to catch a bad
|
||||||
|
keypair or wrong key path before relying on it. returns True on success.
|
||||||
|
"""
|
||||||
|
if not self.master_key:
|
||||||
|
raise ValueError("self_test: not initialized with a key")
|
||||||
|
|
||||||
|
sample = {"_selftest": "ok", "n": 12345}
|
||||||
|
if self.decrypt_data(self.encrypt_data(sample)) != sample:
|
||||||
|
raise RuntimeError("self_test: data round-trip failed")
|
||||||
|
|
||||||
|
_, wrapped = self.encrypt_aes_key_with_rsa(self.master_key, rsa_public_key, is_file=is_file)
|
||||||
|
try:
|
||||||
|
recovered = self.decrypt_aes_key_with_rsa(wrapped, rsa_private_key, password=password)
|
||||||
|
except Exception as error:
|
||||||
|
raise RuntimeError(
|
||||||
|
"self_test: key unwrap failed (public/private keys do not pair, "
|
||||||
|
"or wrong password)"
|
||||||
|
) from error
|
||||||
|
if recovered != self.master_key:
|
||||||
|
raise RuntimeError("self_test: key wrap/unwrap mismatch (public/private keys do not pair)")
|
||||||
|
|
||||||
|
_log.info("self_test passed")
|
||||||
|
return True
|
||||||
|
|
||||||
|
def initialize(self, master_key: bytes) -> None:
|
||||||
|
"""arm the instance with the AES data key (DEK)"""
|
||||||
|
self.master_key = master_key
|
||||||
|
_log.info("crypto initialized with data key")
|
||||||
|
|
||||||
|
def shutdown(self) -> None:
|
||||||
|
"""drop the data key reference
|
||||||
|
|
||||||
|
note: python cannot guarantee zeroing of immutable bytes in memory; this
|
||||||
|
only releases the reference for garbage collection. do not rely on it to
|
||||||
|
scrub the key from RAM.
|
||||||
|
"""
|
||||||
|
self.master_key = None
|
||||||
|
_log.info("data key reference cleared")
|
||||||
|
|
||||||
|
def create_aes_key(self) -> bytes:
|
||||||
|
"""generate a random AES-256 data key"""
|
||||||
|
key = os.urandom(32)
|
||||||
|
_log.info("generated new AES data key")
|
||||||
|
return key
|
||||||
|
|
||||||
|
def get_rsa_key_fingerprint(
|
||||||
|
self, key_path_or_data: str, is_private: bool = False, is_file: bool = True
|
||||||
|
) -> str:
|
||||||
|
"""return a base64 SHA-256 fingerprint of an RSA key for identification"""
|
||||||
|
if is_file:
|
||||||
|
with open(key_path_or_data, "rb") as key_file:
|
||||||
|
key_data = key_file.read()
|
||||||
|
else:
|
||||||
|
key_data = (
|
||||||
|
key_path_or_data.encode()
|
||||||
|
if isinstance(key_path_or_data, str)
|
||||||
|
else key_path_or_data
|
||||||
|
)
|
||||||
|
|
||||||
|
if is_private:
|
||||||
|
private_key = serialization.load_pem_private_key(key_data, password=None)
|
||||||
|
public_key = private_key.public_key()
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
public_key = serialization.load_pem_public_key(key_data)
|
||||||
|
except ValueError:
|
||||||
|
public_key = load_ssh_public_key(key_data)
|
||||||
|
|
||||||
|
key_bytes = public_key.public_bytes(
|
||||||
|
encoding=serialization.Encoding.DER,
|
||||||
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||||
|
)
|
||||||
|
digest = hashes.Hash(hashes.SHA256())
|
||||||
|
digest.update(key_bytes)
|
||||||
|
fingerprint = base64.b64encode(digest.finalize()).decode()
|
||||||
|
_log.info("generated %s key fingerprint", "private" if is_private else "public")
|
||||||
|
return fingerprint
|
||||||
|
|
||||||
|
def encrypt_aes_key_with_rsa(
|
||||||
|
self, aes_key: bytes, rsa_key: str, is_file: bool = True
|
||||||
|
) -> Tuple[str, str]:
|
||||||
|
"""wrap an AES key with an RSA public key; returns (fingerprint, wrapped_b64)"""
|
||||||
|
if is_file:
|
||||||
|
with open(rsa_key, "rb") as key_file:
|
||||||
|
key_data = key_file.read()
|
||||||
|
else:
|
||||||
|
key_data = rsa_key.encode() if isinstance(rsa_key, str) else rsa_key
|
||||||
|
|
||||||
|
try:
|
||||||
|
public_key = serialization.load_pem_public_key(key_data)
|
||||||
|
except ValueError:
|
||||||
|
public_key = load_ssh_public_key(key_data)
|
||||||
|
|
||||||
|
wrapped = public_key.encrypt(
|
||||||
|
aes_key,
|
||||||
|
padding.OAEP(
|
||||||
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||||
|
algorithm=hashes.SHA256(),
|
||||||
|
label=None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
fingerprint = self.get_rsa_key_fingerprint(rsa_key, is_private=False, is_file=is_file)
|
||||||
|
wrapped_b64 = base64.b64encode(wrapped).decode()
|
||||||
|
_log.info("wrapped data key for fingerprint %s", fingerprint[:8])
|
||||||
|
return fingerprint, wrapped_b64
|
||||||
|
|
||||||
|
def decrypt_aes_key_with_rsa(
|
||||||
|
self, encrypted_key_base64: str, rsa_private_key_path: str,
|
||||||
|
password: Optional[str] = None,
|
||||||
|
) -> bytes:
|
||||||
|
"""unwrap an AES key with an RSA private key"""
|
||||||
|
with open(rsa_private_key_path, "rb") as key_file:
|
||||||
|
key_data = key_file.read()
|
||||||
|
try:
|
||||||
|
private_key = serialization.load_pem_private_key(
|
||||||
|
key_data, password=password.encode() if password else None
|
||||||
|
)
|
||||||
|
except ValueError as error:
|
||||||
|
if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
|
||||||
|
private_key = serialization.load_ssh_private_key(
|
||||||
|
key_data, password=password.encode() if password else None
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
raise error
|
||||||
|
|
||||||
|
wrapped = base64.b64decode(encrypted_key_base64)
|
||||||
|
aes_key = private_key.decrypt(
|
||||||
|
wrapped,
|
||||||
|
padding.OAEP(
|
||||||
|
mgf=padding.MGF1(algorithm=hashes.SHA256()),
|
||||||
|
algorithm=hashes.SHA256(),
|
||||||
|
label=None,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
_log.info("unwrapped data key with RSA private key")
|
||||||
|
return aes_key
|
||||||
|
|
||||||
|
def authorize_system(self, rsa_public_key: str, is_file: bool = True) -> Tuple[str, str]:
|
||||||
|
"""wrap the current data key for another system's public key
|
||||||
|
|
||||||
|
returns (fingerprint, wrapped_b64) for the caller to store as that
|
||||||
|
system's key-authorization record. requires this instance to already
|
||||||
|
hold the data key — only an authorized system can authorize others.
|
||||||
|
"""
|
||||||
|
if not self.master_key:
|
||||||
|
raise ValueError("cannot authorize another system: not initialized")
|
||||||
|
return self.encrypt_aes_key_with_rsa(self.master_key, rsa_public_key, is_file=is_file)
|
||||||
|
|
||||||
|
def rotate_master_key(
|
||||||
|
self, authorized_public_keys: List[str], is_file: bool = True
|
||||||
|
) -> Tuple[bytes, Dict[str, str]]:
|
||||||
|
"""generate a NEW data key and wrap it for each authorized public key
|
||||||
|
|
||||||
|
returns (new_key, {fingerprint: wrapped_b64}). does NOT re-encrypt existing
|
||||||
|
data — build a new instance with the new key and call reencrypt() on each
|
||||||
|
record. systems not in the list get no wrapped copy (deauthorized).
|
||||||
|
"""
|
||||||
|
new_key = self.create_aes_key()
|
||||||
|
wrapped = {}
|
||||||
|
for pub in authorized_public_keys:
|
||||||
|
fingerprint, wrapped_b64 = self.encrypt_aes_key_with_rsa(new_key, pub, is_file=is_file)
|
||||||
|
wrapped[fingerprint] = wrapped_b64
|
||||||
|
_log.info("rotated data key, wrapped for %d systems", len(wrapped))
|
||||||
|
return new_key, wrapped
|
||||||
|
|
||||||
|
def encrypt_data(self, data: Union[Dict[str, Any], str]) -> Dict[str, str]:
|
||||||
|
"""encrypt a dict or string under the data key with a unique IV"""
|
||||||
|
if not self.master_key:
|
||||||
|
raise ValueError("not initialized with data key")
|
||||||
|
|
||||||
|
data_str = json.dumps(data) if isinstance(data, dict) else data
|
||||||
|
iv = os.urandom(12)
|
||||||
|
aesgcm = AESGCM(self.master_key)
|
||||||
|
ciphertext = aesgcm.encrypt(iv, data_str.encode(), None)
|
||||||
|
return {
|
||||||
|
"secure": True,
|
||||||
|
"iv": base64.b64encode(iv).decode(),
|
||||||
|
"data": base64.b64encode(ciphertext).decode(),
|
||||||
|
}
|
||||||
|
|
||||||
|
def decrypt_data(self, encrypted_data: Dict[str, str]) -> Union[Dict[str, Any], str]:
|
||||||
|
"""decrypt a {secure, iv, data} blob; returns the original dict or string"""
|
||||||
|
if not self.master_key:
|
||||||
|
raise ValueError("not initialized with data key")
|
||||||
|
|
||||||
|
iv = base64.b64decode(encrypted_data["iv"])
|
||||||
|
ciphertext = base64.b64decode(encrypted_data["data"])
|
||||||
|
aesgcm = AESGCM(self.master_key)
|
||||||
|
plaintext = aesgcm.decrypt(iv, ciphertext, None).decode()
|
||||||
|
try:
|
||||||
|
return json.loads(plaintext)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
return plaintext
|
||||||
|
|
||||||
|
def reencrypt(self, source_crypto: "EnvelopeCrypto", record: dict, traversal_level: int = 2) -> dict:
|
||||||
|
"""re-encrypt a record's encrypted fields from source_crypto's key to this one's
|
||||||
|
|
||||||
|
self holds the destination (new) key; source_crypto holds the source (old)
|
||||||
|
key. only {secure, iv, data} fields are touched; plaintext fields are left
|
||||||
|
as-is. returns a new dict; the input is not mutated. used during rotation.
|
||||||
|
"""
|
||||||
|
if not self.master_key:
|
||||||
|
raise ValueError("destination not initialized with data key")
|
||||||
|
|
||||||
|
result = record.copy()
|
||||||
|
for key, value in record.items():
|
||||||
|
if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value:
|
||||||
|
result[key] = self.encrypt_data(source_crypto.decrypt_data(value))
|
||||||
|
elif traversal_level > 0 and isinstance(value, dict):
|
||||||
|
result[key] = self.reencrypt(source_crypto, value, traversal_level - 1)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
# naming aliases — same class
|
||||||
|
DocumentCrypto = EnvelopeCrypto
|
||||||
|
RecordCrypto = EnvelopeCrypto
|
||||||
|
PCICrypto = EnvelopeCrypto # deprecated legacy alias; remove after all systems migrate
|
||||||
|
|
||||||
|
|
||||||
|
def is_encrypted_record(record, traversal_level: int = 2) -> bool:
|
||||||
|
"""return whether a record has any encrypted ({secure, iv, data}) fields
|
||||||
|
|
||||||
|
aliases: is_encrypted_document, is_encrypted_dict — same function
|
||||||
|
"""
|
||||||
|
if not isinstance(record, dict):
|
||||||
|
return False
|
||||||
|
|
||||||
|
for value in record.values():
|
||||||
|
if isinstance(value, dict) and value.get("secure") is True:
|
||||||
|
if "iv" in value and "data" in value:
|
||||||
|
return True
|
||||||
|
|
||||||
|
if traversal_level > 0:
|
||||||
|
for value in record.values():
|
||||||
|
if isinstance(value, dict) and is_encrypted_record(value, traversal_level - 1):
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def decrypt_record(crypto: EnvelopeCrypto, record, traversal_level: int = 2) -> dict:
|
||||||
|
"""decrypt a record's encrypted fields into a new dict (up to traversal_level deep)
|
||||||
|
|
||||||
|
failures on a single field are logged and that field is left encrypted, so a
|
||||||
|
partial failure is visible (the {secure,...} blob remains) rather than silent.
|
||||||
|
|
||||||
|
aliases: decrypt_document, decrypt_dict — same function
|
||||||
|
"""
|
||||||
|
if not crypto.master_key:
|
||||||
|
raise ValueError("not initialized with data key")
|
||||||
|
if not isinstance(record, dict):
|
||||||
|
return record
|
||||||
|
|
||||||
|
result = record.copy()
|
||||||
|
for key, value in record.items():
|
||||||
|
if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value:
|
||||||
|
try:
|
||||||
|
result[key] = crypto.decrypt_data(value)
|
||||||
|
except Exception:
|
||||||
|
_log.exception("failed to decrypt field %s", key)
|
||||||
|
elif traversal_level > 0 and isinstance(value, dict):
|
||||||
|
result[key] = decrypt_record(crypto, value, traversal_level - 1)
|
||||||
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def fingerprint_data(data: dict) -> str:
|
||||||
|
"""return a deterministic SHA-256 hex fingerprint of a dict"""
|
||||||
|
return hashlib.sha256(json.dumps(data, sort_keys=True).encode()).hexdigest()
|
||||||
|
|
||||||
|
|
||||||
|
# function aliases — same functions, naming preference only
|
||||||
|
is_encrypted_document = is_encrypted_record
|
||||||
|
is_encrypted_dict = is_encrypted_record
|
||||||
|
decrypt_document = decrypt_record
|
||||||
|
decrypt_dict = decrypt_record
|
||||||
Loading…
Reference in New Issue
Block a user