fix: EC-1 single key load for fingerprint; encrypt/decrypt type guards
EC-1: factor _fingerprint_of(public_key) so encrypt_aes_key_with_rsa fingerprints the already-loaded key instead of re-opening/parsing the file. encrypt_data rejects a non-dict/str with a clear TypeError (was: opaque .encode() AttributeError); decrypt_data raises ValueError on a malformed blob (was: raw KeyError); a wrong-password PEM load gives a clearer message; reencrypt's dict-only traversal (list-nested blobs skipped) documented. Signed-off-by: disqualifier <dev@disqualifier.me>
This commit is contained in:
parent
254826f86c
commit
306e5b8057
@ -87,6 +87,10 @@ def _load_private_key(key_data: bytes, pw: Optional[bytes]):
|
|||||||
raise ValueError(
|
raise ValueError(
|
||||||
"private key is encrypted but no password was provided"
|
"private key is encrypted but no password was provided"
|
||||||
) from ssh_error
|
) from ssh_error
|
||||||
|
if pw is not None:
|
||||||
|
# a password was given but the PEM load still failed — most likely a wrong
|
||||||
|
# password; give a clearer message than cryptography's raw "Bad decrypt"
|
||||||
|
raise ValueError("could not load private key (wrong password or malformed key)") from error
|
||||||
raise error
|
raise error
|
||||||
except TypeError as error:
|
except TypeError as error:
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
@ -94,6 +98,21 @@ def _load_private_key(key_data: bytes, pw: Optional[bytes]):
|
|||||||
) from error
|
) from error
|
||||||
|
|
||||||
|
|
||||||
|
def _fingerprint_of(public_key) -> str:
|
||||||
|
"""base64 SHA-256 fingerprint of an already-loaded public key
|
||||||
|
|
||||||
|
factored so callers that already hold a loaded key (e.g. encrypt_aes_key_with_rsa)
|
||||||
|
don't re-open and re-parse the key file just to fingerprint it.
|
||||||
|
"""
|
||||||
|
key_bytes = public_key.public_bytes(
|
||||||
|
encoding=serialization.Encoding.DER,
|
||||||
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
||||||
|
)
|
||||||
|
digest = hashes.Hash(hashes.SHA256())
|
||||||
|
digest.update(key_bytes)
|
||||||
|
return base64.b64encode(digest.finalize()).decode()
|
||||||
|
|
||||||
|
|
||||||
def _load_public_key(key_data: bytes):
|
def _load_public_key(key_data: bytes):
|
||||||
"""load a PEM or OpenSSH public key, normalizing non-key input to ValueError
|
"""load a PEM or OpenSSH public key, normalizing non-key input to ValueError
|
||||||
|
|
||||||
@ -218,13 +237,7 @@ class EnvelopeCrypto:
|
|||||||
else:
|
else:
|
||||||
public_key = _load_public_key(key_data)
|
public_key = _load_public_key(key_data)
|
||||||
|
|
||||||
key_bytes = public_key.public_bytes(
|
fingerprint = _fingerprint_of(public_key)
|
||||||
encoding=serialization.Encoding.DER,
|
|
||||||
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
||||||
)
|
|
||||||
digest = hashes.Hash(hashes.SHA256())
|
|
||||||
digest.update(key_bytes)
|
|
||||||
fingerprint = base64.b64encode(digest.finalize()).decode()
|
|
||||||
_log.info("generated %s key fingerprint", "private" if is_private else "public")
|
_log.info("generated %s key fingerprint", "private" if is_private else "public")
|
||||||
return fingerprint
|
return fingerprint
|
||||||
|
|
||||||
@ -248,7 +261,8 @@ class EnvelopeCrypto:
|
|||||||
label=None,
|
label=None,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
fingerprint = self.get_rsa_key_fingerprint(rsa_key, is_private=False, is_file=is_file)
|
# fingerprint from the already-loaded public_key — no second open/parse of the file
|
||||||
|
fingerprint = _fingerprint_of(public_key)
|
||||||
wrapped_b64 = base64.b64encode(wrapped).decode()
|
wrapped_b64 = base64.b64encode(wrapped).decode()
|
||||||
_log.info("wrapped data key for fingerprint %s", fingerprint[:8])
|
_log.info("wrapped data key for fingerprint %s", fingerprint[:8])
|
||||||
return fingerprint, wrapped_b64
|
return fingerprint, wrapped_b64
|
||||||
@ -307,6 +321,10 @@ class EnvelopeCrypto:
|
|||||||
"""encrypt a dict or string under the data key with a unique IV"""
|
"""encrypt a dict or string under the data key with a unique IV"""
|
||||||
if not self.master_key:
|
if not self.master_key:
|
||||||
raise ValueError("not initialized with data key")
|
raise ValueError("not initialized with data key")
|
||||||
|
if not isinstance(data, (dict, str)):
|
||||||
|
# a non-dict/non-str would .encode()-fail with an opaque AttributeError below;
|
||||||
|
# reject it clearly (decrypt_data only round-trips dict or str anyway)
|
||||||
|
raise TypeError(f"encrypt_data expects a dict or str, got {type(data).__name__}")
|
||||||
|
|
||||||
data_str = json.dumps(data) if isinstance(data, dict) else data
|
data_str = json.dumps(data) if isinstance(data, dict) else data
|
||||||
iv = os.urandom(12)
|
iv = os.urandom(12)
|
||||||
@ -333,6 +351,10 @@ class EnvelopeCrypto:
|
|||||||
"""
|
"""
|
||||||
if not self.master_key:
|
if not self.master_key:
|
||||||
raise ValueError("not initialized with data key")
|
raise ValueError("not initialized with data key")
|
||||||
|
if not isinstance(encrypted_data, dict) or "iv" not in encrypted_data or "data" not in encrypted_data:
|
||||||
|
# a structurally-malformed blob would raise a raw KeyError/TypeError; surface
|
||||||
|
# a clear ValueError instead, matching the documented {secure, iv, data} shape
|
||||||
|
raise ValueError("decrypt_data expects a {secure, iv, data} blob")
|
||||||
|
|
||||||
iv = base64.b64decode(encrypted_data["iv"])
|
iv = base64.b64decode(encrypted_data["iv"])
|
||||||
ciphertext = base64.b64decode(encrypted_data["data"])
|
ciphertext = base64.b64decode(encrypted_data["data"])
|
||||||
@ -354,6 +376,12 @@ class EnvelopeCrypto:
|
|||||||
unlike decrypt_record (which logs a failed field and leaves it encrypted), a
|
unlike decrypt_record (which logs a failed field and leaves it encrypted), a
|
||||||
per-field decrypt failure here RAISES — rotation must fail loud, since silently
|
per-field decrypt failure here RAISES — rotation must fail loud, since silently
|
||||||
keeping a field under the old key would lose it once the old key is retired.
|
keeping a field under the old key would lose it once the old key is retired.
|
||||||
|
|
||||||
|
traversal recurses into nested DICTS only (up to `traversal_level`); a blob nested
|
||||||
|
inside a LIST is NOT re-encrypted. records in this scheme key blobs by field name,
|
||||||
|
not inside arrays, so this doesn't arise in practice — but if you store
|
||||||
|
list-nested blobs, flatten them to dict fields before rotation or they'll be left
|
||||||
|
under the old key.
|
||||||
"""
|
"""
|
||||||
if not self.master_key:
|
if not self.master_key:
|
||||||
raise ValueError("destination not initialized with data key")
|
raise ValueError("destination not initialized with data key")
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user