mirror of
https://github.com/jazzband/django-auditlog.git
synced 2026-03-16 22:20:26 +00:00
426 lines
17 KiB
Python
426 lines
17 KiB
Python
import ast
|
|
import json
|
|
|
|
from dateutil import parser
|
|
from dateutil.tz import gettz
|
|
from django.conf import settings
|
|
from django.contrib.contenttypes.fields import GenericRelation
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core.exceptions import FieldDoesNotExist
|
|
from django.db import DEFAULT_DB_ALIAS, models
|
|
from django.db.models import Field, Q, QuerySet
|
|
from django.utils import formats, timezone
|
|
from django.utils.encoding import smart_str
|
|
from django.utils.translation import gettext_lazy as _
|
|
from django_jsonfield_backport.models import JSONField
|
|
|
|
|
|
class LogEntryManager(models.Manager):
|
|
"""
|
|
Custom manager for the :py:class:`LogEntry` model.
|
|
"""
|
|
|
|
def log_create(self, instance, **kwargs):
|
|
"""
|
|
Helper method to create a new log entry. This method automatically populates some fields when no explicit value
|
|
is given.
|
|
|
|
:param instance: The model instance to log a change for.
|
|
:type instance: Model
|
|
:param kwargs: Field overrides for the :py:class:`LogEntry` object.
|
|
:return: The new log entry or `None` if there were no changes.
|
|
:rtype: LogEntry
|
|
"""
|
|
changes = kwargs.get("changes", None)
|
|
pk = self._get_pk_value(instance)
|
|
|
|
if changes is not None:
|
|
kwargs.setdefault(
|
|
"content_type", ContentType.objects.get_for_model(instance)
|
|
)
|
|
kwargs.setdefault("object_pk", pk)
|
|
kwargs.setdefault("object_repr", smart_str(instance))
|
|
|
|
if isinstance(pk, int):
|
|
kwargs.setdefault("object_id", pk)
|
|
|
|
get_additional_data = getattr(instance, "get_additional_data", None)
|
|
if callable(get_additional_data):
|
|
kwargs.setdefault("additional_data", get_additional_data())
|
|
|
|
# Delete log entries with the same pk as a newly created model. This should only be necessary when an pk is
|
|
# used twice.
|
|
if kwargs.get("action", None) is LogEntry.Action.CREATE:
|
|
if (
|
|
kwargs.get("object_id", None) is not None
|
|
and self.filter(
|
|
content_type=kwargs.get("content_type"),
|
|
object_id=kwargs.get("object_id"),
|
|
).exists()
|
|
):
|
|
self.filter(
|
|
content_type=kwargs.get("content_type"),
|
|
object_id=kwargs.get("object_id"),
|
|
).delete()
|
|
else:
|
|
self.filter(
|
|
content_type=kwargs.get("content_type"),
|
|
object_pk=kwargs.get("object_pk", ""),
|
|
).delete()
|
|
# save LogEntry to same database instance is using
|
|
db = instance._state.db
|
|
return (
|
|
self.create(**kwargs)
|
|
if db is None or db == ""
|
|
else self.using(db).create(**kwargs)
|
|
)
|
|
return None
|
|
|
|
def get_for_object(self, instance):
|
|
"""
|
|
Get log entries for the specified model instance.
|
|
|
|
:param instance: The model instance to get log entries for.
|
|
:type instance: Model
|
|
:return: QuerySet of log entries for the given model instance.
|
|
:rtype: QuerySet
|
|
"""
|
|
# Return empty queryset if the given model instance is not a model instance.
|
|
if not isinstance(instance, models.Model):
|
|
return self.none()
|
|
|
|
content_type = ContentType.objects.get_for_model(instance.__class__)
|
|
pk = self._get_pk_value(instance)
|
|
|
|
if isinstance(pk, int):
|
|
return self.filter(content_type=content_type, object_id=pk)
|
|
else:
|
|
return self.filter(content_type=content_type, object_pk=smart_str(pk))
|
|
|
|
def get_for_objects(self, queryset):
|
|
"""
|
|
Get log entries for the objects in the specified queryset.
|
|
|
|
:param queryset: The queryset to get the log entries for.
|
|
:type queryset: QuerySet
|
|
:return: The LogEntry objects for the objects in the given queryset.
|
|
:rtype: QuerySet
|
|
"""
|
|
if not isinstance(queryset, QuerySet) or queryset.count() == 0:
|
|
return self.none()
|
|
|
|
content_type = ContentType.objects.get_for_model(queryset.model)
|
|
primary_keys = list(
|
|
queryset.values_list(queryset.model._meta.pk.name, flat=True)
|
|
)
|
|
|
|
if isinstance(primary_keys[0], int):
|
|
return (
|
|
self.filter(content_type=content_type)
|
|
.filter(Q(object_id__in=primary_keys))
|
|
.distinct()
|
|
)
|
|
elif isinstance(queryset.model._meta.pk, models.UUIDField):
|
|
primary_keys = [smart_str(pk) for pk in primary_keys]
|
|
return (
|
|
self.filter(content_type=content_type)
|
|
.filter(Q(object_pk__in=primary_keys))
|
|
.distinct()
|
|
)
|
|
else:
|
|
return (
|
|
self.filter(content_type=content_type)
|
|
.filter(Q(object_pk__in=primary_keys))
|
|
.distinct()
|
|
)
|
|
|
|
def get_for_model(self, model):
|
|
"""
|
|
Get log entries for all objects of a specified type.
|
|
|
|
:param model: The model to get log entries for.
|
|
:type model: class
|
|
:return: QuerySet of log entries for the given model.
|
|
:rtype: QuerySet
|
|
"""
|
|
# Return empty queryset if the given object is not valid.
|
|
if not issubclass(model, models.Model):
|
|
return self.none()
|
|
|
|
content_type = ContentType.objects.get_for_model(model)
|
|
|
|
return self.filter(content_type=content_type)
|
|
|
|
def _get_pk_value(self, instance):
|
|
"""
|
|
Get the primary key field value for a model instance.
|
|
|
|
:param instance: The model instance to get the primary key for.
|
|
:type instance: Model
|
|
:return: The primary key value of the given model instance.
|
|
"""
|
|
pk_field = instance._meta.pk.name
|
|
pk = getattr(instance, pk_field, None)
|
|
|
|
# Check to make sure that we got an pk not a model object.
|
|
if isinstance(pk, models.Model):
|
|
pk = self._get_pk_value(pk)
|
|
return pk
|
|
|
|
|
|
class LogEntry(models.Model):
|
|
"""
|
|
Represents an entry in the audit log. The content type is saved along with the textual and numeric (if available)
|
|
primary key, as well as the textual representation of the object when it was saved. It holds the action performed
|
|
and the fields that were changed in the transaction.
|
|
|
|
If AuditlogMiddleware is used, the actor will be set automatically. Keep in mind that editing / re-saving LogEntry
|
|
instances may set the actor to a wrong value - editing LogEntry instances is not recommended (and it should not be
|
|
necessary).
|
|
"""
|
|
|
|
class Action:
|
|
"""
|
|
The actions that Auditlog distinguishes: creating, updating and deleting objects. Viewing objects is not logged.
|
|
The values of the actions are numeric, a higher integer value means a more intrusive action. This may be useful
|
|
in some cases when comparing actions because the ``__lt``, ``__lte``, ``__gt``, ``__gte`` lookup filters can be
|
|
used in queries.
|
|
|
|
The valid actions are :py:attr:`Action.CREATE`, :py:attr:`Action.UPDATE` and :py:attr:`Action.DELETE`.
|
|
"""
|
|
|
|
CREATE = 0
|
|
UPDATE = 1
|
|
DELETE = 2
|
|
|
|
choices = (
|
|
(CREATE, _("create")),
|
|
(UPDATE, _("update")),
|
|
(DELETE, _("delete")),
|
|
)
|
|
|
|
content_type = models.ForeignKey(
|
|
to="contenttypes.ContentType",
|
|
on_delete=models.CASCADE,
|
|
related_name="+",
|
|
verbose_name=_("content type"),
|
|
)
|
|
object_pk = models.CharField(
|
|
db_index=True, max_length=255, verbose_name=_("object pk")
|
|
)
|
|
object_id = models.BigIntegerField(
|
|
blank=True, db_index=True, null=True, verbose_name=_("object id")
|
|
)
|
|
object_repr = models.TextField(verbose_name=_("object representation"))
|
|
action = models.PositiveSmallIntegerField(
|
|
choices=Action.choices, verbose_name=_("action"), db_index=True
|
|
)
|
|
changes = models.TextField(blank=True, verbose_name=_("change message"))
|
|
actor = models.ForeignKey(
|
|
to=settings.AUTH_USER_MODEL,
|
|
on_delete=models.SET_NULL,
|
|
blank=True,
|
|
null=True,
|
|
related_name="+",
|
|
verbose_name=_("actor"),
|
|
)
|
|
remote_addr = models.GenericIPAddressField(
|
|
blank=True, null=True, verbose_name=_("remote address")
|
|
)
|
|
timestamp = models.DateTimeField(auto_now_add=True, verbose_name=_("timestamp"))
|
|
additional_data = JSONField(
|
|
blank=True, null=True, verbose_name=_("additional data")
|
|
)
|
|
|
|
objects = LogEntryManager()
|
|
|
|
class Meta:
|
|
get_latest_by = "timestamp"
|
|
ordering = ["-timestamp"]
|
|
verbose_name = _("log entry")
|
|
verbose_name_plural = _("log entries")
|
|
|
|
def __str__(self):
|
|
if self.action == self.Action.CREATE:
|
|
fstring = _("Created {repr:s}")
|
|
elif self.action == self.Action.UPDATE:
|
|
fstring = _("Updated {repr:s}")
|
|
elif self.action == self.Action.DELETE:
|
|
fstring = _("Deleted {repr:s}")
|
|
else:
|
|
fstring = _("Logged {repr:s}")
|
|
|
|
return fstring.format(repr=self.object_repr)
|
|
|
|
@property
|
|
def changes_dict(self):
|
|
"""
|
|
:return: The changes recorded in this log entry as a dictionary object.
|
|
"""
|
|
try:
|
|
return json.loads(self.changes)
|
|
except ValueError:
|
|
return {}
|
|
|
|
@property
|
|
def changes_str(self, colon=": ", arrow=" \u2192 ", separator="; "):
|
|
"""
|
|
Return the changes recorded in this log entry as a string. The formatting of the string can be customized by
|
|
setting alternate values for colon, arrow and separator. If the formatting is still not satisfying, please use
|
|
:py:func:`LogEntry.changes_dict` and format the string yourself.
|
|
|
|
:param colon: The string to place between the field name and the values.
|
|
:param arrow: The string to place between each old and new value.
|
|
:param separator: The string to place between each field.
|
|
:return: A readable string of the changes in this log entry.
|
|
"""
|
|
substrings = []
|
|
|
|
for field, values in self.changes_dict.items():
|
|
substring = "{field_name:s}{colon:s}{old:s}{arrow:s}{new:s}".format(
|
|
field_name=field,
|
|
colon=colon,
|
|
old=values[0],
|
|
arrow=arrow,
|
|
new=values[1],
|
|
)
|
|
substrings.append(substring)
|
|
|
|
return separator.join(substrings)
|
|
|
|
@property
|
|
def changes_display_dict(self):
|
|
"""
|
|
:return: The changes recorded in this log entry intended for display to users as a dictionary object.
|
|
"""
|
|
# Get the model and model_fields
|
|
from auditlog.registry import auditlog
|
|
|
|
model = self.content_type.model_class()
|
|
model_fields = auditlog.get_model_fields(model._meta.model)
|
|
changes_display_dict = {}
|
|
# grab the changes_dict and iterate through
|
|
for field_name, values in self.changes_dict.items():
|
|
# try to get the field attribute on the model
|
|
try:
|
|
field = model._meta.get_field(field_name)
|
|
except FieldDoesNotExist:
|
|
changes_display_dict[field_name] = values
|
|
continue
|
|
values_display = []
|
|
# handle choices fields and Postgres ArrayField to get human readable version
|
|
choices_dict = None
|
|
if getattr(field, "choices") and len(field.choices) > 0:
|
|
choices_dict = dict(field.choices)
|
|
if (
|
|
hasattr(field, "base_field")
|
|
and isinstance(field.base_field, Field)
|
|
and getattr(field.base_field, "choices")
|
|
and len(field.base_field.choices) > 0
|
|
):
|
|
choices_dict = dict(field.base_field.choices)
|
|
|
|
if choices_dict:
|
|
for value in values:
|
|
try:
|
|
value = ast.literal_eval(value)
|
|
if type(value) is [].__class__:
|
|
values_display.append(
|
|
", ".join(
|
|
[choices_dict.get(val, "None") for val in value]
|
|
)
|
|
)
|
|
else:
|
|
values_display.append(choices_dict.get(value, "None"))
|
|
except ValueError:
|
|
values_display.append(choices_dict.get(value, "None"))
|
|
except:
|
|
values_display.append(choices_dict.get(value, "None"))
|
|
else:
|
|
try:
|
|
field_type = field.get_internal_type()
|
|
except AttributeError:
|
|
# if the field is a relationship it has no internal type and exclude it
|
|
continue
|
|
for value in values:
|
|
# handle case where field is a datetime, date, or time type
|
|
if field_type in ["DateTimeField", "DateField", "TimeField"]:
|
|
try:
|
|
value = parser.parse(value)
|
|
if field_type == "DateField":
|
|
value = value.date()
|
|
elif field_type == "TimeField":
|
|
value = value.time()
|
|
elif field_type == "DateTimeField":
|
|
value = value.replace(tzinfo=timezone.utc)
|
|
value = value.astimezone(gettz(settings.TIME_ZONE))
|
|
value = formats.localize(value)
|
|
except ValueError:
|
|
pass
|
|
# check if length is longer than 140 and truncate with ellipsis
|
|
if len(value) > 140:
|
|
value = f"{value[:140]}..."
|
|
|
|
values_display.append(value)
|
|
verbose_name = model_fields["mapping_fields"].get(
|
|
field.name, getattr(field, "verbose_name", field.name)
|
|
)
|
|
changes_display_dict[verbose_name] = values_display
|
|
return changes_display_dict
|
|
|
|
|
|
class AuditlogHistoryField(GenericRelation):
|
|
"""
|
|
A subclass of py:class:`django.contrib.contenttypes.fields.GenericRelation` that sets some default variables. This
|
|
makes it easier to access Auditlog's log entries, for example in templates.
|
|
|
|
By default this field will assume that your primary keys are numeric, simply because this is the most common case.
|
|
However, if you have a non-integer primary key, you can simply pass ``pk_indexable=False`` to the constructor, and
|
|
Auditlog will fall back to using a non-indexed text based field for this model.
|
|
|
|
Using this field will not automatically register the model for automatic logging. This is done so you can be more
|
|
flexible with how you use this field.
|
|
|
|
:param pk_indexable: Whether the primary key for this model is not an :py:class:`int` or :py:class:`long`.
|
|
:type pk_indexable: bool
|
|
:param delete_related: By default, including a generic relation into a model will cause all related objects to be
|
|
cascade-deleted when the parent object is deleted. Passing False to this overrides this behavior, retaining
|
|
the full auditlog history for the object. Defaults to True, because that's Django's default behavior.
|
|
:type delete_related: bool
|
|
"""
|
|
|
|
def __init__(self, pk_indexable=True, delete_related=True, **kwargs):
|
|
kwargs["to"] = LogEntry
|
|
|
|
if pk_indexable:
|
|
kwargs["object_id_field"] = "object_id"
|
|
else:
|
|
kwargs["object_id_field"] = "object_pk"
|
|
|
|
kwargs["content_type_field"] = "content_type"
|
|
self.delete_related = delete_related
|
|
super().__init__(**kwargs)
|
|
|
|
def bulk_related_objects(self, objs, using=DEFAULT_DB_ALIAS):
|
|
"""
|
|
Return all objects related to ``objs`` via this ``GenericRelation``.
|
|
"""
|
|
if self.delete_related:
|
|
return super().bulk_related_objects(objs, using)
|
|
|
|
# When deleting, Collector.collect() finds related objects using this
|
|
# method. However, because we don't want to delete these related
|
|
# objects, we simply return an empty list.
|
|
return []
|
|
|
|
|
|
# South compatibility for AuditlogHistoryField
|
|
try:
|
|
from south.modelsinspector import add_introspection_rules
|
|
|
|
add_introspection_rules([], [r"^auditlog\.models\.AuditlogHistoryField"])
|
|
raise DeprecationWarning(
|
|
"South support will be dropped in django-auditlog 0.4.0 or later."
|
|
)
|
|
except ImportError:
|
|
pass
|