custom_fields_callbacks

This commit is contained in:
Arjuna Del Toso 2024-10-28 21:52:17 -04:00
parent ceae2f9b4a
commit 4f4b3af7fe
2 changed files with 97 additions and 20 deletions

View file

@ -74,6 +74,7 @@ class LogEntryManager(models.Manager):
# set correlation id
kwargs.setdefault("cid", get_cid())
return self.create(**kwargs)
return None
@ -149,7 +150,9 @@ class LogEntryManager(models.Manager):
if isinstance(pk, int):
return self.filter(content_type=content_type, object_id=pk)
else:
return self.filter(content_type=content_type, object_pk=smart_str(pk))
return self.filter(
content_type=content_type, object_pk=smart_str(pk)
)
def get_for_objects(self, queryset):
"""
@ -236,13 +239,38 @@ class LogEntryManager(models.Manager):
if opts["serialize_auditlog_fields_only"]:
kwargs.setdefault(
"fields", self._get_applicable_model_fields(instance, model_fields)
"fields",
self._get_applicable_model_fields(instance, model_fields),
)
instance_copy = self._get_copy_with_python_typed_fields(instance)
custom_fields_data = {}
if opts["serialize_fields_callbacks"]:
for field_name, callback in opts[
"serialize_fields_callbacks"
].items():
custom_fields_data[field_name] = callback(
instance_copy, field_name
)
if custom_fields_data:
all_fields = [f.name for f in instance._meta.fields]
custom_fields = custom_fields_data.keys()
kwargs["fields"] = [
f
for f in all_fields
if f not in custom_fields_data
and f not in model_fields["exclude_fields"]
]
data = dict(
json.loads(serializers.serialize("json", (instance_copy,), **kwargs))[0]
json.loads(
serializers.serialize("json", (instance_copy,), **kwargs)
)[0]
)
if custom_fields_data:
data["fields"].update(custom_fields_data)
mask_fields = model_fields["mask_fields"]
if mask_fields:
@ -283,7 +311,9 @@ class LogEntryManager(models.Manager):
if not include_fields and not exclude_fields:
return all_field_names
return list(set(include_fields or all_field_names).difference(exclude_fields))
return list(
set(include_fields or all_field_names).difference(exclude_fields)
)
def _mask_serialized_fields(
self, data: Dict[str, Any], mask_fields: List[str]
@ -352,7 +382,9 @@ class LogEntry(models.Model):
action = models.PositiveSmallIntegerField(
choices=Action.choices, verbose_name=_("action"), db_index=True
)
changes_text = models.TextField(blank=True, verbose_name=_("change message"))
changes_text = models.TextField(
blank=True, verbose_name=_("change message")
)
changes = models.JSONField(null=True, verbose_name=_("change message"))
actor = models.ForeignKey(
to=settings.AUTH_USER_MODEL,
@ -471,11 +503,16 @@ class LogEntry(models.Model):
if type(value) is [].__class__:
values_display.append(
", ".join(
[choices_dict.get(val, "None") for val in value]
[
choices_dict.get(val, "None")
for val in value
]
)
)
else:
values_display.append(choices_dict.get(value, "None"))
values_display.append(
choices_dict.get(value, "None")
)
except Exception:
values_display.append(choices_dict.get(value, "None"))
else:
@ -486,7 +523,11 @@ class LogEntry(models.Model):
continue
for value in values:
# handle case where field is a datetime, date, or time type
if field_type in ["DateTimeField", "DateField", "TimeField"]:
if field_type in [
"DateTimeField",
"DateField",
"TimeField",
]:
try:
value = parser.parse(value)
if field_type == "DateField":
@ -495,12 +536,16 @@ class LogEntry(models.Model):
value = value.time()
elif field_type == "DateTimeField":
value = value.replace(tzinfo=timezone.utc)
value = value.astimezone(gettz(settings.TIME_ZONE))
value = value.astimezone(
gettz(settings.TIME_ZONE)
)
value = formats.localize(value)
except ValueError:
pass
elif field_type in ["ForeignKey", "OneToOneField"]:
value = self._get_changes_display_for_fk_field(field, value)
value = self._get_changes_display_for_fk_field(
field, value
)
# check if length is longer than 140 and truncate with ellipsis
if len(value) > 140:

View file

@ -49,7 +49,12 @@ class AuditlogModelRegistry:
m2m: bool = True,
custom: Optional[Dict[ModelSignal, Callable]] = None,
):
from auditlog.receivers import log_access, log_create, log_delete, log_update
from auditlog.receivers import (
log_access,
log_create,
log_delete,
log_update,
)
self._registry = {}
self._signals = {}
@ -79,6 +84,7 @@ class AuditlogModelRegistry:
serialize_data: bool = False,
serialize_kwargs: Optional[Dict[str, Any]] = None,
serialize_auditlog_fields_only: bool = False,
serialize_fields_callbacks: Optional[Dict[str, Callable]] = None,
):
"""
Register a model with auditlog. Auditlog will then track mutations on this model's instances.
@ -106,8 +112,14 @@ class AuditlogModelRegistry:
m2m_fields = set()
if serialize_kwargs is None:
serialize_kwargs = {}
if serialize_fields_callbacks is None:
serialize_fields_callbacks = []
if (serialize_kwargs or serialize_auditlog_fields_only) and not serialize_data:
if (
serialize_kwargs
or serialize_auditlog_fields_only
or serialize_fields_callbacks
) and not serialize_data:
raise AuditLogRegistrationError(
"Serializer options were given but the 'serialize_data' option is not "
"set. Did you forget to set serialized_data to True?"
@ -130,6 +142,7 @@ class AuditlogModelRegistry:
"serialize_data": serialize_data,
"serialize_kwargs": serialize_kwargs,
"serialize_auditlog_fields_only": serialize_auditlog_fields_only,
"serialize_fields_callbacks": serialize_fields_callbacks,
}
self._connect_signals(cls)
@ -187,6 +200,9 @@ class AuditlogModelRegistry:
"serialize_auditlog_fields_only": bool(
self._registry[model]["serialize_auditlog_fields_only"]
),
"serialize_fields_callbacks": dict(
self._registry[model]["serialize_fields_callbacks"]
),
}
def _connect_signals(self, model):
@ -256,7 +272,9 @@ class AuditlogModelRegistry:
]
return exclude_models
def _register_models(self, models: Iterable[Union[str, Dict[str, Any]]]) -> None:
def _register_models(
self, models: Iterable[Union[str, Dict[str, Any]]]
) -> None:
models = copy.deepcopy(models)
for model in models:
if isinstance(model, str):
@ -279,10 +297,16 @@ class AuditlogModelRegistry:
Register models from settings variables
"""
if not isinstance(settings.AUDITLOG_INCLUDE_ALL_MODELS, bool):
raise TypeError("Setting 'AUDITLOG_INCLUDE_ALL_MODELS' must be a boolean")
raise TypeError(
"Setting 'AUDITLOG_INCLUDE_ALL_MODELS' must be a boolean"
)
if not isinstance(settings.AUDITLOG_DISABLE_ON_RAW_SAVE, bool):
raise TypeError("Setting 'AUDITLOG_DISABLE_ON_RAW_SAVE' must be a boolean")
if not isinstance(settings.AUDITLOG_EXCLUDE_TRACKING_MODELS, (list, tuple)):
raise TypeError(
"Setting 'AUDITLOG_DISABLE_ON_RAW_SAVE' must be a boolean"
)
if not isinstance(
settings.AUDITLOG_EXCLUDE_TRACKING_MODELS, (list, tuple)
):
raise TypeError(
"Setting 'AUDITLOG_EXCLUDE_TRACKING_MODELS' must be a list or tuple"
)
@ -305,12 +329,16 @@ class AuditlogModelRegistry:
"setting 'AUDITLOG_INCLUDE_ALL_MODELS' must be set to 'True'"
)
if not isinstance(settings.AUDITLOG_INCLUDE_TRACKING_MODELS, (list, tuple)):
if not isinstance(
settings.AUDITLOG_INCLUDE_TRACKING_MODELS, (list, tuple)
):
raise TypeError(
"Setting 'AUDITLOG_INCLUDE_TRACKING_MODELS' must be a list or tuple"
)
if not isinstance(settings.AUDITLOG_EXCLUDE_TRACKING_FIELDS, (list, tuple)):
if not isinstance(
settings.AUDITLOG_EXCLUDE_TRACKING_FIELDS, (list, tuple)
):
raise TypeError(
"Setting 'AUDITLOG_EXCLUDE_TRACKING_FIELDS' must be a list or tuple"
)
@ -346,7 +374,9 @@ class AuditlogModelRegistry:
continue
m2m_fields = [
m.name for m in meta.get_fields() if isinstance(m, ManyToManyField)
m.name
for m in meta.get_fields()
if isinstance(m, ManyToManyField)
]
exclude_fields = [
@ -356,7 +386,9 @@ class AuditlogModelRegistry:
]
self.register(
model=model, m2m_fields=m2m_fields, exclude_fields=exclude_fields
model=model,
m2m_fields=m2m_fields,
exclude_fields=exclude_fields,
)
self._register_models(settings.AUDITLOG_INCLUDE_TRACKING_MODELS)