both used record.copy() (shallow), leaving unencrypted mutable fields shared between the input and the returned dict, violating the documented 'input is not mutated' contract. switched to copy.deepcopy. Signed-off-by: disqualifier <dev@disqualifier.me> |
||
|---|---|---|
| src/envelope_crypto | ||
| .gitignore | ||
| pyproject.toml | ||
| README.md | ||
envelope_crypto
Envelope encryption for dict records. A random AES-256-GCM data key (DEK) encrypts the data; that key is wrapped (RSA-OAEP) per authorized system's public key (KEK) and stored by the caller, keyed by fingerprint. Each system unwraps its own copy with its private key. The same envelope pattern KMS-style systems use — config-free and storage-agnostic.
Install
requirements.txt:
envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.1
Direct:
pip install "envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.1"
Requires cryptography (pulled transitively).
First-time setup
Run once, ever, to create the data key and authorize the first system. You need an RSA keypair first:
# generate an RSA keypair for the first system (PEM)
openssl genrsa -out local_priv.pem 4096
openssl rsa -in local_priv.pem -pubout -out local_pub.pem
from envelope_crypto import EnvelopeCrypto
# generate the DEK and wrap it for this system in one call
crypto, fingerprint, wrapped = EnvelopeCrypto.bootstrap("public_key.pem")
# verify the keypair actually round-trips BEFORE storing anything
crypto.self_test("public_key.pem", "private_key.pem") # raises if keys don't pair
# store the wrapped key — this is now the ONLY record of the DEK
await db.create_document("keys", {"_id": fingerprint, "key": wrapped})
The plaintext DEK is never stored. It survives only as the RSA-wrapped copy, and is re-derived each boot by unwrapping. Never persist the plaintext key.
Boot (already set up)
crypto = EnvelopeCrypto()
fingerprint = crypto.get_rsa_key_fingerprint("public_key.pem")
record = await db.get_document("keys", {"_id": fingerprint})
if not record:
raise RuntimeError("this system is not authorized")
crypto.initialize(crypto.decrypt_aes_key_with_rsa(record["key"], "private_key.pem"))
bot.crypto = crypto
The keys schema (_id = fingerprint, key = wrapped) is the caller's choice;
this lib only produces (fingerprint, wrapped_key).
Encrypt / decrypt
enc = crypto.encrypt_data({"ssn": "..."}) # -> {"secure": True, "iv": ..., "data": ...}
plain = crypto.decrypt_data(enc) # -> {"ssn": "..."}
For whole records: decrypt_record(crypto, doc) decrypts every {secure, iv, data}
field (nested up to traversal_level, default 2); is_encrypted_record(doc) reports
whether any encrypted field exists.
from envelope_crypto import is_encrypted_record, decrypt_record
if is_encrypted_record(doc):
doc = decrypt_record(crypto, doc)
Naming aliases (same objects): EnvelopeCrypto = DocumentCrypto = RecordCrypto
= PCICrypto (deprecated legacy alias). decrypt_record = decrypt_document =
decrypt_dict; is_encrypted_record = is_encrypted_document = is_encrypted_dict.
Authorize another system
An initialized system wraps the DEK for another system's public key. Returns the record to store.
fingerprint, wrapped = crypto.authorize_system(other_pub_path)
await db.create_document("keys", {"_id": fingerprint, "key": wrapped})
Only a system that already holds the DEK can authorize others.
Deauthorize
Delete that fingerprint's key record — the system can no longer unwrap at boot. This does not revoke a DEK already held in memory by a running system; rotate if a system is compromised.
Rotate (new key + re-encrypt)
Generate a new DEK, wrap for the still-authorized set, then re-encrypt existing data.
new_key, wrapped = crypto.rotate_master_key([pub_a, pub_b]) # omit a system to drop it
new_crypto = EnvelopeCrypto()
new_crypto.initialize(new_key)
# re-encrypt every record (caller owns the DB loop)
for doc in await db.get_documents("settings", {}):
fresh = new_crypto.reencrypt(crypto, doc) # decrypt(old) -> encrypt(new)
await db.update_document("settings", {"_id": doc["_id"]}, fresh)
# replace the key records
await db.delete_documents("keys", {})
for fingerprint, wrapped_key in wrapped.items():
await db.create_document("keys", {"_id": fingerprint, "key": wrapped_key})
reencrypt(source_crypto, record) is a method on the destination (new-key)
instance: it decrypts each encrypted field with source_crypto (old key) and
re-encrypts with itself. Only {secure, ...} fields are touched.
Storage patterns
The encrypted blob is just a dict — store it wherever:
- Mongo — store the dict directly (Mongo is dict-native).
- MariaDB / Postgres —
json.dumps(enc)into aJSON(orTEXT) column;json.loadson read, thendecrypt_data. - File —
json.dump(enc, f).
The lib never touches a database; only the caller's storage layer differs.
Notes
shutdown()drops the key reference but cannot guarantee zeroing it from RAM (Python immutable bytes).- A failed field decryption in
decrypt_recordis logged and left encrypted (the blob stays visible) rather than silently dropped. - The scheme is envelope/hybrid encryption (AES-256-GCM data key wrapped by RSA-OAEP). Using it does not by itself confer PCI-DSS or any other compliance — that is a whole-system property.
Versioning
Tagged vX.Y.Z. Pin the tag in requirements.txt.