envelope_crypto/README.md
disqualifier 16205e810a fix: deepcopy in reencrypt/decrypt_record so input is not mutated
both used record.copy() (shallow), leaving unencrypted mutable fields shared between the input and the returned dict, violating the documented 'input is not mutated' contract. switched to copy.deepcopy.

Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-28 17:18:28 -04:00

5.2 KiB

envelope_crypto

Envelope encryption for dict records. A random AES-256-GCM data key (DEK) encrypts the data; that key is wrapped (RSA-OAEP) per authorized system's public key (KEK) and stored by the caller, keyed by fingerprint. Each system unwraps its own copy with its private key. The same envelope pattern KMS-style systems use — config-free and storage-agnostic.

Install

requirements.txt:

envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.1

Direct:

pip install "envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.1"

Requires cryptography (pulled transitively).

First-time setup

Run once, ever, to create the data key and authorize the first system. You need an RSA keypair first:

# generate an RSA keypair for the first system (PEM)
openssl genrsa -out local_priv.pem 4096
openssl rsa -in local_priv.pem -pubout -out local_pub.pem
from envelope_crypto import EnvelopeCrypto

# generate the DEK and wrap it for this system in one call
crypto, fingerprint, wrapped = EnvelopeCrypto.bootstrap("public_key.pem")

# verify the keypair actually round-trips BEFORE storing anything
crypto.self_test("public_key.pem", "private_key.pem")        # raises if keys don't pair

# store the wrapped key — this is now the ONLY record of the DEK
await db.create_document("keys", {"_id": fingerprint, "key": wrapped})

The plaintext DEK is never stored. It survives only as the RSA-wrapped copy, and is re-derived each boot by unwrapping. Never persist the plaintext key.

Boot (already set up)

crypto = EnvelopeCrypto()
fingerprint = crypto.get_rsa_key_fingerprint("public_key.pem")
record = await db.get_document("keys", {"_id": fingerprint})
if not record:
    raise RuntimeError("this system is not authorized")
crypto.initialize(crypto.decrypt_aes_key_with_rsa(record["key"], "private_key.pem"))
bot.crypto = crypto

The keys schema (_id = fingerprint, key = wrapped) is the caller's choice; this lib only produces (fingerprint, wrapped_key).

Encrypt / decrypt

enc = crypto.encrypt_data({"ssn": "..."})      # -> {"secure": True, "iv": ..., "data": ...}
plain = crypto.decrypt_data(enc)               # -> {"ssn": "..."}

For whole records: decrypt_record(crypto, doc) decrypts every {secure, iv, data} field (nested up to traversal_level, default 2); is_encrypted_record(doc) reports whether any encrypted field exists.

from envelope_crypto import is_encrypted_record, decrypt_record

if is_encrypted_record(doc):
    doc = decrypt_record(crypto, doc)

Naming aliases (same objects): EnvelopeCrypto = DocumentCrypto = RecordCrypto = PCICrypto (deprecated legacy alias). decrypt_record = decrypt_document = decrypt_dict; is_encrypted_record = is_encrypted_document = is_encrypted_dict.

Authorize another system

An initialized system wraps the DEK for another system's public key. Returns the record to store.

fingerprint, wrapped = crypto.authorize_system(other_pub_path)
await db.create_document("keys", {"_id": fingerprint, "key": wrapped})

Only a system that already holds the DEK can authorize others.

Deauthorize

Delete that fingerprint's key record — the system can no longer unwrap at boot. This does not revoke a DEK already held in memory by a running system; rotate if a system is compromised.

Rotate (new key + re-encrypt)

Generate a new DEK, wrap for the still-authorized set, then re-encrypt existing data.

new_key, wrapped = crypto.rotate_master_key([pub_a, pub_b])   # omit a system to drop it

new_crypto = EnvelopeCrypto()
new_crypto.initialize(new_key)

# re-encrypt every record (caller owns the DB loop)
for doc in await db.get_documents("settings", {}):
    fresh = new_crypto.reencrypt(crypto, doc)                 # decrypt(old) -> encrypt(new)
    await db.update_document("settings", {"_id": doc["_id"]}, fresh)

# replace the key records
await db.delete_documents("keys", {})
for fingerprint, wrapped_key in wrapped.items():
    await db.create_document("keys", {"_id": fingerprint, "key": wrapped_key})

reencrypt(source_crypto, record) is a method on the destination (new-key) instance: it decrypts each encrypted field with source_crypto (old key) and re-encrypts with itself. Only {secure, ...} fields are touched.

Storage patterns

The encrypted blob is just a dict — store it wherever:

  • Mongo — store the dict directly (Mongo is dict-native).
  • MariaDB / Postgresjson.dumps(enc) into a JSON (or TEXT) column; json.loads on read, then decrypt_data.
  • Filejson.dump(enc, f).

The lib never touches a database; only the caller's storage layer differs.

Notes

  • shutdown() drops the key reference but cannot guarantee zeroing it from RAM (Python immutable bytes).
  • A failed field decryption in decrypt_record is logged and left encrypted (the blob stays visible) rather than silently dropped.
  • The scheme is envelope/hybrid encryption (AES-256-GCM data key wrapped by RSA-OAEP). Using it does not by itself confer PCI-DSS or any other compliance — that is a whole-system property.

Versioning

Tagged vX.Y.Z. Pin the tag in requirements.txt.