envelope_crypto/README.md
disqualifier 16205e810a fix: deepcopy in reencrypt/decrypt_record so input is not mutated
both used record.copy() (shallow), leaving unencrypted mutable fields shared between the input and the returned dict, violating the documented 'input is not mutated' contract. switched to copy.deepcopy.

Signed-off-by: disqualifier <dev@disqualifier.me>
2026-06-28 17:18:28 -04:00

156 lines
5.2 KiB
Markdown

# envelope_crypto
Envelope encryption for dict records. A random AES-256-GCM data key (DEK) encrypts
the data; that key is wrapped (RSA-OAEP) per authorized system's public key (KEK)
and stored by the caller, keyed by fingerprint. Each system unwraps its own copy
with its private key. The same envelope pattern KMS-style systems use — config-free
and storage-agnostic.
## Install
`requirements.txt`:
```
envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.1
```
Direct:
```bash
pip install "envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.1"
```
Requires `cryptography` (pulled transitively).
## First-time setup
Run once, ever, to create the data key and authorize the first system. You need an
RSA keypair first:
```bash
# generate an RSA keypair for the first system (PEM)
openssl genrsa -out local_priv.pem 4096
openssl rsa -in local_priv.pem -pubout -out local_pub.pem
```
```python
from envelope_crypto import EnvelopeCrypto
# generate the DEK and wrap it for this system in one call
crypto, fingerprint, wrapped = EnvelopeCrypto.bootstrap("public_key.pem")
# verify the keypair actually round-trips BEFORE storing anything
crypto.self_test("public_key.pem", "private_key.pem") # raises if keys don't pair
# store the wrapped key — this is now the ONLY record of the DEK
await db.create_document("keys", {"_id": fingerprint, "key": wrapped})
```
The plaintext DEK is never stored. It survives only as the RSA-wrapped copy, and is
re-derived each boot by unwrapping. **Never persist the plaintext key.**
## Boot (already set up)
```python
crypto = EnvelopeCrypto()
fingerprint = crypto.get_rsa_key_fingerprint("public_key.pem")
record = await db.get_document("keys", {"_id": fingerprint})
if not record:
raise RuntimeError("this system is not authorized")
crypto.initialize(crypto.decrypt_aes_key_with_rsa(record["key"], "private_key.pem"))
bot.crypto = crypto
```
The `keys` schema (`_id` = fingerprint, `key` = wrapped) is the **caller's** choice;
this lib only produces `(fingerprint, wrapped_key)`.
## Encrypt / decrypt
```python
enc = crypto.encrypt_data({"ssn": "..."}) # -> {"secure": True, "iv": ..., "data": ...}
plain = crypto.decrypt_data(enc) # -> {"ssn": "..."}
```
For whole records: `decrypt_record(crypto, doc)` decrypts every `{secure, iv, data}`
field (nested up to `traversal_level`, default 2); `is_encrypted_record(doc)` reports
whether any encrypted field exists.
```python
from envelope_crypto import is_encrypted_record, decrypt_record
if is_encrypted_record(doc):
doc = decrypt_record(crypto, doc)
```
Naming aliases (same objects): `EnvelopeCrypto` = `DocumentCrypto` = `RecordCrypto`
= `PCICrypto` (deprecated legacy alias). `decrypt_record` = `decrypt_document` =
`decrypt_dict`; `is_encrypted_record` = `is_encrypted_document` = `is_encrypted_dict`.
## Authorize another system
An initialized system wraps the DEK for another system's public key. Returns the
record to store.
```python
fingerprint, wrapped = crypto.authorize_system(other_pub_path)
await db.create_document("keys", {"_id": fingerprint, "key": wrapped})
```
Only a system that already holds the DEK can authorize others.
## Deauthorize
Delete that fingerprint's key record — the system can no longer unwrap at boot. This
does **not** revoke a DEK already held in memory by a running system; rotate if a
system is compromised.
## Rotate (new key + re-encrypt)
Generate a new DEK, wrap for the still-authorized set, then re-encrypt existing data.
```python
new_key, wrapped = crypto.rotate_master_key([pub_a, pub_b]) # omit a system to drop it
new_crypto = EnvelopeCrypto()
new_crypto.initialize(new_key)
# re-encrypt every record (caller owns the DB loop)
for doc in await db.get_documents("settings", {}):
fresh = new_crypto.reencrypt(crypto, doc) # decrypt(old) -> encrypt(new)
await db.update_document("settings", {"_id": doc["_id"]}, fresh)
# replace the key records
await db.delete_documents("keys", {})
for fingerprint, wrapped_key in wrapped.items():
await db.create_document("keys", {"_id": fingerprint, "key": wrapped_key})
```
`reencrypt(source_crypto, record)` is a method on the **destination** (new-key)
instance: it decrypts each encrypted field with `source_crypto` (old key) and
re-encrypts with itself. Only `{secure, ...}` fields are touched.
## Storage patterns
The encrypted blob is just a dict — store it wherever:
- **Mongo** — store the dict directly (Mongo is dict-native).
- **MariaDB / Postgres** — `json.dumps(enc)` into a `JSON` (or `TEXT`) column;
`json.loads` on read, then `decrypt_data`.
- **File** — `json.dump(enc, f)`.
The lib never touches a database; only the caller's storage layer differs.
## Notes
- `shutdown()` drops the key reference but cannot guarantee zeroing it from RAM
(Python immutable bytes).
- A failed field decryption in `decrypt_record` is logged and left encrypted (the
blob stays visible) rather than silently dropped.
- The scheme is envelope/hybrid encryption (AES-256-GCM data key wrapped by RSA-OAEP).
Using it does not by itself confer PCI-DSS or any other compliance — that is a
whole-system property.
## Versioning
Tagged `vX.Y.Z`. Pin the tag in `requirements.txt`.