Add black and format files with black.

This commit is contained in:
Hasan Ramezani 2020-12-06 21:29:24 +01:00
parent b700e40f65
commit f5bb5cb1a2
26 changed files with 778 additions and 373 deletions

View file

@ -6,4 +6,4 @@ except DistributionNotFound:
# package is not installed
pass
default_app_config = 'auditlog.apps.AuditlogConfig'
default_app_config = "auditlog.apps.AuditlogConfig"

View file

@ -5,13 +5,19 @@ from .filters import ResourceTypeFilter
class LogEntryAdmin(admin.ModelAdmin, LogEntryAdminMixin):
list_display = ['created', 'resource_url', 'action', 'msg_short', 'user_url']
search_fields = ['timestamp', 'object_repr', 'changes', 'actor__first_name', 'actor__last_name']
list_filter = ['action', ResourceTypeFilter]
readonly_fields = ['created', 'resource_url', 'action', 'user_url', 'msg']
list_display = ["created", "resource_url", "action", "msg_short", "user_url"]
search_fields = [
"timestamp",
"object_repr",
"changes",
"actor__first_name",
"actor__last_name",
]
list_filter = ["action", ResourceTypeFilter]
readonly_fields = ["created", "resource_url", "action", "user_url", "msg"]
fieldsets = [
(None, {'fields': ['created', 'user_url', 'resource_url']}),
('Changes', {'fields': ['action', 'msg']}),
(None, {"fields": ["created", "user_url", "resource_url"]}),
("Changes", {"fields": ["action", "msg"]}),
]

View file

@ -2,5 +2,5 @@ from django.apps import AppConfig
class AuditlogConfig(AppConfig):
name = 'auditlog'
name = "auditlog"
verbose_name = "Audit log"

View file

@ -17,12 +17,16 @@ def track_field(field):
:rtype: bool
"""
from auditlog.models import LogEntry
# Do not track many to many relations
if field.many_to_many:
return False
# Do not track relations to LogEntry
if getattr(field, 'remote_field', None) is not None and field.remote_field.model == LogEntry:
if (
getattr(field, "remote_field", None) is not None
and field.remote_field.model == LogEntry
):
return False
return True
@ -108,16 +112,26 @@ def model_instance_diff(old, new):
model_fields = None
# Check if fields must be filtered
if model_fields and (model_fields['include_fields'] or model_fields['exclude_fields']) and fields:
if (
model_fields
and (model_fields["include_fields"] or model_fields["exclude_fields"])
and fields
):
filtered_fields = []
if model_fields['include_fields']:
filtered_fields = [field for field in fields
if field.name in model_fields['include_fields']]
if model_fields["include_fields"]:
filtered_fields = [
field
for field in fields
if field.name in model_fields["include_fields"]
]
else:
filtered_fields = fields
if model_fields['exclude_fields']:
filtered_fields = [field for field in filtered_fields
if field.name not in model_fields['exclude_fields']]
if model_fields["exclude_fields"]:
filtered_fields = [
field
for field in filtered_fields
if field.name not in model_fields["exclude_fields"]
]
fields = filtered_fields
for field in fields:

View file

@ -2,13 +2,13 @@ from django.contrib.admin import SimpleListFilter
class ResourceTypeFilter(SimpleListFilter):
title = 'Resource Type'
parameter_name = 'resource_type'
title = "Resource Type"
parameter_name = "resource_type"
def lookups(self, request, model_admin):
qs = model_admin.get_queryset(request)
types = qs.values_list('content_type_id', 'content_type__model')
return list(types.order_by('content_type__model').distinct())
types = qs.values_list("content_type_id", "content_type__model")
return list(types.order_by("content_type__model").distinct())
def queryset(self, request, queryset):
if self.value() is None:

View file

@ -7,16 +7,25 @@ class Command(BaseCommand):
help = "Deletes all log entries from the database."
def add_arguments(self, parser):
parser.add_argument('-y, --yes', action='store_true', default=None,
help="Continue without asking confirmation.", dest='yes')
parser.add_argument(
"-y, --yes",
action="store_true",
default=None,
help="Continue without asking confirmation.",
dest="yes",
)
def handle(self, *args, **options):
answer = options['yes']
answer = options["yes"]
if answer is None:
self.stdout.write("This action will clear all log entries from the database.")
response = input("Are you sure you want to continue? [y/N]: ").lower().strip()
answer = response == 'y'
self.stdout.write(
"This action will clear all log entries from the database."
)
response = (
input("Are you sure you want to continue? [y/N]: ").lower().strip()
)
answer = response == "y"
if answer:
count, _ = LogEntry.objects.all().delete()

View file

@ -25,25 +25,40 @@ class AuditlogMiddleware(MiddlewareMixin):
"""
# Initialize thread local storage
threadlocal.auditlog = {
'signal_duid': (self.__class__, time.time()),
'remote_addr': request.META.get('REMOTE_ADDR'),
"signal_duid": (self.__class__, time.time()),
"remote_addr": request.META.get("REMOTE_ADDR"),
}
# In case of proxy, set 'original' address
if request.META.get('HTTP_X_FORWARDED_FOR'):
threadlocal.auditlog['remote_addr'] = request.META.get('HTTP_X_FORWARDED_FOR').split(',')[0]
if request.META.get("HTTP_X_FORWARDED_FOR"):
threadlocal.auditlog["remote_addr"] = request.META.get(
"HTTP_X_FORWARDED_FOR"
).split(",")[0]
# Connect signal for automatic logging
if hasattr(request, 'user') and getattr(request.user, 'is_authenticated', False):
set_actor = partial(self.set_actor, user=request.user, signal_duid=threadlocal.auditlog['signal_duid'])
pre_save.connect(set_actor, sender=LogEntry, dispatch_uid=threadlocal.auditlog['signal_duid'], weak=False)
if hasattr(request, "user") and getattr(
request.user, "is_authenticated", False
):
set_actor = partial(
self.set_actor,
user=request.user,
signal_duid=threadlocal.auditlog["signal_duid"],
)
pre_save.connect(
set_actor,
sender=LogEntry,
dispatch_uid=threadlocal.auditlog["signal_duid"],
weak=False,
)
def process_response(self, request, response):
"""
Disconnects the signal receiver to prevent it from staying active.
"""
if hasattr(threadlocal, 'auditlog'):
pre_save.disconnect(sender=LogEntry, dispatch_uid=threadlocal.auditlog['signal_duid'])
if hasattr(threadlocal, "auditlog"):
pre_save.disconnect(
sender=LogEntry, dispatch_uid=threadlocal.auditlog["signal_duid"]
)
return response
@ -51,8 +66,10 @@ class AuditlogMiddleware(MiddlewareMixin):
"""
Disconnects the signal receiver to prevent it from staying active in case of an exception.
"""
if hasattr(threadlocal, 'auditlog'):
pre_save.disconnect(sender=LogEntry, dispatch_uid=threadlocal.auditlog['signal_duid'])
if hasattr(threadlocal, "auditlog"):
pre_save.disconnect(
sender=LogEntry, dispatch_uid=threadlocal.auditlog["signal_duid"]
)
return None
@ -62,15 +79,19 @@ class AuditlogMiddleware(MiddlewareMixin):
Signal receiver with an extra, required 'user' kwarg. This method becomes a real (valid) signal receiver when
it is curried with the actor.
"""
if hasattr(threadlocal, 'auditlog'):
if signal_duid != threadlocal.auditlog['signal_duid']:
if hasattr(threadlocal, "auditlog"):
if signal_duid != threadlocal.auditlog["signal_duid"]:
return
try:
app_label, model_name = settings.AUTH_USER_MODEL.split('.')
app_label, model_name = settings.AUTH_USER_MODEL.split(".")
auth_user_model = apps.get_model(app_label, model_name)
except ValueError:
auth_user_model = apps.get_model('auth', 'user')
if sender == LogEntry and isinstance(user, auth_user_model) and instance.actor is None:
auth_user_model = apps.get_model("auth", "user")
if (
sender == LogEntry
and isinstance(user, auth_user_model)
and instance.actor is None
):
instance.actor = user
instance.remote_addr = threadlocal.auditlog['remote_addr']
instance.remote_addr = threadlocal.auditlog["remote_addr"]

View file

@ -7,28 +7,71 @@ class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('contenttypes', '0001_initial'),
("contenttypes", "0001_initial"),
]
operations = [
migrations.CreateModel(
name='LogEntry',
name="LogEntry",
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('object_pk', models.TextField(verbose_name='object pk')),
('object_id', models.PositiveIntegerField(db_index=True, null=True, verbose_name='object id', blank=True)),
('object_repr', models.TextField(verbose_name='object representation')),
('action', models.PositiveSmallIntegerField(verbose_name='action', choices=[(0, 'create'), (1, 'update'), (2, 'delete')])),
('changes', models.TextField(verbose_name='change message', blank=True)),
('timestamp', models.DateTimeField(auto_now_add=True, verbose_name='timestamp')),
('actor', models.ForeignKey(related_name='+', on_delete=django.db.models.deletion.SET_NULL, verbose_name='actor', blank=True, to=settings.AUTH_USER_MODEL, null=True)),
('content_type', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='+', verbose_name='content type', to='contenttypes.ContentType')),
(
"id",
models.AutoField(
verbose_name="ID",
serialize=False,
auto_created=True,
primary_key=True,
),
),
("object_pk", models.TextField(verbose_name="object pk")),
(
"object_id",
models.PositiveIntegerField(
db_index=True, null=True, verbose_name="object id", blank=True
),
),
("object_repr", models.TextField(verbose_name="object representation")),
(
"action",
models.PositiveSmallIntegerField(
verbose_name="action",
choices=[(0, "create"), (1, "update"), (2, "delete")],
),
),
(
"changes",
models.TextField(verbose_name="change message", blank=True),
),
(
"timestamp",
models.DateTimeField(auto_now_add=True, verbose_name="timestamp"),
),
(
"actor",
models.ForeignKey(
related_name="+",
on_delete=django.db.models.deletion.SET_NULL,
verbose_name="actor",
blank=True,
to=settings.AUTH_USER_MODEL,
null=True,
),
),
(
"content_type",
models.ForeignKey(
on_delete=django.db.models.deletion.CASCADE,
related_name="+",
verbose_name="content type",
to="contenttypes.ContentType",
),
),
],
options={
'ordering': ['-timestamp'],
'get_latest_by': 'timestamp',
'verbose_name': 'log entry',
'verbose_name_plural': 'log entries',
"ordering": ["-timestamp"],
"get_latest_by": "timestamp",
"verbose_name": "log entry",
"verbose_name_plural": "log entries",
},
bases=(models.Model,),
),

View file

@ -4,13 +4,15 @@ from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('auditlog', '0001_initial'),
("auditlog", "0001_initial"),
]
operations = [
migrations.AlterField(
model_name='logentry',
name='object_id',
field=models.BigIntegerField(db_index=True, null=True, verbose_name='object id', blank=True),
model_name="logentry",
name="object_id",
field=models.BigIntegerField(
db_index=True, null=True, verbose_name="object id", blank=True
),
),
]

View file

@ -4,13 +4,15 @@ from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('auditlog', '0002_auto_support_long_primary_keys'),
("auditlog", "0002_auto_support_long_primary_keys"),
]
operations = [
migrations.AddField(
model_name='logentry',
name='remote_addr',
field=models.GenericIPAddressField(null=True, verbose_name='remote address', blank=True),
model_name="logentry",
name="remote_addr",
field=models.GenericIPAddressField(
null=True, verbose_name="remote address", blank=True
),
),
]

View file

@ -5,13 +5,13 @@ import jsonfield.fields
class Migration(migrations.Migration):
dependencies = [
('auditlog', '0003_logentry_remote_addr'),
("auditlog", "0003_logentry_remote_addr"),
]
operations = [
migrations.AddField(
model_name='logentry',
name='additional_data',
model_name="logentry",
name="additional_data",
field=jsonfield.fields.JSONField(null=True, blank=True),
),
]

View file

@ -5,13 +5,15 @@ import jsonfield.fields
class Migration(migrations.Migration):
dependencies = [
('auditlog', '0004_logentry_detailed_object_repr'),
("auditlog", "0004_logentry_detailed_object_repr"),
]
operations = [
migrations.AlterField(
model_name='logentry',
name='additional_data',
field=jsonfield.fields.JSONField(null=True, verbose_name='additional data', blank=True),
model_name="logentry",
name="additional_data",
field=jsonfield.fields.JSONField(
null=True, verbose_name="additional data", blank=True
),
),
]

View file

@ -4,13 +4,15 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('auditlog', '0005_logentry_additional_data_verbose_name'),
("auditlog", "0005_logentry_additional_data_verbose_name"),
]
operations = [
migrations.AlterField(
model_name='logentry',
name='object_pk',
field=models.CharField(verbose_name='object pk', max_length=255, db_index=True),
model_name="logentry",
name="object_pk",
field=models.CharField(
verbose_name="object pk", max_length=255, db_index=True
),
),
]

View file

@ -4,13 +4,15 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('auditlog', '0006_object_pk_index'),
("auditlog", "0006_object_pk_index"),
]
operations = [
migrations.AlterField(
model_name='logentry',
name='object_pk',
field=models.CharField(verbose_name='object pk', max_length=255, db_index=True),
model_name="logentry",
name="object_pk",
field=models.CharField(
verbose_name="object pk", max_length=255, db_index=True
),
),
]

View file

@ -12,62 +12,65 @@ MAX = 75
class LogEntryAdminMixin(object):
def created(self, obj):
return obj.timestamp.strftime('%Y-%m-%d %H:%M:%S')
return obj.timestamp.strftime("%Y-%m-%d %H:%M:%S")
created.short_description = 'Created'
created.short_description = "Created"
def user_url(self, obj):
if obj.actor:
app_label, model = settings.AUTH_USER_MODEL.split('.')
viewname = 'admin:%s_%s_change' % (app_label, model.lower())
app_label, model = settings.AUTH_USER_MODEL.split(".")
viewname = "admin:%s_%s_change" % (app_label, model.lower())
try:
link = urlresolvers.reverse(viewname, args=[obj.actor.pk])
except NoReverseMatch:
return u'%s' % (obj.actor)
return format_html(u'<a href="{}">{}</a>', link, obj.actor)
return "%s" % (obj.actor)
return format_html('<a href="{}">{}</a>', link, obj.actor)
return 'system'
return "system"
user_url.short_description = 'User'
user_url.short_description = "User"
def resource_url(self, obj):
app_label, model = obj.content_type.app_label, obj.content_type.model
viewname = 'admin:%s_%s_change' % (app_label, model)
viewname = "admin:%s_%s_change" % (app_label, model)
try:
args = [obj.object_pk] if obj.object_id is None else [obj.object_id]
link = urlresolvers.reverse(viewname, args=args)
except NoReverseMatch:
return obj.object_repr
else:
return format_html(u'<a href="{}">{}</a>', link, obj.object_repr)
return format_html('<a href="{}">{}</a>', link, obj.object_repr)
resource_url.short_description = 'Resource'
resource_url.short_description = "Resource"
def msg_short(self, obj):
if obj.action == LogEntry.Action.DELETE:
return '' # delete
return "" # delete
changes = json.loads(obj.changes)
s = '' if len(changes) == 1 else 's'
fields = ', '.join(changes.keys())
s = "" if len(changes) == 1 else "s"
fields = ", ".join(changes.keys())
if len(fields) > MAX:
i = fields.rfind(' ', 0, MAX)
fields = fields[:i] + ' ..'
return '%d change%s: %s' % (len(changes), s, fields)
i = fields.rfind(" ", 0, MAX)
fields = fields[:i] + " .."
return "%d change%s: %s" % (len(changes), s, fields)
msg_short.short_description = 'Changes'
msg_short.short_description = "Changes"
def msg(self, obj):
if obj.action == LogEntry.Action.DELETE:
return '' # delete
return "" # delete
changes = json.loads(obj.changes)
msg = '<table><tr><th>#</th><th>Field</th><th>From</th><th>To</th></tr>'
msg = "<table><tr><th>#</th><th>Field</th><th>From</th><th>To</th></tr>"
for i, field in enumerate(sorted(changes), 1):
value = [i, field] + (['***', '***'] if field == 'password' else changes[field])
msg += format_html('<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>', *value)
value = [i, field] + (
["***", "***"] if field == "password" else changes[field]
)
msg += format_html(
"<tr><td>{}</td><td>{}</td><td>{}</td><td>{}</td></tr>", *value
)
msg += '</table>'
msg += "</table>"
return mark_safe(msg)
msg.short_description = 'Changes'
msg.short_description = "Changes"

View file

@ -31,33 +31,49 @@ class LogEntryManager(models.Manager):
:return: The new log entry or `None` if there were no changes.
:rtype: LogEntry
"""
changes = kwargs.get('changes', None)
changes = kwargs.get("changes", None)
pk = self._get_pk_value(instance)
if changes is not None:
kwargs.setdefault('content_type', ContentType.objects.get_for_model(instance))
kwargs.setdefault('object_pk', pk)
kwargs.setdefault('object_repr', smart_str(instance))
kwargs.setdefault(
"content_type", ContentType.objects.get_for_model(instance)
)
kwargs.setdefault("object_pk", pk)
kwargs.setdefault("object_repr", smart_str(instance))
if isinstance(pk, int):
kwargs.setdefault('object_id', pk)
kwargs.setdefault("object_id", pk)
get_additional_data = getattr(instance, 'get_additional_data', None)
get_additional_data = getattr(instance, "get_additional_data", None)
if callable(get_additional_data):
kwargs.setdefault('additional_data', get_additional_data())
kwargs.setdefault("additional_data", get_additional_data())
# Delete log entries with the same pk as a newly created model. This should only be necessary when an pk is
# used twice.
if kwargs.get('action', None) is LogEntry.Action.CREATE:
if kwargs.get('object_id', None) is not None and self.filter(content_type=kwargs.get('content_type'),
object_id=kwargs.get(
'object_id')).exists():
self.filter(content_type=kwargs.get('content_type'), object_id=kwargs.get('object_id')).delete()
if kwargs.get("action", None) is LogEntry.Action.CREATE:
if (
kwargs.get("object_id", None) is not None
and self.filter(
content_type=kwargs.get("content_type"),
object_id=kwargs.get("object_id"),
).exists()
):
self.filter(
content_type=kwargs.get("content_type"),
object_id=kwargs.get("object_id"),
).delete()
else:
self.filter(content_type=kwargs.get('content_type'), object_pk=kwargs.get('object_pk', '')).delete()
self.filter(
content_type=kwargs.get("content_type"),
object_pk=kwargs.get("object_pk", ""),
).delete()
# save LogEntry to same database instance is using
db = instance._state.db
return self.create(**kwargs) if db is None or db == '' else self.using(db).create(**kwargs)
return (
self.create(**kwargs)
if db is None or db == ""
else self.using(db).create(**kwargs)
)
return None
def get_for_object(self, instance):
@ -94,15 +110,29 @@ class LogEntryManager(models.Manager):
return self.none()
content_type = ContentType.objects.get_for_model(queryset.model)
primary_keys = list(queryset.values_list(queryset.model._meta.pk.name, flat=True))
primary_keys = list(
queryset.values_list(queryset.model._meta.pk.name, flat=True)
)
if isinstance(primary_keys[0], int):
return self.filter(content_type=content_type).filter(Q(object_id__in=primary_keys)).distinct()
return (
self.filter(content_type=content_type)
.filter(Q(object_id__in=primary_keys))
.distinct()
)
elif isinstance(queryset.model._meta.pk, models.UUIDField):
primary_keys = [smart_str(pk) for pk in primary_keys]
return self.filter(content_type=content_type).filter(Q(object_pk__in=primary_keys)).distinct()
return (
self.filter(content_type=content_type)
.filter(Q(object_pk__in=primary_keys))
.distinct()
)
else:
return self.filter(content_type=content_type).filter(Q(object_pk__in=primary_keys)).distinct()
return (
self.filter(content_type=content_type)
.filter(Q(object_pk__in=primary_keys))
.distinct()
)
def get_for_model(self, model):
"""
@ -158,6 +188,7 @@ class LogEntry(models.Model):
The valid actions are :py:attr:`Action.CREATE`, :py:attr:`Action.UPDATE` and :py:attr:`Action.DELETE`.
"""
CREATE = 0
UPDATE = 1
DELETE = 2
@ -168,24 +199,44 @@ class LogEntry(models.Model):
(DELETE, _("delete")),
)
content_type = models.ForeignKey(to='contenttypes.ContentType', on_delete=models.CASCADE, related_name='+',
verbose_name=_("content type"))
object_pk = models.CharField(db_index=True, max_length=255, verbose_name=_("object pk"))
object_id = models.BigIntegerField(blank=True, db_index=True, null=True, verbose_name=_("object id"))
content_type = models.ForeignKey(
to="contenttypes.ContentType",
on_delete=models.CASCADE,
related_name="+",
verbose_name=_("content type"),
)
object_pk = models.CharField(
db_index=True, max_length=255, verbose_name=_("object pk")
)
object_id = models.BigIntegerField(
blank=True, db_index=True, null=True, verbose_name=_("object id")
)
object_repr = models.TextField(verbose_name=_("object representation"))
action = models.PositiveSmallIntegerField(choices=Action.choices, verbose_name=_("action"))
action = models.PositiveSmallIntegerField(
choices=Action.choices, verbose_name=_("action")
)
changes = models.TextField(blank=True, verbose_name=_("change message"))
actor = models.ForeignKey(to=settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, blank=True, null=True,
related_name='+', verbose_name=_("actor"))
remote_addr = models.GenericIPAddressField(blank=True, null=True, verbose_name=_("remote address"))
actor = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
on_delete=models.SET_NULL,
blank=True,
null=True,
related_name="+",
verbose_name=_("actor"),
)
remote_addr = models.GenericIPAddressField(
blank=True, null=True, verbose_name=_("remote address")
)
timestamp = models.DateTimeField(auto_now_add=True, verbose_name=_("timestamp"))
additional_data = JSONField(blank=True, null=True, verbose_name=_("additional data"))
additional_data = JSONField(
blank=True, null=True, verbose_name=_("additional data")
)
objects = LogEntryManager()
class Meta:
get_latest_by = 'timestamp'
ordering = ['-timestamp']
get_latest_by = "timestamp"
ordering = ["-timestamp"]
verbose_name = _("log entry")
verbose_name_plural = _("log entries")
@ -212,7 +263,7 @@ class LogEntry(models.Model):
return {}
@property
def changes_str(self, colon=': ', arrow=' \u2192 ', separator='; '):
def changes_str(self, colon=": ", arrow=" \u2192 ", separator="; "):
"""
Return the changes recorded in this log entry as a string. The formatting of the string can be customized by
setting alternate values for colon, arrow and separator. If the formatting is still not satisfying, please use
@ -226,7 +277,7 @@ class LogEntry(models.Model):
substrings = []
for field, values in self.changes_dict.items():
substring = '{field_name:s}{colon:s}{old:s}{arrow:s}{new:s}'.format(
substring = "{field_name:s}{colon:s}{old:s}{arrow:s}{new:s}".format(
field_name=field,
colon=colon,
old=values[0],
@ -244,6 +295,7 @@ class LogEntry(models.Model):
"""
# Get the model and model_fields
from auditlog.registry import auditlog
model = self.content_type.model_class()
model_fields = auditlog.get_model_fields(model._meta.model)
changes_display_dict = {}
@ -258,9 +310,14 @@ class LogEntry(models.Model):
values_display = []
# handle choices fields and Postgres ArrayField to get human readable version
choices_dict = None
if getattr(field, 'choices') and len(field.choices) > 0:
if getattr(field, "choices") and len(field.choices) > 0:
choices_dict = dict(field.choices)
if hasattr(field, 'base_field') and isinstance(field.base_field, Field) and getattr(field.base_field, 'choices') and len(field.base_field.choices) > 0:
if (
hasattr(field, "base_field")
and isinstance(field.base_field, Field)
and getattr(field.base_field, "choices")
and len(field.base_field.choices) > 0
):
choices_dict = dict(field.base_field.choices)
if choices_dict:
@ -268,13 +325,17 @@ class LogEntry(models.Model):
try:
value = ast.literal_eval(value)
if type(value) is [].__class__:
values_display.append(', '.join([choices_dict.get(val, 'None') for val in value]))
values_display.append(
", ".join(
[choices_dict.get(val, "None") for val in value]
)
)
else:
values_display.append(choices_dict.get(value, 'None'))
values_display.append(choices_dict.get(value, "None"))
except ValueError:
values_display.append(choices_dict.get(value, 'None'))
values_display.append(choices_dict.get(value, "None"))
except:
values_display.append(choices_dict.get(value, 'None'))
values_display.append(choices_dict.get(value, "None"))
else:
try:
field_type = field.get_internal_type()
@ -301,7 +362,9 @@ class LogEntry(models.Model):
value = "{}...".format(value[:140])
values_display.append(value)
verbose_name = model_fields['mapping_fields'].get(field.name, getattr(field, 'verbose_name', field.name))
verbose_name = model_fields["mapping_fields"].get(
field.name, getattr(field, "verbose_name", field.name)
)
changes_display_dict[verbose_name] = values_display
return changes_display_dict
@ -327,14 +390,14 @@ class AuditlogHistoryField(GenericRelation):
"""
def __init__(self, pk_indexable=True, delete_related=True, **kwargs):
kwargs['to'] = LogEntry
kwargs["to"] = LogEntry
if pk_indexable:
kwargs['object_id_field'] = 'object_id'
kwargs["object_id_field"] = "object_id"
else:
kwargs['object_id_field'] = 'object_pk'
kwargs["object_id_field"] = "object_pk"
kwargs['content_type_field'] = 'content_type'
kwargs["content_type_field"] = "content_type"
self.delete_related = delete_related
super(AuditlogHistoryField, self).__init__(**kwargs)
@ -356,6 +419,8 @@ try:
from south.modelsinspector import add_introspection_rules
add_introspection_rules([], ["^auditlog\.models\.AuditlogHistoryField"])
raise DeprecationWarning("South support will be dropped in django-auditlog 0.4.0 or later.")
raise DeprecationWarning(
"South support will be dropped in django-auditlog 0.4.0 or later."
)
except ImportError:
pass

View file

@ -12,8 +12,13 @@ class AuditlogModelRegistry(object):
A registry that keeps track of the models that use Auditlog to track changes.
"""
def __init__(self, create: bool = True, update: bool = True, delete: bool = True,
custom: Optional[Dict[ModelSignal, Callable]] = None):
def __init__(
self,
create: bool = True,
update: bool = True,
delete: bool = True,
custom: Optional[Dict[ModelSignal, Callable]] = None,
):
from auditlog.receivers import log_create, log_update, log_delete
self._registry = {}
@ -29,8 +34,13 @@ class AuditlogModelRegistry(object):
if custom is not None:
self._signals.update(custom)
def register(self, model: ModelBase = None, include_fields: Optional[List[str]] = None,
exclude_fields: Optional[List[str]] = None, mapping_fields: Optional[Dict[str, str]] = None):
def register(
self,
model: ModelBase = None,
include_fields: Optional[List[str]] = None,
exclude_fields: Optional[List[str]] = None,
mapping_fields: Optional[Dict[str, str]] = None,
):
"""
Register a model with auditlog. Auditlog will then track mutations on this model's instances.
@ -54,9 +64,9 @@ class AuditlogModelRegistry(object):
raise TypeError("Supplied model is not a valid model.")
self._registry[cls] = {
'include_fields': include_fields,
'exclude_fields': exclude_fields,
'mapping_fields': mapping_fields,
"include_fields": include_fields,
"exclude_fields": exclude_fields,
"mapping_fields": mapping_fields,
}
self._connect_signals(cls)
@ -101,9 +111,9 @@ class AuditlogModelRegistry(object):
def get_model_fields(self, model: ModelBase):
return {
'include_fields': list(self._registry[model]['include_fields']),
'exclude_fields': list(self._registry[model]['exclude_fields']),
'mapping_fields': dict(self._registry[model]['mapping_fields']),
"include_fields": list(self._registry[model]["include_fields"]),
"exclude_fields": list(self._registry[model]["exclude_fields"]),
"mapping_fields": dict(self._registry[model]["mapping_fields"]),
}
def _connect_signals(self, model):
@ -112,14 +122,18 @@ class AuditlogModelRegistry(object):
"""
for signal in self._signals:
receiver = self._signals[signal]
signal.connect(receiver, sender=model, dispatch_uid=self._dispatch_uid(signal, model))
signal.connect(
receiver, sender=model, dispatch_uid=self._dispatch_uid(signal, model)
)
def _disconnect_signals(self, model):
"""
Disconnect signals for the model.
"""
for signal, receiver in self._signals.items():
signal.disconnect(sender=model, dispatch_uid=self._dispatch_uid(signal, model))
signal.disconnect(
sender=model, dispatch_uid=self._dispatch_uid(signal, model)
)
def _dispatch_uid(self, signal, model) -> DispatchUID:
"""

View file

@ -1 +1 @@
default_app_config = 'auditlog_tests.apps.AuditlogTestConfig'
default_app_config = "auditlog_tests.apps.AuditlogTestConfig"

View file

@ -2,4 +2,4 @@ from django.apps import AppConfig
class AuditlogTestConfig(AppConfig):
name = 'auditlog_tests'
name = "auditlog_tests"

View file

@ -64,7 +64,7 @@ class RelatedModel(models.Model):
A model with a foreign key.
"""
related = models.ForeignKey(to='self', on_delete=models.CASCADE)
related = models.ForeignKey(to="self", on_delete=models.CASCADE)
history = AuditlogHistoryField()
@ -74,12 +74,12 @@ class ManyRelatedModel(models.Model):
A model with a many to many relation.
"""
related = models.ManyToManyField('self')
related = models.ManyToManyField("self")
history = AuditlogHistoryField()
@auditlog.register(include_fields=['label'])
@auditlog.register(include_fields=["label"])
class SimpleIncludeModel(models.Model):
"""
A simple model used for register's include_fields kwarg
@ -108,7 +108,7 @@ class SimpleMappingModel(models.Model):
"""
sku = models.CharField(max_length=100)
vtxt = models.CharField(verbose_name='Version', max_length=100)
vtxt = models.CharField(verbose_name="Version", max_length=100)
not_mapped = models.CharField(max_length=100)
history = AuditlogHistoryField()
@ -133,8 +133,8 @@ class AdditionalDataIncludedModel(models.Model):
manager and added to each logentry instance on creation.
"""
object_details = {
'related_model_id': self.related.id,
'related_model_text': self.related.text
"related_model_id": self.related.id,
"related_model_text": self.related.text,
}
return object_details
@ -144,6 +144,7 @@ class DateTimeFieldModel(models.Model):
A model with a DateTimeField, used to test DateTimeField
changes are detected properly.
"""
label = models.CharField(max_length=100)
timestamp = models.DateTimeField()
date = models.DateField()
@ -158,14 +159,15 @@ class ChoicesFieldModel(models.Model):
A model with a CharField restricted to a set of choices.
This model is used to test the changes_display_dict method.
"""
RED = 'r'
YELLOW = 'y'
GREEN = 'g'
RED = "r"
YELLOW = "y"
GREEN = "g"
STATUS_CHOICES = (
(RED, 'Red'),
(YELLOW, 'Yellow'),
(GREEN, 'Green'),
(RED, "Red"),
(YELLOW, "Yellow"),
(GREEN, "Green"),
)
status = models.CharField(max_length=1, choices=STATUS_CHOICES)
@ -191,17 +193,20 @@ class PostgresArrayFieldModel(models.Model):
"""
Test auditlog with Postgres's ArrayField
"""
RED = 'r'
YELLOW = 'y'
GREEN = 'g'
RED = "r"
YELLOW = "y"
GREEN = "g"
STATUS_CHOICES = (
(RED, 'Red'),
(YELLOW, 'Yellow'),
(GREEN, 'Green'),
(RED, "Red"),
(YELLOW, "Yellow"),
(GREEN, "Green"),
)
arrayfield = ArrayField(models.CharField(max_length=1, choices=STATUS_CHOICES), size=3)
arrayfield = ArrayField(
models.CharField(max_length=1, choices=STATUS_CHOICES), size=3
)
history = AuditlogHistoryField()
@ -218,8 +223,8 @@ auditlog.register(ProxyModel)
auditlog.register(RelatedModel)
auditlog.register(ManyRelatedModel)
auditlog.register(ManyRelatedModel.related.through)
auditlog.register(SimpleExcludeModel, exclude_fields=['text'])
auditlog.register(SimpleMappingModel, mapping_fields={'sku': 'Product No.'})
auditlog.register(SimpleExcludeModel, exclude_fields=["text"])
auditlog.register(SimpleMappingModel, mapping_fields={"sku": "Product No."})
auditlog.register(AdditionalDataIncludedModel)
auditlog.register(DateTimeFieldModel)
auditlog.register(ChoicesFieldModel)

View file

@ -5,55 +5,55 @@ import os
DEBUG = True
SECRET_KEY = 'test'
SECRET_KEY = "test"
INSTALLED_APPS = [
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.messages',
'django.contrib.sessions',
'django.contrib.admin',
'django.contrib.staticfiles',
'auditlog',
'auditlog_tests',
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.messages",
"django.contrib.sessions",
"django.contrib.admin",
"django.contrib.staticfiles",
"auditlog",
"auditlog_tests",
]
MIDDLEWARE = (
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'auditlog.middleware.AuditlogMiddleware',
"django.middleware.common.CommonMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"auditlog.middleware.AuditlogMiddleware",
)
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql',
'NAME': os.getenv('TEST_DB_NAME', 'auditlog_tests_db'),
'USER': os.getenv('TEST_DB_USER', 'postgres'),
'PASSWORD': os.getenv('TEST_DB_PASS', ''),
'HOST': os.getenv('TEST_DB_HOST', '127.0.0.1'),
'PORT': os.getenv('TEST_DB_PORT', '5432'),
"default": {
"ENGINE": "django.db.backends.postgresql",
"NAME": os.getenv("TEST_DB_NAME", "auditlog_tests_db"),
"USER": os.getenv("TEST_DB_USER", "postgres"),
"PASSWORD": os.getenv("TEST_DB_PASS", ""),
"HOST": os.getenv("TEST_DB_HOST", "127.0.0.1"),
"PORT": os.getenv("TEST_DB_PORT", "5432"),
}
}
TEMPLATES = [
{
'APP_DIRS': True,
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'OPTIONS': {
'context_processors': [
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
"APP_DIRS": True,
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"OPTIONS": {
"context_processors": [
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
]
},
},
]
STATIC_URL = '/static/'
STATIC_URL = "/static/"
ROOT_URLCONF = 'auditlog_tests.urls'
ROOT_URLCONF = "auditlog_tests.urls"
USE_TZ = True

View file

@ -14,15 +14,28 @@ from dateutil.tz import gettz
from auditlog.middleware import AuditlogMiddleware
from auditlog.models import LogEntry
from auditlog.registry import auditlog
from auditlog_tests.models import SimpleModel, AltPrimaryKeyModel, UUIDPrimaryKeyModel, \
ProxyModel, SimpleIncludeModel, SimpleExcludeModel, SimpleMappingModel, RelatedModel, \
ManyRelatedModel, AdditionalDataIncludedModel, DateTimeFieldModel, ChoicesFieldModel, \
CharfieldTextfieldModel, PostgresArrayFieldModel, NoDeleteHistoryModel
from auditlog_tests.models import (
SimpleModel,
AltPrimaryKeyModel,
UUIDPrimaryKeyModel,
ProxyModel,
SimpleIncludeModel,
SimpleExcludeModel,
SimpleMappingModel,
RelatedModel,
ManyRelatedModel,
AdditionalDataIncludedModel,
DateTimeFieldModel,
ChoicesFieldModel,
CharfieldTextfieldModel,
PostgresArrayFieldModel,
NoDeleteHistoryModel,
)
class SimpleModelTest(TestCase):
def setUp(self):
self.obj = SimpleModel.objects.create(text='I am not difficult.')
self.obj = SimpleModel.objects.create(text="I am not difficult.")
def test_create(self):
"""Creation is logged correctly."""
@ -37,8 +50,12 @@ class SimpleModelTest(TestCase):
except obj.history.DoesNotExist:
self.assertTrue(False, "Log entry exists")
else:
self.assertEqual(history.action, LogEntry.Action.CREATE, msg="Action is 'CREATE'")
self.assertEqual(history.object_repr, str(obj), msg="Representation is equal")
self.assertEqual(
history.action, LogEntry.Action.CREATE, msg="Action is 'CREATE'"
)
self.assertEqual(
history.object_repr, str(obj), msg="Representation is equal"
)
def test_update(self):
"""Updates are logged correctly."""
@ -50,11 +67,18 @@ class SimpleModelTest(TestCase):
obj.save()
# Check for log entries
self.assertTrue(obj.history.filter(action=LogEntry.Action.UPDATE).count() == 1, msg="There is one log entry for 'UPDATE'")
self.assertTrue(
obj.history.filter(action=LogEntry.Action.UPDATE).count() == 1,
msg="There is one log entry for 'UPDATE'",
)
history = obj.history.get(action=LogEntry.Action.UPDATE)
self.assertJSONEqual(history.changes, '{"boolean": ["False", "True"]}', msg="The change is correctly logged")
self.assertJSONEqual(
history.changes,
'{"boolean": ["False", "True"]}',
msg="The change is correctly logged",
)
def test_delete(self):
"""Deletion is logged correctly."""
@ -67,7 +91,15 @@ class SimpleModelTest(TestCase):
obj.delete()
# Check for log entries
self.assertTrue(LogEntry.objects.filter(content_type=history.content_type, object_pk=history.object_pk, action=LogEntry.Action.DELETE).count() == 1, msg="There is one log entry for 'DELETE'")
self.assertTrue(
LogEntry.objects.filter(
content_type=history.content_type,
object_pk=history.object_pk,
action=LogEntry.Action.DELETE,
).count()
== 1,
msg="There is one log entry for 'DELETE'",
)
def test_recreate(self):
SimpleModel.objects.all().delete()
@ -77,12 +109,14 @@ class SimpleModelTest(TestCase):
class AltPrimaryKeyModelTest(SimpleModelTest):
def setUp(self):
self.obj = AltPrimaryKeyModel.objects.create(key=str(datetime.datetime.now()), text='I am strange.')
self.obj = AltPrimaryKeyModel.objects.create(
key=str(datetime.datetime.now()), text="I am strange."
)
class UUIDPrimaryKeyModelModelTest(SimpleModelTest):
def setUp(self):
self.obj = UUIDPrimaryKeyModel.objects.create(text='I am strange.')
self.obj = UUIDPrimaryKeyModel.objects.create(text="I am strange.")
def test_get_for_object(self):
self.obj.boolean = True
@ -94,41 +128,54 @@ class UUIDPrimaryKeyModelModelTest(SimpleModelTest):
self.obj.boolean = True
self.obj.save()
self.assertEqual(LogEntry.objects.get_for_objects(UUIDPrimaryKeyModel.objects.all()).count(), 2)
self.assertEqual(
LogEntry.objects.get_for_objects(UUIDPrimaryKeyModel.objects.all()).count(),
2,
)
class ProxyModelTest(SimpleModelTest):
def setUp(self):
self.obj = ProxyModel.objects.create(text='I am not what you think.')
self.obj = ProxyModel.objects.create(text="I am not what you think.")
class ManyRelatedModelTest(TestCase):
"""
Test the behaviour of a many-to-many relationship.
"""
def setUp(self):
self.obj = ManyRelatedModel.objects.create()
self.rel_obj = ManyRelatedModel.objects.create()
self.obj.related.add(self.rel_obj)
def test_related(self):
self.assertEqual(LogEntry.objects.get_for_objects(self.obj.related.all()).count(), self.rel_obj.history.count())
self.assertEqual(LogEntry.objects.get_for_objects(self.obj.related.all()).first(), self.rel_obj.history.first())
self.assertEqual(
LogEntry.objects.get_for_objects(self.obj.related.all()).count(),
self.rel_obj.history.count(),
)
self.assertEqual(
LogEntry.objects.get_for_objects(self.obj.related.all()).first(),
self.rel_obj.history.first(),
)
class MiddlewareTest(TestCase):
"""
Test the middleware responsible for connecting and disconnecting the signals used in automatic logging.
"""
def setUp(self):
self.middleware = AuditlogMiddleware()
self.factory = RequestFactory()
self.user = User.objects.create_user(username='test', email='test@example.com', password='top_secret')
self.user = User.objects.create_user(
username="test", email="test@example.com", password="top_secret"
)
def test_request_anonymous(self):
"""No actor will be logged when a user is not logged in."""
# Create a request
request = self.factory.get('/')
request = self.factory.get("/")
request.user = AnonymousUser()
# Run middleware
@ -143,7 +190,7 @@ class MiddlewareTest(TestCase):
def test_request(self):
"""The actor will be logged when a user is logged in."""
# Create a request
request = self.factory.get('/')
request = self.factory.get("/")
request.user = self.user
# Run middleware
self.middleware.process_request(request)
@ -157,12 +204,14 @@ class MiddlewareTest(TestCase):
def test_response(self):
"""The signal will be disconnected when the request is processed."""
# Create a request
request = self.factory.get('/')
request = self.factory.get("/")
request.user = self.user
# Run middleware
self.middleware.process_request(request)
self.assertTrue(pre_save.has_listeners(LogEntry)) # The signal should be present before trying to disconnect it.
self.assertTrue(
pre_save.has_listeners(LogEntry)
) # The signal should be present before trying to disconnect it.
self.middleware.process_response(request, HttpResponse())
# Validate result
@ -171,12 +220,14 @@ class MiddlewareTest(TestCase):
def test_exception(self):
"""The signal will be disconnected when an exception is raised."""
# Create a request
request = self.factory.get('/')
request = self.factory.get("/")
request.user = self.user
# Run middleware
self.middleware.process_request(request)
self.assertTrue(pre_save.has_listeners(LogEntry)) # The signal should be present before trying to disconnect it.
self.assertTrue(
pre_save.has_listeners(LogEntry)
) # The signal should be present before trying to disconnect it.
self.middleware.process_exception(request, ValidationError("Test"))
# Validate result
@ -187,17 +238,17 @@ class SimpeIncludeModelTest(TestCase):
"""Log only changes in include_fields"""
def test_register_include_fields(self):
sim = SimpleIncludeModel(label='Include model', text='Looong text')
sim = SimpleIncludeModel(label="Include model", text="Looong text")
sim.save()
self.assertTrue(sim.history.count() == 1, msg="There is one log entry")
# Change label, record
sim.label = 'Changed label'
sim.label = "Changed label"
sim.save()
self.assertTrue(sim.history.count() == 2, msg="There are two log entries")
# Change text, ignore
sim.text = 'Short text'
sim.text = "Short text"
sim.save()
self.assertTrue(sim.history.count() == 2, msg="There are two log entries")
@ -206,17 +257,17 @@ class SimpeExcludeModelTest(TestCase):
"""Log only changes that are not in exclude_fields"""
def test_register_exclude_fields(self):
sem = SimpleExcludeModel(label='Exclude model', text='Looong text')
sem = SimpleExcludeModel(label="Exclude model", text="Looong text")
sem.save()
self.assertTrue(sem.history.count() == 1, msg="There is one log entry")
# Change label, ignore
sem.label = 'Changed label'
sem.label = "Changed label"
sem.save()
self.assertTrue(sem.history.count() == 2, msg="There are two log entries")
# Change text, record
sem.text = 'Short text'
sem.text = "Short text"
sem.save()
self.assertTrue(sem.history.count() == 2, msg="There are two log entries")
@ -225,38 +276,58 @@ class SimpleMappingModelTest(TestCase):
"""Diff displays fields as mapped field names where available through mapping_fields"""
def test_register_mapping_fields(self):
smm = SimpleMappingModel(sku='ASD301301A6', vtxt='2.1.5', not_mapped='Not mapped')
smm = SimpleMappingModel(
sku="ASD301301A6", vtxt="2.1.5", not_mapped="Not mapped"
)
smm.save()
self.assertTrue(smm.history.latest().changes_dict['sku'][1] == 'ASD301301A6',
msg="The diff function retains 'sku' and can be retrieved.")
self.assertTrue(smm.history.latest().changes_dict['not_mapped'][1] == 'Not mapped',
msg="The diff function does not map 'not_mapped' and can be retrieved.")
self.assertTrue(smm.history.latest().changes_display_dict['Product No.'][1] == 'ASD301301A6',
msg="The diff function maps 'sku' as 'Product No.' and can be retrieved.")
self.assertTrue(smm.history.latest().changes_display_dict['Version'][1] == '2.1.5',
msg=("The diff function maps 'vtxt' as 'Version' through verbose_name"
" setting on the model field and can be retrieved."))
self.assertTrue(smm.history.latest().changes_display_dict['not mapped'][1] == 'Not mapped',
msg=("The diff function uses the django default verbose name for 'not_mapped'"
" and can be retrieved."))
self.assertTrue(
smm.history.latest().changes_dict["sku"][1] == "ASD301301A6",
msg="The diff function retains 'sku' and can be retrieved.",
)
self.assertTrue(
smm.history.latest().changes_dict["not_mapped"][1] == "Not mapped",
msg="The diff function does not map 'not_mapped' and can be retrieved.",
)
self.assertTrue(
smm.history.latest().changes_display_dict["Product No."][1]
== "ASD301301A6",
msg="The diff function maps 'sku' as 'Product No.' and can be retrieved.",
)
self.assertTrue(
smm.history.latest().changes_display_dict["Version"][1] == "2.1.5",
msg=(
"The diff function maps 'vtxt' as 'Version' through verbose_name"
" setting on the model field and can be retrieved."
),
)
self.assertTrue(
smm.history.latest().changes_display_dict["not mapped"][1] == "Not mapped",
msg=(
"The diff function uses the django default verbose name for 'not_mapped'"
" and can be retrieved."
),
)
class AdditionalDataModelTest(TestCase):
"""Log additional data if get_additional_data is defined in the model"""
def test_model_without_additional_data(self):
obj_wo_additional_data = SimpleModel.objects.create(text='No additional '
'data')
obj_wo_additional_data = SimpleModel.objects.create(
text="No additional " "data"
)
obj_log_entry = obj_wo_additional_data.history.get()
self.assertIsNone(obj_log_entry.additional_data)
def test_model_with_additional_data(self):
related_model = SimpleModel.objects.create(text='Log my reference')
related_model = SimpleModel.objects.create(text="Log my reference")
obj_with_additional_data = AdditionalDataIncludedModel(
label='Additional data to log entries', related=related_model)
label="Additional data to log entries", related=related_model
)
obj_with_additional_data.save()
self.assertTrue(obj_with_additional_data.history.count() == 1,
msg="There is 1 log entry")
self.assertTrue(
obj_with_additional_data.history.count() == 1, msg="There is 1 log entry"
)
log_entry = obj_with_additional_data.history.get()
# FIXME: Work-around for the fact that additional_data isn't working
# on Django 3.1 correctly (see https://github.com/jazzband/django-auditlog/issues/266)
@ -265,10 +336,14 @@ class AdditionalDataModelTest(TestCase):
else:
extra_data = log_entry.additional_data
self.assertIsNotNone(extra_data)
self.assertTrue(extra_data['related_model_text'] == related_model.text,
msg="Related model's text is logged")
self.assertTrue(extra_data['related_model_id'] == related_model.id,
msg="Related model's id is logged")
self.assertTrue(
extra_data["related_model_text"] == related_model.text,
msg="Related model's text is logged",
)
self.assertTrue(
extra_data["related_model_id"] == related_model.id,
msg="Related model's id is logged",
)
class DateTimeFieldModelTest(TestCase):
@ -281,7 +356,13 @@ class DateTimeFieldModelTest(TestCase):
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
date = datetime.date(2017, 1, 10)
time = datetime.time(12, 0)
dtm = DateTimeFieldModel(label='DateTimeField model', timestamp=timestamp, date=date, time=time, naive_dt=self.now)
dtm = DateTimeFieldModel(
label="DateTimeField model",
timestamp=timestamp,
date=date,
time=time,
naive_dt=self.now,
)
dtm.save()
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
@ -299,7 +380,13 @@ class DateTimeFieldModelTest(TestCase):
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
date = datetime.date(2017, 1, 10)
time = datetime.time(12, 0)
dtm = DateTimeFieldModel(label='DateTimeField model', timestamp=timestamp, date=date, time=time, naive_dt=self.now)
dtm = DateTimeFieldModel(
label="DateTimeField model",
timestamp=timestamp,
date=date,
time=time,
naive_dt=self.now,
)
dtm.save()
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
@ -315,7 +402,13 @@ class DateTimeFieldModelTest(TestCase):
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
date = datetime.date(2017, 1, 10)
time = datetime.time(12, 0)
dtm = DateTimeFieldModel(label='DateTimeField model', timestamp=timestamp, date=date, time=time, naive_dt=self.now)
dtm = DateTimeFieldModel(
label="DateTimeField model",
timestamp=timestamp,
date=date,
time=time,
naive_dt=self.now,
)
dtm.save()
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
@ -331,7 +424,13 @@ class DateTimeFieldModelTest(TestCase):
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
date = datetime.date(2017, 1, 10)
time = datetime.time(12, 0)
dtm = DateTimeFieldModel(label='DateTimeField model', timestamp=timestamp, date=date, time=time, naive_dt=self.now)
dtm = DateTimeFieldModel(
label="DateTimeField model",
timestamp=timestamp,
date=date,
time=time,
naive_dt=self.now,
)
dtm.save()
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
@ -347,7 +446,13 @@ class DateTimeFieldModelTest(TestCase):
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
date = datetime.date(2017, 1, 10)
time = datetime.time(12, 0)
dtm = DateTimeFieldModel(label='DateTimeField model', timestamp=timestamp, date=date, time=time, naive_dt=self.now)
dtm = DateTimeFieldModel(
label="DateTimeField model",
timestamp=timestamp,
date=date,
time=time,
naive_dt=self.now,
)
dtm.save()
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
@ -363,7 +468,13 @@ class DateTimeFieldModelTest(TestCase):
timestamp = datetime.datetime(2017, 1, 10, 12, 0, tzinfo=timezone.utc)
date = datetime.date(2017, 1, 10)
time = datetime.time(12, 0)
dtm = DateTimeFieldModel(label='DateTimeField model', timestamp=timestamp, date=date, time=time, naive_dt=self.now)
dtm = DateTimeFieldModel(
label="DateTimeField model",
timestamp=timestamp,
date=date,
time=time,
naive_dt=self.now,
)
dtm.save()
self.assertTrue(dtm.history.count() == 1, msg="There is one log entry")
@ -379,85 +490,144 @@ class DateTimeFieldModelTest(TestCase):
timestamp = datetime.datetime(2017, 1, 10, 15, 0, tzinfo=timezone.utc)
date = datetime.date(2017, 1, 10)
time = datetime.time(12, 0)
dtm = DateTimeFieldModel(label='DateTimeField model', timestamp=timestamp, date=date, time=time, naive_dt=self.now)
dtm = DateTimeFieldModel(
label="DateTimeField model",
timestamp=timestamp,
date=date,
time=time,
naive_dt=self.now,
)
dtm.save()
localized_timestamp = timestamp.astimezone(gettz(settings.TIME_ZONE))
self.assertTrue(dtm.history.latest().changes_display_dict["timestamp"][1] == \
dateformat.format(localized_timestamp, settings.DATETIME_FORMAT),
msg=("The datetime should be formatted according to Django's settings for"
" DATETIME_FORMAT"))
self.assertTrue(
dtm.history.latest().changes_display_dict["timestamp"][1]
== dateformat.format(localized_timestamp, settings.DATETIME_FORMAT),
msg=(
"The datetime should be formatted according to Django's settings for"
" DATETIME_FORMAT"
),
)
timestamp = timezone.now()
dtm.timestamp = timestamp
dtm.save()
localized_timestamp = timestamp.astimezone(gettz(settings.TIME_ZONE))
self.assertTrue(dtm.history.latest().changes_display_dict["timestamp"][1] == \
dateformat.format(localized_timestamp, settings.DATETIME_FORMAT),
msg=("The datetime should be formatted according to Django's settings for"
" DATETIME_FORMAT"))
self.assertTrue(
dtm.history.latest().changes_display_dict["timestamp"][1]
== dateformat.format(localized_timestamp, settings.DATETIME_FORMAT),
msg=(
"The datetime should be formatted according to Django's settings for"
" DATETIME_FORMAT"
),
)
# Change USE_L10N = True
with self.settings(USE_L10N=True, LANGUAGE_CODE='en-GB'):
self.assertTrue(dtm.history.latest().changes_display_dict["timestamp"][1] == \
formats.localize(localized_timestamp),
msg=("The datetime should be formatted according to Django's settings for"
" USE_L10N is True with a different LANGUAGE_CODE."))
with self.settings(USE_L10N=True, LANGUAGE_CODE="en-GB"):
self.assertTrue(
dtm.history.latest().changes_display_dict["timestamp"][1]
== formats.localize(localized_timestamp),
msg=(
"The datetime should be formatted according to Django's settings for"
" USE_L10N is True with a different LANGUAGE_CODE."
),
)
def test_changes_display_dict_date(self):
timestamp = datetime.datetime(2017, 1, 10, 15, 0, tzinfo=timezone.utc)
date = datetime.date(2017, 1, 10)
time = datetime.time(12, 0)
dtm = DateTimeFieldModel(label='DateTimeField model', timestamp=timestamp, date=date, time=time, naive_dt=self.now)
dtm = DateTimeFieldModel(
label="DateTimeField model",
timestamp=timestamp,
date=date,
time=time,
naive_dt=self.now,
)
dtm.save()
self.assertTrue(dtm.history.latest().changes_display_dict["date"][1] == \
dateformat.format(date, settings.DATE_FORMAT),
msg=("The date should be formatted according to Django's settings for"
" DATE_FORMAT unless USE_L10N is True."))
self.assertTrue(
dtm.history.latest().changes_display_dict["date"][1]
== dateformat.format(date, settings.DATE_FORMAT),
msg=(
"The date should be formatted according to Django's settings for"
" DATE_FORMAT unless USE_L10N is True."
),
)
date = datetime.date(2017, 1, 11)
dtm.date = date
dtm.save()
self.assertTrue(dtm.history.latest().changes_display_dict["date"][1] == \
dateformat.format(date, settings.DATE_FORMAT),
msg=("The date should be formatted according to Django's settings for"
" DATE_FORMAT unless USE_L10N is True."))
self.assertTrue(
dtm.history.latest().changes_display_dict["date"][1]
== dateformat.format(date, settings.DATE_FORMAT),
msg=(
"The date should be formatted according to Django's settings for"
" DATE_FORMAT unless USE_L10N is True."
),
)
# Change USE_L10N = True
with self.settings(USE_L10N=True, LANGUAGE_CODE='en-GB'):
self.assertTrue(dtm.history.latest().changes_display_dict["date"][1] == \
formats.localize(date),
msg=("The date should be formatted according to Django's settings for"
" USE_L10N is True with a different LANGUAGE_CODE."))
with self.settings(USE_L10N=True, LANGUAGE_CODE="en-GB"):
self.assertTrue(
dtm.history.latest().changes_display_dict["date"][1]
== formats.localize(date),
msg=(
"The date should be formatted according to Django's settings for"
" USE_L10N is True with a different LANGUAGE_CODE."
),
)
def test_changes_display_dict_time(self):
timestamp = datetime.datetime(2017, 1, 10, 15, 0, tzinfo=timezone.utc)
date = datetime.date(2017, 1, 10)
time = datetime.time(12, 0)
dtm = DateTimeFieldModel(label='DateTimeField model', timestamp=timestamp, date=date, time=time, naive_dt=self.now)
dtm = DateTimeFieldModel(
label="DateTimeField model",
timestamp=timestamp,
date=date,
time=time,
naive_dt=self.now,
)
dtm.save()
self.assertTrue(dtm.history.latest().changes_display_dict["time"][1] == \
dateformat.format(time, settings.TIME_FORMAT),
msg=("The time should be formatted according to Django's settings for"
" TIME_FORMAT unless USE_L10N is True."))
self.assertTrue(
dtm.history.latest().changes_display_dict["time"][1]
== dateformat.format(time, settings.TIME_FORMAT),
msg=(
"The time should be formatted according to Django's settings for"
" TIME_FORMAT unless USE_L10N is True."
),
)
time = datetime.time(6, 0)
dtm.time = time
dtm.save()
self.assertTrue(dtm.history.latest().changes_display_dict["time"][1] == \
dateformat.format(time, settings.TIME_FORMAT),
msg=("The time should be formatted according to Django's settings for"
" TIME_FORMAT unless USE_L10N is True."))
self.assertTrue(
dtm.history.latest().changes_display_dict["time"][1]
== dateformat.format(time, settings.TIME_FORMAT),
msg=(
"The time should be formatted according to Django's settings for"
" TIME_FORMAT unless USE_L10N is True."
),
)
# Change USE_L10N = True
with self.settings(USE_L10N=True, LANGUAGE_CODE='en-GB'):
self.assertTrue(dtm.history.latest().changes_display_dict["time"][1] == \
formats.localize(time),
msg=("The time should be formatted according to Django's settings for"
" USE_L10N is True with a different LANGUAGE_CODE."))
with self.settings(USE_L10N=True, LANGUAGE_CODE="en-GB"):
self.assertTrue(
dtm.history.latest().changes_display_dict["time"][1]
== formats.localize(time),
msg=(
"The time should be formatted according to Django's settings for"
" USE_L10N is True with a different LANGUAGE_CODE."
),
)
def test_update_naive_dt(self):
timestamp = datetime.datetime(2017, 1, 10, 15, 0, tzinfo=timezone.utc)
date = datetime.date(2017, 1, 10)
time = datetime.time(12, 0)
dtm = DateTimeFieldModel(label='DateTimeField model', timestamp=timestamp, date=date, time=time, naive_dt=self.now)
dtm = DateTimeFieldModel(
label="DateTimeField model",
timestamp=timestamp,
date=date,
time=time,
naive_dt=self.now,
)
dtm.save()
# Change with naive field doesnt raise error
@ -468,7 +638,7 @@ class DateTimeFieldModelTest(TestCase):
class UnregisterTest(TestCase):
def setUp(self):
auditlog.unregister(SimpleModel)
self.obj = SimpleModel.objects.create(text='No history')
self.obj = SimpleModel.objects.create(text="No history")
def tearDown(self):
# Re-register for future tests
@ -507,32 +677,45 @@ class UnregisterTest(TestCase):
class ChoicesFieldModelTest(TestCase):
def setUp(self):
self.obj = ChoicesFieldModel.objects.create(
status=ChoicesFieldModel.RED,
multiplechoice=[ChoicesFieldModel.RED, ChoicesFieldModel.YELLOW, ChoicesFieldModel.GREEN],
multiplechoice=[
ChoicesFieldModel.RED,
ChoicesFieldModel.YELLOW,
ChoicesFieldModel.GREEN,
],
)
def test_changes_display_dict_single_choice(self):
self.assertTrue(self.obj.history.latest().changes_display_dict["status"][1] == "Red",
msg="The human readable text 'Red' is displayed.")
self.assertTrue(
self.obj.history.latest().changes_display_dict["status"][1] == "Red",
msg="The human readable text 'Red' is displayed.",
)
self.obj.status = ChoicesFieldModel.GREEN
self.obj.save()
self.assertTrue(self.obj.history.latest().changes_display_dict["status"][1] == "Green", msg="The human readable text 'Green' is displayed.")
self.assertTrue(
self.obj.history.latest().changes_display_dict["status"][1] == "Green",
msg="The human readable text 'Green' is displayed.",
)
def test_changes_display_dict_multiplechoice(self):
self.assertTrue(self.obj.history.latest().changes_display_dict["multiplechoice"][1] == "Red, Yellow, Green",
msg="The human readable text 'Red, Yellow, Green' is displayed.")
self.assertTrue(
self.obj.history.latest().changes_display_dict["multiplechoice"][1]
== "Red, Yellow, Green",
msg="The human readable text 'Red, Yellow, Green' is displayed.",
)
self.obj.multiplechoice = ChoicesFieldModel.RED
self.obj.save()
self.assertTrue(self.obj.history.latest().changes_display_dict["multiplechoice"][1] == "Red",
msg="The human readable text 'Red' is displayed.")
self.assertTrue(
self.obj.history.latest().changes_display_dict["multiplechoice"][1]
== "Red",
msg="The human readable text 'Red' is displayed.",
)
class CharfieldTextfieldModelTest(TestCase):
def setUp(self):
self.PLACEHOLDER_LONGCHAR = "s" * 255
self.PLACEHOLDER_LONGTEXTFIELD = "s" * 1000
@ -542,28 +725,38 @@ class CharfieldTextfieldModelTest(TestCase):
)
def test_changes_display_dict_longchar(self):
self.assertTrue(self.obj.history.latest().changes_display_dict["longchar"][1] == \
"{}...".format(self.PLACEHOLDER_LONGCHAR[:140]),
msg="The string should be truncated at 140 characters with an ellipsis at the end.")
self.assertTrue(
self.obj.history.latest().changes_display_dict["longchar"][1]
== "{}...".format(self.PLACEHOLDER_LONGCHAR[:140]),
msg="The string should be truncated at 140 characters with an ellipsis at the end.",
)
SHORTENED_PLACEHOLDER = self.PLACEHOLDER_LONGCHAR[:139]
self.obj.longchar = SHORTENED_PLACEHOLDER
self.obj.save()
self.assertTrue(self.obj.history.latest().changes_display_dict["longchar"][1] == SHORTENED_PLACEHOLDER,
msg="The field should display the entire string because it is less than 140 characters")
self.assertTrue(
self.obj.history.latest().changes_display_dict["longchar"][1]
== SHORTENED_PLACEHOLDER,
msg="The field should display the entire string because it is less than 140 characters",
)
def test_changes_display_dict_longtextfield(self):
self.assertTrue(self.obj.history.latest().changes_display_dict["longtextfield"][1] == \
"{}...".format(self.PLACEHOLDER_LONGTEXTFIELD[:140]),
msg="The string should be truncated at 140 characters with an ellipsis at the end.")
self.assertTrue(
self.obj.history.latest().changes_display_dict["longtextfield"][1]
== "{}...".format(self.PLACEHOLDER_LONGTEXTFIELD[:140]),
msg="The string should be truncated at 140 characters with an ellipsis at the end.",
)
SHORTENED_PLACEHOLDER = self.PLACEHOLDER_LONGTEXTFIELD[:139]
self.obj.longtextfield = SHORTENED_PLACEHOLDER
self.obj.save()
self.assertTrue(self.obj.history.latest().changes_display_dict["longtextfield"][1] == SHORTENED_PLACEHOLDER,
msg="The field should display the entire string because it is less than 140 characters")
self.assertTrue(
self.obj.history.latest().changes_display_dict["longtextfield"][1]
== SHORTENED_PLACEHOLDER,
msg="The field should display the entire string because it is less than 140 characters",
)
class PostgresArrayFieldModelTest(TestCase):
databases = '__all__'
databases = "__all__"
def setUp(self):
self.obj = PostgresArrayFieldModel.objects.create(
@ -575,20 +768,28 @@ class PostgresArrayFieldModelTest(TestCase):
return self.obj.history.latest().changes_display_dict["arrayfield"][1]
def test_changes_display_dict_arrayfield(self):
self.assertTrue(self.latest_array_change == "Red, Green",
msg="The human readable text for the two choices, 'Red, Green' is displayed.")
self.assertTrue(
self.latest_array_change == "Red, Green",
msg="The human readable text for the two choices, 'Red, Green' is displayed.",
)
self.obj.arrayfield = [PostgresArrayFieldModel.GREEN]
self.obj.save()
self.assertTrue(self.latest_array_change == "Green",
msg="The human readable text 'Green' is displayed.")
self.assertTrue(
self.latest_array_change == "Green",
msg="The human readable text 'Green' is displayed.",
)
self.obj.arrayfield = []
self.obj.save()
self.assertTrue(self.latest_array_change == "",
msg="The human readable text '' is displayed.")
self.assertTrue(
self.latest_array_change == "",
msg="The human readable text '' is displayed.",
)
self.obj.arrayfield = [PostgresArrayFieldModel.GREEN]
self.obj.save()
self.assertTrue(self.latest_array_change == "Green",
msg="The human readable text 'Green' is displayed.")
self.assertTrue(
self.latest_array_change == "Green",
msg="The human readable text 'Green' is displayed.",
)
class AdminPanelTest(TestCase):
@ -602,7 +803,7 @@ class AdminPanelTest(TestCase):
cls.user.is_superuser = True
cls.user.is_active = True
cls.user.save()
cls.obj = SimpleModel.objects.create(text='For admin logentry test')
cls.obj = SimpleModel.objects.create(text="For admin logentry test")
def test_auditlog_admin(self):
self.client.login(username=self.username, password=self.password)
@ -611,7 +812,9 @@ class AdminPanelTest(TestCase):
assert res.status_code == 200
res = self.client.get("/admin/auditlog/logentry/add/")
assert res.status_code == 200
res = self.client.get("/admin/auditlog/logentry/{}/".format(log_pk), follow=True)
res = self.client.get(
"/admin/auditlog/logentry/{}/".format(log_pk), follow=True
)
assert res.status_code == 200
res = self.client.get("/admin/auditlog/logentry/{}/delete/".format(log_pk))
assert res.status_code == 200
@ -628,7 +831,7 @@ class NoDeleteHistoryTest(TestCase):
assert LogEntry.objects.all().count() == 2
instance.delete()
entries = LogEntry.objects.order_by('id')
entries = LogEntry.objects.order_by("id")
# The "DELETE" record is always retained
assert LogEntry.objects.all().count() == 1
@ -642,9 +845,9 @@ class NoDeleteHistoryTest(TestCase):
self.assertEqual(LogEntry.objects.all().count(), 2)
instance.delete()
entries = LogEntry.objects.order_by('id')
entries = LogEntry.objects.order_by("id")
self.assertEqual(entries.count(), 3)
self.assertEqual(
list(entries.values_list('action', flat=True)),
[LogEntry.Action.CREATE, LogEntry.Action.UPDATE, LogEntry.Action.DELETE]
list(entries.values_list("action", flat=True)),
[LogEntry.Action.CREATE, LogEntry.Action.UPDATE, LogEntry.Action.DELETE],
)

2
pyproject.toml Normal file
View file

@ -0,0 +1,2 @@
[tool.black]
target-version = ["py35"]

View file

@ -7,7 +7,7 @@ from django.conf import settings
from django.test.utils import get_runner
if __name__ == "__main__":
os.environ['DJANGO_SETTINGS_MODULE'] = 'auditlog_tests.test_settings'
os.environ["DJANGO_SETTINGS_MODULE"] = "auditlog_tests.test_settings"
django.setup()
TestRunner = get_runner(settings)
test_runner = TestRunner()

View file

@ -7,29 +7,31 @@ with open(os.path.join(os.path.dirname(__file__), "README.md"), "r") as readme_f
long_description = readme_file.read()
setup(
name='django-auditlog',
name="django-auditlog",
use_scm_version={"version_scheme": "post-release"},
setup_requires=["setuptools_scm"],
packages=['auditlog', 'auditlog.migrations', 'auditlog.management', 'auditlog.management.commands'],
url='https://github.com/jazzband/django-auditlog',
license='MIT',
author='Jan-Jelle Kester',
description='Audit log app for Django',
long_description=long_description,
long_description_content_type='text/markdown',
install_requires=[
'django-jsonfield>=1.0.0',
'python-dateutil>=2.6.0'
packages=[
"auditlog",
"auditlog.migrations",
"auditlog.management",
"auditlog.management.commands",
],
url="https://github.com/jazzband/django-auditlog",
license="MIT",
author="Jan-Jelle Kester",
description="Audit log app for Django",
long_description=long_description,
long_description_content_type="text/markdown",
install_requires=["django-jsonfield>=1.0.0", "python-dateutil>=2.6.0"],
zip_safe=False,
classifiers=[
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'License :: OSI Approved :: MIT License',
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"License :: OSI Approved :: MIT License",
],
)

View file

@ -3,6 +3,7 @@ envlist =
{py35,py36,py37,py38,py39}-django-22
{py36,py37,py38,py39}-django-{30,31}
py38-docs
py38-qa
[testenv]
commands =
@ -35,6 +36,13 @@ changedir = docs/source
deps = -rdocs/requirements.txt
commands = sphinx-build -W -b html -d {envtmpdir}/doctrees . {envtmpdir}/html
[testenv:py38-qa]
basepython = python3.8
deps =
black
commands =
black --check --diff auditlog auditlog_tests setup.py runtests.py
[gh-actions]
python =
3.5: py35