django-fernet-encrypted-fields/encrypted_fields/fields.py

219 lines
7 KiB
Python
Raw Normal View History

2025-01-06 02:16:57 +00:00
from __future__ import annotations
2021-09-30 14:27:19 +00:00
import base64
2023-07-18 04:55:25 +00:00
import json
2025-01-06 02:16:57 +00:00
from typing import Any
2022-05-06 13:38:10 +00:00
2025-01-06 02:16:57 +00:00
from cryptography.fernet import Fernet, InvalidToken, MultiFernet
2021-09-30 14:27:19 +00:00
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
2022-05-06 13:38:10 +00:00
from django.conf import settings
2025-01-06 02:16:57 +00:00
from django.core.validators import MaxValueValidator, MinValueValidator
2021-09-30 14:27:19 +00:00
from django.db import models
2025-01-06 02:16:57 +00:00
from django.db.backends.base.base import BaseDatabaseWrapper
2022-05-06 23:23:24 +00:00
from django.db.backends.base.operations import BaseDatabaseOperations
2025-01-06 02:16:57 +00:00
from django.db.models.expressions import Expression
from django.utils.functional import cached_property
2021-09-30 14:27:19 +00:00
2025-01-06 02:16:57 +00:00
_TypeAny = Any
class EncryptedFieldMixin:
@cached_property
2025-01-06 02:16:57 +00:00
def keys(self) -> list[bytes]:
keys = []
salt_keys = (
settings.SALT_KEY
if isinstance(settings.SALT_KEY, list)
else [settings.SALT_KEY]
)
secret_keys = [settings.SECRET_KEY] + getattr(settings, "SECRET_KEY_FALLBACKS", list())
for secret_key in secret_keys:
for salt_key in salt_keys:
salt = bytes(salt_key, "utf-8")
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=100_000,
backend=default_backend(),
)
keys.append(
base64.urlsafe_b64encode(
kdf.derive(secret_key.encode("utf-8"))
)
)
return keys
2021-09-30 14:27:19 +00:00
@cached_property
2025-01-06 02:16:57 +00:00
def f(self) -> Fernet | MultiFernet:
if len(self.keys) == 1:
return Fernet(self.keys[0])
return MultiFernet([Fernet(k) for k in self.keys])
2021-09-30 14:27:19 +00:00
2025-01-06 02:16:57 +00:00
def get_internal_type(self) -> str:
2021-09-30 14:27:19 +00:00
"""
To treat everything as text
"""
return "TextField"
2021-09-30 14:27:19 +00:00
2025-01-06 02:16:57 +00:00
def get_prep_value(self, value: _TypeAny) -> _TypeAny:
value = super().get_prep_value(value)
2021-09-30 14:27:19 +00:00
if value:
if not isinstance(value, str):
value = str(value)
return self.f.encrypt(bytes(value, "utf-8")).decode("utf-8")
2021-09-30 14:27:19 +00:00
return None
2025-01-06 02:16:57 +00:00
def get_db_prep_value(
self,
value: _TypeAny,
connection: BaseDatabaseWrapper, # noqa: ARG002
prepared: bool = False, # noqa: FBT001, FBT002
) -> _TypeAny:
2021-09-30 14:27:19 +00:00
if not prepared:
value = self.get_prep_value(value)
return value
2025-01-06 02:16:57 +00:00
def from_db_value(
self,
value: _TypeAny,
expression: Expression, # noqa: ARG002
connection: BaseDatabaseWrapper, # noqa: ARG002
) -> _TypeAny:
2021-09-30 14:27:19 +00:00
return self.to_python(value)
2025-01-06 02:16:57 +00:00
def to_python(self, value: _TypeAny) -> _TypeAny:
if (
value is None
or not isinstance(value, str)
or hasattr(self, "_already_decrypted")
):
2021-09-30 14:27:19 +00:00
return value
try:
value = self.f.decrypt(bytes(value, "utf-8")).decode("utf-8")
except InvalidToken:
pass
except UnicodeEncodeError:
pass
2025-01-06 02:16:57 +00:00
return super().to_python(value)
2021-09-30 14:27:19 +00:00
2025-01-06 02:16:57 +00:00
def clean(self, value: _TypeAny, model_instance: models.Field) -> _TypeAny:
"""
2025-01-06 02:16:57 +00:00
Create and assign a semaphore so that to_python method will not try
to decrypt an already decrypted value during cleaning of a form
"""
self._already_decrypted = True
ret = super().clean(value, model_instance)
del self._already_decrypted
return ret
2021-09-30 14:27:19 +00:00
class EncryptedCharField(EncryptedFieldMixin, models.CharField):
pass
class EncryptedTextField(EncryptedFieldMixin, models.TextField):
pass
class EncryptedDateTimeField(EncryptedFieldMixin, models.DateTimeField):
pass
class EncryptedIntegerField(EncryptedFieldMixin, models.IntegerField):
2022-05-06 00:38:20 +00:00
@cached_property
2025-01-06 02:16:57 +00:00
def validators(self) -> list[MinValueValidator | MaxValueValidator]:
2022-05-06 23:23:24 +00:00
# These validators can't be added at field initialization time since
# they're based on values retrieved from `connection`.
validators_ = [*self.default_validators, *self._validators]
internal_type = models.IntegerField().get_internal_type()
2025-01-06 02:16:57 +00:00
min_value, max_value = BaseDatabaseOperations.integer_field_ranges[
internal_type
]
2022-05-06 23:23:24 +00:00
if min_value is not None and not any(
(
2025-01-06 02:16:57 +00:00
isinstance(validator, MinValueValidator)
2022-05-06 23:23:24 +00:00
and (
validator.limit_value()
if callable(validator.limit_value)
else validator.limit_value
)
>= min_value
)
for validator in validators_
):
2025-01-06 02:16:57 +00:00
validators_.append(MinValueValidator(min_value))
2022-05-06 23:23:24 +00:00
if max_value is not None and not any(
(
2025-01-06 02:16:57 +00:00
isinstance(validator, MaxValueValidator)
2022-05-06 23:23:24 +00:00
and (
validator.limit_value()
if callable(validator.limit_value)
else validator.limit_value
)
<= max_value
)
for validator in validators_
):
2025-01-06 02:16:57 +00:00
validators_.append(MaxValueValidator(max_value))
2022-05-06 23:23:24 +00:00
return validators_
2021-09-30 14:27:19 +00:00
class EncryptedDateField(EncryptedFieldMixin, models.DateField):
pass
class EncryptedFloatField(EncryptedFieldMixin, models.FloatField):
pass
class EncryptedEmailField(EncryptedFieldMixin, models.EmailField):
pass
class EncryptedBooleanField(EncryptedFieldMixin, models.BooleanField):
pass
2023-07-18 04:55:25 +00:00
class EncryptedJSONField(EncryptedFieldMixin, models.JSONField):
2025-01-06 02:16:57 +00:00
def _encrypt_values(self, value: _TypeAny) -> _TypeAny:
2023-07-18 04:55:25 +00:00
if isinstance(value, dict):
return {key: self._encrypt_values(data) for key, data in value.items()}
2025-01-06 02:16:57 +00:00
if isinstance(value, list):
return [self._encrypt_values(data) for data in value]
value = str(value)
2023-07-18 04:55:25 +00:00
return self.f.encrypt(bytes(value, "utf-8")).decode("utf-8")
2025-01-06 02:16:57 +00:00
def _decrypt_values(self, value: _TypeAny) -> _TypeAny:
2023-07-18 04:55:25 +00:00
if value is None:
return value
if isinstance(value, dict):
return {key: self._decrypt_values(data) for key, data in value.items()}
2025-01-06 02:16:57 +00:00
if isinstance(value, list):
return [self._decrypt_values(data) for data in value]
value = str(value)
2023-07-18 04:55:25 +00:00
return self.f.decrypt(bytes(value, "utf-8")).decode("utf-8")
2025-01-06 02:16:57 +00:00
def get_prep_value(self, value: _TypeAny) -> str:
2023-07-18 04:55:25 +00:00
return json.dumps(self._encrypt_values(value=value), cls=self.encoder)
2025-01-06 02:16:57 +00:00
def get_internal_type(self) -> str:
2023-07-18 04:55:25 +00:00
return "JSONField"
2025-01-06 02:16:57 +00:00
def to_python(self, value: _TypeAny) -> _TypeAny:
2023-07-18 04:55:25 +00:00
if (
value is None
or not isinstance(value, str)
or hasattr(self, "_already_decrypted")
):
return value
try:
value = self._decrypt_values(value=json.loads(value))
except InvalidToken:
pass
except UnicodeEncodeError:
pass
2025-01-06 02:16:57 +00:00
return super().to_python(value)