update library models and logic

This commit is contained in:
dishantsethi 2024-02-10 15:21:20 +05:30
parent 1b27abe0d2
commit 71bef1b280
11 changed files with 76 additions and 284 deletions

View file

@ -1,3 +1,3 @@
from importlib.metadata import version
__version__ = version("django-auditlog")
__version__ = version("django-auditlog-bklynhlth")

View file

@ -4,34 +4,30 @@ from django.contrib import admin
from django.contrib.auth import get_user_model
from django.utils.translation import gettext_lazy as _
from auditlog.filters import CIDFilter, ResourceTypeFilter
from auditlog.filters import ResourceTypeFilter
from auditlog.mixins import LogEntryAdminMixin
from auditlog.models import LogEntry
@admin.register(LogEntry)
class LogEntryAdmin(admin.ModelAdmin, LogEntryAdminMixin):
list_select_related = ["content_type", "actor"]
list_select_related = ["content_type"]
list_display = [
"created",
"resource_url",
"action",
"msg_short",
"user_url",
"cid_url",
]
search_fields = [
"timestamp",
"object_repr",
"changes",
"actor__first_name",
"actor__last_name",
f"actor__{get_user_model().USERNAME_FIELD}",
]
list_filter = ["action", ResourceTypeFilter, CIDFilter]
list_filter = ["action", ResourceTypeFilter]
readonly_fields = ["created", "resource_url", "action", "user_url", "msg"]
fieldsets = [
(None, {"fields": ["created", "user_url", "resource_url", "cid"]}),
(None, {"fields": ["created", "user_url", "resource_url"]}),
(_("Changes"), {"fields": ["action", "msg"]}),
]

View file

@ -1,71 +0,0 @@
from contextvars import ContextVar
from typing import Optional
from django.conf import settings
from django.http import HttpRequest
from django.utils.module_loading import import_string
correlation_id = ContextVar("auditlog_correlation_id", default=None)
def set_cid(request: Optional[HttpRequest] = None) -> None:
"""
A function to read the cid from a request.
If the header is not in the request, then we set it to `None`.
Note: we look for the value of `AUDITLOG_CID_HEADER` in `request.headers` and in `request.META`.
This function doesn't do anything if the user is supplying their own `AUDITLOG_CID_GETTER`.
:param request: The request to get the cid from.
:return: None
"""
if settings.AUDITLOG_CID_GETTER:
return
cid = None
header = settings.AUDITLOG_CID_HEADER
if header and request:
if header in request.headers:
cid = request.headers.get(header)
elif header in request.META:
cid = request.META.get(header)
# Ideally, this line should be nested inside the if statement.
# However, because the tests do not run requests in multiple threads,
# we have to always set the value of the cid,
# even if the request does not have the header present,
# in which case it will be set to None
correlation_id.set(cid)
def _get_cid() -> Optional[str]:
return correlation_id.get()
def get_cid() -> Optional[str]:
"""
Calls the cid getter function based on `settings.AUDITLOG_CID_GETTER`
If the setting value is:
* None: then it calls the default getter (which retrieves the value set in `set_cid`)
* callable: then it calls the function
* type(str): then it imports the function and then call it
The result is then returned to the caller.
If your custom getter does not depend on `set_header()`,
then we recommend setting `settings.AUDITLOG_CID_GETTER` to `None`.
:return: The correlation ID
"""
method = settings.AUDITLOG_CID_GETTER
if not method:
return _get_cid()
if callable(method):
return method()
return import_string(method)()

View file

@ -12,59 +12,6 @@ auditlog_value = ContextVar("auditlog_value")
auditlog_disabled = ContextVar("auditlog_disabled", default=False)
@contextlib.contextmanager
def set_actor(actor, remote_addr=None):
"""Connect a signal receiver with current user attached."""
# Initialize thread local storage
context_data = {
"signal_duid": ("set_actor", time.time()),
"remote_addr": remote_addr,
}
auditlog_value.set(context_data)
# Connect signal for automatic logging
set_actor = partial(_set_actor, user=actor, signal_duid=context_data["signal_duid"])
pre_save.connect(
set_actor,
sender=LogEntry,
dispatch_uid=context_data["signal_duid"],
weak=False,
)
try:
yield
finally:
try:
auditlog = auditlog_value.get()
except LookupError:
pass
else:
pre_save.disconnect(sender=LogEntry, dispatch_uid=auditlog["signal_duid"])
def _set_actor(user, sender, instance, signal_duid, **kwargs):
"""Signal receiver with extra 'user' and 'signal_duid' kwargs.
This function becomes a valid signal receiver when it is curried with the actor and a dispatch id.
"""
try:
auditlog = auditlog_value.get()
except LookupError:
pass
else:
if signal_duid != auditlog["signal_duid"]:
return
auth_user_model = get_user_model()
if (
sender == LogEntry
and isinstance(user, auth_user_model)
and instance.actor is None
):
instance.actor = user
instance.remote_addr = auditlog["remote_addr"]
@contextlib.contextmanager
def disable_auditlog():
token = auditlog_disabled.set(True)

View file

@ -15,19 +15,3 @@ class ResourceTypeFilter(SimpleListFilter):
if self.value() is None:
return queryset
return queryset.filter(content_type_id=self.value())
class CIDFilter(SimpleListFilter):
title = _("Correlation ID")
parameter_name = "cid"
def lookups(self, request, model_admin):
return []
def has_output(self):
return True
def queryset(self, request, queryset):
if self.value() is None:
return queryset
return queryset.filter(cid=self.value())

View file

@ -1,9 +1,3 @@
from django.contrib.auth import get_user_model
from auditlog.cid import set_cid
from auditlog.context import set_actor
class AuditlogMiddleware:
"""
Middleware to couple the request's user to log items. This is accomplished by currying the
@ -12,36 +6,3 @@ class AuditlogMiddleware:
def __init__(self, get_response=None):
self.get_response = get_response
@staticmethod
def _get_remote_addr(request):
# In case there is no proxy, return the original address
if not request.headers.get("X-Forwarded-For"):
return request.META.get("REMOTE_ADDR")
# In case of proxy, set 'original' address
remote_addr: str = request.headers.get("X-Forwarded-For").split(",")[0]
# Remove port number from remote_addr
if "." in remote_addr and ":" in remote_addr: # IPv4 with port (`x.x.x.x:x`)
remote_addr = remote_addr.split(":")[0]
elif "[" in remote_addr: # IPv6 with port (`[:::]:x`)
remote_addr = remote_addr[1:].split("]")[0]
return remote_addr
@staticmethod
def _get_actor(request):
user = getattr(request, "user", None)
if isinstance(user, get_user_model()) and user.is_authenticated:
return user
return None
def __call__(self, request):
remote_addr = self._get_remote_addr(request)
user = self._get_actor(request)
set_cid(request)
with set_actor(actor=user, remote_addr=remote_addr):
return self.get_response(request)

View file

@ -0,0 +1,67 @@
# Generated by Django 3.2.19 on 2024-02-10 09:34
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("auditlog", "0015_alter_logentry_changes"),
]
operations = [
migrations.RemoveField(
model_name="logentry",
name="actor",
),
migrations.RemoveField(
model_name="logentry",
name="additional_data",
),
migrations.RemoveField(
model_name="logentry",
name="cid",
),
migrations.RemoveField(
model_name="logentry",
name="object_id",
),
migrations.RemoveField(
model_name="logentry",
name="remote_addr",
),
migrations.RemoveField(
model_name="logentry",
name="serialized_data",
),
migrations.AddField(
model_name="logentry",
name="database_name",
field=models.CharField(
default="RDS", max_length=255, verbose_name="database name"
),
),
migrations.AddField(
model_name="logentry",
name="developer_name",
field=models.CharField(
default="willisaplication",
max_length=255,
verbose_name="developer name",
),
),
migrations.AlterField(
model_name="logentry",
name="action",
field=models.PositiveSmallIntegerField(
choices=[
("create", "create"),
("update", "update"),
("delete", "delete"),
("access", "access"),
],
db_index=True,
verbose_name="action",
),
),
]

View file

@ -19,7 +19,6 @@ MAX = 75
class LogEntryAdminMixin:
request: HttpRequest
CID_TITLE = _("Click to filter by records with this correlation id")
@admin.display(description=_("Created"))
def created(self, obj):
@ -29,15 +28,6 @@ class LogEntryAdminMixin:
@admin.display(description=_("User"))
def user_url(self, obj):
if obj.actor:
app_label, model = settings.AUTH_USER_MODEL.split(".")
viewname = f"admin:{app_label}_{model.lower()}_change"
try:
link = urlresolvers.reverse(viewname, args=[obj.actor.pk])
except NoReverseMatch:
return "%s" % (obj.actor)
return format_html('<a href="{}">{}</a>', link, obj.actor)
return "system"
@admin.display(description=_("Resource"))
@ -45,7 +35,7 @@ class LogEntryAdminMixin:
app_label, model = obj.content_type.app_label, obj.content_type.model
viewname = f"admin:{app_label}_{model}_change"
try:
args = [obj.object_pk] if obj.object_id is None else [obj.object_id]
args = [obj.object_pk]
link = urlresolvers.reverse(viewname, args=args)
except NoReverseMatch:
return obj.object_repr
@ -118,15 +108,6 @@ class LogEntryAdminMixin:
return mark_safe("".join(msg))
@admin.display(description="Correlation ID")
def cid_url(self, obj):
cid = obj.cid
if cid:
url = self._add_query_parameter("cid", cid)
return format_html(
'<a href="{}" title="{}">{}</a>', url, self.CID_TITLE, cid
)
def _format_header(self, *labels):
return format_html(
"".join(["<tr>", "<th>{}</th>" * len(labels), "</tr>"]), *labels

View file

@ -46,7 +46,6 @@ class LogEntryManager(models.Manager):
:return: The new log entry or `None` if there were no changes.
:rtype: LogEntry
"""
from auditlog.cid import get_cid
changes = kwargs.get("changes", None)
pk = self._get_pk_value(instance)
@ -61,19 +60,8 @@ class LogEntryManager(models.Manager):
except ObjectDoesNotExist:
object_repr = DEFAULT_OBJECT_REPR
kwargs.setdefault("object_repr", object_repr)
kwargs.setdefault(
"serialized_data", self._get_serialized_data_or_none(instance)
)
if isinstance(pk, int):
kwargs.setdefault("object_id", pk)
get_additional_data = getattr(instance, "get_additional_data", None)
if callable(get_additional_data):
kwargs.setdefault("additional_data", get_additional_data())
# set correlation id
kwargs.setdefault("cid", get_cid())
return self.create(**kwargs)
return None
@ -94,7 +82,6 @@ class LogEntryManager(models.Manager):
:return: The new log entry or `None` if there were no changes.
:rtype: LogEntry
"""
from auditlog.cid import get_cid
pk = self._get_pk_value(instance)
if changed_queryset:
@ -109,13 +96,6 @@ class LogEntryManager(models.Manager):
kwargs.setdefault("object_repr", object_repr)
kwargs.setdefault("action", LogEntry.Action.UPDATE)
if isinstance(pk, int):
kwargs.setdefault("object_id", pk)
get_additional_data = getattr(instance, "get_additional_data", None)
if callable(get_additional_data):
kwargs.setdefault("additional_data", get_additional_data())
objects = [smart_str(instance) for instance in changed_queryset]
kwargs["changes"] = {
field_name: {
@ -125,7 +105,6 @@ class LogEntryManager(models.Manager):
}
}
kwargs.setdefault("cid", get_cid())
return self.create(**kwargs)
return None
@ -147,7 +126,7 @@ class LogEntryManager(models.Manager):
pk = self._get_pk_value(instance)
if isinstance(pk, int):
return self.filter(content_type=content_type, object_id=pk)
return self.filter(content_type=content_type)
else:
return self.filter(content_type=content_type, object_pk=smart_str(pk))
@ -169,11 +148,7 @@ class LogEntryManager(models.Manager):
)
if isinstance(primary_keys[0], int):
return (
self.filter(content_type=content_type)
.filter(Q(object_id__in=primary_keys))
.distinct()
)
return self.filter(content_type=content_type).distinct()
elif isinstance(queryset.model._meta.pk, models.UUIDField):
primary_keys = [smart_str(pk) for pk in primary_keys]
return (
@ -221,32 +196,6 @@ class LogEntryManager(models.Manager):
pk = self._get_pk_value(pk)
return pk
def _get_serialized_data_or_none(self, instance):
from auditlog.registry import auditlog
opts = auditlog.get_serialize_options(instance.__class__)
if not opts["serialize_data"]:
return None
model_fields = auditlog.get_model_fields(instance.__class__)
kwargs = opts.get("serialize_kwargs", {})
if opts["serialize_auditlog_fields_only"]:
kwargs.setdefault(
"fields", self._get_applicable_model_fields(instance, model_fields)
)
instance_copy = self._get_copy_with_python_typed_fields(instance)
data = dict(
json.loads(serializers.serialize("json", (instance_copy,), **kwargs))[0]
)
mask_fields = model_fields["mask_fields"]
if mask_fields:
data = self._mask_serialized_fields(data, mask_fields)
return data
def _get_copy_with_python_typed_fields(self, instance):
"""
Attempt to create copy of instance and coerce types on instance fields
@ -536,11 +485,6 @@ class AuditlogHistoryField(GenericRelation):
def __init__(self, pk_indexable=True, delete_related=False, **kwargs):
kwargs["to"] = LogEntry
if pk_indexable:
kwargs["object_id_field"] = "object_id"
else:
kwargs["object_id_field"] = "object_pk"
kwargs["content_type_field"] = "content_type"
self.delete_related = delete_related
super().__init__(**kwargs)

View file

@ -1,2 +1 @@
def get_cid():
return "my custom get_cid"

View file

@ -100,10 +100,6 @@ class ManyRelatedModel(models.Model):
history = AuditlogHistoryField(delete_related=True)
def get_additional_data(self):
related = self.related.first()
return {"related_model_id": related.id if related else None}
class ManyRelatedOtherModel(models.Model):
"""
@ -172,18 +168,6 @@ class AdditionalDataIncludedModel(models.Model):
history = AuditlogHistoryField(delete_related=True)
def get_additional_data(self):
"""
Returns JSON that captures a snapshot of additional details of the
model instance. This method, if defined, is accessed by auditlog
manager and added to each logentry instance on creation.
"""
object_details = {
"related_model_id": self.related.id,
"related_model_text": self.related.text,
}
return object_details
class DateTimeFieldModel(models.Model):
"""