|
|
|
@ -54,12 +54,14 @@ function variants are the same functions — use whichever fits your storage.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
import os
|
|
|
|
|
|
|
|
import copy
|
|
|
|
import json
|
|
|
|
import json
|
|
|
|
import base64
|
|
|
|
import base64
|
|
|
|
import hashlib
|
|
|
|
import hashlib
|
|
|
|
import logging
|
|
|
|
import logging
|
|
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
|
from typing import Any, Dict, List, Optional, Tuple, Union
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from cryptography.exceptions import UnsupportedAlgorithm
|
|
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
|
|
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
|
|
|
|
@ -68,6 +70,65 @@ from cryptography.hazmat.primitives.serialization import load_ssh_public_key
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
_log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _load_private_key(key_data: bytes, pw: Optional[bytes]):
|
|
|
|
|
|
|
|
"""load a PEM or OpenSSH private key, normalizing the missing-password error
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cryptography raises TypeError when a key is encrypted but no password was given
|
|
|
|
|
|
|
|
(PEM raises it on the first call; OpenSSH raises it inside the openssh fallback).
|
|
|
|
|
|
|
|
both are normalized to a clear ValueError so callers see one error type.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
return serialization.load_pem_private_key(key_data, password=pw)
|
|
|
|
|
|
|
|
except ValueError as error:
|
|
|
|
|
|
|
|
if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
return serialization.load_ssh_private_key(key_data, password=pw)
|
|
|
|
|
|
|
|
except TypeError as ssh_error:
|
|
|
|
|
|
|
|
raise ValueError(
|
|
|
|
|
|
|
|
"private key is encrypted but no password was provided"
|
|
|
|
|
|
|
|
) from ssh_error
|
|
|
|
|
|
|
|
if pw is not None:
|
|
|
|
|
|
|
|
# a password was given but the PEM load still failed — most likely a wrong
|
|
|
|
|
|
|
|
# password; give a clearer message than cryptography's raw "Bad decrypt"
|
|
|
|
|
|
|
|
raise ValueError("could not load private key (wrong password or malformed key)") from error
|
|
|
|
|
|
|
|
raise error
|
|
|
|
|
|
|
|
except TypeError as error:
|
|
|
|
|
|
|
|
raise ValueError(
|
|
|
|
|
|
|
|
"private key is encrypted but no password was provided"
|
|
|
|
|
|
|
|
) from error
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _fingerprint_of(public_key) -> str:
|
|
|
|
|
|
|
|
"""base64 SHA-256 fingerprint of an already-loaded public key
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
factored so callers that already hold a loaded key (e.g. encrypt_aes_key_with_rsa)
|
|
|
|
|
|
|
|
don't re-open and re-parse the key file just to fingerprint it.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
key_bytes = public_key.public_bytes(
|
|
|
|
|
|
|
|
encoding=serialization.Encoding.DER,
|
|
|
|
|
|
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
digest = hashes.Hash(hashes.SHA256())
|
|
|
|
|
|
|
|
digest.update(key_bytes)
|
|
|
|
|
|
|
|
return base64.b64encode(digest.finalize()).decode()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _load_public_key(key_data: bytes):
|
|
|
|
|
|
|
|
"""load a PEM or OpenSSH public key, normalizing non-key input to ValueError
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
load_ssh_public_key raises UnsupportedAlgorithm (not ValueError) on non-SSH/garbage
|
|
|
|
|
|
|
|
input; normalize it so a bad public key always surfaces as a clear ValueError,
|
|
|
|
|
|
|
|
consistent with the private-key path.
|
|
|
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
return serialization.load_pem_public_key(key_data)
|
|
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
|
|
|
return load_ssh_public_key(key_data)
|
|
|
|
|
|
|
|
except (UnsupportedAlgorithm, ValueError) as error:
|
|
|
|
|
|
|
|
raise ValueError("not a valid PEM or OpenSSH public key") from error
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class EnvelopeCrypto:
|
|
|
|
class EnvelopeCrypto:
|
|
|
|
"""hybrid RSA/AES-256-GCM envelope encryption for dict records
|
|
|
|
"""hybrid RSA/AES-256-GCM envelope encryption for dict records
|
|
|
|
|
|
|
|
|
|
|
|
@ -147,9 +208,18 @@ class EnvelopeCrypto:
|
|
|
|
return key
|
|
|
|
return key
|
|
|
|
|
|
|
|
|
|
|
|
def get_rsa_key_fingerprint(
|
|
|
|
def get_rsa_key_fingerprint(
|
|
|
|
self, key_path_or_data: str, is_private: bool = False, is_file: bool = True
|
|
|
|
self, key_path_or_data: str, is_private: bool = False, is_file: bool = True,
|
|
|
|
|
|
|
|
password: Optional[str] = None,
|
|
|
|
) -> str:
|
|
|
|
) -> str:
|
|
|
|
"""return a base64 SHA-256 fingerprint of an RSA key for identification"""
|
|
|
|
"""return a base64 SHA-256 fingerprint of an RSA key for identification
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
for an encrypted private key (is_private=True), pass its `password`; an
|
|
|
|
|
|
|
|
unencrypted key ignores it. fingerprinting always uses the public half, so a
|
|
|
|
|
|
|
|
private and its public key produce the same fingerprint. PEM and OpenSSH
|
|
|
|
|
|
|
|
private-key formats are both accepted (mirrors decrypt_aes_key_with_rsa). an
|
|
|
|
|
|
|
|
encrypted key with no/wrong password raises ValueError with a clear message
|
|
|
|
|
|
|
|
(cryptography raises TypeError for the missing-password case — normalized here).
|
|
|
|
|
|
|
|
"""
|
|
|
|
if is_file:
|
|
|
|
if is_file:
|
|
|
|
with open(key_path_or_data, "rb") as key_file:
|
|
|
|
with open(key_path_or_data, "rb") as key_file:
|
|
|
|
key_data = key_file.read()
|
|
|
|
key_data = key_file.read()
|
|
|
|
@ -161,21 +231,13 @@ class EnvelopeCrypto:
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
if is_private:
|
|
|
|
if is_private:
|
|
|
|
private_key = serialization.load_pem_private_key(key_data, password=None)
|
|
|
|
pw = password.encode() if password else None
|
|
|
|
|
|
|
|
private_key = _load_private_key(key_data, pw)
|
|
|
|
public_key = private_key.public_key()
|
|
|
|
public_key = private_key.public_key()
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
try:
|
|
|
|
public_key = _load_public_key(key_data)
|
|
|
|
public_key = serialization.load_pem_public_key(key_data)
|
|
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
|
|
public_key = load_ssh_public_key(key_data)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
key_bytes = public_key.public_bytes(
|
|
|
|
fingerprint = _fingerprint_of(public_key)
|
|
|
|
encoding=serialization.Encoding.DER,
|
|
|
|
|
|
|
|
format=serialization.PublicFormat.SubjectPublicKeyInfo,
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
digest = hashes.Hash(hashes.SHA256())
|
|
|
|
|
|
|
|
digest.update(key_bytes)
|
|
|
|
|
|
|
|
fingerprint = base64.b64encode(digest.finalize()).decode()
|
|
|
|
|
|
|
|
_log.info("generated %s key fingerprint", "private" if is_private else "public")
|
|
|
|
_log.info("generated %s key fingerprint", "private" if is_private else "public")
|
|
|
|
return fingerprint
|
|
|
|
return fingerprint
|
|
|
|
|
|
|
|
|
|
|
|
@ -189,10 +251,7 @@ class EnvelopeCrypto:
|
|
|
|
else:
|
|
|
|
else:
|
|
|
|
key_data = rsa_key.encode() if isinstance(rsa_key, str) else rsa_key
|
|
|
|
key_data = rsa_key.encode() if isinstance(rsa_key, str) else rsa_key
|
|
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
public_key = _load_public_key(key_data)
|
|
|
|
public_key = serialization.load_pem_public_key(key_data)
|
|
|
|
|
|
|
|
except ValueError:
|
|
|
|
|
|
|
|
public_key = load_ssh_public_key(key_data)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
wrapped = public_key.encrypt(
|
|
|
|
wrapped = public_key.encrypt(
|
|
|
|
aes_key,
|
|
|
|
aes_key,
|
|
|
|
@ -202,7 +261,8 @@ class EnvelopeCrypto:
|
|
|
|
label=None,
|
|
|
|
label=None,
|
|
|
|
),
|
|
|
|
),
|
|
|
|
)
|
|
|
|
)
|
|
|
|
fingerprint = self.get_rsa_key_fingerprint(rsa_key, is_private=False, is_file=is_file)
|
|
|
|
# fingerprint from the already-loaded public_key — no second open/parse of the file
|
|
|
|
|
|
|
|
fingerprint = _fingerprint_of(public_key)
|
|
|
|
wrapped_b64 = base64.b64encode(wrapped).decode()
|
|
|
|
wrapped_b64 = base64.b64encode(wrapped).decode()
|
|
|
|
_log.info("wrapped data key for fingerprint %s", fingerprint[:8])
|
|
|
|
_log.info("wrapped data key for fingerprint %s", fingerprint[:8])
|
|
|
|
return fingerprint, wrapped_b64
|
|
|
|
return fingerprint, wrapped_b64
|
|
|
|
@ -214,17 +274,8 @@ class EnvelopeCrypto:
|
|
|
|
"""unwrap an AES key with an RSA private key"""
|
|
|
|
"""unwrap an AES key with an RSA private key"""
|
|
|
|
with open(rsa_private_key_path, "rb") as key_file:
|
|
|
|
with open(rsa_private_key_path, "rb") as key_file:
|
|
|
|
key_data = key_file.read()
|
|
|
|
key_data = key_file.read()
|
|
|
|
try:
|
|
|
|
pw = password.encode() if password else None
|
|
|
|
private_key = serialization.load_pem_private_key(
|
|
|
|
private_key = _load_private_key(key_data, pw)
|
|
|
|
key_data, password=password.encode() if password else None
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
except ValueError as error:
|
|
|
|
|
|
|
|
if b"BEGIN OPENSSH PRIVATE KEY" in key_data:
|
|
|
|
|
|
|
|
private_key = serialization.load_ssh_private_key(
|
|
|
|
|
|
|
|
key_data, password=password.encode() if password else None
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
|
|
|
raise error
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
wrapped = base64.b64decode(encrypted_key_base64)
|
|
|
|
wrapped = base64.b64decode(encrypted_key_base64)
|
|
|
|
aes_key = private_key.decrypt(
|
|
|
|
aes_key = private_key.decrypt(
|
|
|
|
@ -270,6 +321,10 @@ class EnvelopeCrypto:
|
|
|
|
"""encrypt a dict or string under the data key with a unique IV"""
|
|
|
|
"""encrypt a dict or string under the data key with a unique IV"""
|
|
|
|
if not self.master_key:
|
|
|
|
if not self.master_key:
|
|
|
|
raise ValueError("not initialized with data key")
|
|
|
|
raise ValueError("not initialized with data key")
|
|
|
|
|
|
|
|
if not isinstance(data, (dict, str)):
|
|
|
|
|
|
|
|
# a non-dict/non-str would .encode()-fail with an opaque AttributeError below;
|
|
|
|
|
|
|
|
# reject it clearly (decrypt_data only round-trips dict or str anyway)
|
|
|
|
|
|
|
|
raise TypeError(f"encrypt_data expects a dict or str, got {type(data).__name__}")
|
|
|
|
|
|
|
|
|
|
|
|
data_str = json.dumps(data) if isinstance(data, dict) else data
|
|
|
|
data_str = json.dumps(data) if isinstance(data, dict) else data
|
|
|
|
iv = os.urandom(12)
|
|
|
|
iv = os.urandom(12)
|
|
|
|
@ -282,18 +337,34 @@ class EnvelopeCrypto:
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
def decrypt_data(self, encrypted_data: Dict[str, str]) -> Union[Dict[str, Any], str]:
|
|
|
|
def decrypt_data(self, encrypted_data: Dict[str, str]) -> Union[Dict[str, Any], str]:
|
|
|
|
"""decrypt a {secure, iv, data} blob; returns the original dict or string"""
|
|
|
|
"""decrypt a {secure, iv, data} blob; returns the original dict or string
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
`encrypt_data` only json-encodes dicts (a string is stored verbatim), so decrypt
|
|
|
|
|
|
|
|
only treats a json-OBJECT plaintext as a dict and returns everything else as the
|
|
|
|
|
|
|
|
raw string. this keeps the type faithful for the common cases: a string that is
|
|
|
|
|
|
|
|
json-shaped but NOT an object ('123'->'123', 'true'->'true', '[1,2]'->'[1,2]')
|
|
|
|
|
|
|
|
round-trips as a STRING, not an int/bool/list. the one irreducible ambiguity: a
|
|
|
|
|
|
|
|
string whose exact value is a json OBJECT ('{"a":1}') decrypts to a dict, because
|
|
|
|
|
|
|
|
without a type marker it is indistinguishable from a stored dict — don't store a
|
|
|
|
|
|
|
|
bare string that is a json object if you need it back as a string. existing stored
|
|
|
|
|
|
|
|
blobs are unaffected — a dict was stored as a json object and still parses to a dict.
|
|
|
|
|
|
|
|
"""
|
|
|
|
if not self.master_key:
|
|
|
|
if not self.master_key:
|
|
|
|
raise ValueError("not initialized with data key")
|
|
|
|
raise ValueError("not initialized with data key")
|
|
|
|
|
|
|
|
if not isinstance(encrypted_data, dict) or "iv" not in encrypted_data or "data" not in encrypted_data:
|
|
|
|
|
|
|
|
# a structurally-malformed blob would raise a raw KeyError/TypeError; surface
|
|
|
|
|
|
|
|
# a clear ValueError instead, matching the documented {secure, iv, data} shape
|
|
|
|
|
|
|
|
raise ValueError("decrypt_data expects a {secure, iv, data} blob")
|
|
|
|
|
|
|
|
|
|
|
|
iv = base64.b64decode(encrypted_data["iv"])
|
|
|
|
iv = base64.b64decode(encrypted_data["iv"])
|
|
|
|
ciphertext = base64.b64decode(encrypted_data["data"])
|
|
|
|
ciphertext = base64.b64decode(encrypted_data["data"])
|
|
|
|
aesgcm = AESGCM(self.master_key)
|
|
|
|
aesgcm = AESGCM(self.master_key)
|
|
|
|
plaintext = aesgcm.decrypt(iv, ciphertext, None).decode()
|
|
|
|
plaintext = aesgcm.decrypt(iv, ciphertext, None).decode()
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
return json.loads(plaintext)
|
|
|
|
parsed = json.loads(plaintext)
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
except json.JSONDecodeError:
|
|
|
|
return plaintext
|
|
|
|
return plaintext
|
|
|
|
|
|
|
|
return parsed if isinstance(parsed, dict) else plaintext
|
|
|
|
|
|
|
|
|
|
|
|
def reencrypt(self, source_crypto: "EnvelopeCrypto", record: dict, traversal_level: int = 2) -> dict:
|
|
|
|
def reencrypt(self, source_crypto: "EnvelopeCrypto", record: dict, traversal_level: int = 2) -> dict:
|
|
|
|
"""re-encrypt a record's encrypted fields from source_crypto's key to this one's
|
|
|
|
"""re-encrypt a record's encrypted fields from source_crypto's key to this one's
|
|
|
|
@ -301,11 +372,21 @@ class EnvelopeCrypto:
|
|
|
|
self holds the destination (new) key; source_crypto holds the source (old)
|
|
|
|
self holds the destination (new) key; source_crypto holds the source (old)
|
|
|
|
key. only {secure, iv, data} fields are touched; plaintext fields are left
|
|
|
|
key. only {secure, iv, data} fields are touched; plaintext fields are left
|
|
|
|
as-is. returns a new dict; the input is not mutated. used during rotation.
|
|
|
|
as-is. returns a new dict; the input is not mutated. used during rotation.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
unlike decrypt_record (which logs a failed field and leaves it encrypted), a
|
|
|
|
|
|
|
|
per-field decrypt failure here RAISES — rotation must fail loud, since silently
|
|
|
|
|
|
|
|
keeping a field under the old key would lose it once the old key is retired.
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
traversal recurses into nested DICTS only (up to `traversal_level`); a blob nested
|
|
|
|
|
|
|
|
inside a LIST is NOT re-encrypted. records in this scheme key blobs by field name,
|
|
|
|
|
|
|
|
not inside arrays, so this doesn't arise in practice — but if you store
|
|
|
|
|
|
|
|
list-nested blobs, flatten them to dict fields before rotation or they'll be left
|
|
|
|
|
|
|
|
under the old key.
|
|
|
|
"""
|
|
|
|
"""
|
|
|
|
if not self.master_key:
|
|
|
|
if not self.master_key:
|
|
|
|
raise ValueError("destination not initialized with data key")
|
|
|
|
raise ValueError("destination not initialized with data key")
|
|
|
|
|
|
|
|
|
|
|
|
result = record.copy()
|
|
|
|
result = copy.deepcopy(record)
|
|
|
|
for key, value in record.items():
|
|
|
|
for key, value in record.items():
|
|
|
|
if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value:
|
|
|
|
if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value:
|
|
|
|
result[key] = self.encrypt_data(source_crypto.decrypt_data(value))
|
|
|
|
result[key] = self.encrypt_data(source_crypto.decrypt_data(value))
|
|
|
|
@ -353,7 +434,7 @@ def decrypt_record(crypto: EnvelopeCrypto, record, traversal_level: int = 2) ->
|
|
|
|
if not isinstance(record, dict):
|
|
|
|
if not isinstance(record, dict):
|
|
|
|
return record
|
|
|
|
return record
|
|
|
|
|
|
|
|
|
|
|
|
result = record.copy()
|
|
|
|
result = copy.deepcopy(record)
|
|
|
|
for key, value in record.items():
|
|
|
|
for key, value in record.items():
|
|
|
|
if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value:
|
|
|
|
if isinstance(value, dict) and value.get("secure") is True and "iv" in value and "data" in value:
|
|
|
|
try:
|
|
|
|
try:
|
|
|
|
|