django-auditlog/auditlog/diff.py

208 lines
6.7 KiB
Python
Raw Normal View History

import json
from datetime import timezone
from typing import Optional
from django.conf import settings
from django.core.exceptions import ObjectDoesNotExist
from django.db.models import NOT_PROVIDED, DateTimeField, ForeignKey, JSONField, Model
from django.utils import timezone as django_timezone
from django.utils.encoding import smart_str
2013-10-20 13:25:48 +00:00
def track_field(field):
"""
Returns whether the given field should be tracked by Auditlog.
Untracked fields are many-to-many relations and relations to the Auditlog LogEntry model.
:param field: The field to check.
:type field: Field
:return: Whether the given field should be tracked.
:rtype: bool
"""
from auditlog.models import LogEntry
2020-12-06 20:29:24 +00:00
# Do not track many to many relations
if field.many_to_many:
return False
# Do not track relations to LogEntry
2020-12-06 20:29:24 +00:00
if (
getattr(field, "remote_field", None) is not None
and field.remote_field.model == LogEntry
):
return False
return True
2015-06-03 14:06:25 +00:00
def get_fields_in_model(instance):
"""
2022-06-18 16:20:25 +00:00
Returns the list of fields in the given model instance. Checks whether to use the official
_meta API or use the raw data. This method excludes many to many fields.
2015-06-03 14:06:25 +00:00
:param instance: The model instance to get the fields for
:type instance: Model
:return: The list of fields for the given model (instance)
:rtype: list
"""
assert isinstance(instance, Model)
2020-08-31 12:16:21 +00:00
return [f for f in instance._meta.get_fields() if track_field(f)]
2015-06-03 14:06:25 +00:00
def get_field_value(obj, field):
"""
Gets the value of a given model instance field.
2020-08-31 12:14:32 +00:00
:param obj: The model instance.
:type obj: Model
:param field: The field you want to find the value of.
:type field: Any
:return: The value of the field as a string.
:rtype: str
"""
try:
if isinstance(field, DateTimeField):
# DateTimeFields are timezone-aware, so we need to convert the field
# to its naive form before we can accurately compare them for changes.
value = getattr(obj, field.name, None)
try:
value = field.to_python(value)
except TypeError:
return value
if (
value is not None
and settings.USE_TZ
and not django_timezone.is_naive(value)
):
value = django_timezone.make_naive(value, timezone=timezone.utc)
elif isinstance(field, JSONField):
value = field.to_python(getattr(obj, field.name, None))
try:
value = json.dumps(value, sort_keys=True, cls=field.encoder)
except TypeError:
pass
2022-12-28 08:51:44 +00:00
elif (field.one_to_one or field.many_to_one) and hasattr(field, "rel_class"):
value = smart_str(
getattr(obj, field.get_attname(), None), strings_only=True
)
else:
value = smart_str(getattr(obj, field.name, None))
if type(value).__name__ == "__proxy__":
value = str(value)
except ObjectDoesNotExist:
value = (
field.default
if getattr(field, "default", NOT_PROVIDED) is not NOT_PROVIDED
else None
)
return value
def mask_str(value: str) -> str:
"""
Masks the first half of the input string to remove sensitive data.
:param value: The value to mask.
:type value: str
:return: The masked version of the string.
:rtype: str
"""
mask_limit = int(len(value) / 2)
return "*" * mask_limit + value[mask_limit:]
def model_instance_diff(
old: Optional[Model], new: Optional[Model], fields_to_check=None
):
2013-10-20 13:25:48 +00:00
"""
2022-06-18 16:20:25 +00:00
Calculates the differences between two model instances. One of the instances may be ``None``
(i.e., a newly created model or deleted model). This will cause all fields with a value to have
changed (from ``None``).
2015-05-14 23:25:44 +00:00
:param old: The old state of the model instance.
:type old: Model
:param new: The new state of the model instance.
:type new: Model
:param fields_to_check: An iterable of the field names to restrict the diff to, while ignoring the rest of
the model's fields. This is used to pass the `update_fields` kwarg from the model's `save` method.
:type fields_to_check: Iterable
2022-06-18 16:20:25 +00:00
:return: A dictionary with the names of the changed fields as keys and a two tuple of the old and new
field values as value.
2015-05-31 13:06:06 +00:00
:rtype: dict
2013-10-20 13:25:48 +00:00
"""
from auditlog.registry import auditlog
2020-08-31 12:14:32 +00:00
if not (old is None or isinstance(old, Model)):
raise TypeError("The supplied old instance is not a valid model instance.")
2020-08-31 12:14:32 +00:00
if not (new is None or isinstance(new, Model)):
raise TypeError("The supplied new instance is not a valid model instance.")
2013-10-20 13:25:48 +00:00
diff = {}
if old is not None and new is not None:
fields = set(old._meta.fields + new._meta.fields)
model_fields = auditlog.get_model_fields(new._meta.model)
2013-10-20 13:25:48 +00:00
elif old is not None:
2015-06-03 14:06:25 +00:00
fields = set(get_fields_in_model(old))
model_fields = auditlog.get_model_fields(old._meta.model)
2013-10-20 13:25:48 +00:00
elif new is not None:
2015-06-03 14:06:25 +00:00
fields = set(get_fields_in_model(new))
model_fields = auditlog.get_model_fields(new._meta.model)
2013-10-20 13:25:48 +00:00
else:
fields = set()
model_fields = None
2013-10-20 13:25:48 +00:00
if fields_to_check:
fields = {
field
for field in fields
if (
(isinstance(field, ForeignKey) and field.attname in fields_to_check)
or (field.name in fields_to_check)
)
}
# Check if fields must be filtered
2020-12-06 20:29:24 +00:00
if (
model_fields
and (model_fields["include_fields"] or model_fields["exclude_fields"])
and fields
):
filtered_fields = []
2020-12-06 20:29:24 +00:00
if model_fields["include_fields"]:
filtered_fields = [
field
for field in fields
if field.name in model_fields["include_fields"]
]
else:
filtered_fields = fields
2020-12-06 20:29:24 +00:00
if model_fields["exclude_fields"]:
filtered_fields = [
field
for field in filtered_fields
if field.name not in model_fields["exclude_fields"]
]
fields = filtered_fields
2013-10-20 13:25:48 +00:00
for field in fields:
old_value = get_field_value(old, field)
new_value = get_field_value(new, field)
2013-10-20 13:25:48 +00:00
if old_value != new_value:
if model_fields and field.name in model_fields["mask_fields"]:
diff[field.name] = (
mask_str(smart_str(old_value)),
mask_str(smart_str(new_value)),
)
else:
diff[field.name] = (smart_str(old_value), smart_str(new_value))
2013-10-20 13:25:48 +00:00
if len(diff) == 0:
diff = None
return diff