fix: normalize key-load exceptions; type-faithful decrypt_data (v0.1.3)

- encrypted OpenSSH private key with no password now raises ValueError (not a raw
  TypeError from load_ssh_private_key), matching the PEM path and the docstring (L14)
- a non-PEM/non-SSH public key raises a clear ValueError instead of cryptography's
  UnsupportedAlgorithm, consistent with the private-key paths (L15)
- decrypt_data only treats a json-OBJECT plaintext as a dict, so json-shaped strings
  ('123','true','[1,2]') round-trip as strings; existing dict blobs unaffected (L16)
- both key loads route through shared _load_private_key/_load_public_key helpers
- document reencrypt's fail-loud (vs decrypt_record's per-field swallow) asymmetry (nit).

Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
disqualifier 2026-06-29 17:58:26 -04:00
parent 5de8b5d736
commit 72c7aa936e
3 changed files with 66 additions and 35 deletions

View File

@ -11,13 +11,13 @@ and storage-agnostic.
`requirements.txt`: `requirements.txt`:
``` ```
envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.2 envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.3
``` ```
Direct: Direct:
```bash ```bash
pip install "envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.2" pip install "envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.3"
``` ```
Requires `cryptography` (pulled transitively). Requires `cryptography` (pulled transitively).

View File

@ -4,7 +4,7 @@ build-backend = "hatchling.build"
[project] [project]
name = "envelope_crypto" name = "envelope_crypto"
version = "0.1.2" version = "0.1.3"
description = "Envelope encryption (RSA-OAEP wrapped AES-256-GCM) for dict records — config-free, storage-agnostic, installable." description = "Envelope encryption (RSA-OAEP wrapped AES-256-GCM) for dict records — config-free, storage-agnostic, installable."
requires-python = ">=3.10" requires-python = ">=3.10"
dependencies = [ dependencies = [

View File

@ -61,6 +61,7 @@ import hashlib
import logging import logging
from typing import Any, Dict, List, Optional, Tuple, Union from typing import Any, Dict, List, Optional, Tuple, Union
from cryptography.exceptions import UnsupportedAlgorithm
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.ciphers.aead import AESGCM
@ -69,6 +70,46 @@ from cryptography.hazmat.primitives.serialization import load_ssh_public_key
_log = logging.getLogger(__name__) _log = logging.getLogger(__name__)
def _load_private_key(key_data: bytes, pw: Optional[bytes]):
"""load a PEM or OpenSSH private key, normalizing the missing-password error
cryptography raises TypeError when a key is encrypted but no password was given
(PEM raises it on the first call; OpenSSH raises it inside the openssh fallback).
both are normalized to a clear ValueError so callers see one error type.
"""
try:
return serialization.load_pem_private_key(key_data, password=pw)
except ValueError as error:
if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
try:
return serialization.load_ssh_private_key(key_data, password=pw)
except TypeError as ssh_error:
raise ValueError(
"private key is encrypted but no password was provided"
) from ssh_error
raise error
except TypeError as error:
raise ValueError(
"private key is encrypted but no password was provided"
) from error
def _load_public_key(key_data: bytes):
"""load a PEM or OpenSSH public key, normalizing non-key input to ValueError
load_ssh_public_key raises UnsupportedAlgorithm (not ValueError) on non-SSH/garbage
input; normalize it so a bad public key always surfaces as a clear ValueError,
consistent with the private-key path.
"""
try:
return serialization.load_pem_public_key(key_data)
except ValueError:
try:
return load_ssh_public_key(key_data)
except (UnsupportedAlgorithm, ValueError) as error:
raise ValueError("not a valid PEM or OpenSSH public key") from error
class EnvelopeCrypto: class EnvelopeCrypto:
"""hybrid RSA/AES-256-GCM envelope encryption for dict records """hybrid RSA/AES-256-GCM envelope encryption for dict records
@ -172,23 +213,10 @@ class EnvelopeCrypto:
if is_private: if is_private:
pw = password.encode() if password else None pw = password.encode() if password else None
try: private_key = _load_private_key(key_data, pw)
private_key = serialization.load_pem_private_key(key_data, password=pw)
except ValueError as error:
if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
private_key = serialization.load_ssh_private_key(key_data, password=pw)
else:
raise error
except TypeError as error:
raise ValueError(
"private key is encrypted but no password was provided"
) from error
public_key = private_key.public_key() public_key = private_key.public_key()
else: else:
try: public_key = _load_public_key(key_data)
public_key = serialization.load_pem_public_key(key_data)
except ValueError:
public_key = load_ssh_public_key(key_data)
key_bytes = public_key.public_bytes( key_bytes = public_key.public_bytes(
encoding=serialization.Encoding.DER, encoding=serialization.Encoding.DER,
@ -210,10 +238,7 @@ class EnvelopeCrypto:
else: else:
key_data = rsa_key.encode() if isinstance(rsa_key, str) else rsa_key key_data = rsa_key.encode() if isinstance(rsa_key, str) else rsa_key
try: public_key = _load_public_key(key_data)
public_key = serialization.load_pem_public_key(key_data)
except ValueError:
public_key = load_ssh_public_key(key_data)
wrapped = public_key.encrypt( wrapped = public_key.encrypt(
aes_key, aes_key,
@ -236,17 +261,7 @@ class EnvelopeCrypto:
with open(rsa_private_key_path, "rb") as key_file: with open(rsa_private_key_path, "rb") as key_file:
key_data = key_file.read() key_data = key_file.read()
pw = password.encode() if password else None pw = password.encode() if password else None
try: private_key = _load_private_key(key_data, pw)
private_key = serialization.load_pem_private_key(key_data, password=pw)
except ValueError as error:
if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
private_key = serialization.load_ssh_private_key(key_data, password=pw)
else:
raise error
except TypeError as error:
raise ValueError(
"private key is encrypted but no password was provided"
) from error
wrapped = base64.b64decode(encrypted_key_base64) wrapped = base64.b64decode(encrypted_key_base64)
aes_key = private_key.decrypt( aes_key = private_key.decrypt(
@ -304,7 +319,18 @@ class EnvelopeCrypto:
} }
def decrypt_data(self, encrypted_data: Dict[str, str]) -> Union[Dict[str, Any], str]: def decrypt_data(self, encrypted_data: Dict[str, str]) -> Union[Dict[str, Any], str]:
"""decrypt a {secure, iv, data} blob; returns the original dict or string""" """decrypt a {secure, iv, data} blob; returns the original dict or string
`encrypt_data` only json-encodes dicts (a string is stored verbatim), so decrypt
only treats a json-OBJECT plaintext as a dict and returns everything else as the
raw string. this keeps the type faithful for the common cases: a string that is
json-shaped but NOT an object ('123'->'123', 'true'->'true', '[1,2]'->'[1,2]')
round-trips as a STRING, not an int/bool/list. the one irreducible ambiguity: a
string whose exact value is a json OBJECT ('{"a":1}') decrypts to a dict, because
without a type marker it is indistinguishable from a stored dict don't store a
bare string that is a json object if you need it back as a string. existing stored
blobs are unaffected a dict was stored as a json object and still parses to a dict.
"""
if not self.master_key: if not self.master_key:
raise ValueError("not initialized with data key") raise ValueError("not initialized with data key")
@ -313,9 +339,10 @@ class EnvelopeCrypto:
aesgcm = AESGCM(self.master_key) aesgcm = AESGCM(self.master_key)
plaintext = aesgcm.decrypt(iv, ciphertext, None).decode() plaintext = aesgcm.decrypt(iv, ciphertext, None).decode()
try: try:
return json.loads(plaintext) parsed = json.loads(plaintext)
except json.JSONDecodeError: except json.JSONDecodeError:
return plaintext return plaintext
return parsed if isinstance(parsed, dict) else plaintext
def reencrypt(self, source_crypto: "EnvelopeCrypto", record: dict, traversal_level: int = 2) -> dict: def reencrypt(self, source_crypto: "EnvelopeCrypto", record: dict, traversal_level: int = 2) -> dict:
"""re-encrypt a record's encrypted fields from source_crypto's key to this one's """re-encrypt a record's encrypted fields from source_crypto's key to this one's
@ -323,6 +350,10 @@ class EnvelopeCrypto:
self holds the destination (new) key; source_crypto holds the source (old) self holds the destination (new) key; source_crypto holds the source (old)
key. only {secure, iv, data} fields are touched; plaintext fields are left key. only {secure, iv, data} fields are touched; plaintext fields are left
as-is. returns a new dict; the input is not mutated. used during rotation. as-is. returns a new dict; the input is not mutated. used during rotation.
unlike decrypt_record (which logs a failed field and leaves it encrypted), a
per-field decrypt failure here RAISES rotation must fail loud, since silently
keeping a field under the old key would lose it once the old key is retired.
""" """
if not self.master_key: if not self.master_key:
raise ValueError("destination not initialized with data key") raise ValueError("destination not initialized with data key")