mirror of
https://github.com/jazzband/django-auditlog.git
synced 2026-03-17 06:30:27 +00:00
Compare commits
100 commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
cc90f32ada | ||
|
|
48adbc5a1e | ||
|
|
465bfded80 | ||
|
|
bd68e2a619 | ||
|
|
aa4f7a8108 | ||
|
|
bdf595e4dc | ||
|
|
a13f8d9e60 | ||
|
|
d7a73e5cbf | ||
|
|
f12e8a74fc | ||
|
|
1fc1e678c7 | ||
|
|
d067de73b6 | ||
|
|
cb99bcfbc4 | ||
|
|
093b8093de | ||
|
|
cf63860dad | ||
|
|
943d5a9e6b | ||
|
|
ea57dfd9c1 | ||
|
|
ccf4b69113 | ||
|
|
327f8c7067 | ||
|
|
94182f86e9 | ||
|
|
59bf633fbf | ||
|
|
783316331f | ||
|
|
e404e82795 | ||
|
|
458f3e3766 | ||
|
|
4232d685bd | ||
|
|
49d92b30fe | ||
|
|
b130e3088e | ||
|
|
7801239387 | ||
|
|
c57be3a2c1 | ||
|
|
5d2bc88b2d | ||
|
|
f647966210 | ||
|
|
2b44eebd50 | ||
|
|
6c0c83e7e5 | ||
|
|
034ba57d93 | ||
|
|
13cad5b25a | ||
|
|
9629f3f8d7 | ||
|
|
3eb5d66c39 | ||
|
|
a0a0726982 | ||
|
|
ff6349a89a | ||
|
|
8b97cc2acb | ||
|
|
5dcb069bb8 | ||
|
|
d7a6496ad8 | ||
|
|
df16b2a8da | ||
|
|
be82018266 | ||
|
|
de693dd092 | ||
|
|
6379683f77 | ||
|
|
c346adb8b5 | ||
|
|
701f867a04 | ||
|
|
0ca00faafc | ||
|
|
cb1fefb793 | ||
|
|
536a841bf3 | ||
|
|
08a7b82acc | ||
|
|
8d2bb0f319 | ||
|
|
3ac4311ae4 | ||
|
|
917b490ee4 | ||
|
|
47a268eef9 | ||
|
|
ca29848d78 | ||
|
|
a6cea38c9e | ||
|
|
33a35e841b | ||
|
|
a11d1a3e46 | ||
|
|
45a27ec1d6 | ||
|
|
e6f3f12bae | ||
|
|
28a60af5f3 | ||
|
|
fd169771df | ||
|
|
d97ac056d4 | ||
|
|
33c35b42db | ||
|
|
0f3c7b430f | ||
|
|
bf8ba7a0be | ||
|
|
2b536fc87e | ||
|
|
106417c684 | ||
|
|
358971aafe | ||
|
|
f58b3d7685 | ||
|
|
5cd55ac38c | ||
|
|
8397754a20 | ||
|
|
da49432924 | ||
|
|
3af06e13c7 | ||
|
|
ae57b0c322 | ||
|
|
bde49bdb4f | ||
|
|
c66b36c700 | ||
|
|
2ae401a04f | ||
|
|
5ba554af56 | ||
|
|
df2bf0a05c | ||
|
|
05e6b179fd | ||
|
|
07b38a9345 | ||
|
|
5784247180 | ||
|
|
2a43cff96f | ||
|
|
b16b1a0df3 | ||
|
|
9152d225bb | ||
|
|
c53b766132 | ||
|
|
e35d0f4194 | ||
|
|
e60876ae14 | ||
|
|
5dbea8a9a1 | ||
|
|
cfbc588cc1 | ||
|
|
ee6bb33bc9 | ||
|
|
c9c97b6861 | ||
|
|
5f5cc7f7e9 | ||
|
|
a5381b6195 | ||
|
|
2dc0ac43b5 | ||
|
|
aa28009d3b | ||
|
|
03b8616dac | ||
|
|
62c1e676cc |
23 changed files with 859 additions and 329 deletions
1
LICENSE
1
LICENSE
|
|
@ -1,6 +1,7 @@
|
|||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2013-2020 Jan-Jelle Kester
|
||||
Copyright (c) 2019-2020 Alieh Rymašeuski <alieh.rymasheuski@gmail.com>
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
import django
|
||||
from pkg_resources import DistributionNotFound, get_distribution
|
||||
|
||||
try:
|
||||
|
|
@ -6,4 +7,5 @@ except DistributionNotFound:
|
|||
# package is not installed
|
||||
pass
|
||||
|
||||
default_app_config = "auditlog.apps.AuditlogConfig"
|
||||
if django.VERSION < (3, 2):
|
||||
default_app_config = "auditlog.apps.AuditlogConfig"
|
||||
|
|
|
|||
|
|
@ -1,10 +1,30 @@
|
|||
from django.conf import settings
|
||||
from django.contrib import admin
|
||||
from django.core.paginator import Paginator
|
||||
from django.utils.functional import cached_property
|
||||
|
||||
from auditlog.filters import ResourceTypeFilter
|
||||
from auditlog.count import limit_query_time
|
||||
from auditlog.filters import (
|
||||
FieldFilter,
|
||||
ResourceTypeFilter,
|
||||
ShortActorFilter,
|
||||
get_timestamp_filter,
|
||||
)
|
||||
from auditlog.mixins import LogEntryAdminMixin
|
||||
from auditlog.models import LogEntry
|
||||
|
||||
|
||||
class TimeLimitedPaginator(Paginator):
|
||||
"""A PostgreSQL-specific paginator with a hard time limit for total count of pages."""
|
||||
|
||||
@cached_property
|
||||
@limit_query_time(
|
||||
getattr(settings, "AUDITLOG_PAGINATOR_TIMEOUT", 500), default=100000
|
||||
)
|
||||
def count(self):
|
||||
return super().count
|
||||
|
||||
|
||||
class LogEntryAdmin(admin.ModelAdmin, LogEntryAdminMixin):
|
||||
list_display = ["created", "resource_url", "action", "msg_short", "user_url"]
|
||||
search_fields = [
|
||||
|
|
@ -14,12 +34,21 @@ class LogEntryAdmin(admin.ModelAdmin, LogEntryAdminMixin):
|
|||
"actor__first_name",
|
||||
"actor__last_name",
|
||||
]
|
||||
list_filter = ["action", ResourceTypeFilter]
|
||||
list_filter = [
|
||||
"action",
|
||||
ShortActorFilter,
|
||||
ResourceTypeFilter,
|
||||
FieldFilter,
|
||||
("timestamp", get_timestamp_filter()),
|
||||
]
|
||||
readonly_fields = ["created", "resource_url", "action", "user_url", "msg"]
|
||||
fieldsets = [
|
||||
(None, {"fields": ["created", "user_url", "resource_url"]}),
|
||||
("Changes", {"fields": ["action", "msg"]}),
|
||||
]
|
||||
list_select_related = ["actor", "content_type"]
|
||||
show_full_result_count = False
|
||||
paginator = TimeLimitedPaginator
|
||||
|
||||
|
||||
admin.site.register(LogEntry, LogEntryAdmin)
|
||||
|
|
|
|||
67
auditlog/context.py
Normal file
67
auditlog/context.py
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
import contextlib
|
||||
import threading
|
||||
import time
|
||||
from functools import partial
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.db.models.signals import pre_save
|
||||
|
||||
from auditlog.models import LogEntry
|
||||
|
||||
threadlocal = threading.local()
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def set_actor(actor, remote_addr=None):
|
||||
"""Connect a signal receiver with current user attached."""
|
||||
# Initialize thread local storage
|
||||
threadlocal.auditlog = {
|
||||
"signal_duid": ("set_actor", time.time()),
|
||||
"remote_addr": remote_addr,
|
||||
}
|
||||
|
||||
# Connect signal for automatic logging
|
||||
set_actor = partial(
|
||||
_set_actor, user=actor, signal_duid=threadlocal.auditlog["signal_duid"]
|
||||
)
|
||||
pre_save.connect(
|
||||
set_actor,
|
||||
sender=LogEntry,
|
||||
dispatch_uid=threadlocal.auditlog["signal_duid"],
|
||||
weak=False,
|
||||
)
|
||||
|
||||
try:
|
||||
yield
|
||||
|
||||
finally:
|
||||
try:
|
||||
auditlog = threadlocal.auditlog
|
||||
except AttributeError:
|
||||
pass
|
||||
else:
|
||||
pre_save.disconnect(sender=LogEntry, dispatch_uid=auditlog["signal_duid"])
|
||||
del threadlocal.auditlog
|
||||
|
||||
|
||||
def _set_actor(user, sender, instance, signal_duid, **kwargs):
|
||||
"""Signal receiver with extra 'user' and 'signal_duid' kwargs.
|
||||
|
||||
This function becomes a valid signal receiver when it is curried with the actor and a dispatch id.
|
||||
"""
|
||||
try:
|
||||
auditlog = threadlocal.auditlog
|
||||
except AttributeError:
|
||||
pass
|
||||
else:
|
||||
if signal_duid != auditlog["signal_duid"]:
|
||||
return
|
||||
auth_user_model = get_user_model()
|
||||
if (
|
||||
sender == LogEntry
|
||||
and isinstance(user, auth_user_model)
|
||||
and instance.actor is None
|
||||
):
|
||||
instance.actor = user
|
||||
|
||||
instance.remote_addr = auditlog["remote_addr"]
|
||||
23
auditlog/count.py
Normal file
23
auditlog/count.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
from django.db import OperationalError, connection, transaction
|
||||
|
||||
|
||||
def limit_query_time(timeout, default=None):
|
||||
"""A PostgreSQL-specific decorator with a hard time limit and a default return value.
|
||||
|
||||
Timeout in milliseconds.
|
||||
|
||||
Courtesy of https://medium.com/@hakibenita/optimizing-django-admin-paginator-53c4eb6bfca3
|
||||
"""
|
||||
|
||||
def decorator(function):
|
||||
def _limit_query_time(*args, **kwargs):
|
||||
with transaction.atomic(), connection.cursor() as cursor:
|
||||
cursor.execute("SET LOCAL statement_timeout TO %s;", (timeout,))
|
||||
try:
|
||||
return function(*args, **kwargs)
|
||||
except OperationalError:
|
||||
return default
|
||||
|
||||
return _limit_query_time
|
||||
|
||||
return decorator
|
||||
|
|
@ -1,4 +1,29 @@
|
|||
from django.apps import apps
|
||||
from django.contrib.admin import SimpleListFilter
|
||||
from django.contrib.admin.filters import DateFieldListFilter
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.contrib.postgres.fields import JSONField
|
||||
from django.db import connection
|
||||
from django.db.models import Value
|
||||
from django.db.models.functions import Cast, Concat
|
||||
|
||||
from auditlog.registry import auditlog
|
||||
|
||||
|
||||
class ShortActorFilter(SimpleListFilter):
|
||||
title = "Actor"
|
||||
parameter_name = "actor"
|
||||
|
||||
def lookups(self, request, model_admin):
|
||||
return [("null", "System"), ("not_null", "Users")]
|
||||
|
||||
def queryset(self, request, queryset):
|
||||
value = self.value()
|
||||
if value is None:
|
||||
return queryset
|
||||
if value == "null":
|
||||
return queryset.filter(actor__isnull=True)
|
||||
return queryset.filter(actor__isnull=False)
|
||||
|
||||
|
||||
class ResourceTypeFilter(SimpleListFilter):
|
||||
|
|
@ -6,11 +31,68 @@ class ResourceTypeFilter(SimpleListFilter):
|
|||
parameter_name = "resource_type"
|
||||
|
||||
def lookups(self, request, model_admin):
|
||||
qs = model_admin.get_queryset(request)
|
||||
types = qs.values_list("content_type_id", "content_type__model")
|
||||
return list(types.order_by("content_type__model").distinct())
|
||||
tracked_model_names = [
|
||||
"{}.{}".format(m._meta.app_label, m._meta.model_name)
|
||||
for m in auditlog.get_models()
|
||||
]
|
||||
model_name_concat = Concat("app_label", Value("."), "model")
|
||||
content_types = ContentType.objects.annotate(
|
||||
model_name=model_name_concat,
|
||||
).filter(
|
||||
model_name__in=tracked_model_names,
|
||||
)
|
||||
return content_types.order_by("model_name").values_list("id", "model_name")
|
||||
|
||||
def queryset(self, request, queryset):
|
||||
if self.value() is None:
|
||||
return queryset
|
||||
return queryset.filter(content_type_id=self.value())
|
||||
|
||||
|
||||
class FieldFilter(SimpleListFilter):
|
||||
title = "Field"
|
||||
parameter_name = "field"
|
||||
parent = ResourceTypeFilter
|
||||
|
||||
def __init__(self, request, *args, **kwargs):
|
||||
self.target_model = self._get_target_model(request)
|
||||
super().__init__(request, *args, **kwargs)
|
||||
|
||||
def _get_target_model(self, request):
|
||||
# the parameters consumed by previous filters aren't passed to subsequent filters,
|
||||
# so we have to look into the request parameters explicitly
|
||||
content_type_id = request.GET.get(self.parent.parameter_name)
|
||||
if not content_type_id:
|
||||
return None
|
||||
|
||||
return ContentType.objects.get(id=content_type_id).model_class()
|
||||
|
||||
def lookups(self, request, model_admin):
|
||||
if connection.vendor != "postgresql":
|
||||
# filtering inside JSON is PostgreSQL-specific for now
|
||||
return []
|
||||
if not self.target_model:
|
||||
return []
|
||||
return sorted(
|
||||
(field.name, field.name) for field in self.target_model._meta.fields
|
||||
)
|
||||
|
||||
def queryset(self, request, queryset):
|
||||
if self.value() is None:
|
||||
return queryset
|
||||
return queryset.annotate(changes_json=Cast("changes", JSONField())).filter(
|
||||
**{"changes_json__{}__isnull".format(self.value()): False}
|
||||
)
|
||||
|
||||
|
||||
def get_timestamp_filter():
|
||||
"""Returns rangefilter filter class if able or a simple list filter as a fallback."""
|
||||
if apps.is_installed("rangefilter"):
|
||||
try:
|
||||
from rangefilter.filter import DateTimeRangeFilter
|
||||
|
||||
return DateTimeRangeFilter
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
return DateFieldListFilter
|
||||
|
|
|
|||
|
|
@ -1,97 +1,35 @@
|
|||
import threading
|
||||
import time
|
||||
from functools import partial
|
||||
import contextlib
|
||||
|
||||
from django.apps import apps
|
||||
from django.conf import settings
|
||||
from django.db.models.signals import pre_save
|
||||
from django.utils.deprecation import MiddlewareMixin
|
||||
|
||||
from auditlog.models import LogEntry
|
||||
|
||||
threadlocal = threading.local()
|
||||
from auditlog.context import set_actor
|
||||
|
||||
|
||||
class AuditlogMiddleware(MiddlewareMixin):
|
||||
@contextlib.contextmanager
|
||||
def nullcontext():
|
||||
"""Equivalent to contextlib.nullcontext(None) from Python 3.7."""
|
||||
yield
|
||||
|
||||
|
||||
class AuditlogMiddleware(object):
|
||||
"""
|
||||
Middleware to couple the request's user to log items. This is accomplished by currying the signal receiver with the
|
||||
user from the request (or None if the user is not authenticated).
|
||||
"""
|
||||
|
||||
def process_request(self, request):
|
||||
"""
|
||||
Gets the current user from the request and prepares and connects a signal receiver with the user already
|
||||
attached to it.
|
||||
"""
|
||||
# Initialize thread local storage
|
||||
threadlocal.auditlog = {
|
||||
"signal_duid": (self.__class__, time.time()),
|
||||
"remote_addr": request.META.get("REMOTE_ADDR"),
|
||||
}
|
||||
def __init__(self, get_response=None):
|
||||
self.get_response = get_response
|
||||
|
||||
def __call__(self, request):
|
||||
|
||||
# In case of proxy, set 'original' address
|
||||
if request.META.get("HTTP_X_FORWARDED_FOR"):
|
||||
threadlocal.auditlog["remote_addr"] = request.META.get(
|
||||
"HTTP_X_FORWARDED_FOR"
|
||||
).split(",")[0]
|
||||
# In case of proxy, set 'original' address
|
||||
remote_addr = request.META.get("HTTP_X_FORWARDED_FOR").split(",")[0]
|
||||
else:
|
||||
remote_addr = request.META.get("REMOTE_ADDR")
|
||||
|
||||
# Connect signal for automatic logging
|
||||
if hasattr(request, "user") and getattr(
|
||||
request.user, "is_authenticated", False
|
||||
):
|
||||
set_actor = partial(
|
||||
self.set_actor,
|
||||
user=request.user,
|
||||
signal_duid=threadlocal.auditlog["signal_duid"],
|
||||
)
|
||||
pre_save.connect(
|
||||
set_actor,
|
||||
sender=LogEntry,
|
||||
dispatch_uid=threadlocal.auditlog["signal_duid"],
|
||||
weak=False,
|
||||
)
|
||||
if hasattr(request, "user") and request.user.is_authenticated:
|
||||
context = set_actor(actor=request.user, remote_addr=remote_addr)
|
||||
else:
|
||||
context = nullcontext()
|
||||
|
||||
def process_response(self, request, response):
|
||||
"""
|
||||
Disconnects the signal receiver to prevent it from staying active.
|
||||
"""
|
||||
if hasattr(threadlocal, "auditlog"):
|
||||
pre_save.disconnect(
|
||||
sender=LogEntry, dispatch_uid=threadlocal.auditlog["signal_duid"]
|
||||
)
|
||||
|
||||
return response
|
||||
|
||||
def process_exception(self, request, exception):
|
||||
"""
|
||||
Disconnects the signal receiver to prevent it from staying active in case of an exception.
|
||||
"""
|
||||
if hasattr(threadlocal, "auditlog"):
|
||||
pre_save.disconnect(
|
||||
sender=LogEntry, dispatch_uid=threadlocal.auditlog["signal_duid"]
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def set_actor(user, sender, instance, signal_duid, **kwargs):
|
||||
"""
|
||||
Signal receiver with an extra, required 'user' kwarg. This method becomes a real (valid) signal receiver when
|
||||
it is curried with the actor.
|
||||
"""
|
||||
if hasattr(threadlocal, "auditlog"):
|
||||
if signal_duid != threadlocal.auditlog["signal_duid"]:
|
||||
return
|
||||
try:
|
||||
app_label, model_name = settings.AUTH_USER_MODEL.split(".")
|
||||
auth_user_model = apps.get_model(app_label, model_name)
|
||||
except ValueError:
|
||||
auth_user_model = apps.get_model("auth", "user")
|
||||
if (
|
||||
sender == LogEntry
|
||||
and isinstance(user, auth_user_model)
|
||||
and instance.actor is None
|
||||
):
|
||||
instance.actor = user
|
||||
|
||||
instance.remote_addr = threadlocal.auditlog["remote_addr"]
|
||||
with context:
|
||||
return self.get_response(request)
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import jsonfield.fields
|
||||
from django.db import migrations, models
|
||||
from django.db import migrations
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
import jsonfield.fields
|
||||
from django.db import migrations, models
|
||||
from django.db import migrations
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
|
|
|||
19
auditlog/migrations/0008_timestamp_index.py
Normal file
19
auditlog/migrations/0008_timestamp_index.py
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("auditlog", "0007_object_pk_type"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterField(
|
||||
model_name="logentry",
|
||||
name="timestamp",
|
||||
field=models.DateTimeField(
|
||||
auto_now_add=True, db_index=True, verbose_name="timestamp"
|
||||
),
|
||||
),
|
||||
]
|
||||
21
auditlog/migrations/0009_timestamp_id_index.py
Normal file
21
auditlog/migrations/0009_timestamp_id_index.py
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from django.db import migrations, models
|
||||
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
|
||||
dependencies = [
|
||||
("auditlog", "0008_timestamp_index"),
|
||||
]
|
||||
|
||||
operations = [
|
||||
migrations.AlterIndexTogether(
|
||||
name="logentry",
|
||||
index_together={("timestamp", "id")},
|
||||
),
|
||||
migrations.AlterField(
|
||||
model_name="logentry",
|
||||
name="timestamp",
|
||||
field=models.DateTimeField(auto_now_add=True, verbose_name="timestamp"),
|
||||
),
|
||||
]
|
||||
|
|
@ -3,8 +3,9 @@ import json
|
|||
from django import urls as urlresolvers
|
||||
from django.conf import settings
|
||||
from django.urls.exceptions import NoReverseMatch
|
||||
from django.utils.html import format_html
|
||||
from django.utils.html import format_html, format_html_join
|
||||
from django.utils.safestring import mark_safe
|
||||
from django.utils.timezone import localtime
|
||||
|
||||
from auditlog.models import LogEntry
|
||||
|
||||
|
|
@ -13,7 +14,7 @@ MAX = 75
|
|||
|
||||
class LogEntryAdminMixin(object):
|
||||
def created(self, obj):
|
||||
return obj.timestamp.strftime("%Y-%m-%d %H:%M:%S")
|
||||
return localtime(obj.timestamp).strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
created.short_description = "Created"
|
||||
|
||||
|
|
@ -61,16 +62,61 @@ class LogEntryAdminMixin(object):
|
|||
if obj.action == LogEntry.Action.DELETE:
|
||||
return "" # delete
|
||||
changes = json.loads(obj.changes)
|
||||
msg = "<table><tr><th>#</th><th>Field</th><th>From</th><th>To</th></tr>"
|
||||
for i, field in enumerate(sorted(changes), 1):
|
||||
value = [i, field] + (
|
||||
["***", "***"] if field == "password" else changes[field]
|
||||
)
|
||||
msg += format_html(
|
||||
"<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>", *value
|
||||
)
|
||||
|
||||
msg += "</table>"
|
||||
return mark_safe(msg)
|
||||
atom_changes = {}
|
||||
m2m_changes = {}
|
||||
|
||||
for field, change in changes.items():
|
||||
if isinstance(change, dict):
|
||||
assert (
|
||||
change["type"] == "m2m"
|
||||
), "Only m2m operations are expected to produce dict changes now"
|
||||
m2m_changes[field] = change
|
||||
else:
|
||||
atom_changes[field] = change
|
||||
|
||||
msg = []
|
||||
|
||||
if atom_changes:
|
||||
msg.append("<table>")
|
||||
msg.append(self._format_header("#", "Field", "From", "To"))
|
||||
for i, (field, change) in enumerate(sorted(atom_changes.items()), 1):
|
||||
value = [i, field] + (["***", "***"] if field == "password" else change)
|
||||
msg.append(self._format_line(*value))
|
||||
msg.append("</table>")
|
||||
|
||||
if m2m_changes:
|
||||
msg.append("<table>")
|
||||
msg.append(self._format_header("#", "Relationship", "Action", "Objects"))
|
||||
for i, (field, change) in enumerate(sorted(m2m_changes.items()), 1):
|
||||
change_html = format_html_join(
|
||||
mark_safe("</br>"),
|
||||
"{}",
|
||||
[(value,) for value in change["objects"]],
|
||||
)
|
||||
|
||||
msg.append(
|
||||
format_html(
|
||||
"<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td>",
|
||||
i,
|
||||
field,
|
||||
change["operation"],
|
||||
change_html,
|
||||
)
|
||||
)
|
||||
|
||||
msg.append("</table>")
|
||||
|
||||
return mark_safe("".join(msg))
|
||||
|
||||
msg.short_description = "Changes"
|
||||
|
||||
def _format_header(self, *labels):
|
||||
return format_html(
|
||||
"".join(["<tr>", "<th>{}</th>" * len(labels), "</tr>"]), *labels
|
||||
)
|
||||
|
||||
def _format_line(self, *values):
|
||||
return format_html(
|
||||
"".join(["<tr>", "<td>{}</td>" * len(values), "</tr>"]), *values
|
||||
)
|
||||
|
|
|
|||
|
|
@ -8,10 +8,10 @@ from django.contrib.contenttypes.fields import GenericRelation
|
|||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.core.exceptions import FieldDoesNotExist
|
||||
from django.db import DEFAULT_DB_ALIAS, models
|
||||
from django.db.models import Field, Q, QuerySet
|
||||
from django.db.models import Q, QuerySet
|
||||
from django.utils import formats, timezone
|
||||
from django.utils.encoding import smart_str
|
||||
from django.utils.translation import ugettext_lazy as _
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
from jsonfield.fields import JSONField
|
||||
|
||||
|
||||
|
|
@ -76,6 +76,55 @@ class LogEntryManager(models.Manager):
|
|||
)
|
||||
return None
|
||||
|
||||
def log_m2m_changes(
|
||||
self, changed_queryset, instance, operation, field_name, **kwargs
|
||||
):
|
||||
"""Create a new "changed" log entry from m2m record.
|
||||
|
||||
:param instance: The model instance to log a change for.
|
||||
:type instance: Model
|
||||
:param operation: "add" or "delete".
|
||||
:type action: str
|
||||
:param kwargs: Field overrides for the :py:class:`LogEntry` object.
|
||||
:return: The new log entry or `None` if there were no changes.
|
||||
:rtype: LogEntry
|
||||
"""
|
||||
|
||||
pk = self._get_pk_value(instance)
|
||||
if changed_queryset is not None:
|
||||
kwargs.setdefault(
|
||||
"content_type", ContentType.objects.get_for_model(instance)
|
||||
)
|
||||
kwargs.setdefault("object_pk", pk)
|
||||
kwargs.setdefault("object_repr", smart_str(instance))
|
||||
kwargs.setdefault("action", LogEntry.Action.UPDATE)
|
||||
|
||||
if isinstance(pk, int):
|
||||
kwargs.setdefault("object_id", pk)
|
||||
|
||||
get_additional_data = getattr(instance, "get_additional_data", None)
|
||||
if callable(get_additional_data):
|
||||
kwargs.setdefault("additional_data", get_additional_data())
|
||||
|
||||
objects = [smart_str(instance) for instance in changed_queryset]
|
||||
kwargs["changes"] = json.dumps(
|
||||
{
|
||||
field_name: {
|
||||
"type": "m2m",
|
||||
"operation": operation,
|
||||
"objects": objects,
|
||||
}
|
||||
}
|
||||
)
|
||||
db = instance._state.db
|
||||
return (
|
||||
self.create(**kwargs)
|
||||
if db is None or db == ""
|
||||
else self.using(db).create(**kwargs)
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def get_for_object(self, instance):
|
||||
"""
|
||||
Get log entries for the specified model instance.
|
||||
|
|
@ -239,6 +288,7 @@ class LogEntry(models.Model):
|
|||
ordering = ["-timestamp"]
|
||||
verbose_name = _("log entry")
|
||||
verbose_name_plural = _("log entries")
|
||||
index_together = ("timestamp", "id")
|
||||
|
||||
def __str__(self):
|
||||
if self.action == self.Action.CREATE:
|
||||
|
|
@ -310,14 +360,9 @@ class LogEntry(models.Model):
|
|||
values_display = []
|
||||
# handle choices fields and Postgres ArrayField to get human readable version
|
||||
choices_dict = None
|
||||
if getattr(field, "choices") and len(field.choices) > 0:
|
||||
if getattr(field, "choices", []):
|
||||
choices_dict = dict(field.choices)
|
||||
if (
|
||||
hasattr(field, "base_field")
|
||||
and isinstance(field.base_field, Field)
|
||||
and getattr(field.base_field, "choices")
|
||||
and len(field.base_field.choices) > 0
|
||||
):
|
||||
if getattr(getattr(field, "base_field", None), "choices", []):
|
||||
choices_dict = dict(field.base_field.choices)
|
||||
|
||||
if choices_dict:
|
||||
|
|
@ -332,9 +377,7 @@ class LogEntry(models.Model):
|
|||
)
|
||||
else:
|
||||
values_display.append(choices_dict.get(value, "None"))
|
||||
except ValueError:
|
||||
values_display.append(choices_dict.get(value, "None"))
|
||||
except:
|
||||
except Exception:
|
||||
values_display.append(choices_dict.get(value, "None"))
|
||||
else:
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -59,3 +59,34 @@ def log_delete(sender, instance, **kwargs):
|
|||
action=LogEntry.Action.DELETE,
|
||||
changes=json.dumps(changes),
|
||||
)
|
||||
|
||||
|
||||
def make_log_m2m_changes(field_name):
|
||||
"""Return a handler for m2m_changed with field_name enclosed."""
|
||||
|
||||
def log_m2m_changes(signal, action, **kwargs):
|
||||
"""Handle m2m_changed and call LogEntry.objects.log_m2m_changes as needed."""
|
||||
if action not in ["post_add", "post_clear", "post_remove"]:
|
||||
return
|
||||
|
||||
if action == "post_clear":
|
||||
changed_queryset = kwargs["model"].objects.all()
|
||||
else:
|
||||
changed_queryset = kwargs["model"].objects.filter(pk__in=kwargs["pk_set"])
|
||||
|
||||
if action in ["post_add"]:
|
||||
LogEntry.objects.log_m2m_changes(
|
||||
changed_queryset,
|
||||
kwargs["instance"],
|
||||
"add",
|
||||
field_name,
|
||||
)
|
||||
elif action in ["post_remove", "post_clear"]:
|
||||
LogEntry.objects.log_m2m_changes(
|
||||
changed_queryset,
|
||||
kwargs["instance"],
|
||||
"delete",
|
||||
field_name,
|
||||
)
|
||||
|
||||
return log_m2m_changes
|
||||
|
|
|
|||
|
|
@ -1,10 +1,17 @@
|
|||
from typing import Callable, Dict, List, Optional, Tuple
|
||||
from collections import defaultdict
|
||||
from typing import Callable, Collection, Dict, List, Optional, Tuple
|
||||
|
||||
from django.db.models import Model
|
||||
from django.db.models.base import ModelBase
|
||||
from django.db.models.signals import ModelSignal, post_delete, post_save, pre_save
|
||||
from django.db.models.signals import (
|
||||
ModelSignal,
|
||||
m2m_changed,
|
||||
post_delete,
|
||||
post_save,
|
||||
pre_save,
|
||||
)
|
||||
|
||||
DispatchUID = Tuple[int, str, int]
|
||||
DispatchUID = Tuple[int, int, int]
|
||||
|
||||
|
||||
class AuditlogModelRegistry(object):
|
||||
|
|
@ -17,12 +24,14 @@ class AuditlogModelRegistry(object):
|
|||
create: bool = True,
|
||||
update: bool = True,
|
||||
delete: bool = True,
|
||||
m2m: bool = True,
|
||||
custom: Optional[Dict[ModelSignal, Callable]] = None,
|
||||
):
|
||||
from auditlog.receivers import log_create, log_delete, log_update
|
||||
|
||||
self._registry = {}
|
||||
self._signals = {}
|
||||
self._m2m_signals = defaultdict(dict)
|
||||
|
||||
if create:
|
||||
self._signals[post_save] = log_create
|
||||
|
|
@ -30,6 +39,7 @@ class AuditlogModelRegistry(object):
|
|||
self._signals[pre_save] = log_update
|
||||
if delete:
|
||||
self._signals[post_delete] = log_delete
|
||||
self._m2m = m2m
|
||||
|
||||
if custom is not None:
|
||||
self._signals.update(custom)
|
||||
|
|
@ -40,6 +50,7 @@ class AuditlogModelRegistry(object):
|
|||
include_fields: Optional[List[str]] = None,
|
||||
exclude_fields: Optional[List[str]] = None,
|
||||
mapping_fields: Optional[Dict[str, str]] = None,
|
||||
m2m_fields: Optional[Collection[str]] = None,
|
||||
):
|
||||
"""
|
||||
Register a model with auditlog. Auditlog will then track mutations on this model's instances.
|
||||
|
|
@ -48,6 +59,7 @@ class AuditlogModelRegistry(object):
|
|||
:param include_fields: The fields to include. Implicitly excludes all other fields.
|
||||
:param exclude_fields: The fields to exclude. Overrides the fields to include.
|
||||
:param mapping_fields: Mapping from field names to strings in diff.
|
||||
:param m2m_fields: The fields to map as many to many.
|
||||
|
||||
"""
|
||||
|
||||
|
|
@ -57,6 +69,8 @@ class AuditlogModelRegistry(object):
|
|||
exclude_fields = []
|
||||
if mapping_fields is None:
|
||||
mapping_fields = {}
|
||||
if m2m_fields is None:
|
||||
m2m_fields = set()
|
||||
|
||||
def registrar(cls):
|
||||
"""Register models for a given class."""
|
||||
|
|
@ -67,6 +81,7 @@ class AuditlogModelRegistry(object):
|
|||
"include_fields": include_fields,
|
||||
"exclude_fields": exclude_fields,
|
||||
"mapping_fields": mapping_fields,
|
||||
"m2m_fields": m2m_fields,
|
||||
}
|
||||
self._connect_signals(cls)
|
||||
|
||||
|
|
@ -107,6 +122,7 @@ class AuditlogModelRegistry(object):
|
|||
self._disconnect_signals(model)
|
||||
|
||||
def get_models(self) -> List[ModelBase]:
|
||||
"""Get a list of all registered models."""
|
||||
return list(self._registry.keys())
|
||||
|
||||
def get_model_fields(self, model: ModelBase):
|
||||
|
|
@ -120,11 +136,26 @@ class AuditlogModelRegistry(object):
|
|||
"""
|
||||
Connect signals for the model.
|
||||
"""
|
||||
for signal in self._signals:
|
||||
receiver = self._signals[signal]
|
||||
from auditlog.receivers import make_log_m2m_changes
|
||||
|
||||
for signal, receiver in self._signals.items():
|
||||
signal.connect(
|
||||
receiver, sender=model, dispatch_uid=self._dispatch_uid(signal, model)
|
||||
receiver,
|
||||
sender=model,
|
||||
dispatch_uid=self._dispatch_uid(signal, receiver),
|
||||
)
|
||||
if self._m2m:
|
||||
for field_name in self._registry[model]["m2m_fields"]:
|
||||
receiver = make_log_m2m_changes(field_name)
|
||||
self._m2m_signals[model][field_name] = receiver
|
||||
field = getattr(model, field_name)
|
||||
m2m_model = getattr(field, "through")
|
||||
|
||||
m2m_changed.connect(
|
||||
receiver,
|
||||
sender=m2m_model,
|
||||
dispatch_uid=self._dispatch_uid(m2m_changed, receiver),
|
||||
)
|
||||
|
||||
def _disconnect_signals(self, model):
|
||||
"""
|
||||
|
|
@ -132,14 +163,20 @@ class AuditlogModelRegistry(object):
|
|||
"""
|
||||
for signal, receiver in self._signals.items():
|
||||
signal.disconnect(
|
||||
sender=model, dispatch_uid=self._dispatch_uid(signal, model)
|
||||
sender=model, dispatch_uid=self._dispatch_uid(signal, receiver)
|
||||
)
|
||||
for field_name, receiver in self._m2m_signals[model].items():
|
||||
field = getattr(model, field_name)
|
||||
m2m_model = getattr(field, "through")
|
||||
m2m_changed.disconnect(
|
||||
sender=m2m_model,
|
||||
dispatch_uid=self._dispatch_uid(m2m_changed, receiver),
|
||||
)
|
||||
del self._m2m_signals[model]
|
||||
|
||||
def _dispatch_uid(self, signal, model) -> DispatchUID:
|
||||
"""
|
||||
Generate a dispatch_uid.
|
||||
"""
|
||||
return self.__hash__(), model.__qualname__, signal.__hash__()
|
||||
def _dispatch_uid(self, signal, receiver) -> DispatchUID:
|
||||
"""Generate a dispatch_uid which is unique for a combination of self, signal, and receiver."""
|
||||
return id(self), id(signal), id(receiver)
|
||||
|
||||
|
||||
auditlog = AuditlogModelRegistry()
|
||||
|
|
|
|||
|
|
@ -1 +1,4 @@
|
|||
default_app_config = "auditlog_tests.apps.AuditlogTestConfig"
|
||||
import django
|
||||
|
||||
if django.VERSION < (3, 2):
|
||||
default_app_config = "auditlog_tests.apps.AuditlogTestConfig"
|
||||
|
|
|
|||
|
|
@ -4,7 +4,9 @@ from django.contrib.postgres.fields import ArrayField
|
|||
from django.db import models
|
||||
|
||||
from auditlog.models import AuditlogHistoryField
|
||||
from auditlog.registry import auditlog
|
||||
from auditlog.registry import AuditlogModelRegistry, auditlog
|
||||
|
||||
m2m_only_auditlog = AuditlogModelRegistry(create=False, update=False, delete=False)
|
||||
|
||||
|
||||
@auditlog.register()
|
||||
|
|
@ -80,6 +82,24 @@ class ManyRelatedModel(models.Model):
|
|||
history = AuditlogHistoryField()
|
||||
|
||||
|
||||
class FirstManyRelatedModel(models.Model):
|
||||
"""
|
||||
A model with a many to many relation to another model similar.
|
||||
"""
|
||||
|
||||
related = models.ManyToManyField("OtherManyRelatedModel", related_name="related")
|
||||
|
||||
history = AuditlogHistoryField()
|
||||
|
||||
|
||||
class OtherManyRelatedModel(models.Model):
|
||||
"""
|
||||
A model that 'receives' the other side of the many to many relation from 'FirstManyRelatedModel'.
|
||||
"""
|
||||
|
||||
history = AuditlogHistoryField()
|
||||
|
||||
|
||||
@auditlog.register(include_fields=["label"])
|
||||
class SimpleIncludeModel(models.Model):
|
||||
"""
|
||||
|
|
@ -224,6 +244,9 @@ auditlog.register(ProxyModel)
|
|||
auditlog.register(RelatedModel)
|
||||
auditlog.register(ManyRelatedModel)
|
||||
auditlog.register(ManyRelatedModel.related.through)
|
||||
m2m_only_auditlog.register(
|
||||
FirstManyRelatedModel, include_fields=["pk", "history"], m2m_fields={"related"}
|
||||
)
|
||||
auditlog.register(SimpleExcludeModel, exclude_fields=["text"])
|
||||
auditlog.register(SimpleMappingModel, mapping_fields={"sku": "Product No."})
|
||||
auditlog.register(AdditionalDataIncludedModel)
|
||||
|
|
|
|||
|
|
@ -18,18 +18,20 @@ INSTALLED_APPS = [
|
|||
"auditlog_tests",
|
||||
]
|
||||
|
||||
MIDDLEWARE = (
|
||||
MIDDLEWARE = [
|
||||
"django.middleware.common.CommonMiddleware",
|
||||
"django.contrib.sessions.middleware.SessionMiddleware",
|
||||
"django.contrib.auth.middleware.AuthenticationMiddleware",
|
||||
"django.contrib.messages.middleware.MessageMiddleware",
|
||||
"auditlog.middleware.AuditlogMiddleware",
|
||||
)
|
||||
]
|
||||
|
||||
DATABASES = {
|
||||
"default": {
|
||||
"ENGINE": "django.db.backends.postgresql",
|
||||
"NAME": os.getenv("TEST_DB_NAME", "auditlog_tests_db"),
|
||||
"NAME": os.getenv(
|
||||
"TEST_DB_NAME", "auditlog" + os.environ.get("TOX_PARALLEL_ENV", "")
|
||||
),
|
||||
"USER": os.getenv("TEST_DB_USER", "postgres"),
|
||||
"PASSWORD": os.getenv("TEST_DB_PASS", ""),
|
||||
"HOST": os.getenv("TEST_DB_HOST", "127.0.0.1"),
|
||||
|
|
|
|||
|
|
@ -1,17 +1,19 @@
|
|||
import datetime
|
||||
import itertools
|
||||
import json
|
||||
|
||||
import django
|
||||
import mock
|
||||
from dateutil.tz import gettz
|
||||
from django.conf import settings
|
||||
from django.contrib import auth
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.contrib.auth.models import AnonymousUser, User
|
||||
from django.core.exceptions import ValidationError
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db.models.signals import pre_save
|
||||
from django.http import HttpResponse
|
||||
from django.test import RequestFactory, TestCase
|
||||
from django.utils import dateformat, formats, timezone
|
||||
|
||||
from auditlog.context import set_actor
|
||||
from auditlog.middleware import AuditlogMiddleware
|
||||
from auditlog.models import LogEntry
|
||||
from auditlog.registry import auditlog
|
||||
|
|
@ -21,8 +23,10 @@ from auditlog_tests.models import (
|
|||
CharfieldTextfieldModel,
|
||||
ChoicesFieldModel,
|
||||
DateTimeFieldModel,
|
||||
FirstManyRelatedModel,
|
||||
ManyRelatedModel,
|
||||
NoDeleteHistoryModel,
|
||||
OtherManyRelatedModel,
|
||||
PostgresArrayFieldModel,
|
||||
ProxyModel,
|
||||
RelatedModel,
|
||||
|
|
@ -36,7 +40,11 @@ from auditlog_tests.models import (
|
|||
|
||||
class SimpleModelTest(TestCase):
|
||||
def setUp(self):
|
||||
self.obj = SimpleModel.objects.create(text="I am not difficult.")
|
||||
self.obj = self.make_object()
|
||||
super().setUp()
|
||||
|
||||
def make_object(self):
|
||||
return SimpleModel.objects.create(text="I am not difficult.")
|
||||
|
||||
def test_create(self):
|
||||
"""Creation is logged correctly."""
|
||||
|
|
@ -44,19 +52,16 @@ class SimpleModelTest(TestCase):
|
|||
obj = self.obj
|
||||
|
||||
# Check for log entries
|
||||
self.assertTrue(obj.history.count() == 1, msg="There is one log entry")
|
||||
self.assertEqual(obj.history.count(), 1, msg="There is one log entry")
|
||||
|
||||
try:
|
||||
history = obj.history.get()
|
||||
except obj.history.DoesNotExist:
|
||||
self.assertTrue(False, "Log entry exists")
|
||||
else:
|
||||
self.assertEqual(
|
||||
history.action, LogEntry.Action.CREATE, msg="Action is 'CREATE'"
|
||||
)
|
||||
self.assertEqual(
|
||||
history.object_repr, str(obj), msg="Representation is equal"
|
||||
)
|
||||
history = obj.history.get()
|
||||
self.check_create_log_entry(obj, history)
|
||||
|
||||
def check_create_log_entry(self, obj, history):
|
||||
self.assertEqual(
|
||||
history.action, LogEntry.Action.CREATE, msg="Action is 'CREATE'"
|
||||
)
|
||||
self.assertEqual(history.object_repr, str(obj), msg="Representation is equal")
|
||||
|
||||
def test_update(self):
|
||||
"""Updates are logged correctly."""
|
||||
|
|
@ -64,17 +69,23 @@ class SimpleModelTest(TestCase):
|
|||
obj = self.obj
|
||||
|
||||
# Change something
|
||||
obj.boolean = True
|
||||
obj.save()
|
||||
self.update(obj)
|
||||
|
||||
# Check for log entries
|
||||
self.assertTrue(
|
||||
obj.history.filter(action=LogEntry.Action.UPDATE).count() == 1,
|
||||
self.assertEqual(
|
||||
obj.history.filter(action=LogEntry.Action.UPDATE).count(),
|
||||
1,
|
||||
msg="There is one log entry for 'UPDATE'",
|
||||
)
|
||||
|
||||
history = obj.history.get(action=LogEntry.Action.UPDATE)
|
||||
self.check_update_log_entry(obj, history)
|
||||
|
||||
def update(self, obj):
|
||||
obj.boolean = True
|
||||
obj.save()
|
||||
|
||||
def check_update_log_entry(self, obj, history):
|
||||
self.assertJSONEqual(
|
||||
history.changes,
|
||||
'{"boolean": ["False", "True"]}',
|
||||
|
|
@ -85,39 +96,104 @@ class SimpleModelTest(TestCase):
|
|||
"""Deletion is logged correctly."""
|
||||
# Get the object to work with
|
||||
obj = self.obj
|
||||
|
||||
history = obj.history.latest()
|
||||
content_type = ContentType.objects.get_for_model(obj.__class__)
|
||||
pk = obj.pk
|
||||
|
||||
# Delete the object
|
||||
obj.delete()
|
||||
self.delete(obj)
|
||||
|
||||
# Check for log entries
|
||||
self.assertTrue(
|
||||
LogEntry.objects.filter(
|
||||
content_type=history.content_type,
|
||||
object_pk=history.object_pk,
|
||||
action=LogEntry.Action.DELETE,
|
||||
).count()
|
||||
== 1,
|
||||
msg="There is one log entry for 'DELETE'",
|
||||
)
|
||||
qs = LogEntry.objects.filter(content_type=content_type, object_pk=pk)
|
||||
self.assertEqual(qs.count(), 1, msg="There is one log entry for 'DELETE'")
|
||||
|
||||
history = qs.get()
|
||||
self.check_delete_log_entry(obj, history)
|
||||
|
||||
def delete(self, obj):
|
||||
obj.delete()
|
||||
|
||||
def check_delete_log_entry(self, obj, history):
|
||||
pass
|
||||
|
||||
def test_recreate(self):
|
||||
SimpleModel.objects.all().delete()
|
||||
self.obj.delete()
|
||||
self.setUp()
|
||||
self.test_create()
|
||||
|
||||
|
||||
class AltPrimaryKeyModelTest(SimpleModelTest):
|
||||
class NoActorMixin:
|
||||
def check_create_log_entry(self, obj, log_entry):
|
||||
super().check_create_log_entry(obj, log_entry)
|
||||
self.assertIsNone(log_entry.actor)
|
||||
|
||||
def check_update_log_entry(self, obj, log_entry):
|
||||
super().check_update_log_entry(obj, log_entry)
|
||||
self.assertIsNone(log_entry.actor)
|
||||
|
||||
def check_delete_log_entry(self, obj, log_entry):
|
||||
super().check_delete_log_entry(obj, log_entry)
|
||||
self.assertIsNone(log_entry.actor)
|
||||
|
||||
|
||||
class WithActorMixin:
|
||||
sequence = itertools.count()
|
||||
|
||||
def setUp(self):
|
||||
self.obj = AltPrimaryKeyModel.objects.create(
|
||||
username = "actor_{}".format(next(self.sequence))
|
||||
self.user = get_user_model().objects.create(
|
||||
username=username,
|
||||
email="{}@example.com".format(username),
|
||||
password="secret",
|
||||
)
|
||||
super().setUp()
|
||||
|
||||
def tearDown(self):
|
||||
self.user.delete()
|
||||
super().tearDown()
|
||||
|
||||
def make_object(self):
|
||||
with set_actor(self.user):
|
||||
return super().make_object()
|
||||
|
||||
def check_create_log_entry(self, obj, log_entry):
|
||||
super().check_create_log_entry(obj, log_entry)
|
||||
self.assertEqual(log_entry.actor, self.user)
|
||||
|
||||
def update(self, obj):
|
||||
with set_actor(self.user):
|
||||
return super().update(obj)
|
||||
|
||||
def check_update_log_entry(self, obj, log_entry):
|
||||
super().check_update_log_entry(obj, log_entry)
|
||||
self.assertEqual(log_entry.actor, self.user)
|
||||
|
||||
def delete(self, obj):
|
||||
with set_actor(self.user):
|
||||
return super().delete(obj)
|
||||
|
||||
def check_delete_log_entry(self, obj, log_entry):
|
||||
super().check_delete_log_entry(obj, log_entry)
|
||||
self.assertEqual(log_entry.actor, self.user)
|
||||
|
||||
|
||||
class AltPrimaryKeyModelBase(SimpleModelTest):
|
||||
def make_object(self):
|
||||
return AltPrimaryKeyModel.objects.create(
|
||||
key=str(datetime.datetime.now()), text="I am strange."
|
||||
)
|
||||
|
||||
|
||||
class UUIDPrimaryKeyModelModelTest(SimpleModelTest):
|
||||
def setUp(self):
|
||||
self.obj = UUIDPrimaryKeyModel.objects.create(text="I am strange.")
|
||||
class AltPrimaryKeyModelTest(NoActorMixin, AltPrimaryKeyModelBase):
|
||||
pass
|
||||
|
||||
|
||||
class AltPrimaryKeyModelWithActorTest(WithActorMixin, AltPrimaryKeyModelBase):
|
||||
pass
|
||||
|
||||
|
||||
class UUIDPrimaryKeyModelModelBase(SimpleModelTest):
|
||||
def make_object(self):
|
||||
return UUIDPrimaryKeyModel.objects.create(text="I am strange.")
|
||||
|
||||
def test_get_for_object(self):
|
||||
self.obj.boolean = True
|
||||
|
|
@ -135,14 +211,32 @@ class UUIDPrimaryKeyModelModelTest(SimpleModelTest):
|
|||
)
|
||||
|
||||
|
||||
class ProxyModelTest(SimpleModelTest):
|
||||
def setUp(self):
|
||||
self.obj = ProxyModel.objects.create(text="I am not what you think.")
|
||||
class UUIDPrimaryKeyModelModelTest(NoActorMixin, UUIDPrimaryKeyModelModelBase):
|
||||
pass
|
||||
|
||||
|
||||
class UUIDPrimaryKeyModelModelWithActorTest(
|
||||
WithActorMixin, UUIDPrimaryKeyModelModelBase
|
||||
):
|
||||
pass
|
||||
|
||||
|
||||
class ProxyModelBase(SimpleModelTest):
|
||||
def make_object(self):
|
||||
return ProxyModel.objects.create(text="I am not what you think.")
|
||||
|
||||
|
||||
class ProxyModelTest(NoActorMixin, ProxyModelBase):
|
||||
pass
|
||||
|
||||
|
||||
class ProxyModelWithActorTest(WithActorMixin, ProxyModelBase):
|
||||
pass
|
||||
|
||||
|
||||
class ManyRelatedModelTest(TestCase):
|
||||
"""
|
||||
Test the behaviour of a many-to-many relationship.
|
||||
Test the behaviour of a default many-to-many relationship.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
|
|
@ -161,78 +255,126 @@ class ManyRelatedModelTest(TestCase):
|
|||
)
|
||||
|
||||
|
||||
class FirstManyRelatedModelTest(TestCase):
|
||||
"""
|
||||
Test the behaviour of a many-to-many relationship.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.obj = FirstManyRelatedModel.objects.create()
|
||||
self.rel_obj = OtherManyRelatedModel.objects.create()
|
||||
|
||||
def test_related_add_from_first_side(self):
|
||||
self.obj.related.add(self.rel_obj)
|
||||
self.assertEqual(
|
||||
LogEntry.objects.get_for_objects(self.obj.related.all()).count(),
|
||||
self.rel_obj.history.count(),
|
||||
)
|
||||
self.assertEqual(
|
||||
LogEntry.objects.get_for_objects(self.obj.related.all()).first(),
|
||||
self.rel_obj.history.first(),
|
||||
)
|
||||
self.assertEqual(LogEntry.objects.count(), 1)
|
||||
|
||||
def test_related_add_from_other_side(self):
|
||||
self.rel_obj.related.add(self.obj)
|
||||
self.assertEqual(
|
||||
LogEntry.objects.get_for_objects(self.obj.related.all()).count(),
|
||||
self.rel_obj.history.count(),
|
||||
)
|
||||
self.assertEqual(
|
||||
LogEntry.objects.get_for_objects(self.obj.related.all()).first(),
|
||||
self.rel_obj.history.first(),
|
||||
)
|
||||
self.assertEqual(LogEntry.objects.count(), 1)
|
||||
|
||||
def test_related_remove_from_first_side(self):
|
||||
self.obj.related.add(self.rel_obj)
|
||||
self.obj.related.remove(self.rel_obj)
|
||||
self.assertEqual(LogEntry.objects.count(), 2)
|
||||
|
||||
def test_related_remove_from_other_side(self):
|
||||
self.rel_obj.related.add(self.obj)
|
||||
self.rel_obj.related.remove(self.obj)
|
||||
self.assertEqual(LogEntry.objects.count(), 2)
|
||||
|
||||
def test_related_clear_from_first_side(self):
|
||||
self.obj.related.add(self.rel_obj)
|
||||
self.obj.related.clear()
|
||||
self.assertEqual(LogEntry.objects.count(), 2)
|
||||
|
||||
def test_related_clear_from_other_side(self):
|
||||
self.rel_obj.related.add(self.obj)
|
||||
self.rel_obj.related.clear()
|
||||
self.assertEqual(LogEntry.objects.count(), 2)
|
||||
|
||||
|
||||
class MiddlewareTest(TestCase):
|
||||
"""
|
||||
Test the middleware responsible for connecting and disconnecting the signals used in automatic logging.
|
||||
"""
|
||||
|
||||
def setUp(self):
|
||||
self.middleware = AuditlogMiddleware()
|
||||
self.get_response_mock = mock.Mock()
|
||||
self.response_mock = mock.Mock()
|
||||
self.middleware = AuditlogMiddleware(get_response=self.get_response_mock)
|
||||
self.factory = RequestFactory()
|
||||
self.user = User.objects.create_user(
|
||||
username="test", email="test@example.com", password="top_secret"
|
||||
)
|
||||
|
||||
def side_effect(self, assertion):
|
||||
def inner(request):
|
||||
assertion()
|
||||
return self.response_mock
|
||||
|
||||
return inner
|
||||
|
||||
def assert_has_listeners(self):
|
||||
self.assertTrue(pre_save.has_listeners(LogEntry))
|
||||
|
||||
def assert_no_listeners(self):
|
||||
self.assertFalse(pre_save.has_listeners(LogEntry))
|
||||
|
||||
def test_request_anonymous(self):
|
||||
"""No actor will be logged when a user is not logged in."""
|
||||
# Create a request
|
||||
request = self.factory.get("/")
|
||||
request.user = AnonymousUser()
|
||||
|
||||
# Run middleware
|
||||
self.middleware.process_request(request)
|
||||
self.get_response_mock.side_effect = self.side_effect(self.assert_no_listeners)
|
||||
|
||||
# Validate result
|
||||
self.assertFalse(pre_save.has_listeners(LogEntry))
|
||||
response = self.middleware(request)
|
||||
|
||||
# Finalize transaction
|
||||
self.middleware.process_exception(request, None)
|
||||
self.assertIs(response, self.response_mock)
|
||||
self.get_response_mock.assert_called_once_with(request)
|
||||
self.assert_no_listeners()
|
||||
|
||||
def test_request(self):
|
||||
"""The actor will be logged when a user is logged in."""
|
||||
# Create a request
|
||||
request = self.factory.get("/")
|
||||
request.user = self.user
|
||||
# Run middleware
|
||||
self.middleware.process_request(request)
|
||||
|
||||
# Validate result
|
||||
self.assertTrue(pre_save.has_listeners(LogEntry))
|
||||
|
||||
# Finalize transaction
|
||||
self.middleware.process_exception(request, None)
|
||||
|
||||
def test_response(self):
|
||||
"""The signal will be disconnected when the request is processed."""
|
||||
# Create a request
|
||||
request = self.factory.get("/")
|
||||
request.user = self.user
|
||||
|
||||
# Run middleware
|
||||
self.middleware.process_request(request)
|
||||
self.assertTrue(
|
||||
pre_save.has_listeners(LogEntry)
|
||||
) # The signal should be present before trying to disconnect it.
|
||||
self.middleware.process_response(request, HttpResponse())
|
||||
self.get_response_mock.side_effect = self.side_effect(self.assert_has_listeners)
|
||||
|
||||
# Validate result
|
||||
self.assertFalse(pre_save.has_listeners(LogEntry))
|
||||
response = self.middleware(request)
|
||||
|
||||
self.assertIs(response, self.response_mock)
|
||||
self.get_response_mock.assert_called_once_with(request)
|
||||
self.assert_no_listeners()
|
||||
|
||||
def test_exception(self):
|
||||
"""The signal will be disconnected when an exception is raised."""
|
||||
# Create a request
|
||||
request = self.factory.get("/")
|
||||
request.user = self.user
|
||||
|
||||
# Run middleware
|
||||
self.middleware.process_request(request)
|
||||
self.assertTrue(
|
||||
pre_save.has_listeners(LogEntry)
|
||||
) # The signal should be present before trying to disconnect it.
|
||||
self.middleware.process_exception(request, ValidationError("Test"))
|
||||
SomeException = type("SomeException", (Exception,), {})
|
||||
|
||||
# Validate result
|
||||
self.assertFalse(pre_save.has_listeners(LogEntry))
|
||||
self.get_response_mock.side_effect = SomeException
|
||||
|
||||
with self.assertRaises(SomeException):
|
||||
self.middleware(request)
|
||||
|
||||
self.assert_no_listeners()
|
||||
|
||||
|
||||
class SimpeIncludeModelTest(TestCase):
|
||||
|
|
@ -241,17 +383,17 @@ class SimpeIncludeModelTest(TestCase):
|
|||
def test_register_include_fields(self):
|
||||
sim = SimpleIncludeModel(label="Include model", text="Looong text")
|
||||
sim.save()
|
||||
self.assertTrue(sim.history.count() == 1, msg="There is one log entry")
|
||||
self.assertEqual(sim.history.count(), 1, msg="There is one log entry")
|
||||
|
||||
# Change label, record
|
||||
sim.label = "Changed label"
|
||||
sim.save()
|
||||
self.assertTrue(sim.history.count() == 2, msg="There are two log entries")
|
||||
self.assertEqual(sim.history.count(), 2, msg="There are two log entries")
|
||||
|
||||
# Change text, ignore
|
||||
sim.text = "Short text"
|
||||
sim.save()
|
||||
self.assertTrue(sim.history.count() == 2, msg="There are two log entries")
|
||||
self.assertEqual(sim.history.count(), 2, msg="There are two log entries")
|
||||
|
||||
|
||||
class SimpeExcludeModelTest(TestCase):
|
||||
|
|
@ -260,17 +402,17 @@ class SimpeExcludeModelTest(TestCase):
|
|||
def test_register_exclude_fields(self):
|
||||
sem = SimpleExcludeModel(label="Exclude model", text="Looong text")
|
||||
sem.save()
|
||||
self.assertTrue(sem.history.count() == 1, msg="There is one log entry")
|
||||
self.assertEqual(sem.history.count(), 1, msg="There is one log entry")
|
||||
|
||||
# Change label, ignore
|
||||
sem.label = "Changed label"
|
||||
sem.save()
|
||||
self.assertTrue(sem.history.count() == 2, msg="There are two log entries")
|
||||
self.assertEqual(sem.history.count(), 2, msg="There are two log entries")
|
||||
|
||||
# Change text, record
|
||||
sem.text = "Short text"
|
||||
sem.save()
|
||||
self.assertTrue(sem.history.count() == 2, msg="There are two log entries")
|
||||
self.assertEqual(sem.history.count(), 2, msg="There are two log entries")
|
||||
|
||||
|
||||
class SimpleMappingModelTest(TestCase):
|
||||
|
|
@ -281,28 +423,32 @@ class SimpleMappingModelTest(TestCase):
|
|||
sku="ASD301301A6", vtxt="2.1.5", not_mapped="Not mapped"
|
||||
)
|
||||
smm.save()
|
||||
self.assertTrue(
|
||||
smm.history.latest().changes_dict["sku"][1] == "ASD301301A6",
|
||||
self.assertEqual(
|
||||
smm.history.latest().changes_dict["sku"][1],
|
||||
"ASD301301A6",
|
||||
msg="The diff function retains 'sku' and can be retrieved.",
|
||||
)
|
||||
self.assertTrue(
|
||||
smm.history.latest().changes_dict["not_mapped"][1] == "Not mapped",
|
||||
self.assertEqual(
|
||||
smm.history.latest().changes_dict["not_mapped"][1],
|
||||
"Not mapped",
|
||||
msg="The diff function does not map 'not_mapped' and can be retrieved.",
|
||||
)
|
||||
self.assertTrue(
|
||||
smm.history.latest().changes_display_dict["Product No."][1]
|
||||
== "ASD301301A6",
|
||||
self.assertEqual(
|
||||
smm.history.latest().changes_display_dict["Product No."][1],
|
||||
"ASD301301A6",
|
||||
msg="The diff function maps 'sku' as 'Product No.' and can be retrieved.",
|
||||
)
|
||||
self.assertTrue(
|
||||
smm.history.latest().changes_display_dict["Version"][1] == "2.1.5",
|
||||
self.assertEqual(
|
||||
smm.history.latest().changes_display_dict["Version"][1],
|
||||
"2.1.5",
|
||||
msg=(
|
||||
"The diff function maps 'vtxt' as 'Version' through verbose_name"
|
||||
" setting on the model field and can be retrieved."
|
||||
),
|
||||
)
|
||||
self.assertTrue(
|
||||
smm.history.latest().changes_display_dict["not mapped"][1] == "Not mapped",
|
||||
self.assertEqual(
|
||||
smm.history.latest().changes_display_dict["not mapped"][1],
|
||||
"Not mapped",
|
||||
msg=(
|
||||
"The diff function uses the django default verbose name for 'not_mapped'"
|
||||
" and can be retrieved."
|
||||
|
|
@ -326,8 +472,8 @@ class AdditionalDataModelTest(TestCase):
|
|||
label="Additional data to log entries", related=related_model
|
||||
)
|
||||
obj_with_additional_data.save()
|
||||
self.assertTrue(
|
||||
obj_with_additional_data.history.count() == 1, msg="There is 1 log entry"
|
||||
self.assertEqual(
|
||||
obj_with_additional_data.history.count(), 1, msg="There is 1 log entry"
|
||||
)
|
||||
log_entry = obj_with_additional_data.history.get()
|
||||
# FIXME: Work-around for the fact that additional_data isn't working
|
||||
|
|
@ -337,12 +483,14 @@ class AdditionalDataModelTest(TestCase):
|
|||
else:
|
||||
extra_data = log_entry.additional_data
|
||||
self.assertIsNotNone(extra_data)
|
||||
self.assertTrue(
|
||||
extra_data["related_model_text"] == related_model.text,
|
||||
self.assertEqual(
|
||||
extra_data["related_model_text"],
|
||||
related_model.text,
|
||||
msg="Related model's text is logged",
|
||||
)
|
||||
self.assertTrue(
|
||||
extra_data["related_model_id"] == related_model.id,
|
||||
self.assertEqual(
|
||||
extra_data["related_model_id"],
|
||||
related_model.id,
|
||||
msg="Related model's id is logged",
|
||||
)
|
||||
|
||||
|
|
@ -365,7 +513,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
naive_dt=self.now,
|
||||
)
|
||||
dtm.save()
|
||||
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
|
||||
self.assertEqual(dtm.history.count(), 1, msg="There is one log entry")
|
||||
|
||||
# Change timestamp to same datetime and timezone
|
||||
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
|
||||
|
|
@ -375,7 +523,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
dtm.save()
|
||||
|
||||
# Nothing should have changed
|
||||
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
|
||||
self.assertEqual(dtm.history.count(), 1, msg="There is one log entry")
|
||||
|
||||
def test_model_with_different_timezone(self):
|
||||
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
|
||||
|
|
@ -389,7 +537,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
naive_dt=self.now,
|
||||
)
|
||||
dtm.save()
|
||||
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
|
||||
self.assertEqual(dtm.history.count(), 1, msg="There is one log entry")
|
||||
|
||||
# Change timestamp to same datetime in another timezone
|
||||
timestamp = datetime.datetime(2017, 1, 10, 13, 0, tzinfo=self.utc_plus_one)
|
||||
|
|
@ -397,7 +545,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
dtm.save()
|
||||
|
||||
# Nothing should have changed
|
||||
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
|
||||
self.assertEqual(dtm.history.count(), 1, msg="There is one log entry")
|
||||
|
||||
def test_model_with_different_datetime(self):
|
||||
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
|
||||
|
|
@ -411,7 +559,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
naive_dt=self.now,
|
||||
)
|
||||
dtm.save()
|
||||
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
|
||||
self.assertEqual(dtm.history.count(), 1, msg="There is one log entry")
|
||||
|
||||
# Change timestamp to another datetime in the same timezone
|
||||
timestamp = datetime.datetime(2017, 1, 10, 13, 0, tzinfo=timezone.utc)
|
||||
|
|
@ -419,7 +567,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
dtm.save()
|
||||
|
||||
# The time should have changed.
|
||||
self.assertTrue(dtm.history.count() == 2, msg="There are two log entries")
|
||||
self.assertEqual(dtm.history.count(), 2, msg="There are two log entries")
|
||||
|
||||
def test_model_with_different_date(self):
|
||||
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
|
||||
|
|
@ -433,7 +581,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
naive_dt=self.now,
|
||||
)
|
||||
dtm.save()
|
||||
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
|
||||
self.assertEqual(dtm.history.count(), 1, msg="There is one log entry")
|
||||
|
||||
# Change timestamp to another datetime in the same timezone
|
||||
date = datetime.datetime(2017, 1, 11)
|
||||
|
|
@ -441,7 +589,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
dtm.save()
|
||||
|
||||
# The time should have changed.
|
||||
self.assertTrue(dtm.history.count() == 2, msg="There are two log entries")
|
||||
self.assertEqual(dtm.history.count(), 2, msg="There are two log entries")
|
||||
|
||||
def test_model_with_different_time(self):
|
||||
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
|
||||
|
|
@ -455,7 +603,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
naive_dt=self.now,
|
||||
)
|
||||
dtm.save()
|
||||
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
|
||||
self.assertEqual(dtm.history.count(), 1, msg="There is one log entry")
|
||||
|
||||
# Change timestamp to another datetime in the same timezone
|
||||
time = datetime.time(6, 0)
|
||||
|
|
@ -463,7 +611,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
dtm.save()
|
||||
|
||||
# The time should have changed.
|
||||
self.assertTrue(dtm.history.count() == 2, msg="There are two log entries")
|
||||
self.assertEqual(dtm.history.count(), 2, msg="There are two log entries")
|
||||
|
||||
def test_model_with_different_time_and_timezone(self):
|
||||
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
|
||||
|
|
@ -477,7 +625,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
naive_dt=self.now,
|
||||
)
|
||||
dtm.save()
|
||||
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
|
||||
self.assertEqual(dtm.history.count(), 1, msg="There is one log entry")
|
||||
|
||||
# Change timestamp to another datetime and another timezone
|
||||
timestamp = datetime.datetime(2017, 1, 10, 14, 0, tzinfo=self.utc_plus_one)
|
||||
|
|
@ -485,7 +633,7 @@ class DateTimeFieldModelTest(TestCase):
|
|||
dtm.save()
|
||||
|
||||
# The time should have changed.
|
||||
self.assertTrue(dtm.history.count() == 2, msg="There are two log entries")
|
||||
self.assertEqual(dtm.history.count(), 2, msg="There are two log entries")
|
||||
|
||||
def test_changes_display_dict_datetime(self):
|
||||
timestamp = datetime.datetime(2017, 1, 10, 15, 0, tzinfo=timezone.utc)
|
||||
|
|
@ -500,9 +648,9 @@ class DateTimeFieldModelTest(TestCase):
|
|||
)
|
||||
dtm.save()
|
||||
localized_timestamp = timestamp.astimezone(gettz(settings.TIME_ZONE))
|
||||
self.assertTrue(
|
||||
dtm.history.latest().changes_display_dict["timestamp"][1]
|
||||
== dateformat.format(localized_timestamp, settings.DATETIME_FORMAT),
|
||||
self.assertEqual(
|
||||
dtm.history.latest().changes_display_dict["timestamp"][1],
|
||||
dateformat.format(localized_timestamp, settings.DATETIME_FORMAT),
|
||||
msg=(
|
||||
"The datetime should be formatted according to Django's settings for"
|
||||
" DATETIME_FORMAT"
|
||||
|
|
@ -512,9 +660,9 @@ class DateTimeFieldModelTest(TestCase):
|
|||
dtm.timestamp = timestamp
|
||||
dtm.save()
|
||||
localized_timestamp = timestamp.astimezone(gettz(settings.TIME_ZONE))
|
||||
self.assertTrue(
|
||||
dtm.history.latest().changes_display_dict["timestamp"][1]
|
||||
== dateformat.format(localized_timestamp, settings.DATETIME_FORMAT),
|
||||
self.assertEqual(
|
||||
dtm.history.latest().changes_display_dict["timestamp"][1],
|
||||
dateformat.format(localized_timestamp, settings.DATETIME_FORMAT),
|
||||
msg=(
|
||||
"The datetime should be formatted according to Django's settings for"
|
||||
" DATETIME_FORMAT"
|
||||
|
|
@ -523,9 +671,9 @@ class DateTimeFieldModelTest(TestCase):
|
|||
|
||||
# Change USE_L10N = True
|
||||
with self.settings(USE_L10N=True, LANGUAGE_CODE="en-GB"):
|
||||
self.assertTrue(
|
||||
dtm.history.latest().changes_display_dict["timestamp"][1]
|
||||
== formats.localize(localized_timestamp),
|
||||
self.assertEqual(
|
||||
dtm.history.latest().changes_display_dict["timestamp"][1],
|
||||
formats.localize(localized_timestamp),
|
||||
msg=(
|
||||
"The datetime should be formatted according to Django's settings for"
|
||||
" USE_L10N is True with a different LANGUAGE_CODE."
|
||||
|
|
@ -544,9 +692,9 @@ class DateTimeFieldModelTest(TestCase):
|
|||
naive_dt=self.now,
|
||||
)
|
||||
dtm.save()
|
||||
self.assertTrue(
|
||||
dtm.history.latest().changes_display_dict["date"][1]
|
||||
== dateformat.format(date, settings.DATE_FORMAT),
|
||||
self.assertEqual(
|
||||
dtm.history.latest().changes_display_dict["date"][1],
|
||||
dateformat.format(date, settings.DATE_FORMAT),
|
||||
msg=(
|
||||
"The date should be formatted according to Django's settings for"
|
||||
" DATE_FORMAT unless USE_L10N is True."
|
||||
|
|
@ -555,9 +703,9 @@ class DateTimeFieldModelTest(TestCase):
|
|||
date = datetime.date(2017, 1, 11)
|
||||
dtm.date = date
|
||||
dtm.save()
|
||||
self.assertTrue(
|
||||
dtm.history.latest().changes_display_dict["date"][1]
|
||||
== dateformat.format(date, settings.DATE_FORMAT),
|
||||
self.assertEqual(
|
||||
dtm.history.latest().changes_display_dict["date"][1],
|
||||
dateformat.format(date, settings.DATE_FORMAT),
|
||||
msg=(
|
||||
"The date should be formatted according to Django's settings for"
|
||||
" DATE_FORMAT unless USE_L10N is True."
|
||||
|
|
@ -566,9 +714,9 @@ class DateTimeFieldModelTest(TestCase):
|
|||
|
||||
# Change USE_L10N = True
|
||||
with self.settings(USE_L10N=True, LANGUAGE_CODE="en-GB"):
|
||||
self.assertTrue(
|
||||
dtm.history.latest().changes_display_dict["date"][1]
|
||||
== formats.localize(date),
|
||||
self.assertEqual(
|
||||
dtm.history.latest().changes_display_dict["date"][1],
|
||||
formats.localize(date),
|
||||
msg=(
|
||||
"The date should be formatted according to Django's settings for"
|
||||
" USE_L10N is True with a different LANGUAGE_CODE."
|
||||
|
|
@ -587,9 +735,9 @@ class DateTimeFieldModelTest(TestCase):
|
|||
naive_dt=self.now,
|
||||
)
|
||||
dtm.save()
|
||||
self.assertTrue(
|
||||
dtm.history.latest().changes_display_dict["time"][1]
|
||||
== dateformat.format(time, settings.TIME_FORMAT),
|
||||
self.assertEqual(
|
||||
dtm.history.latest().changes_display_dict["time"][1],
|
||||
dateformat.format(time, settings.TIME_FORMAT),
|
||||
msg=(
|
||||
"The time should be formatted according to Django's settings for"
|
||||
" TIME_FORMAT unless USE_L10N is True."
|
||||
|
|
@ -598,9 +746,9 @@ class DateTimeFieldModelTest(TestCase):
|
|||
time = datetime.time(6, 0)
|
||||
dtm.time = time
|
||||
dtm.save()
|
||||
self.assertTrue(
|
||||
dtm.history.latest().changes_display_dict["time"][1]
|
||||
== dateformat.format(time, settings.TIME_FORMAT),
|
||||
self.assertEqual(
|
||||
dtm.history.latest().changes_display_dict["time"][1],
|
||||
dateformat.format(time, settings.TIME_FORMAT),
|
||||
msg=(
|
||||
"The time should be formatted according to Django's settings for"
|
||||
" TIME_FORMAT unless USE_L10N is True."
|
||||
|
|
@ -609,9 +757,9 @@ class DateTimeFieldModelTest(TestCase):
|
|||
|
||||
# Change USE_L10N = True
|
||||
with self.settings(USE_L10N=True, LANGUAGE_CODE="en-GB"):
|
||||
self.assertTrue(
|
||||
dtm.history.latest().changes_display_dict["time"][1]
|
||||
== formats.localize(time),
|
||||
self.assertEqual(
|
||||
dtm.history.latest().changes_display_dict["time"][1],
|
||||
formats.localize(time),
|
||||
msg=(
|
||||
"The time should be formatted according to Django's settings for"
|
||||
" USE_L10N is True with a different LANGUAGE_CODE."
|
||||
|
|
@ -651,7 +799,7 @@ class UnregisterTest(TestCase):
|
|||
obj = self.obj
|
||||
|
||||
# Check for log entries
|
||||
self.assertTrue(obj.history.count() == 0, msg="There are no log entries")
|
||||
self.assertEqual(obj.history.count(), 0, msg="There are no log entries")
|
||||
|
||||
def test_unregister_update(self):
|
||||
"""Updates are not logged after unregistering."""
|
||||
|
|
@ -663,7 +811,7 @@ class UnregisterTest(TestCase):
|
|||
obj.save()
|
||||
|
||||
# Check for log entries
|
||||
self.assertTrue(obj.history.count() == 0, msg="There are no log entries")
|
||||
self.assertEqual(obj.history.count(), 0, msg="There are no log entries")
|
||||
|
||||
def test_unregister_delete(self):
|
||||
"""Deletion is not logged after unregistering."""
|
||||
|
|
@ -674,7 +822,7 @@ class UnregisterTest(TestCase):
|
|||
obj.delete()
|
||||
|
||||
# Check for log entries
|
||||
self.assertTrue(LogEntry.objects.count() == 0, msg="There are no log entries")
|
||||
self.assertEqual(LogEntry.objects.count(), 0, msg="There are no log entries")
|
||||
|
||||
|
||||
class ChoicesFieldModelTest(TestCase):
|
||||
|
|
@ -690,28 +838,30 @@ class ChoicesFieldModelTest(TestCase):
|
|||
|
||||
def test_changes_display_dict_single_choice(self):
|
||||
|
||||
self.assertTrue(
|
||||
self.obj.history.latest().changes_display_dict["status"][1] == "Red",
|
||||
self.assertEqual(
|
||||
self.obj.history.latest().changes_display_dict["status"][1],
|
||||
"Red",
|
||||
msg="The human readable text 'Red' is displayed.",
|
||||
)
|
||||
self.obj.status = ChoicesFieldModel.GREEN
|
||||
self.obj.save()
|
||||
self.assertTrue(
|
||||
self.obj.history.latest().changes_display_dict["status"][1] == "Green",
|
||||
self.assertEqual(
|
||||
self.obj.history.latest().changes_display_dict["status"][1],
|
||||
"Green",
|
||||
msg="The human readable text 'Green' is displayed.",
|
||||
)
|
||||
|
||||
def test_changes_display_dict_multiplechoice(self):
|
||||
self.assertTrue(
|
||||
self.obj.history.latest().changes_display_dict["multiplechoice"][1]
|
||||
== "Red, Yellow, Green",
|
||||
self.assertEqual(
|
||||
self.obj.history.latest().changes_display_dict["multiplechoice"][1],
|
||||
"Red, Yellow, Green",
|
||||
msg="The human readable text 'Red, Yellow, Green' is displayed.",
|
||||
)
|
||||
self.obj.multiplechoice = ChoicesFieldModel.RED
|
||||
self.obj.save()
|
||||
self.assertTrue(
|
||||
self.obj.history.latest().changes_display_dict["multiplechoice"][1]
|
||||
== "Red",
|
||||
self.assertEqual(
|
||||
self.obj.history.latest().changes_display_dict["multiplechoice"][1],
|
||||
"Red",
|
||||
msg="The human readable text 'Red' is displayed.",
|
||||
)
|
||||
|
||||
|
|
@ -726,32 +876,32 @@ class CharfieldTextfieldModelTest(TestCase):
|
|||
)
|
||||
|
||||
def test_changes_display_dict_longchar(self):
|
||||
self.assertTrue(
|
||||
self.obj.history.latest().changes_display_dict["longchar"][1]
|
||||
== "{}...".format(self.PLACEHOLDER_LONGCHAR[:140]),
|
||||
self.assertEqual(
|
||||
self.obj.history.latest().changes_display_dict["longchar"][1],
|
||||
"{}...".format(self.PLACEHOLDER_LONGCHAR[:140]),
|
||||
msg="The string should be truncated at 140 characters with an ellipsis at the end.",
|
||||
)
|
||||
SHORTENED_PLACEHOLDER = self.PLACEHOLDER_LONGCHAR[:139]
|
||||
self.obj.longchar = SHORTENED_PLACEHOLDER
|
||||
self.obj.save()
|
||||
self.assertTrue(
|
||||
self.obj.history.latest().changes_display_dict["longchar"][1]
|
||||
== SHORTENED_PLACEHOLDER,
|
||||
self.assertEqual(
|
||||
self.obj.history.latest().changes_display_dict["longchar"][1],
|
||||
SHORTENED_PLACEHOLDER,
|
||||
msg="The field should display the entire string because it is less than 140 characters",
|
||||
)
|
||||
|
||||
def test_changes_display_dict_longtextfield(self):
|
||||
self.assertTrue(
|
||||
self.obj.history.latest().changes_display_dict["longtextfield"][1]
|
||||
== "{}...".format(self.PLACEHOLDER_LONGTEXTFIELD[:140]),
|
||||
self.assertEqual(
|
||||
self.obj.history.latest().changes_display_dict["longtextfield"][1],
|
||||
"{}...".format(self.PLACEHOLDER_LONGTEXTFIELD[:140]),
|
||||
msg="The string should be truncated at 140 characters with an ellipsis at the end.",
|
||||
)
|
||||
SHORTENED_PLACEHOLDER = self.PLACEHOLDER_LONGTEXTFIELD[:139]
|
||||
self.obj.longtextfield = SHORTENED_PLACEHOLDER
|
||||
self.obj.save()
|
||||
self.assertTrue(
|
||||
self.obj.history.latest().changes_display_dict["longtextfield"][1]
|
||||
== SHORTENED_PLACEHOLDER,
|
||||
self.assertEqual(
|
||||
self.obj.history.latest().changes_display_dict["longtextfield"][1],
|
||||
SHORTENED_PLACEHOLDER,
|
||||
msg="The field should display the entire string because it is less than 140 characters",
|
||||
)
|
||||
|
||||
|
|
@ -769,26 +919,28 @@ class PostgresArrayFieldModelTest(TestCase):
|
|||
return self.obj.history.latest().changes_display_dict["arrayfield"][1]
|
||||
|
||||
def test_changes_display_dict_arrayfield(self):
|
||||
self.assertTrue(
|
||||
self.latest_array_change == "Red, Green",
|
||||
self.assertEqual(
|
||||
self.latest_array_change,
|
||||
"Red, Green",
|
||||
msg="The human readable text for the two choices, 'Red, Green' is displayed.",
|
||||
)
|
||||
self.obj.arrayfield = [PostgresArrayFieldModel.GREEN]
|
||||
self.obj.save()
|
||||
self.assertTrue(
|
||||
self.latest_array_change == "Green",
|
||||
self.assertEqual(
|
||||
self.latest_array_change,
|
||||
"Green",
|
||||
msg="The human readable text 'Green' is displayed.",
|
||||
)
|
||||
self.obj.arrayfield = []
|
||||
self.obj.save()
|
||||
self.assertTrue(
|
||||
self.latest_array_change == "",
|
||||
msg="The human readable text '' is displayed.",
|
||||
self.assertEqual(
|
||||
self.latest_array_change, "", msg="The human readable text '' is displayed."
|
||||
)
|
||||
self.obj.arrayfield = [PostgresArrayFieldModel.GREEN]
|
||||
self.obj.save()
|
||||
self.assertTrue(
|
||||
self.latest_array_change == "Green",
|
||||
self.assertEqual(
|
||||
self.latest_array_change,
|
||||
"Green",
|
||||
msg="The human readable text 'Green' is displayed.",
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@
|
|||
import os
|
||||
import sys
|
||||
from datetime import date
|
||||
|
||||
from pkg_resources import get_distribution
|
||||
|
||||
# -- Path setup --------------------------------------------------------------
|
||||
|
|
@ -18,22 +19,23 @@ from pkg_resources import get_distribution
|
|||
# documentation root, use os.path.abspath to make it absolute, like shown here.
|
||||
|
||||
# Add sources folder
|
||||
sys.path.insert(0, os.path.abspath('../../'))
|
||||
sys.path.insert(0, os.path.abspath("../../"))
|
||||
|
||||
# Setup Django for autodoc
|
||||
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'auditlog_tests.test_settings')
|
||||
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "auditlog_tests.test_settings")
|
||||
import django
|
||||
|
||||
django.setup()
|
||||
|
||||
# -- Project information -----------------------------------------------------
|
||||
|
||||
project = 'django-auditlog'
|
||||
author = 'Jan-Jelle Kester and contributors'
|
||||
copyright = f'2013-{date.today().year}, {author}'
|
||||
project = "django-auditlog"
|
||||
author = "Jan-Jelle Kester and contributors"
|
||||
copyright = f"2013-{date.today().year}, {author}"
|
||||
|
||||
release = get_distribution('django-auditlog').version
|
||||
release = get_distribution("django-auditlog").version
|
||||
# for example take major/minor
|
||||
version = '.'.join(release.split('.')[:2])
|
||||
version = ".".join(release.split(".")[:2])
|
||||
|
||||
# -- General configuration ---------------------------------------------------
|
||||
|
||||
|
|
@ -41,27 +43,27 @@ version = '.'.join(release.split('.')[:2])
|
|||
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
|
||||
# ones.
|
||||
extensions = [
|
||||
'sphinx.ext.autodoc',
|
||||
'sphinx.ext.viewcode',
|
||||
"sphinx.ext.autodoc",
|
||||
"sphinx.ext.viewcode",
|
||||
]
|
||||
|
||||
# Master document that contains the root table of contents
|
||||
master_doc = 'index'
|
||||
master_doc = "index"
|
||||
|
||||
# Add any paths that contain templates here, relative to this directory.
|
||||
templates_path = ['_templates']
|
||||
templates_path = ["_templates"]
|
||||
|
||||
# List of patterns, relative to source directory, that match files and
|
||||
# directories to ignore when looking for source files.
|
||||
# This pattern also affects html_static_path and html_extra_path.
|
||||
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
|
||||
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
|
||||
|
||||
# -- Options for HTML output -------------------------------------------------
|
||||
|
||||
# The theme to use for HTML and HTML Help pages. See the documentation for
|
||||
# a list of builtin themes.
|
||||
#
|
||||
html_theme = 'sphinx_rtd_theme'
|
||||
html_theme = "sphinx_rtd_theme"
|
||||
|
||||
# Add any paths that contain custom static files (such as style sheets) here,
|
||||
# relative to this directory. They are copied after the builtin static files,
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
[tool.black]
|
||||
target-version = ["py36"]
|
||||
|
||||
# black compatible isort
|
||||
[tool.isort]
|
||||
profile = "black"
|
||||
known_first_party = "auditlog"
|
||||
|
|
|
|||
9
setup.py
9
setup.py
|
|
@ -16,13 +16,18 @@ setup(
|
|||
"auditlog.management",
|
||||
"auditlog.management.commands",
|
||||
],
|
||||
url="https://github.com/jazzband/django-auditlog",
|
||||
url="https://github.com/MacmillanPlatform/django-auditlog/",
|
||||
license="MIT",
|
||||
author="Jan-Jelle Kester",
|
||||
maintainer="Alieh Rymašeŭski",
|
||||
description="Audit log app for Django",
|
||||
long_description=long_description,
|
||||
long_description_content_type="text/markdown",
|
||||
install_requires=["django-jsonfield>=1.0.0", "python-dateutil>=2.6.0"],
|
||||
install_requires=[
|
||||
"django-admin-rangefilter>=0.5.0",
|
||||
"django-jsonfield>=1.0.0",
|
||||
"python-dateutil>=2.6.0",
|
||||
],
|
||||
zip_safe=False,
|
||||
classifiers=[
|
||||
"Programming Language :: Python :: 3",
|
||||
|
|
|
|||
6
tox.ini
6
tox.ini
|
|
@ -5,6 +5,8 @@ envlist =
|
|||
py38-qa
|
||||
|
||||
[testenv]
|
||||
setenv =
|
||||
COVERAGE_FILE={toxworkdir}/.coverage.{envname}
|
||||
commands =
|
||||
coverage run --source auditlog runtests.py
|
||||
coverage xml
|
||||
|
|
@ -15,7 +17,9 @@ deps =
|
|||
# Test requirements
|
||||
coverage
|
||||
codecov
|
||||
psycopg2-binary
|
||||
django-multiselectfield
|
||||
mock
|
||||
psycopg2-binary>=2.8,<2.9
|
||||
passenv=
|
||||
TEST_DB_HOST
|
||||
TEST_DB_USER
|
||||
|
|
|
|||
Loading…
Reference in a new issue