Merge pull request #14 from faradayyg/feature/encrypt-json-fields

Add support for encrypted JSON fields
This commit is contained in:
fragment 2023-07-24 15:55:13 +01:00 committed by GitHub
commit 295f84f40c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 75 additions and 1 deletions

View file

@ -53,6 +53,7 @@ Currently build in and unit-tested fields. They have the same APIs as their non-
- `EncryptedFloatField` - `EncryptedFloatField`
- `EncryptedEmailField` - `EncryptedEmailField`
- `EncryptedBooleanField` - `EncryptedBooleanField`
- `EncryptedJSONField`
### Compatible Django Version ### Compatible Django Version

View file

@ -1,4 +1,5 @@
import base64 import base64
import json
from cryptography.fernet import Fernet, MultiFernet, InvalidToken from cryptography.fernet import Fernet, MultiFernet, InvalidToken
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
@ -10,7 +11,6 @@ from django.db import models
from django.db.backends.base.operations import BaseDatabaseOperations from django.db.backends.base.operations import BaseDatabaseOperations
from django.utils.functional import cached_property from django.utils.functional import cached_property
class EncryptedFieldMixin(object): class EncryptedFieldMixin(object):
@cached_property @cached_property
def keys(self): def keys(self):
@ -153,3 +153,46 @@ class EncryptedEmailField(EncryptedFieldMixin, models.EmailField):
class EncryptedBooleanField(EncryptedFieldMixin, models.BooleanField): class EncryptedBooleanField(EncryptedFieldMixin, models.BooleanField):
pass pass
class EncryptedJSONField(EncryptedFieldMixin, models.JSONField):
def _encrypt_values(self, value):
if isinstance(value, dict):
return {key: self._encrypt_values(data) for key, data in value.items()}
elif isinstance(value, list):
return [self._encrypt_values(data) for data in value]
else:
value = str(value)
return self.f.encrypt(bytes(value, "utf-8")).decode("utf-8")
def _decrypt_values(self, value):
if value is None:
return value
if isinstance(value, dict):
return {key: self._decrypt_values(data) for key, data in value.items()}
elif isinstance(value, list):
return [self._decrypt_values(data) for data in value]
else:
value = str(value)
return self.f.decrypt(bytes(value, "utf-8")).decode("utf-8")
def get_prep_value(self, value):
return json.dumps(self._encrypt_values(value=value), cls=self.encoder)
def get_internal_type(self):
return "JSONField"
def to_python(self, value):
if (
value is None
or not isinstance(value, str)
or hasattr(self, "_already_decrypted")
):
return value
try:
value = self._decrypt_values(value=json.loads(value))
except InvalidToken:
pass
except UnicodeEncodeError:
pass
return super(EncryptedFieldMixin, self).to_python(value)

View file

@ -10,3 +10,4 @@ class TestModel(models.Model):
floating = EncryptedFloatField(null=True, blank=True) floating = EncryptedFloatField(null=True, blank=True)
email = EncryptedEmailField(null=True, blank=True) email = EncryptedEmailField(null=True, blank=True)
boolean = EncryptedBooleanField(default=False, null=True) boolean = EncryptedBooleanField(default=False, null=True)
json = EncryptedJSONField(default=dict, null=True, blank=True)

View file

@ -1,3 +1,4 @@
import json
import re import re
from django.db import connection from django.db import connection
@ -199,6 +200,34 @@ class FieldTest(TestCase):
model.full_clean() model.full_clean()
model.save() model.save()
def test_json_field_encrypted(self):
dict_values = {"key": "value", "list": ["nested", {"key": "val"}], "nested": {"child": "sibling"}}
model = TestModel()
model.json = dict_values
model.full_clean()
model.save()
ciphertext = json.loads(self.get_db_value("json", model.id))
self.assertNotEqual(dict_values, ciphertext)
fresh_model = TestModel.objects.get(id=model.id)
self.assertEqual(fresh_model.json, dict_values)
def test_json_field_retains_keys(self):
plain_value = {"key": "value", "another_key": "some value"}
model = TestModel()
model.json = plain_value
model.full_clean()
model.save()
ciphertext = json.loads(self.get_db_value("json", model.id))
self.assertEqual(plain_value.keys(), ciphertext.keys())
class RotatedSaltTestCase(TestCase): class RotatedSaltTestCase(TestCase):
@classmethod @classmethod