From 72c7aa936eb29856743c406ea0ddce40a4e1d8c9 Mon Sep 17 00:00:00 2001 From: disqualifier Date: Mon, 29 Jun 2026 17:58:26 -0400 Subject: [PATCH] fix: normalize key-load exceptions; type-faithful decrypt_data (v0.1.3) - encrypted OpenSSH private key with no password now raises ValueError (not a raw TypeError from load_ssh_private_key), matching the PEM path and the docstring (L14) - a non-PEM/non-SSH public key raises a clear ValueError instead of cryptography's UnsupportedAlgorithm, consistent with the private-key paths (L15) - decrypt_data only treats a json-OBJECT plaintext as a dict, so json-shaped strings ('123','true','[1,2]') round-trip as strings; existing dict blobs unaffected (L16) - both key loads route through shared _load_private_key/_load_public_key helpers - document reencrypt's fail-loud (vs decrypt_record's per-field swallow) asymmetry (nit). Signed-off-by: disqualifier --- README.md | 4 +- pyproject.toml | 2 +- src/envelope_crypto/envelope_crypto.py | 95 +++++++++++++++++--------- 3 files changed, 66 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index 290a425..62833d3 100644 --- a/README.md +++ b/README.md @@ -11,13 +11,13 @@ and storage-agnostic. `requirements.txt`: ``` -envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.2 +envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.3 ``` Direct: ```bash -pip install "envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.2" +pip install "envelope_crypto @ git+ssh://git@git.rethinkstudios.io/rethink-public/envelope_crypto.git@v0.1.3" ``` Requires `cryptography` (pulled transitively). diff --git a/pyproject.toml b/pyproject.toml index 611f95f..78aaa14 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "envelope_crypto" -version = "0.1.2" +version = "0.1.3" description = "Envelope encryption (RSA-OAEP wrapped AES-256-GCM) for dict records — config-free, storage-agnostic, installable." requires-python = ">=3.10" dependencies = [ diff --git a/src/envelope_crypto/envelope_crypto.py b/src/envelope_crypto/envelope_crypto.py index 34ddf1c..5509324 100644 --- a/src/envelope_crypto/envelope_crypto.py +++ b/src/envelope_crypto/envelope_crypto.py @@ -61,6 +61,7 @@ import hashlib import logging from typing import Any, Dict, List, Optional, Tuple, Union +from cryptography.exceptions import UnsupportedAlgorithm from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.ciphers.aead import AESGCM @@ -69,6 +70,46 @@ from cryptography.hazmat.primitives.serialization import load_ssh_public_key _log = logging.getLogger(__name__) +def _load_private_key(key_data: bytes, pw: Optional[bytes]): + """load a PEM or OpenSSH private key, normalizing the missing-password error + + cryptography raises TypeError when a key is encrypted but no password was given + (PEM raises it on the first call; OpenSSH raises it inside the openssh fallback). + both are normalized to a clear ValueError so callers see one error type. + """ + try: + return serialization.load_pem_private_key(key_data, password=pw) + except ValueError as error: + if b"BEGIN OPENSSH PRIVATE KEY" in key_data: + try: + return serialization.load_ssh_private_key(key_data, password=pw) + except TypeError as ssh_error: + raise ValueError( + "private key is encrypted but no password was provided" + ) from ssh_error + raise error + except TypeError as error: + raise ValueError( + "private key is encrypted but no password was provided" + ) from error + + +def _load_public_key(key_data: bytes): + """load a PEM or OpenSSH public key, normalizing non-key input to ValueError + + load_ssh_public_key raises UnsupportedAlgorithm (not ValueError) on non-SSH/garbage + input; normalize it so a bad public key always surfaces as a clear ValueError, + consistent with the private-key path. + """ + try: + return serialization.load_pem_public_key(key_data) + except ValueError: + try: + return load_ssh_public_key(key_data) + except (UnsupportedAlgorithm, ValueError) as error: + raise ValueError("not a valid PEM or OpenSSH public key") from error + + class EnvelopeCrypto: """hybrid RSA/AES-256-GCM envelope encryption for dict records @@ -172,23 +213,10 @@ class EnvelopeCrypto: if is_private: pw = password.encode() if password else None - try: - private_key = serialization.load_pem_private_key(key_data, password=pw) - except ValueError as error: - if b"BEGIN OPENSSH PRIVATE KEY" in key_data: - private_key = serialization.load_ssh_private_key(key_data, password=pw) - else: - raise error - except TypeError as error: - raise ValueError( - "private key is encrypted but no password was provided" - ) from error + private_key = _load_private_key(key_data, pw) public_key = private_key.public_key() else: - try: - public_key = serialization.load_pem_public_key(key_data) - except ValueError: - public_key = load_ssh_public_key(key_data) + public_key = _load_public_key(key_data) key_bytes = public_key.public_bytes( encoding=serialization.Encoding.DER, @@ -210,10 +238,7 @@ class EnvelopeCrypto: else: key_data = rsa_key.encode() if isinstance(rsa_key, str) else rsa_key - try: - public_key = serialization.load_pem_public_key(key_data) - except ValueError: - public_key = load_ssh_public_key(key_data) + public_key = _load_public_key(key_data) wrapped = public_key.encrypt( aes_key, @@ -236,17 +261,7 @@ class EnvelopeCrypto: with open(rsa_private_key_path, "rb") as key_file: key_data = key_file.read() pw = password.encode() if password else None - try: - private_key = serialization.load_pem_private_key(key_data, password=pw) - except ValueError as error: - if b"BEGIN OPENSSH PRIVATE KEY" in key_data: - private_key = serialization.load_ssh_private_key(key_data, password=pw) - else: - raise error - except TypeError as error: - raise ValueError( - "private key is encrypted but no password was provided" - ) from error + private_key = _load_private_key(key_data, pw) wrapped = base64.b64decode(encrypted_key_base64) aes_key = private_key.decrypt( @@ -304,7 +319,18 @@ class EnvelopeCrypto: } def decrypt_data(self, encrypted_data: Dict[str, str]) -> Union[Dict[str, Any], str]: - """decrypt a {secure, iv, data} blob; returns the original dict or string""" + """decrypt a {secure, iv, data} blob; returns the original dict or string + + `encrypt_data` only json-encodes dicts (a string is stored verbatim), so decrypt + only treats a json-OBJECT plaintext as a dict and returns everything else as the + raw string. this keeps the type faithful for the common cases: a string that is + json-shaped but NOT an object ('123'->'123', 'true'->'true', '[1,2]'->'[1,2]') + round-trips as a STRING, not an int/bool/list. the one irreducible ambiguity: a + string whose exact value is a json OBJECT ('{"a":1}') decrypts to a dict, because + without a type marker it is indistinguishable from a stored dict — don't store a + bare string that is a json object if you need it back as a string. existing stored + blobs are unaffected — a dict was stored as a json object and still parses to a dict. + """ if not self.master_key: raise ValueError("not initialized with data key") @@ -313,9 +339,10 @@ class EnvelopeCrypto: aesgcm = AESGCM(self.master_key) plaintext = aesgcm.decrypt(iv, ciphertext, None).decode() try: - return json.loads(plaintext) + parsed = json.loads(plaintext) except json.JSONDecodeError: return plaintext + return parsed if isinstance(parsed, dict) else plaintext def reencrypt(self, source_crypto: "EnvelopeCrypto", record: dict, traversal_level: int = 2) -> dict: """re-encrypt a record's encrypted fields from source_crypto's key to this one's @@ -323,6 +350,10 @@ class EnvelopeCrypto: self holds the destination (new) key; source_crypto holds the source (old) key. only {secure, iv, data} fields are touched; plaintext fields are left as-is. returns a new dict; the input is not mutated. used during rotation. + + unlike decrypt_record (which logs a failed field and leaves it encrypted), a + per-field decrypt failure here RAISES — rotation must fail loud, since silently + keeping a field under the old key would lose it once the old key is retired. """ if not self.master_key: raise ValueError("destination not initialized with data key")