mirror of
https://github.com/Hopiu/django.git
synced 2026-04-21 15:24:50 +00:00
PathInfo values are ostensibly static over the lifetime of the object for which they're requested, so the data can be memoized, quickly amortising the cost over the process' duration.
552 lines
19 KiB
Python
552 lines
19 KiB
Python
import datetime
|
|
import decimal
|
|
import json
|
|
from collections import defaultdict
|
|
|
|
from django.core.exceptions import FieldDoesNotExist
|
|
from django.db import models, router
|
|
from django.db.models.constants import LOOKUP_SEP
|
|
from django.db.models.deletion import Collector
|
|
from django.forms.utils import pretty_name
|
|
from django.urls import NoReverseMatch, reverse
|
|
from django.utils import formats, timezone
|
|
from django.utils.html import format_html
|
|
from django.utils.regex_helper import _lazy_re_compile
|
|
from django.utils.text import capfirst
|
|
from django.utils.translation import ngettext, override as translation_override
|
|
|
|
QUOTE_MAP = {i: '_%02X' % i for i in b'":/_#?;@&=+$,"[]<>%\n\\'}
|
|
UNQUOTE_MAP = {v: chr(k) for k, v in QUOTE_MAP.items()}
|
|
UNQUOTE_RE = _lazy_re_compile('_(?:%s)' % '|'.join([x[1:] for x in UNQUOTE_MAP]))
|
|
|
|
|
|
class FieldIsAForeignKeyColumnName(Exception):
|
|
"""A field is a foreign key attname, i.e. <FK>_id."""
|
|
pass
|
|
|
|
|
|
def lookup_spawns_duplicates(opts, lookup_path):
|
|
"""
|
|
Return True if the given lookup path spawns duplicates.
|
|
"""
|
|
lookup_fields = lookup_path.split(LOOKUP_SEP)
|
|
# Go through the fields (following all relations) and look for an m2m.
|
|
for field_name in lookup_fields:
|
|
if field_name == 'pk':
|
|
field_name = opts.pk.name
|
|
try:
|
|
field = opts.get_field(field_name)
|
|
except FieldDoesNotExist:
|
|
# Ignore query lookups.
|
|
continue
|
|
else:
|
|
if hasattr(field, 'path_infos'):
|
|
# This field is a relation; update opts to follow the relation.
|
|
path_info = field.path_infos
|
|
opts = path_info[-1].to_opts
|
|
if any(path.m2m for path in path_info):
|
|
# This field is a m2m relation so duplicates must be
|
|
# handled.
|
|
return True
|
|
return False
|
|
|
|
|
|
def prepare_lookup_value(key, value):
|
|
"""
|
|
Return a lookup value prepared to be used in queryset filtering.
|
|
"""
|
|
# if key ends with __in, split parameter into separate values
|
|
if key.endswith('__in'):
|
|
value = value.split(',')
|
|
# if key ends with __isnull, special case '' and the string literals 'false' and '0'
|
|
elif key.endswith('__isnull'):
|
|
value = value.lower() not in ('', 'false', '0')
|
|
return value
|
|
|
|
|
|
def quote(s):
|
|
"""
|
|
Ensure that primary key values do not confuse the admin URLs by escaping
|
|
any '/', '_' and ':' and similarly problematic characters.
|
|
Similar to urllib.parse.quote(), except that the quoting is slightly
|
|
different so that it doesn't get automatically unquoted by the web browser.
|
|
"""
|
|
return s.translate(QUOTE_MAP) if isinstance(s, str) else s
|
|
|
|
|
|
def unquote(s):
|
|
"""Undo the effects of quote()."""
|
|
return UNQUOTE_RE.sub(lambda m: UNQUOTE_MAP[m[0]], s)
|
|
|
|
|
|
def flatten(fields):
|
|
"""
|
|
Return a list which is a single level of flattening of the original list.
|
|
"""
|
|
flat = []
|
|
for field in fields:
|
|
if isinstance(field, (list, tuple)):
|
|
flat.extend(field)
|
|
else:
|
|
flat.append(field)
|
|
return flat
|
|
|
|
|
|
def flatten_fieldsets(fieldsets):
|
|
"""Return a list of field names from an admin fieldsets structure."""
|
|
field_names = []
|
|
for name, opts in fieldsets:
|
|
field_names.extend(
|
|
flatten(opts['fields'])
|
|
)
|
|
return field_names
|
|
|
|
|
|
def get_deleted_objects(objs, request, admin_site):
|
|
"""
|
|
Find all objects related to ``objs`` that should also be deleted. ``objs``
|
|
must be a homogeneous iterable of objects (e.g. a QuerySet).
|
|
|
|
Return a nested list of strings suitable for display in the
|
|
template with the ``unordered_list`` filter.
|
|
"""
|
|
try:
|
|
obj = objs[0]
|
|
except IndexError:
|
|
return [], {}, set(), []
|
|
else:
|
|
using = router.db_for_write(obj._meta.model)
|
|
collector = NestedObjects(using=using)
|
|
collector.collect(objs)
|
|
perms_needed = set()
|
|
|
|
def format_callback(obj):
|
|
model = obj.__class__
|
|
has_admin = model in admin_site._registry
|
|
opts = obj._meta
|
|
|
|
no_edit_link = '%s: %s' % (capfirst(opts.verbose_name), obj)
|
|
|
|
if has_admin:
|
|
if not admin_site._registry[model].has_delete_permission(request, obj):
|
|
perms_needed.add(opts.verbose_name)
|
|
try:
|
|
admin_url = reverse('%s:%s_%s_change'
|
|
% (admin_site.name,
|
|
opts.app_label,
|
|
opts.model_name),
|
|
None, (quote(obj.pk),))
|
|
except NoReverseMatch:
|
|
# Change url doesn't exist -- don't display link to edit
|
|
return no_edit_link
|
|
|
|
# Display a link to the admin page.
|
|
return format_html('{}: <a href="{}">{}</a>',
|
|
capfirst(opts.verbose_name),
|
|
admin_url,
|
|
obj)
|
|
else:
|
|
# Don't display link to edit, because it either has no
|
|
# admin or is edited inline.
|
|
return no_edit_link
|
|
|
|
to_delete = collector.nested(format_callback)
|
|
|
|
protected = [format_callback(obj) for obj in collector.protected]
|
|
model_count = {model._meta.verbose_name_plural: len(objs) for model, objs in collector.model_objs.items()}
|
|
|
|
return to_delete, model_count, perms_needed, protected
|
|
|
|
|
|
class NestedObjects(Collector):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.edges = {} # {from_instance: [to_instances]}
|
|
self.protected = set()
|
|
self.model_objs = defaultdict(set)
|
|
|
|
def add_edge(self, source, target):
|
|
self.edges.setdefault(source, []).append(target)
|
|
|
|
def collect(self, objs, source=None, source_attr=None, **kwargs):
|
|
for obj in objs:
|
|
if source_attr and not source_attr.endswith('+'):
|
|
related_name = source_attr % {
|
|
'class': source._meta.model_name,
|
|
'app_label': source._meta.app_label,
|
|
}
|
|
self.add_edge(getattr(obj, related_name), obj)
|
|
else:
|
|
self.add_edge(None, obj)
|
|
self.model_objs[obj._meta.model].add(obj)
|
|
try:
|
|
return super().collect(objs, source_attr=source_attr, **kwargs)
|
|
except models.ProtectedError as e:
|
|
self.protected.update(e.protected_objects)
|
|
except models.RestrictedError as e:
|
|
self.protected.update(e.restricted_objects)
|
|
|
|
def related_objects(self, related_model, related_fields, objs):
|
|
qs = super().related_objects(related_model, related_fields, objs)
|
|
return qs.select_related(*[related_field.name for related_field in related_fields])
|
|
|
|
def _nested(self, obj, seen, format_callback):
|
|
if obj in seen:
|
|
return []
|
|
seen.add(obj)
|
|
children = []
|
|
for child in self.edges.get(obj, ()):
|
|
children.extend(self._nested(child, seen, format_callback))
|
|
if format_callback:
|
|
ret = [format_callback(obj)]
|
|
else:
|
|
ret = [obj]
|
|
if children:
|
|
ret.append(children)
|
|
return ret
|
|
|
|
def nested(self, format_callback=None):
|
|
"""
|
|
Return the graph as a nested list.
|
|
"""
|
|
seen = set()
|
|
roots = []
|
|
for root in self.edges.get(None, ()):
|
|
roots.extend(self._nested(root, seen, format_callback))
|
|
return roots
|
|
|
|
def can_fast_delete(self, *args, **kwargs):
|
|
"""
|
|
We always want to load the objects into memory so that we can display
|
|
them to the user in confirm page.
|
|
"""
|
|
return False
|
|
|
|
|
|
def model_format_dict(obj):
|
|
"""
|
|
Return a `dict` with keys 'verbose_name' and 'verbose_name_plural',
|
|
typically for use with string formatting.
|
|
|
|
`obj` may be a `Model` instance, `Model` subclass, or `QuerySet` instance.
|
|
"""
|
|
if isinstance(obj, (models.Model, models.base.ModelBase)):
|
|
opts = obj._meta
|
|
elif isinstance(obj, models.query.QuerySet):
|
|
opts = obj.model._meta
|
|
else:
|
|
opts = obj
|
|
return {
|
|
'verbose_name': opts.verbose_name,
|
|
'verbose_name_plural': opts.verbose_name_plural,
|
|
}
|
|
|
|
|
|
def model_ngettext(obj, n=None):
|
|
"""
|
|
Return the appropriate `verbose_name` or `verbose_name_plural` value for
|
|
`obj` depending on the count `n`.
|
|
|
|
`obj` may be a `Model` instance, `Model` subclass, or `QuerySet` instance.
|
|
If `obj` is a `QuerySet` instance, `n` is optional and the length of the
|
|
`QuerySet` is used.
|
|
"""
|
|
if isinstance(obj, models.query.QuerySet):
|
|
if n is None:
|
|
n = obj.count()
|
|
obj = obj.model
|
|
d = model_format_dict(obj)
|
|
singular, plural = d["verbose_name"], d["verbose_name_plural"]
|
|
return ngettext(singular, plural, n or 0)
|
|
|
|
|
|
def lookup_field(name, obj, model_admin=None):
|
|
opts = obj._meta
|
|
try:
|
|
f = _get_non_gfk_field(opts, name)
|
|
except (FieldDoesNotExist, FieldIsAForeignKeyColumnName):
|
|
# For non-field values, the value is either a method, property or
|
|
# returned via a callable.
|
|
if callable(name):
|
|
attr = name
|
|
value = attr(obj)
|
|
elif hasattr(model_admin, name) and name != '__str__':
|
|
attr = getattr(model_admin, name)
|
|
value = attr(obj)
|
|
else:
|
|
attr = getattr(obj, name)
|
|
if callable(attr):
|
|
value = attr()
|
|
else:
|
|
value = attr
|
|
f = None
|
|
else:
|
|
attr = None
|
|
value = getattr(obj, name)
|
|
return f, attr, value
|
|
|
|
|
|
def _get_non_gfk_field(opts, name):
|
|
"""
|
|
For historical reasons, the admin app relies on GenericForeignKeys as being
|
|
"not found" by get_field(). This could likely be cleaned up.
|
|
|
|
Reverse relations should also be excluded as these aren't attributes of the
|
|
model (rather something like `foo_set`).
|
|
"""
|
|
field = opts.get_field(name)
|
|
if (field.is_relation and
|
|
# Generic foreign keys OR reverse relations
|
|
((field.many_to_one and not field.related_model) or field.one_to_many)):
|
|
raise FieldDoesNotExist()
|
|
|
|
# Avoid coercing <FK>_id fields to FK
|
|
if field.is_relation and not field.many_to_many and hasattr(field, 'attname') and field.attname == name:
|
|
raise FieldIsAForeignKeyColumnName()
|
|
|
|
return field
|
|
|
|
|
|
def label_for_field(name, model, model_admin=None, return_attr=False, form=None):
|
|
"""
|
|
Return a sensible label for a field name. The name can be a callable,
|
|
property (but not created with @property decorator), or the name of an
|
|
object's attribute, as well as a model field. If return_attr is True, also
|
|
return the resolved attribute (which could be a callable). This will be
|
|
None if (and only if) the name refers to a field.
|
|
"""
|
|
attr = None
|
|
try:
|
|
field = _get_non_gfk_field(model._meta, name)
|
|
try:
|
|
label = field.verbose_name
|
|
except AttributeError:
|
|
# field is likely a ForeignObjectRel
|
|
label = field.related_model._meta.verbose_name
|
|
except FieldDoesNotExist:
|
|
if name == "__str__":
|
|
label = str(model._meta.verbose_name)
|
|
attr = str
|
|
else:
|
|
if callable(name):
|
|
attr = name
|
|
elif hasattr(model_admin, name):
|
|
attr = getattr(model_admin, name)
|
|
elif hasattr(model, name):
|
|
attr = getattr(model, name)
|
|
elif form and name in form.fields:
|
|
attr = form.fields[name]
|
|
else:
|
|
message = "Unable to lookup '%s' on %s" % (name, model._meta.object_name)
|
|
if model_admin:
|
|
message += " or %s" % model_admin.__class__.__name__
|
|
if form:
|
|
message += " or %s" % form.__class__.__name__
|
|
raise AttributeError(message)
|
|
|
|
if hasattr(attr, "short_description"):
|
|
label = attr.short_description
|
|
elif (isinstance(attr, property) and
|
|
hasattr(attr, "fget") and
|
|
hasattr(attr.fget, "short_description")):
|
|
label = attr.fget.short_description
|
|
elif callable(attr):
|
|
if attr.__name__ == "<lambda>":
|
|
label = "--"
|
|
else:
|
|
label = pretty_name(attr.__name__)
|
|
else:
|
|
label = pretty_name(name)
|
|
except FieldIsAForeignKeyColumnName:
|
|
label = pretty_name(name)
|
|
attr = name
|
|
|
|
if return_attr:
|
|
return (label, attr)
|
|
else:
|
|
return label
|
|
|
|
|
|
def help_text_for_field(name, model):
|
|
help_text = ""
|
|
try:
|
|
field = _get_non_gfk_field(model._meta, name)
|
|
except (FieldDoesNotExist, FieldIsAForeignKeyColumnName):
|
|
pass
|
|
else:
|
|
if hasattr(field, 'help_text'):
|
|
help_text = field.help_text
|
|
return help_text
|
|
|
|
|
|
def display_for_field(value, field, empty_value_display):
|
|
from django.contrib.admin.templatetags.admin_list import _boolean_icon
|
|
|
|
if getattr(field, 'flatchoices', None):
|
|
return dict(field.flatchoices).get(value, empty_value_display)
|
|
# BooleanField needs special-case null-handling, so it comes before the
|
|
# general null test.
|
|
elif isinstance(field, models.BooleanField):
|
|
return _boolean_icon(value)
|
|
elif value is None:
|
|
return empty_value_display
|
|
elif isinstance(field, models.DateTimeField):
|
|
return formats.localize(timezone.template_localtime(value))
|
|
elif isinstance(field, (models.DateField, models.TimeField)):
|
|
return formats.localize(value)
|
|
elif isinstance(field, models.DecimalField):
|
|
return formats.number_format(value, field.decimal_places)
|
|
elif isinstance(field, (models.IntegerField, models.FloatField)):
|
|
return formats.number_format(value)
|
|
elif isinstance(field, models.FileField) and value:
|
|
return format_html('<a href="{}">{}</a>', value.url, value)
|
|
elif isinstance(field, models.JSONField) and value:
|
|
try:
|
|
return json.dumps(value, ensure_ascii=False, cls=field.encoder)
|
|
except TypeError:
|
|
return display_for_value(value, empty_value_display)
|
|
else:
|
|
return display_for_value(value, empty_value_display)
|
|
|
|
|
|
def display_for_value(value, empty_value_display, boolean=False):
|
|
from django.contrib.admin.templatetags.admin_list import _boolean_icon
|
|
|
|
if boolean:
|
|
return _boolean_icon(value)
|
|
elif value is None:
|
|
return empty_value_display
|
|
elif isinstance(value, bool):
|
|
return str(value)
|
|
elif isinstance(value, datetime.datetime):
|
|
return formats.localize(timezone.template_localtime(value))
|
|
elif isinstance(value, (datetime.date, datetime.time)):
|
|
return formats.localize(value)
|
|
elif isinstance(value, (int, decimal.Decimal, float)):
|
|
return formats.number_format(value)
|
|
elif isinstance(value, (list, tuple)):
|
|
return ', '.join(str(v) for v in value)
|
|
else:
|
|
return str(value)
|
|
|
|
|
|
class NotRelationField(Exception):
|
|
pass
|
|
|
|
|
|
def get_model_from_relation(field):
|
|
if hasattr(field, 'path_infos'):
|
|
return field.path_infos[-1].to_opts.model
|
|
else:
|
|
raise NotRelationField
|
|
|
|
|
|
def reverse_field_path(model, path):
|
|
""" Create a reversed field path.
|
|
|
|
E.g. Given (Order, "user__groups"),
|
|
return (Group, "user__order").
|
|
|
|
Final field must be a related model, not a data field.
|
|
"""
|
|
reversed_path = []
|
|
parent = model
|
|
pieces = path.split(LOOKUP_SEP)
|
|
for piece in pieces:
|
|
field = parent._meta.get_field(piece)
|
|
# skip trailing data field if extant:
|
|
if len(reversed_path) == len(pieces) - 1: # final iteration
|
|
try:
|
|
get_model_from_relation(field)
|
|
except NotRelationField:
|
|
break
|
|
|
|
# Field should point to another model
|
|
if field.is_relation and not (field.auto_created and not field.concrete):
|
|
related_name = field.related_query_name()
|
|
parent = field.remote_field.model
|
|
else:
|
|
related_name = field.field.name
|
|
parent = field.related_model
|
|
reversed_path.insert(0, related_name)
|
|
return (parent, LOOKUP_SEP.join(reversed_path))
|
|
|
|
|
|
def get_fields_from_path(model, path):
|
|
""" Return list of Fields given path relative to model.
|
|
|
|
e.g. (ModelX, "user__groups__name") -> [
|
|
<django.db.models.fields.related.ForeignKey object at 0x...>,
|
|
<django.db.models.fields.related.ManyToManyField object at 0x...>,
|
|
<django.db.models.fields.CharField object at 0x...>,
|
|
]
|
|
"""
|
|
pieces = path.split(LOOKUP_SEP)
|
|
fields = []
|
|
for piece in pieces:
|
|
if fields:
|
|
parent = get_model_from_relation(fields[-1])
|
|
else:
|
|
parent = model
|
|
fields.append(parent._meta.get_field(piece))
|
|
return fields
|
|
|
|
|
|
def construct_change_message(form, formsets, add):
|
|
"""
|
|
Construct a JSON structure describing changes from a changed object.
|
|
Translations are deactivated so that strings are stored untranslated.
|
|
Translation happens later on LogEntry access.
|
|
"""
|
|
# Evaluating `form.changed_data` prior to disabling translations is required
|
|
# to avoid fields affected by localization from being included incorrectly,
|
|
# e.g. where date formats differ such as MM/DD/YYYY vs DD/MM/YYYY.
|
|
changed_data = form.changed_data
|
|
with translation_override(None):
|
|
# Deactivate translations while fetching verbose_name for form
|
|
# field labels and using `field_name`, if verbose_name is not provided.
|
|
# Translations will happen later on LogEntry access.
|
|
changed_field_labels = _get_changed_field_labels_from_form(form, changed_data)
|
|
|
|
change_message = []
|
|
if add:
|
|
change_message.append({'added': {}})
|
|
elif form.changed_data:
|
|
change_message.append({'changed': {'fields': changed_field_labels}})
|
|
if formsets:
|
|
with translation_override(None):
|
|
for formset in formsets:
|
|
for added_object in formset.new_objects:
|
|
change_message.append({
|
|
'added': {
|
|
'name': str(added_object._meta.verbose_name),
|
|
'object': str(added_object),
|
|
}
|
|
})
|
|
for changed_object, changed_fields in formset.changed_objects:
|
|
change_message.append({
|
|
'changed': {
|
|
'name': str(changed_object._meta.verbose_name),
|
|
'object': str(changed_object),
|
|
'fields': _get_changed_field_labels_from_form(formset.forms[0], changed_fields),
|
|
}
|
|
})
|
|
for deleted_object in formset.deleted_objects:
|
|
change_message.append({
|
|
'deleted': {
|
|
'name': str(deleted_object._meta.verbose_name),
|
|
'object': str(deleted_object),
|
|
}
|
|
})
|
|
return change_message
|
|
|
|
|
|
def _get_changed_field_labels_from_form(form, changed_data):
|
|
changed_field_labels = []
|
|
for field_name in changed_data:
|
|
try:
|
|
verbose_field_name = form.fields[field_name].label or field_name
|
|
except KeyError:
|
|
verbose_field_name = field_name
|
|
changed_field_labels.append(str(verbose_field_name))
|
|
return changed_field_labels
|