Compare commits

..

No commits in common. "master" and "4.3.0" have entirely different histories.

61 changed files with 744 additions and 1905 deletions

View file

@ -6,10 +6,10 @@ jobs:
docs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v6
uses: actions/setup-python@v5
with:
python-version: '3.12'
cache: 'pip'

View file

@ -11,12 +11,12 @@ jobs:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
with:
fetch-depth: 0
- name: Set up Python
uses: actions/setup-python@v6
uses: actions/setup-python@v5
- name: Install dependencies
run: |
@ -30,7 +30,7 @@ jobs:
- name: Upload packages to Jazzband
if: github.event_name == 'push' && startsWith(github.ref, 'refs/tags')
uses: pypa/gh-action-pypi-publish@v1.13.0
uses: pypa/gh-action-pypi-publish@v1.12.3
with:
user: jazzband
password: ${{ secrets.JAZZBAND_RELEASE_KEY }}

View file

@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 1
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
- uses: chartboost/ruff-action@v1
with:
version: 0.5.0
@ -17,7 +17,7 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 1
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
- uses: chartboost/ruff-action@v1
with:
version: 0.5.0
@ -28,13 +28,13 @@ jobs:
fail-fast: false
max-parallel: 5
matrix:
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13', '3.14']
python-version: ['3.8', '3.9', '3.10', '3.11', '3.12', '3.13']
steps:
- uses: actions/checkout@v6
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v6
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
cache: 'pip'

View file

@ -5,13 +5,13 @@ repos:
- id: python-check-blanket-noqa
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v6.0.0
rev: v5.0.0
hooks:
- id: check-merge-conflict
- id: check-yaml
- repo: https://github.com/asottile/pyupgrade
rev: v3.21.2
rev: v3.19.1
hooks:
- id: pyupgrade
args: [ --py38-plus ]

View file

@ -14,5 +14,3 @@ sphinx:
python:
install:
- requirements: docs/requirements.txt
- method: pip
path: .

View file

@ -32,7 +32,6 @@ Merijn Bertels <merijn.bertels@gmail.com>
Omer Katz <omer.drow@gmail.com>
Petr Knap <dev@petrknap.cz>
Philip Neustrom <philipn@gmail.com>
Philipp Thumfart <philipp@thumfart.eu>
Pierre-Olivier Marec <pomarec@free.fr>
Roman Krejcik <farin@farin.cz>
Silvan Spross <silvan.spross@gmail.com>

View file

@ -1,3 +1,46 @@
# Django Constance Code of Conduct
# Code of Conduct
The django-constance project utilizes the [Django Commons Code of Conduct](https://github.com/django-commons/membership/blob/main/CODE_OF_CONDUCT.md).
As contributors and maintainers of the Jazzband projects, and in the interest of
fostering an open and welcoming community, we pledge to respect all people who
contribute through reporting issues, posting feature requests, updating documentation,
submitting pull requests or patches, and other activities.
We are committed to making participation in the Jazzband a harassment-free experience
for everyone, regardless of the level of experience, gender, gender identity and
expression, sexual orientation, disability, personal appearance, body size, race,
ethnicity, age, religion, or nationality.
Examples of unacceptable behavior by participants include:
- The use of sexualized language or imagery
- Personal attacks
- Trolling or insulting/derogatory comments
- Public or private harassment
- Publishing other's private information, such as physical or electronic addresses,
without explicit permission
- Other unethical or unprofessional conduct
The Jazzband roadies have the right and responsibility to remove, edit, or reject
comments, commits, code, wiki edits, issues, and other contributions that are not
aligned to this Code of Conduct, or to ban temporarily or permanently any contributor
for other behaviors that they deem inappropriate, threatening, offensive, or harmful.
By adopting this Code of Conduct, the roadies commit themselves to fairly and
consistently applying these principles to every aspect of managing the jazzband
projects. Roadies who do not follow or enforce the Code of Conduct may be permanently
removed from the Jazzband roadies.
This code of conduct applies both within project spaces and in public spaces when an
individual is representing the project or its community.
Instances of abusive, harassing, or otherwise unacceptable behavior may be reported by
contacting the roadies at `roadies@jazzband.co`. All complaints will be reviewed and
investigated and will result in a response that is deemed necessary and appropriate to
the circumstances. Roadies are obligated to maintain confidentiality with regard to the
reporter of an incident.
This Code of Conduct is adapted from the [Contributor Covenant][homepage], version
1.3.0, available at [https://contributor-covenant.org/version/1/3/0/][version]
[homepage]: https://contributor-covenant.org
[version]: https://contributor-covenant.org/version/1/3/0/

View file

@ -1,4 +1,3 @@
import json
from collections import OrderedDict
from datetime import date
from datetime import datetime
@ -26,7 +25,7 @@ config = LazyConfig()
class ConstanceAdmin(admin.ModelAdmin):
change_list_template = "admin/constance/change_list.html"
change_list_template = 'admin/constance/change_list.html'
change_list_form = ConstanceForm
def __init__(self, model, admin_site):
@ -34,10 +33,10 @@ class ConstanceAdmin(admin.ModelAdmin):
super().__init__(model, admin_site)
def get_urls(self):
info = f"{self.model._meta.app_label}_{self.model._meta.module_name}"
info = f'{self.model._meta.app_label}_{self.model._meta.module_name}'
return [
path("", self.admin_site.admin_view(self.changelist_view), name=f"{info}_changelist"),
path("", self.admin_site.admin_view(self.changelist_view), name=f"{info}_add"),
path('', self.admin_site.admin_view(self.changelist_view), name=f'{info}_changelist'),
path('', self.admin_site.admin_view(self.changelist_view), name=f'{info}_add'),
]
def get_config_value(self, name, options, form, initial):
@ -53,28 +52,23 @@ class ConstanceAdmin(admin.ModelAdmin):
form_field = form[name]
config_value = {
"name": name,
"default": localize(default),
"raw_default": default,
"help_text": _(help_text),
"value": localize(value),
"modified": localize(value) != localize(default),
"form_field": form_field,
"is_date": isinstance(default, date),
"is_datetime": isinstance(default, datetime),
"is_checkbox": isinstance(form_field.field.widget, forms.CheckboxInput),
"is_multi_select": isinstance(
form_field.field.widget, (forms.SelectMultiple, forms.CheckboxSelectMultiple)
),
"is_file": isinstance(form_field.field.widget, forms.FileInput),
'name': name,
'default': localize(default),
'raw_default': default,
'help_text': _(help_text),
'value': localize(value),
'modified': localize(value) != localize(default),
'form_field': form_field,
'is_date': isinstance(default, date),
'is_datetime': isinstance(default, datetime),
'is_checkbox': isinstance(form_field.field.widget, forms.CheckboxInput),
'is_file': isinstance(form_field.field.widget, forms.FileInput),
}
if config_value["is_multi_select"]:
config_value["json_default"] = json.dumps(default if isinstance(default, list) else [default])
if field_type and field_type in settings.ADDITIONAL_FIELDS:
serialized_default = form[name].field.prepare_value(default)
config_value["default"] = serialized_default
config_value["raw_default"] = serialized_default
config_value["value"] = form[name].field.prepare_value(value)
config_value['default'] = serialized_default
config_value['raw_default'] = serialized_default
config_value['value'] = form[name].field.prepare_value(value)
return config_value
@ -91,27 +85,26 @@ class ConstanceAdmin(admin.ModelAdmin):
initial = get_values()
form_cls = self.get_changelist_form(request)
form = form_cls(initial=initial, request=request)
if request.method == "POST" and request.user.has_perm("constance.change_config"):
if request.method == 'POST' and request.user.has_perm('constance.change_config'):
form = form_cls(data=request.POST, files=request.FILES, initial=initial, request=request)
if form.is_valid():
form.save()
messages.add_message(request, messages.SUCCESS, _("Live settings updated successfully."))
return HttpResponseRedirect(".")
messages.add_message(request, messages.ERROR, _("Failed to update live settings."))
context = {
**self.admin_site.each_context(request),
**(extra_context or {}),
"config_values": [],
"title": self.model._meta.app_config.verbose_name,
"app_label": "constance",
"opts": self.model._meta,
"form": form,
"media": self.media + form.media,
"icon_type": "svg",
"django_version": get_version(),
}
messages.add_message(request, messages.SUCCESS, _('Live settings updated successfully.'))
return HttpResponseRedirect('.')
messages.add_message(request, messages.ERROR, _('Failed to update live settings.'))
context = dict(
self.admin_site.each_context(request),
config_values=[],
title=self.model._meta.app_config.verbose_name,
app_label='constance',
opts=self.model._meta,
form=form,
media=self.media + form.media,
icon_type='svg',
django_version=get_version(),
)
for name, options in settings.CONFIG.items():
context["config_values"].append(self.get_config_value(name, options, form, initial))
context['config_values'].append(self.get_config_value(name, options, form, initial))
if settings.CONFIG_FIELDSETS:
if isinstance(settings.CONFIG_FIELDSETS, dict):
@ -119,11 +112,11 @@ class ConstanceAdmin(admin.ModelAdmin):
else:
fieldset_items = settings.CONFIG_FIELDSETS
context["fieldsets"] = []
context['fieldsets'] = []
for fieldset_title, fieldset_data in fieldset_items:
if isinstance(fieldset_data, dict):
fields_list = fieldset_data["fields"]
collapse = fieldset_data.get("collapse", False)
fields_list = fieldset_data['fields']
collapse = fieldset_data.get('collapse', False)
else:
fields_list = fieldset_data
collapse = False
@ -131,8 +124,8 @@ class ConstanceAdmin(admin.ModelAdmin):
absent_fields = [field for field in fields_list if field not in settings.CONFIG]
if any(absent_fields):
raise ValueError(
"CONSTANCE_CONFIG_FIELDSETS contains field(s) that does not exist(s): {}".format(
", ".join(absent_fields)
'CONSTANCE_CONFIG_FIELDSETS contains field(s) that does not exist(s): {}'.format(
', '.join(absent_fields)
)
)
@ -142,16 +135,16 @@ class ConstanceAdmin(admin.ModelAdmin):
options = settings.CONFIG.get(name)
if options:
config_values.append(self.get_config_value(name, options, form, initial))
fieldset_context = {"title": fieldset_title, "config_values": config_values}
fieldset_context = {'title': fieldset_title, 'config_values': config_values}
if collapse:
fieldset_context["collapse"] = True
context["fieldsets"].append(fieldset_context)
fieldset_context['collapse'] = True
context['fieldsets'].append(fieldset_context)
if not isinstance(settings.CONFIG_FIELDSETS, (OrderedDict, tuple)):
context["fieldsets"].sort(key=itemgetter("title"))
context['fieldsets'].sort(key=itemgetter('title'))
if not isinstance(settings.CONFIG, OrderedDict):
context["config_values"].sort(key=itemgetter("name"))
context['config_values'].sort(key=itemgetter('name'))
request.current_app = self.admin_site.name
return TemplateResponse(request, self.change_list_template, context)
@ -169,11 +162,11 @@ class ConstanceAdmin(admin.ModelAdmin):
class Config:
class Meta:
app_label = "constance"
object_name = "Config"
app_label = 'constance'
object_name = 'Config'
concrete_model = None
model_name = module_name = "config"
verbose_name_plural = _("config")
model_name = module_name = 'config'
verbose_name_plural = _('config')
abstract = False
swapped = False
is_composite_pk = False
@ -182,7 +175,7 @@ class Config:
return False
def get_change_permission(self):
return f"change_{self.model_name}"
return f'change_{self.model_name}'
@property
def app_config(self):
@ -190,11 +183,11 @@ class Config:
@property
def label(self):
return f"{self.app_label}.{self.object_name}"
return f'{self.app_label}.{self.object_name}'
@property
def label_lower(self):
return f"{self.app_label}.{self.model_name}"
return f'{self.app_label}.{self.model_name}'
_meta = Meta()

View file

@ -6,9 +6,9 @@ from constance.checks import check_fieldsets
class ConstanceConfig(AppConfig):
name = "constance"
verbose_name = _("Constance")
default_auto_field = "django.db.models.AutoField"
name = 'constance'
verbose_name = _('Constance')
default_auto_field = 'django.db.models.AutoField'
def ready(self):
checks.register(check_fieldsets, "constance")
checks.register(check_fieldsets, 'constance')

View file

@ -1,50 +1,21 @@
"""Defines the base constance backend."""
from abc import ABC
from abc import abstractmethod
class Backend(ABC):
@abstractmethod
class Backend:
def get(self, key):
"""
Get the key from the backend store and return the value.
Return None if not found.
"""
...
raise NotImplementedError
@abstractmethod
async def aget(self, key):
"""
Get the key from the backend store and return the value.
Return None if not found.
"""
...
@abstractmethod
def mget(self, keys):
"""
Get the keys from the backend store and return a dict mapping
each found key to its value. Return an empty dict if no keys
are provided or none are found.
Get the keys from the backend store and return a list of the values.
Return an empty list if not found.
"""
...
raise NotImplementedError
@abstractmethod
async def amget(self, keys):
"""
Get the keys from the backend store and return a dict mapping
each found key to its value. Return an empty dict if no keys
are provided or none are found.
"""
...
@abstractmethod
def set(self, key, value):
"""Add the value to the backend store given the key."""
...
@abstractmethod
async def aset(self, key, value):
"""Add the value to the backend store given the key."""
...
raise NotImplementedError

View file

@ -22,7 +22,7 @@ class DatabaseBackend(Backend):
self._model = Constance
self._prefix = settings.DATABASE_PREFIX
self._autofill_timeout = settings.DATABASE_CACHE_AUTOFILL_TIMEOUT
self._autofill_cachekey = "autofilled"
self._autofill_cachekey = 'autofilled'
if self._model._meta.app_config is None:
raise ImproperlyConfigured(
@ -34,9 +34,9 @@ class DatabaseBackend(Backend):
self._cache = caches[settings.DATABASE_CACHE_BACKEND]
if isinstance(self._cache, LocMemCache):
raise ImproperlyConfigured(
"The CONSTANCE_DATABASE_CACHE_BACKEND setting refers to a "
'The CONSTANCE_DATABASE_CACHE_BACKEND setting refers to a '
f"subclass of Django's local-memory backend ({settings.DATABASE_CACHE_BACKEND!r}). Please "
"set it to a backend that supports cross-process caching."
'set it to a backend that supports cross-process caching.'
)
else:
self._cache = None
@ -45,7 +45,7 @@ class DatabaseBackend(Backend):
post_save.connect(self.clear, sender=self._model)
def add_prefix(self, key):
return f"{self._prefix}{key}"
return f'{self._prefix}{key}'
def autofill(self):
if not self._autofill_timeout or not self._cache:
@ -53,23 +53,22 @@ class DatabaseBackend(Backend):
full_cachekey = self.add_prefix(self._autofill_cachekey)
if self._cache.get(full_cachekey):
return
autofill_values = {full_cachekey: 1}
for key, value in self.mget(settings.CONFIG).items():
autofill_values = {}
autofill_values[full_cachekey] = 1
for key, value in self.mget(settings.CONFIG):
autofill_values[self.add_prefix(key)] = value
self._cache.set_many(autofill_values, timeout=self._autofill_timeout)
def mget(self, keys):
result = {}
if not keys:
return result
return
keys = {self.add_prefix(key): key for key in keys}
try:
stored = self._model._default_manager.filter(key__in=keys)
for const in stored:
result[keys[const.key]] = loads(const.value)
yield keys[const.key], loads(const.value)
except (OperationalError, ProgrammingError):
pass
return result
def get(self, key):
key = self.add_prefix(key)
@ -80,53 +79,13 @@ class DatabaseBackend(Backend):
self.autofill()
value = self._cache.get(key)
if value is None:
match = self._model._default_manager.filter(key=key).only("value").first()
match = self._model._default_manager.filter(key=key).first()
if match:
value = loads(match.value)
if self._cache:
self._cache.add(key, value)
return value
async def aget(self, key):
from asgiref.sync import sync_to_async
prefixed_key = self.add_prefix(key)
value = None
if self._cache:
value = await self._cache.aget(prefixed_key)
if value is None:
await sync_to_async(self.autofill, thread_sensitive=True)()
value = await self._cache.aget(prefixed_key)
if value is None:
match = await self._model._default_manager.filter(key=prefixed_key).only("value").afirst()
if match:
value = loads(match.value)
if self._cache:
await self._cache.aadd(prefixed_key, value)
return value
async def amget(self, keys):
if not keys:
return {}
prefixed_keys_map = {self.add_prefix(key): key for key in keys}
results = {}
if self._cache:
cache_results = await self._cache.aget_many(prefixed_keys_map.keys())
for prefixed_key, value in cache_results.items():
results[prefixed_keys_map[prefixed_key]] = value
missing_prefixed_keys = [k for k in prefixed_keys_map if prefixed_keys_map[k] not in results]
if missing_prefixed_keys:
try:
async for const in self._model._default_manager.filter(key__in=missing_prefixed_keys):
results[prefixed_keys_map[const.key]] = loads(const.value)
except (OperationalError, ProgrammingError):
pass
return results
def set(self, key, value):
key = self.add_prefix(key)
created = False
@ -152,7 +111,7 @@ class DatabaseBackend(Backend):
if not created:
old_value = loads(constance.value)
constance.value = dumps(value)
constance.save(update_fields=["value"])
constance.save(update_fields=['value'])
else:
old_value = None
@ -161,13 +120,6 @@ class DatabaseBackend(Backend):
signals.config_updated.send(sender=config, key=key, old_value=old_value, new_value=value)
async def aset(self, key, value):
from asgiref.sync import sync_to_async
# We use sync_to_async because Django's transaction.atomic() and database connections are thread-local.
# This ensures the operation runs in the correct database thread until native async transactions are supported.
return await sync_to_async(self.set, thread_sensitive=True)(key, value)
def clear(self, sender, instance, created, **kwargs):
if self._cache and not created:
keys = [self.add_prefix(k) for k in settings.CONFIG]

View file

@ -19,28 +19,19 @@ class MemoryBackend(Backend):
with self._lock:
return self._storage.get(key)
async def aget(self, key):
# Memory operations are fast enough that we don't need true async here
return self.get(key)
def mget(self, keys):
if not keys:
return {}
return None
result = []
with self._lock:
return {key: self._storage[key] for key in keys if key in self._storage}
async def amget(self, keys):
if not keys:
return {}
with self._lock:
return {key: self._storage[key] for key in keys if key in self._storage}
for key in keys:
value = self._storage.get(key)
if value is not None:
result.append((key, value))
return result
def set(self, key, value):
with self._lock:
old_value = self._storage.get(key)
self._storage[key] = value
signals.config_updated.send(sender=config, key=key, old_value=old_value, new_value=value)
async def aset(self, key, value):
# Memory operations are fast enough that we don't need true async here
self.set(key, value)

View file

@ -1,4 +1,3 @@
import asyncio
from threading import RLock
from time import monotonic
@ -18,45 +17,20 @@ class RedisBackend(Backend):
super().__init__()
self._prefix = settings.REDIS_PREFIX
connection_cls = settings.REDIS_CONNECTION_CLASS
async_connection_cls = settings.REDIS_ASYNC_CONNECTION_CLASS
if connection_cls:
if connection_cls is not None:
self._rd = utils.import_module_attr(connection_cls)()
else:
try:
import redis
except ImportError:
raise ImproperlyConfigured("The Redis backend requires redis-py to be installed.") from None
raise ImproperlyConfigured('The Redis backend requires redis-py to be installed.') from None
if isinstance(settings.REDIS_CONNECTION, str):
self._rd = redis.from_url(settings.REDIS_CONNECTION)
else:
if isinstance(settings.REDIS_CONNECTION, str):
self._rd = redis.from_url(settings.REDIS_CONNECTION)
else:
self._rd = redis.Redis(**settings.REDIS_CONNECTION)
if async_connection_cls:
self._ard = utils.import_module_attr(async_connection_cls)()
else:
try:
import redis.asyncio as aredis
except ImportError:
# We set this to none instead of raising an error to indicate that async support is not available
# without breaking existing sync usage.
self._ard = None
else:
if isinstance(settings.REDIS_CONNECTION, str):
self._ard = aredis.from_url(settings.REDIS_CONNECTION)
else:
self._ard = aredis.Redis(**settings.REDIS_CONNECTION)
self._rd = redis.Redis(**settings.REDIS_CONNECTION)
def add_prefix(self, key):
return f"{self._prefix}{key}"
def _check_async_support(self):
if self._ard is None:
raise ImproperlyConfigured(
"Async support for the Redis backend requires redis>=4.2.0 "
"or a custom CONSTANCE_REDIS_ASYNC_CONNECTION_CLASS to be configured."
)
return f'{self._prefix}{key}'
def get(self, key):
value = self._rd.get(self.add_prefix(key))
@ -64,50 +38,23 @@ class RedisBackend(Backend):
return loads(value)
return None
async def aget(self, key):
self._check_async_support()
value = await self._ard.get(self.add_prefix(key))
if value:
return loads(value)
return None
def mget(self, keys):
if not keys:
return {}
return
prefixed_keys = [self.add_prefix(key) for key in keys]
return {key: loads(value) for key, value in zip(keys, self._rd.mget(prefixed_keys)) if value}
async def amget(self, keys):
if not keys:
return {}
self._check_async_support()
prefixed_keys = [self.add_prefix(key) for key in keys]
values = await self._ard.mget(prefixed_keys)
return {key: loads(value) for key, value in zip(keys, values) if value}
for key, value in zip(keys, self._rd.mget(prefixed_keys)):
if value:
yield key, loads(value)
def set(self, key, value):
old_value = self.get(key)
self._rd.set(self.add_prefix(key), dumps(value))
signals.config_updated.send(sender=config, key=key, old_value=old_value, new_value=value)
async def _aset_internal(self, key, value, old_value):
"""
Internal set operation. Separated to allow subclasses to provide old_value
without going through self.aget() which may have locking behavior.
"""
self._check_async_support()
await self._ard.set(self.add_prefix(key), dumps(value))
signals.config_updated.send(sender=config, key=key, old_value=old_value, new_value=value)
async def aset(self, key, value):
old_value = await self.aget(key)
await self._aset_internal(key, value, old_value)
class CachingRedisBackend(RedisBackend):
_sentinel = object()
_lock = RLock()
_async_lock = None # Lazy-initialized asyncio.Lock
def __init__(self):
super().__init__()
@ -115,12 +62,6 @@ class CachingRedisBackend(RedisBackend):
self._cache = {}
self._sentinel = object()
def _get_async_lock(self):
# Lazily create the asyncio lock to avoid issues with event loops
if self._async_lock is None:
self._async_lock = asyncio.Lock()
return self._async_lock
def _has_expired(self, value):
return value[0] <= monotonic()
@ -138,82 +79,15 @@ class CachingRedisBackend(RedisBackend):
return value[1]
async def _aget_unlocked(self, key):
"""
Get value with cache support but without acquiring lock.
Caller must already hold the lock.
"""
value = self._cache.get(key, self._sentinel)
if value is self._sentinel or self._has_expired(value):
new_value = await super().aget(key)
self._cache_value(key, new_value)
return new_value
return value[1]
async def aget(self, key):
value = self._cache.get(key, self._sentinel)
if value is self._sentinel or self._has_expired(value):
async with self._get_async_lock():
# Double-check after acquiring lock, then delegate to unlocked version
return await self._aget_unlocked(key)
return value[1]
def set(self, key, value):
with self._lock:
super().set(key, value)
self._cache_value(key, value)
async def aset(self, key, value):
async with self._get_async_lock():
# Use unlocked version since we already hold the lock
old_value = await self._aget_unlocked(key)
# Use internal method to avoid lock recursion (super().aset calls self.aget)
await self._aset_internal(key, value, old_value)
self._cache_value(key, value)
def mget(self, keys):
if not keys:
return {}
result = {}
return
for key in keys:
value = self.get(key)
if value is not None:
result[key] = value
return result
async def amget(self, keys):
if not keys:
return {}
results = {}
missing_keys = []
# First, check the local cache for all keys
for key in keys:
value = self._cache.get(key, self._sentinel)
if value is not self._sentinel and not self._has_expired(value):
results[key] = value[1]
else:
missing_keys.append(key)
# Fetch missing keys from Redis
if missing_keys:
async with self._get_async_lock():
# Re-check cache for keys that might have been fetched while waiting for lock
still_missing = []
for key in missing_keys:
value = self._cache.get(key, self._sentinel)
if value is not self._sentinel and not self._has_expired(value):
results[key] = value[1]
else:
still_missing.append(key)
if still_missing:
fetched = await super().amget(still_missing)
for key, value in fetched.items():
self._cache_value(key, value)
results[key] = value
return results
yield key, value

View file

@ -1,149 +1,31 @@
import asyncio
import warnings
from . import settings
from . import utils
class AsyncValueProxy:
def __init__(self, key, config, default):
self._key = key
self._config = config
self._default = default
self._value = None
self._fetched = False
def __await__(self):
return self._get_value().__await__()
async def _get_value(self):
if not self._fetched:
result = await self._config._backend.aget(self._key)
if result is None:
result = self._default
await self._config.aset(self._key, result)
self._value = result
self._fetched = True
return self._value
def _get_sync_value(self):
warnings.warn(
f"Synchronous access to Constance setting '{self._key}' inside an async loop. "
f"Use 'await config.{self._key}' instead.",
RuntimeWarning,
stacklevel=3,
)
return self._config._get_sync_value(self._key, self._default)
def __str__(self):
return str(self._get_sync_value())
def __repr__(self):
return repr(self._get_sync_value())
def __int__(self):
return int(self._get_sync_value())
def __float__(self):
return float(self._get_sync_value())
def __bool__(self):
return bool(self._get_sync_value())
def __eq__(self, other):
return self._get_sync_value() == other
def __ne__(self, other):
return self._get_sync_value() != other
def __lt__(self, other):
return self._get_sync_value() < other
def __le__(self, other):
return self._get_sync_value() <= other
def __gt__(self, other):
return self._get_sync_value() > other
def __ge__(self, other):
return self._get_sync_value() >= other
def __getitem__(self, key):
return self._get_sync_value()[key]
def __iter__(self):
return iter(self._get_sync_value())
def __len__(self):
return len(self._get_sync_value())
def __contains__(self, item):
return item in self._get_sync_value()
def __hash__(self):
return hash(self._get_sync_value())
def __add__(self, other):
return self._get_sync_value() + other
def __sub__(self, other):
return self._get_sync_value() - other
def __mul__(self, other):
return self._get_sync_value() * other
def __truediv__(self, other):
return self._get_sync_value() / other
class Config:
"""The global config wrapper that handles the backend."""
def __init__(self):
super().__setattr__("_backend", utils.import_module_attr(settings.BACKEND)())
def _get_sync_value(self, key, default):
result = self._backend.get(key)
if result is None:
result = default
setattr(self, key, default)
return result
super().__setattr__('_backend', utils.import_module_attr(settings.BACKEND)())
def __getattr__(self, key):
if key == "_backend":
return super().__getattribute__(key)
try:
if len(settings.CONFIG[key]) not in (2, 3):
raise AttributeError(key)
default = settings.CONFIG[key][0]
except KeyError as e:
raise AttributeError(key) from e
try:
asyncio.get_running_loop()
except RuntimeError:
return self._get_sync_value(key, default)
return AsyncValueProxy(key, self, default)
result = self._backend.get(key)
if result is None:
result = default
setattr(self, key, default)
return result
return result
def __setattr__(self, key, value):
if key == "_backend":
super().__setattr__(key, value)
return
if key not in settings.CONFIG:
raise AttributeError(key)
self._backend.set(key, value)
return
async def aset(self, key, value):
if key not in settings.CONFIG:
raise AttributeError(key)
await self._backend.aset(key, value)
async def amget(self, keys):
backend_values = await self._backend.amget(keys)
# Merge with defaults like utils.get_values_for_keys
default_initial = {name: settings.CONFIG[name][0] for name in keys if name in settings.CONFIG}
return dict(default_initial, **backend_values)
def __dir__(self):
return settings.CONFIG.keys()

View file

@ -14,22 +14,22 @@ def check_fieldsets(*args, **kwargs) -> list[CheckMessage]:
errors = []
if hasattr(settings, "CONFIG_FIELDSETS") and settings.CONFIG_FIELDSETS:
if hasattr(settings, 'CONFIG_FIELDSETS') and settings.CONFIG_FIELDSETS:
missing_keys, extra_keys = get_inconsistent_fieldnames()
if missing_keys:
check = checks.Warning(
_("CONSTANCE_CONFIG_FIELDSETS is missing field(s) that exists in CONSTANCE_CONFIG."),
hint=", ".join(sorted(missing_keys)),
obj="settings.CONSTANCE_CONFIG",
id="constance.E001",
_('CONSTANCE_CONFIG_FIELDSETS is missing field(s) that exists in CONSTANCE_CONFIG.'),
hint=', '.join(sorted(missing_keys)),
obj='settings.CONSTANCE_CONFIG',
id='constance.E001',
)
errors.append(check)
if extra_keys:
check = checks.Warning(
_("CONSTANCE_CONFIG_FIELDSETS contains extra field(s) that does not exist in CONFIG."),
hint=", ".join(sorted(extra_keys)),
obj="settings.CONSTANCE_CONFIG",
id="constance.E002",
_('CONSTANCE_CONFIG_FIELDSETS contains extra field(s) that does not exist in CONFIG.'),
hint=', '.join(sorted(extra_keys)),
obj='settings.CONSTANCE_CONFIG',
id='constance.E002',
)
errors.append(check)
return errors
@ -53,8 +53,8 @@ def get_inconsistent_fieldnames() -> tuple[set, set]:
for _fieldset_title, fields_list in fieldset_items:
# fields_list can be a dictionary, when a fieldset is defined as collapsible
# https://django-constance.readthedocs.io/en/latest/#fieldsets-collapsing
if isinstance(fields_list, dict) and "fields" in fields_list:
fields_list = fields_list["fields"]
if isinstance(fields_list, dict) and 'fields' in fields_list:
fields_list = fields_list['fields']
unique_field_names.update(fields_list)
if not unique_field_names:
return unique_field_names, unique_field_names

View file

@ -14,7 +14,7 @@ from typing import TypeVar
logger = logging.getLogger(__name__)
DEFAULT_DISCRIMINATOR = "default"
DEFAULT_DISCRIMINATOR = 'default'
class JSONEncoder(json.JSONEncoder):
@ -24,11 +24,11 @@ class JSONEncoder(json.JSONEncoder):
for discriminator, (t, _, encoder) in _codecs.items():
if isinstance(o, t):
return _as(discriminator, encoder(o))
raise TypeError(f"Object of type {o.__class__.__name__} is not JSON serializable")
raise TypeError(f'Object of type {o.__class__.__name__} is not JSON serializable')
def _as(discriminator: str, v: Any) -> dict[str, Any]:
return {"__type__": discriminator, "__value__": v}
return {'__type__': discriminator, '__value__': v}
def dumps(obj, _dumps=json.dumps, cls=JSONEncoder, default_kwargs=None, **kwargs):
@ -44,7 +44,7 @@ def loads(s, _loads=json.loads, *, first_level=True, **kwargs):
"""Deserialize json string to object."""
if first_level:
return _loads(s, object_hook=object_hook, **kwargs)
if isinstance(s, dict) and "__type__" not in s and "__value__" not in s:
if isinstance(s, dict) and '__type__' not in s and '__value__' not in s:
return {k: loads(v, first_level=False) for k, v in s.items()}
if isinstance(s, list):
return list(loads(v, first_level=False) for v in s)
@ -53,20 +53,20 @@ def loads(s, _loads=json.loads, *, first_level=True, **kwargs):
def object_hook(o: dict) -> Any:
"""Hook function to perform custom deserialization."""
if o.keys() == {"__type__", "__value__"}:
if o["__type__"] == DEFAULT_DISCRIMINATOR:
return o["__value__"]
codec = _codecs.get(o["__type__"])
if o.keys() == {'__type__', '__value__'}:
if o['__type__'] == DEFAULT_DISCRIMINATOR:
return o['__value__']
codec = _codecs.get(o['__type__'])
if not codec:
raise ValueError(f"Unsupported type: {o['__type__']}")
return codec[1](o["__value__"])
if "__type__" not in o and "__value__" not in o:
raise ValueError(f'Unsupported type: {o["__type__"]}')
return codec[1](o['__value__'])
if '__type__' not in o and '__value__' not in o:
return o
logger.error("Cannot deserialize object: %s", o)
raise ValueError(f"Invalid object: {o}")
logger.error('Cannot deserialize object: %s', o)
raise ValueError(f'Invalid object: {o}')
T = TypeVar("T")
T = TypeVar('T')
class Encoder(Protocol[T]):
@ -79,9 +79,9 @@ class Decoder(Protocol[T]):
def register_type(t: type[T], discriminator: str, encoder: Encoder[T], decoder: Decoder[T]):
if not discriminator:
raise ValueError("Discriminator must be specified")
raise ValueError('Discriminator must be specified')
if _codecs.get(discriminator) or discriminator == DEFAULT_DISCRIMINATOR:
raise ValueError(f"Type with discriminator {discriminator} is already registered")
raise ValueError(f'Type with discriminator {discriminator} is already registered')
_codecs[discriminator] = (t, decoder, encoder)
@ -90,12 +90,12 @@ _codecs: dict[str, tuple[type, Decoder, Encoder]] = {}
def _register_default_types():
# NOTE: datetime should be registered before date, because datetime is also instance of date.
register_type(datetime, "datetime", datetime.isoformat, datetime.fromisoformat)
register_type(date, "date", lambda o: o.isoformat(), lambda o: datetime.fromisoformat(o).date())
register_type(time, "time", lambda o: o.isoformat(), time.fromisoformat)
register_type(Decimal, "decimal", str, Decimal)
register_type(uuid.UUID, "uuid", lambda o: o.hex, uuid.UUID)
register_type(timedelta, "timedelta", lambda o: o.total_seconds(), lambda o: timedelta(seconds=o))
register_type(datetime, 'datetime', datetime.isoformat, datetime.fromisoformat)
register_type(date, 'date', lambda o: o.isoformat(), lambda o: datetime.fromisoformat(o).date())
register_type(time, 'time', lambda o: o.isoformat(), time.fromisoformat)
register_type(Decimal, 'decimal', str, Decimal)
register_type(uuid.UUID, 'uuid', lambda o: o.hex, uuid.UUID)
register_type(timedelta, 'timedelta', lambda o: o.total_seconds(), lambda o: timedelta(seconds=o))
_register_default_types()

View file

@ -12,4 +12,4 @@ def config(request):
)
"""
return {"config": constance.config}
return {'config': constance.config}

View file

@ -25,27 +25,27 @@ from .checks import get_inconsistent_fieldnames
config = LazyConfig()
NUMERIC_WIDGET = forms.TextInput(attrs={"size": 10})
NUMERIC_WIDGET = forms.TextInput(attrs={'size': 10})
INTEGER_LIKE = (fields.IntegerField, {"widget": NUMERIC_WIDGET})
INTEGER_LIKE = (fields.IntegerField, {'widget': NUMERIC_WIDGET})
STRING_LIKE = (
fields.CharField,
{
"widget": forms.Textarea(attrs={"rows": 3}),
"required": False,
'widget': forms.Textarea(attrs={'rows': 3}),
'required': False,
},
)
FIELDS = {
bool: (fields.BooleanField, {"required": False}),
bool: (fields.BooleanField, {'required': False}),
int: INTEGER_LIKE,
Decimal: (fields.DecimalField, {"widget": NUMERIC_WIDGET}),
Decimal: (fields.DecimalField, {'widget': NUMERIC_WIDGET}),
str: STRING_LIKE,
datetime: (fields.SplitDateTimeField, {"widget": widgets.AdminSplitDateTime}),
timedelta: (fields.DurationField, {"widget": widgets.AdminTextInputWidget}),
date: (fields.DateField, {"widget": widgets.AdminDateWidget}),
time: (fields.TimeField, {"widget": widgets.AdminTimeWidget}),
float: (fields.FloatField, {"widget": NUMERIC_WIDGET}),
datetime: (fields.SplitDateTimeField, {'widget': widgets.AdminSplitDateTime}),
timedelta: (fields.DurationField, {'widget': widgets.AdminTextInputWidget}),
date: (fields.DateField, {'widget': widgets.AdminDateWidget}),
time: (fields.TimeField, {'widget': widgets.AdminTimeWidget}),
float: (fields.FloatField, {'widget': NUMERIC_WIDGET}),
}
@ -58,12 +58,12 @@ def parse_additional_fields(fields):
field[0] = import_string(field[0])
if "widget" in field[1]:
klass = import_string(field[1]["widget"])
field[1]["widget"] = klass(**(field[1].get("widget_kwargs", {}) or {}))
if 'widget' in field[1]:
klass = import_string(field[1]['widget'])
field[1]['widget'] = klass(**(field[1].get('widget_kwargs', {}) or {}))
if "widget_kwargs" in field[1]:
del field[1]["widget_kwargs"]
if 'widget_kwargs' in field[1]:
del field[1]['widget_kwargs']
fields[key] = field
@ -80,7 +80,7 @@ class ConstanceForm(forms.Form):
super().__init__(*args, initial=initial, **kwargs)
version_hash = hashlib.sha256()
only_view = request and not request.user.has_perm("constance.change_config")
only_view = request and not request.user.has_perm('constance.change_config')
if only_view:
messages.warning(
request,
@ -94,13 +94,13 @@ class ConstanceForm(forms.Form):
if config_type not in settings.ADDITIONAL_FIELDS and not isinstance(default, config_type):
raise ImproperlyConfigured(
_(
"Default value type must be "
"equal to declared config "
"parameter type. Please fix "
"the default value of "
'Default value type must be '
'equal to declared config '
'parameter type. Please fix '
'the default value of '
"'%(name)s'."
)
% {"name": name}
% {'name': name}
)
else:
config_type = type(default)
@ -109,19 +109,19 @@ class ConstanceForm(forms.Form):
raise ImproperlyConfigured(
_(
"Constance doesn't support "
"config values of the type "
"%(config_type)s. Please fix "
'config values of the type '
'%(config_type)s. Please fix '
"the value of '%(name)s'."
)
% {"config_type": config_type, "name": name}
% {'config_type': config_type, 'name': name}
)
field_class, kwargs = FIELDS[config_type]
if only_view:
kwargs["disabled"] = True
kwargs['disabled'] = True
self.fields[name] = field_class(label=name, **kwargs)
version_hash.update(smart_bytes(initial.get(name, "")))
self.initial["version"] = version_hash.hexdigest()
version_hash.update(smart_bytes(initial.get(name, '')))
self.initial['version'] = version_hash.hexdigest()
def save(self):
for file_field in self.files:
@ -142,14 +142,18 @@ class ConstanceForm(forms.Form):
setattr(config, name, new)
def clean_version(self):
value = self.cleaned_data["version"]
value = self.cleaned_data['version']
if settings.IGNORE_ADMIN_VERSION_CHECK:
return value
if value != self.initial["version"]:
if value != self.initial['version']:
raise forms.ValidationError(
_("The settings have been modified by someone else. Please reload the form and resubmit your changes.")
_(
'The settings have been modified '
'by someone else. Please reload the '
'form and resubmit your changes.'
)
)
return value
@ -162,7 +166,7 @@ class ConstanceForm(forms.Form):
missing_keys, extra_keys = get_inconsistent_fieldnames()
if missing_keys or extra_keys:
raise forms.ValidationError(
_("CONSTANCE_CONFIG_FIELDSETS is missing field(s) that exists in CONSTANCE_CONFIG.")
_('CONSTANCE_CONFIG_FIELDSETS is missing field(s) that exists in CONSTANCE_CONFIG.')
)
return cleaned_data

View file

@ -26,36 +26,36 @@ def _set_constance_value(key, value):
class Command(BaseCommand):
help = _("Get/Set In-database config settings handled by Constance")
help = _('Get/Set In-database config settings handled by Constance')
GET = "get"
SET = "set"
LIST = "list"
REMOVE_STALE_KEYS = "remove_stale_keys"
GET = 'get'
SET = 'set'
LIST = 'list'
REMOVE_STALE_KEYS = 'remove_stale_keys'
def add_arguments(self, parser):
subparsers = parser.add_subparsers(dest="command")
subparsers.add_parser(self.LIST, help="list all Constance keys and their values")
subparsers = parser.add_subparsers(dest='command')
subparsers.add_parser(self.LIST, help='list all Constance keys and their values')
parser_get = subparsers.add_parser(self.GET, help="get the value of a Constance key")
parser_get.add_argument("key", help="name of the key to get", metavar="KEY")
parser_get = subparsers.add_parser(self.GET, help='get the value of a Constance key')
parser_get.add_argument('key', help='name of the key to get', metavar='KEY')
parser_set = subparsers.add_parser(self.SET, help="set the value of a Constance key")
parser_set.add_argument("key", help="name of the key to set", metavar="KEY")
parser_set = subparsers.add_parser(self.SET, help='set the value of a Constance key')
parser_set.add_argument('key', help='name of the key to set', metavar='KEY')
# use nargs='+' so that we pass a list to MultiValueField (eg SplitDateTimeField)
parser_set.add_argument("value", help="value to set", metavar="VALUE", nargs="+")
parser_set.add_argument('value', help='value to set', metavar='VALUE', nargs='+')
subparsers.add_parser(
self.REMOVE_STALE_KEYS,
help="delete all Constance keys and their values if they are not in settings.CONSTANCE_CONFIG (stale keys)",
help='delete all Constance keys and their values if they are not in settings.CONSTANCE_CONFIG (stale keys)',
)
def handle(self, command, key=None, value=None, *args, **options):
if command == self.GET:
try:
self.stdout.write(str(getattr(config, key)), ending="\n")
self.stdout.write(str(getattr(config, key)), ending='\n')
except AttributeError as e:
raise CommandError(f"{key} is not defined in settings.CONSTANCE_CONFIG") from e
raise CommandError(f'{key} is not defined in settings.CONSTANCE_CONFIG') from e
elif command == self.SET:
try:
if len(value) == 1:
@ -63,22 +63,21 @@ class Command(BaseCommand):
value = value[0]
_set_constance_value(key, value)
except KeyError as e:
raise CommandError(f"{key} is not defined in settings.CONSTANCE_CONFIG") from e
raise CommandError(f'{key} is not defined in settings.CONSTANCE_CONFIG') from e
except ValidationError as e:
raise CommandError(", ".join(e)) from e
raise CommandError(', '.join(e)) from e
elif command == self.LIST:
for k, v in get_values().items():
self.stdout.write(f"{k}\t{v}", ending="\n")
self.stdout.write(f'{k}\t{v}', ending='\n')
elif command == self.REMOVE_STALE_KEYS:
prefix = getattr(settings, "CONSTANCE_DATABASE_PREFIX", "")
actual_keys = [f"{prefix}{key}" for key in settings.CONSTANCE_CONFIG]
actual_keys = settings.CONSTANCE_CONFIG.keys()
stale_records = Constance.objects.exclude(key__in=actual_keys)
if stale_records:
self.stdout.write("The following record will be deleted:", ending="\n")
self.stdout.write('The following record will be deleted:', ending='\n')
else:
self.stdout.write("There are no stale records in the database.", ending="\n")
self.stdout.write('There are no stale records in the database.', ending='\n')
for stale_record in stale_records:
self.stdout.write(f"{stale_record.key}\t{stale_record.value}", ending="\n")
self.stdout.write(f'{stale_record.key}\t{stale_record.value}', ending='\n')
stale_records.delete()
else:
raise CommandError("Invalid command")
raise CommandError('Invalid command')

View file

@ -9,16 +9,16 @@ class Migration(migrations.Migration):
operations = [
migrations.CreateModel(
name="Constance",
name='Constance',
fields=[
("id", models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name="ID")),
("key", models.CharField(max_length=255, unique=True)),
("value", models.TextField(blank=True, editable=False, null=True)),
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('key', models.CharField(max_length=255, unique=True)),
('value', models.TextField(blank=True, editable=False, null=True)),
],
options={
"verbose_name": "constance",
"verbose_name_plural": "constances",
"permissions": [("change_config", "Can change config"), ("view_config", "Can view config")],
'verbose_name': 'constance',
'verbose_name_plural': 'constances',
'permissions': [('change_config', 'Can change config'), ('view_config', 'Can view config')],
},
),
]

View file

@ -12,19 +12,19 @@ def _migrate_from_old_table(apps, schema_editor) -> None:
On new installations just ignore error that table does not exist.
"""
connection = schema_editor.connection
quoted_string = ", ".join([connection.ops.quote_name(item) for item in ["id", "key", "value"]])
old_table_name = "constance_config"
quoted_string = ', '.join([connection.ops.quote_name(item) for item in ['id', 'key', 'value']])
old_table_name = 'constance_config'
with connection.cursor() as cursor:
if old_table_name not in connection.introspection.table_names():
logger.info("Old table does not exist, skipping")
logger.info('Old table does not exist, skipping')
return
cursor.execute(
f"INSERT INTO constance_constance ( {quoted_string} ) SELECT {quoted_string} FROM {old_table_name}", # noqa: S608
f'INSERT INTO constance_constance ( {quoted_string} ) SELECT {quoted_string} FROM {old_table_name}', # noqa: S608
[],
)
cursor.execute(f"DROP TABLE {old_table_name}", [])
cursor.execute(f'DROP TABLE {old_table_name}', [])
Constance = apps.get_model("constance", "Constance")
Constance = apps.get_model('constance', 'Constance')
sequence_sql = connection.ops.sequence_reset_sql(no_style(), [Constance])
with connection.cursor() as cursor:
for sql in sequence_sql:
@ -32,7 +32,7 @@ def _migrate_from_old_table(apps, schema_editor) -> None:
class Migration(migrations.Migration):
dependencies = [("constance", "0001_initial")]
dependencies = [('constance', '0001_initial')]
atomic = False

View file

@ -15,7 +15,7 @@ logger = logging.getLogger(__name__)
def is_already_migrated(value):
try:
data = json.loads(value)
if isinstance(data, dict) and set(data.keys()) == {"__type__", "__value__"}:
if isinstance(data, dict) and set(data.keys()) == {'__type__', '__value__'}:
return True
except (json.JSONDecodeError, TypeError, UnicodeDecodeError):
return False
@ -23,22 +23,19 @@ def is_already_migrated(value):
def import_module_attr(path):
package, module = path.rsplit(".", 1)
package, module = path.rsplit('.', 1)
return getattr(import_module(package), module)
def migrate_pickled_data(apps, schema_editor) -> None: # pragma: no cover
Constance = apps.get_model("constance", "Constance")
Constance = apps.get_model('constance', 'Constance')
for constance in Constance.objects.exclude(value=None):
if not is_already_migrated(constance.value):
constance.value = dumps(pickle.loads(b64decode(constance.value.encode()))) # noqa: S301
constance.save(update_fields=["value"])
constance.save(update_fields=['value'])
if settings.BACKEND in (
"constance.backends.redisd.RedisBackend",
"constance.backends.redisd.CachingRedisBackend",
):
if settings.BACKEND in ('constance.backends.redisd.RedisBackend', 'constance.backends.redisd.CachingRedisBackend'):
import redis
_prefix = settings.REDIS_PREFIX
@ -52,7 +49,7 @@ def migrate_pickled_data(apps, schema_editor) -> None: # pragma: no cover
_rd = redis.Redis(**settings.REDIS_CONNECTION)
redis_migrated_data = {}
for key in settings.CONFIG:
prefixed_key = f"{_prefix}{key}"
prefixed_key = f'{_prefix}{key}'
value = _rd.get(prefixed_key)
if value is not None and not is_already_migrated(value):
redis_migrated_data[prefixed_key] = dumps(pickle.loads(value)) # noqa: S301
@ -61,7 +58,7 @@ def migrate_pickled_data(apps, schema_editor) -> None: # pragma: no cover
class Migration(migrations.Migration):
dependencies = [("constance", "0002_migrate_from_old_table")]
dependencies = [('constance', '0002_migrate_from_old_table')]
operations = [
migrations.RunPython(migrate_pickled_data),

View file

@ -7,11 +7,11 @@ class Constance(models.Model):
value = models.TextField(null=True, blank=True, editable=False)
class Meta:
verbose_name = _("constance")
verbose_name_plural = _("constances")
verbose_name = _('constance')
verbose_name_plural = _('constances')
permissions = [
("change_config", "Can change config"),
("view_config", "Can view config"),
('change_config', 'Can change config'),
('view_config', 'Can view config'),
]
def __str__(self):

View file

@ -1,31 +1,29 @@
from django.conf import settings
BACKEND = getattr(settings, "CONSTANCE_BACKEND", "constance.backends.redisd.RedisBackend")
BACKEND = getattr(settings, 'CONSTANCE_BACKEND', 'constance.backends.redisd.RedisBackend')
CONFIG = getattr(settings, "CONSTANCE_CONFIG", {})
CONFIG = getattr(settings, 'CONSTANCE_CONFIG', {})
CONFIG_FIELDSETS = getattr(settings, "CONSTANCE_CONFIG_FIELDSETS", {})
CONFIG_FIELDSETS = getattr(settings, 'CONSTANCE_CONFIG_FIELDSETS', {})
ADDITIONAL_FIELDS = getattr(settings, "CONSTANCE_ADDITIONAL_FIELDS", {})
ADDITIONAL_FIELDS = getattr(settings, 'CONSTANCE_ADDITIONAL_FIELDS', {})
FILE_ROOT = getattr(settings, "CONSTANCE_FILE_ROOT", "")
FILE_ROOT = getattr(settings, 'CONSTANCE_FILE_ROOT', '')
DATABASE_CACHE_BACKEND = getattr(settings, "CONSTANCE_DATABASE_CACHE_BACKEND", None)
DATABASE_CACHE_BACKEND = getattr(settings, 'CONSTANCE_DATABASE_CACHE_BACKEND', None)
DATABASE_CACHE_AUTOFILL_TIMEOUT = getattr(settings, "CONSTANCE_DATABASE_CACHE_AUTOFILL_TIMEOUT", 60 * 60 * 24)
DATABASE_CACHE_AUTOFILL_TIMEOUT = getattr(settings, 'CONSTANCE_DATABASE_CACHE_AUTOFILL_TIMEOUT', 60 * 60 * 24)
DATABASE_PREFIX = getattr(settings, "CONSTANCE_DATABASE_PREFIX", "")
DATABASE_PREFIX = getattr(settings, 'CONSTANCE_DATABASE_PREFIX', '')
REDIS_PREFIX = getattr(settings, "CONSTANCE_REDIS_PREFIX", "constance:")
REDIS_PREFIX = getattr(settings, 'CONSTANCE_REDIS_PREFIX', 'constance:')
REDIS_CACHE_TIMEOUT = getattr(settings, "CONSTANCE_REDIS_CACHE_TIMEOUT", 60)
REDIS_CACHE_TIMEOUT = getattr(settings, 'CONSTANCE_REDIS_CACHE_TIMEOUT', 60)
REDIS_CONNECTION_CLASS = getattr(settings, "CONSTANCE_REDIS_CONNECTION_CLASS", None)
REDIS_CONNECTION_CLASS = getattr(settings, 'CONSTANCE_REDIS_CONNECTION_CLASS', None)
REDIS_ASYNC_CONNECTION_CLASS = getattr(settings, "CONSTANCE_REDIS_ASYNC_CONNECTION_CLASS", None)
REDIS_CONNECTION = getattr(settings, 'CONSTANCE_REDIS_CONNECTION', {})
REDIS_CONNECTION = getattr(settings, "CONSTANCE_REDIS_CONNECTION", {})
SUPERUSER_ONLY = getattr(settings, 'CONSTANCE_SUPERUSER_ONLY', True)
SUPERUSER_ONLY = getattr(settings, "CONSTANCE_SUPERUSER_ONLY", True)
IGNORE_ADMIN_VERSION_CHECK = getattr(settings, "CONSTANCE_IGNORE_ADMIN_VERSION_CHECK", False)
IGNORE_ADMIN_VERSION_CHECK = getattr(settings, 'CONSTANCE_IGNORE_ADMIN_VERSION_CHECK', False)

View file

@ -12,17 +12,6 @@
if (fieldType === 'checkbox') {
field.prop('checked', this.dataset.default === 'true');
} else if (fieldType === 'multi-select') {
const defaults = JSON.parse(this.dataset.default);
const stringDefaults = defaults.map(function(v) { return String(v); });
// CheckboxSelectMultiple: individual checkboxes inside a wrapper
field.find('input[type="checkbox"]').each(function() {
$(this).prop('checked', stringDefaults.indexOf($(this).val()) !== -1);
});
// SelectMultiple: <select multiple> element
field.find('option').each(function() {
$(this).prop('selected', stringDefaults.indexOf($(this).val()) !== -1);
});
} else if (fieldType === 'date') {
const defaultDate = new Date(this.dataset.default * 1000);
$('#' + this.dataset.fieldId).val(defaultDate.strftime(get_format('DATE_INPUT_FORMATS')[0]));

View file

@ -49,13 +49,10 @@
{% if fieldsets %}
{% for fieldset in fieldsets %}
<fieldset class="module{% if fieldset.collapse %} collapse{% endif %}">
{% if django_version >= "5.1" and fieldset.collapse %}<details><summary>{% endif %}
<h2 class="fieldset-heading">{{ fieldset.title }}</h2>
{% if django_version >= "5.1" and fieldset.collapse %}</summary>{% endif %}
<h2>{{ fieldset.title }}</h2>
{% with config_values=fieldset.config_values %}
{% include "admin/constance/includes/results_list.html" %}
{% endwith %}
{% if django_version >= "5.1" and fieldset.collapse %}</details>{% endif %}
</fieldset>
{% endfor %}
{% else %}

View file

@ -31,14 +31,12 @@
data-field-id="{{ item.form_field.auto_id }}"
data-field-type="{% spaceless %}
{% if item.is_checkbox %}checkbox
{% elif item.is_multi_select %}multi-select
{% elif item.is_datetime %}datetime
{% elif item.is_date %}date
{% endif %}
{% endspaceless %}"
data-default="{% spaceless %}
{% if item.is_checkbox %}{% if item.raw_default %} true {% else %} false {% endif %}
{% elif item.is_multi_select %}{{ item.json_default }}
{% elif item.is_date %}{{ item.raw_default|date:"U" }}
{% elif item.is_datetime %}{{ item.raw_default|date:"U" }}
{% else %}{{ item.default }}

View file

@ -1,3 +1,3 @@
from .unittest import override_config # pragma: no cover
__all__ = ["override_config"]
__all__ = ['override_config']

View file

@ -14,16 +14,16 @@ from constance import config as constance_config
@pytest.hookimpl(trylast=True)
def pytest_configure(config): # pragma: no cover
"""Register override_config marker."""
config.addinivalue_line("markers", ("override_config(**kwargs): mark test to override django-constance config"))
config.addinivalue_line('markers', ('override_config(**kwargs): mark test to override django-constance config'))
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_call(item): # pragma: no cover
"""Validate constance override marker params. Run test with overridden config."""
marker = item.get_closest_marker("override_config")
marker = item.get_closest_marker('override_config')
if marker is not None:
if marker.args:
pytest.fail("Constance override can not not accept positional args")
pytest.fail('Constance override can not not accept positional args')
with override_config(**marker.kwargs):
yield
else:
@ -59,7 +59,7 @@ class override_config(ContextDecorator):
self.disable()
@pytest.fixture(name="override_config")
@pytest.fixture(name='override_config')
def _override_config():
"""Make override_config available as a function fixture."""
return override_config

View file

@ -1,12 +1,11 @@
from functools import wraps
from django import VERSION as DJANGO_VERSION
from django.test import SimpleTestCase
from django.test.utils import override_settings
from constance import config
__all__ = ("override_config",)
__all__ = ('override_config',)
class override_config(override_settings):
@ -24,7 +23,7 @@ class override_config(override_settings):
"""Modify the decorated function to override config values."""
if isinstance(test_func, type):
if not issubclass(test_func, SimpleTestCase):
raise Exception("Only subclasses of Django SimpleTestCase can be decorated with override_config")
raise Exception('Only subclasses of Django SimpleTestCase can be decorated with override_config')
return self.modify_test_case(test_func)
@wraps(test_func)
@ -45,20 +44,9 @@ class override_config(override_settings):
original_pre_setup = test_case._pre_setup
original_post_teardown = test_case._post_teardown
if DJANGO_VERSION < (5, 2):
def _pre_setup(inner_self):
self.enable()
original_pre_setup(inner_self)
else:
@classmethod
def _pre_setup(cls):
# NOTE: Django 5.2 turned this as a classmethod
# https://github.com/django/django/pull/18514/files
self.enable()
original_pre_setup()
def _pre_setup(inner_self):
self.enable()
original_pre_setup(inner_self)
def _post_teardown(inner_self):
original_post_teardown(inner_self)

View file

@ -7,7 +7,7 @@ config = LazyConfig()
def import_module_attr(path):
package, module = path.rsplit(".", 1)
package, module = path.rsplit('.', 1)
return getattr(import_module(package), module)
@ -19,17 +19,7 @@ def get_values():
# First load a mapping between config name and default value
default_initial = ((name, options[0]) for name, options in settings.CONFIG.items())
# Then update the mapping with actually values from the backend
return dict(default_initial, **config._backend.mget(settings.CONFIG))
async def aget_values():
"""
Get dictionary of values from the backend asynchronously
:return:
"""
default_initial = {name: options[0] for name, options in settings.CONFIG.items()}
backend_values = await config.amget(settings.CONFIG.keys())
return dict(default_initial, **backend_values)
return dict(default_initial, **dict(config._backend.mget(settings.CONFIG)))
def get_values_for_keys(keys):
@ -41,7 +31,7 @@ def get_values_for_keys(keys):
:raises AttributeError: If any key is not found in the configuration.
"""
if not isinstance(keys, (list, tuple, set)):
raise TypeError("keys must be a list, tuple, or set of strings")
raise TypeError('keys must be a list, tuple, or set of strings')
# Prepare default initial mapping
default_initial = {name: options[0] for name, options in settings.CONFIG.items() if name in keys}
@ -52,25 +42,4 @@ def get_values_for_keys(keys):
raise AttributeError(f'"{", ".join(missing_keys)}" keys not found in configuration.')
# Merge default values and backend values, prioritizing backend values
return dict(default_initial, **config._backend.mget(keys))
async def aget_values_for_keys(keys):
"""
Retrieve values for specified keys from the backend asynchronously.
:param keys: List of keys to retrieve.
:return: Dictionary with values for the specified keys.
:raises AttributeError: If any key is not found in the configuration.
"""
if not isinstance(keys, (list, tuple, set)):
raise TypeError("keys must be a list, tuple, or set of strings")
default_initial = {name: options[0] for name, options in settings.CONFIG.items() if name in keys}
missing_keys = [key for key in keys if key not in default_initial]
if missing_keys:
raise AttributeError(f'"{", ".join(missing_keys)}" keys not found in configuration.')
backend_values = await config.amget(keys)
return dict(default_initial, **backend_values)
return dict(default_initial, **dict(config._backend.mget(keys)))

View file

@ -16,7 +16,7 @@ v4.0.0 (2024/08/21)
* Migrate JavaScript to ES2015
* Fix documentation build
* Add linters and formatters (using `ruff`)
* Add support for Django 5.1 and 5.2
* Prepare for Django 5.1 support
* Migrate from `setup.py` to `pyproject.toml`
* Bump `tox`
* Declare support for Python 3.12

View file

@ -4,73 +4,62 @@
# https://www.sphinx-doc.org/en/master/usage/configuration.html
import os
import re
import sys
from datetime import datetime
def get_version():
# Try to get version from installed package metadata
try:
from importlib.metadata import PackageNotFoundError
from importlib.metadata import version
return version("django-constance")
except (ImportError, PackageNotFoundError):
pass
# Fall back to setuptools_scm generated version file
try:
from constance._version import __version__
return __version__
except ImportError:
pass
return "0.0.0"
with open('../pyproject.toml') as f:
for line in f:
match = re.match(r'version = "(.*)"', line)
if match:
return match.group(1)
return '0.0.0'
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "tests.settings")
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'tests.settings')
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
sys.path.insert(0, os.path.abspath("extensions"))
sys.path.insert(0, os.path.abspath(".."))
sys.path.insert(0, os.path.abspath('extensions'))
sys.path.insert(0, os.path.abspath('..'))
# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
project = "django-constance"
project_copyright = datetime.now().year.__str__() + ", Jazzband"
project = 'django-constance'
project_copyright = datetime.now().year.__str__() + ', Jazzband'
# The full version, including alpha/beta/rc tags
release = get_version()
# The short X.Y version
version = ".".join(release.split(".")[:3])
version = '.'.join(release.split('.')[:3])
# -- General configuration ------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
extensions = [
"sphinx.ext.intersphinx",
"sphinx.ext.todo",
"sphinx_search.extension",
"settings",
'sphinx.ext.intersphinx',
'sphinx.ext.todo',
'sphinx_search.extension',
'settings',
]
templates_path = ["_templates"]
source_suffix = ".rst"
root_doc = "index"
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
pygments_style = "sphinx"
html_last_updated_fmt = ""
templates_path = ['_templates']
source_suffix = '.rst'
root_doc = 'index'
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
pygments_style = 'sphinx'
html_last_updated_fmt = ''
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
html_theme = "sphinx_rtd_theme"
html_static_path = ["_static"]
htmlhelp_basename = "django-constancedoc"
html_theme = 'sphinx_rtd_theme'
html_static_path = ['_static']
htmlhelp_basename = 'django-constancedoc'
# -- Options for LaTeX output ---------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-latex-output
@ -78,31 +67,31 @@ htmlhelp_basename = "django-constancedoc"
latex_elements = {}
latex_documents = [
("index", "django-constance.tex", "django-constance Documentation", "Jazzband", "manual"),
('index', 'django-constance.tex', 'django-constance Documentation', 'Jazzband', 'manual'),
]
# -- Options for manual page output ---------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-manual-page-output
man_pages = [("index", "django-constance", "django-constance Documentation", ["Jazzband"], 1)]
man_pages = [('index', 'django-constance', 'django-constance Documentation', ['Jazzband'], 1)]
# -- Options for Texinfo output -------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-texinfo-output
texinfo_documents = [
(
"index",
"django-constance",
"django-constance Documentation",
"Jazzband",
"django-constance",
"One line description of project.",
"Miscellaneous",
'index',
'django-constance',
'django-constance Documentation',
'Jazzband',
'django-constance',
'One line description of project.',
'Miscellaneous',
),
]
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {
"python": ("https://docs.python.org/3", None),
"django": ("https://docs.djangoproject.com/en/dev/", "https://docs.djangoproject.com/en/dev/_objects/"),
'python': ('https://docs.python.org/3', None),
'django': ('https://docs.djangoproject.com/en/dev/', 'https://docs.djangoproject.com/en/dev/_objects/'),
}

View file

@ -1,6 +1,6 @@
def setup(app):
app.add_crossref_type(
directivename="setting",
rolename="setting",
indextemplate="pair: %s; setting",
directivename='setting',
rolename='setting',
indextemplate='pair: %s; setting',
)

View file

@ -89,7 +89,7 @@ Custom fields
You can set the field type with the third value in the ``CONSTANCE_CONFIG`` tuple.
The value can be one of the supported types or a string matching a key in your :setting:`CONSTANCE_ADDITIONAL_FIELDS`
The value can be one of the supported types or a string matching a key in your :setting:``CONSTANCE_ADDITIONAL_FIELDS``
The supported types are:
@ -105,10 +105,6 @@ The supported types are:
* ``list``
* ``dict``
.. note::
To be able to use ``list`` and ``dict`` you need to set a widget and form field for these types as it is ambiguous what types shall be stored in the collection object.
You can do so with :setting:`CONSTANCE_ADDITIONAL_FIELDS` as explained below.
For example, to force a value to be handled as a string:
.. code-block:: python
@ -116,7 +112,7 @@ For example, to force a value to be handled as a string:
'THE_ANSWER': (42, 'Answer to the Ultimate Question of Life, '
'The Universe, and Everything', str),
Custom field types are supported using the dictionary :setting:`CONSTANCE_ADDITIONAL_FIELDS`.
Custom field types are supported using the dictionary :setting:``CONSTANCE_ADDITIONAL_FIELDS``.
This is a mapping between a field label and a sequence (list or tuple). The first item in the sequence is the string
path of a field class, and the (optional) second item is a dictionary used to configure the field.
@ -169,21 +165,6 @@ Images and files are uploaded to ``MEDIA_ROOT`` by default. You can specify a su
This will result in files being placed in ``media/constance`` within your ``BASE_DIR``. You can use deeper nesting in this setting (e.g. ``constance/images``) but other relative path components (e.g. ``../``) will be rejected.
In case you want to store a list of ``int`` values in the constance config, a working setup is
.. code-block:: python
CONSTANCE_ADDITIONAL_FIELDS = {
list: ["django.forms.fields.JSONField", {"widget": "django.forms.Textarea"}],
}
CONSTANCE_CONFIG = {
'KEY': ([0, 10, 20], 'A list of integers', list),
}
Make sure to use the ``JSONField`` for this purpose as user input in the admin page may be understood and saved as ``str`` otherwise.
Ordered Fields in Django Admin
------------------------------
@ -292,36 +273,6 @@ object and accessing the variables with attribute lookups::
if config.THE_ANSWER == 42:
answer_the_question()
Asynchronous usage
^^^^^^^^^^^^^^^^^^
If you are using Django's asynchronous features (like async views), you can ``await`` the settings directly on the standard ``config`` object::
from constance import config
async def my_async_view(request):
# Accessing settings is awaitable
if await config.THE_ANSWER == 42:
return await answer_the_question_async()
async def update_settings():
# Updating settings asynchronously
await config.aset('THE_ANSWER', 43)
# Bulk retrieval is supported as well
values = await config.amget(['THE_ANSWER', 'SITE_NAME'])
Performance and Safety
~~~~~~~~~~~~~~~~~~~~~~
While synchronous access (e.g., ``config.THE_ANSWER``) still works inside async views for some backends, it is highly discouraged:
* **Blocking:** Synchronous access blocks the event loop, reducing the performance of your entire application.
* **Safety Guards:** For the Database backend, Django's safety guards will raise a ``SynchronousOnlyOperation`` error if you attempt to access a setting synchronously from an async thread.
* **Automatic Detection:** Constance will emit a ``RuntimeWarning`` if it detects synchronous access inside an asynchronous event loop, helping you identify and fix these performance bottlenecks.
For peak performance, especially with the Redis backend, always use the ``await`` syntax which leverages native asynchronous drivers.
Django templates
^^^^^^^^^^^^^^^^

View file

@ -7,10 +7,10 @@ class Migration(migrations.Migration):
operations = [
migrations.CreateModel(
name="Brand",
name='Brand',
fields=[
("id", models.AutoField(verbose_name="ID", serialize=False, auto_created=True, primary_key=True)),
("name", models.CharField(max_length=75)),
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('name', models.CharField(max_length=75)),
],
),
]

View file

@ -7,23 +7,23 @@ class Migration(migrations.Migration):
operations = [
migrations.CreateModel(
name="Shelf",
name='Shelf',
fields=[
("id", models.AutoField(verbose_name="ID", serialize=False, auto_created=True, primary_key=True)),
("name", models.CharField(max_length=75)),
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('name', models.CharField(max_length=75)),
],
options={
"verbose_name_plural": "shelves",
'verbose_name_plural': 'shelves',
},
),
migrations.CreateModel(
name="Supply",
name='Supply',
fields=[
("id", models.AutoField(verbose_name="ID", serialize=False, auto_created=True, primary_key=True)),
("name", models.CharField(max_length=75)),
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('name', models.CharField(max_length=75)),
],
options={
"verbose_name_plural": "supplies",
'verbose_name_plural': 'supplies',
},
),
]

View file

@ -5,11 +5,11 @@ class Shelf(models.Model):
name = models.CharField(max_length=75)
class Meta:
verbose_name_plural = "shelves"
verbose_name_plural = 'shelves'
class Supply(models.Model):
name = models.CharField(max_length=75)
class Meta:
verbose_name_plural = "supplies"
verbose_name_plural = 'supplies'

View file

@ -13,7 +13,7 @@ class JsonField(fields.CharField):
def widget_attrs(self, widget: widgets.Widget):
attrs = super().widget_attrs(widget)
attrs["rows"] = self.rows
attrs['rows'] = self.rows
return attrs
def to_python(self, value):

View file

@ -21,7 +21,7 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SITE_ID = 1
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = "hdx64#m+lnc_0ffoyehbk&7gk1&*9uar$pcfcm-%$km#p0$k=6"
SECRET_KEY = 'hdx64#m+lnc_0ffoyehbk&7gk1&*9uar$pcfcm-%$km#p0$k=6'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
@ -32,122 +32,122 @@ ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = (
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.sites",
"django.contrib.messages",
"django.contrib.staticfiles",
"cheeseshop.apps.catalog",
"cheeseshop.apps.storage",
"constance",
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
'django.contrib.messages',
'django.contrib.staticfiles',
'cheeseshop.apps.catalog',
'cheeseshop.apps.storage',
'constance',
)
MIDDLEWARE = (
"django.middleware.common.CommonMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
)
ROOT_URLCONF = "cheeseshop.urls"
ROOT_URLCONF = 'cheeseshop.urls'
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = "cheeseshop.wsgi.application"
WSGI_APPLICATION = 'cheeseshop.wsgi.application'
# Database
# https://docs.djangoproject.com/en/4.1/ref/settings/#databases
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": "/tmp/cheeseshop.db",
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': '/tmp/cheeseshop.db',
}
}
CONSTANCE_REDIS_CONNECTION = {
"host": "localhost",
"port": 6379,
"db": 0,
'host': 'localhost',
'port': 6379,
'db': 0,
}
CONSTANCE_ADDITIONAL_FIELDS = {
"yes_no_null_select": [
"django.forms.fields.ChoiceField",
{"widget": "django.forms.Select", "choices": ((None, "-----"), ("yes", "Yes"), ("no", "No"))},
'yes_no_null_select': [
'django.forms.fields.ChoiceField',
{'widget': 'django.forms.Select', 'choices': ((None, '-----'), ('yes', 'Yes'), ('no', 'No'))},
],
"email": ("django.forms.fields.EmailField",),
"json_field": ["cheeseshop.fields.JsonField"],
"image_field": ["django.forms.ImageField", {}],
'email': ('django.forms.fields.EmailField',),
'json_field': ['cheeseshop.fields.JsonField'],
'image_field': ['django.forms.ImageField', {}],
}
CONSTANCE_CONFIG = {
"BANNER": ("The National Cheese Emporium", "name of the shop"),
"OWNER": ("Mr. Henry Wensleydale", "owner of the shop"),
"OWNER_EMAIL": ("henry@example.com", "contact email for owner", "email"),
"MUSICIANS": (4, "number of musicians inside the shop"),
"DATE_ESTABLISHED": (date(1972, 11, 30), "the shop's first opening"),
"MY_SELECT_KEY": ("yes", "select yes or no", "yes_no_null_select"),
"MULTILINE": ("Line one\nLine two", "multiline string"),
"JSON_DATA": (
{"a": 1_000, "b": "test", "max": 30_000_000},
"Some test data for json",
"json_field",
'BANNER': ('The National Cheese Emporium', 'name of the shop'),
'OWNER': ('Mr. Henry Wensleydale', 'owner of the shop'),
'OWNER_EMAIL': ('henry@example.com', 'contact email for owner', 'email'),
'MUSICIANS': (4, 'number of musicians inside the shop'),
'DATE_ESTABLISHED': (date(1972, 11, 30), "the shop's first opening"),
'MY_SELECT_KEY': ('yes', 'select yes or no', 'yes_no_null_select'),
'MULTILINE': ('Line one\nLine two', 'multiline string'),
'JSON_DATA': (
{'a': 1_000, 'b': 'test', 'max': 30_000_000},
'Some test data for json',
'json_field',
),
"LOGO": (
"",
"Logo image file",
"image_field",
'LOGO': (
'',
'Logo image file',
'image_field',
),
}
CONSTANCE_CONFIG_FIELDSETS = {
"Cheese shop general info": [
"BANNER",
"OWNER",
"OWNER_EMAIL",
"MUSICIANS",
"DATE_ESTABLISHED",
"LOGO",
'Cheese shop general info': [
'BANNER',
'OWNER',
'OWNER_EMAIL',
'MUSICIANS',
'DATE_ESTABLISHED',
'LOGO',
],
"Awkward test settings": ["MY_SELECT_KEY", "MULTILINE", "JSON_DATA"],
'Awkward test settings': ['MY_SELECT_KEY', 'MULTILINE', 'JSON_DATA'],
}
CONSTANCE_BACKEND = "constance.backends.database.DatabaseBackend"
CONSTANCE_BACKEND = 'constance.backends.database.DatabaseBackend'
CACHES = {
"default": {
"BACKEND": "django.core.cache.backends.memcached.PyMemcacheCache",
"LOCATION": "127.0.0.1:11211",
'default': {
'BACKEND': 'django.core.cache.backends.memcached.PyMemcacheCache',
'LOCATION': '127.0.0.1:11211',
}
}
CONSTANCE_DATABASE_CACHE_BACKEND = "default"
CONSTANCE_DATABASE_CACHE_BACKEND = 'default'
# Internationalization
# https://docs.djangoproject.com/en/4.1/topics/i18n/
LANGUAGE_CODE = "en-us"
LANGUAGE_CODE = 'en-us'
TIME_ZONE = "America/Chicago"
TIME_ZONE = 'America/Chicago'
USE_I18N = True
@ -159,12 +159,12 @@ USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/4.1/howto/static-files/
STATIC_URL = "/static/"
STATIC_URL = '/static/'
MEDIA_URL = "/media/"
MEDIA_URL = '/media/'
MEDIA_ROOT = os.path.join(BASE_DIR, "media")
MEDIA_ROOT = os.path.join(BASE_DIR, 'media')
CONSTANCE_FILE_ROOT = "constance"
CONSTANCE_FILE_ROOT = 'constance'
DEFAULT_AUTO_FIELD = "django.db.models.AutoField"
DEFAULT_AUTO_FIELD = 'django.db.models.AutoField'

View file

@ -6,7 +6,7 @@ from django.urls import re_path
admin.autodiscover()
urlpatterns = [
re_path("admin/", admin.site.urls),
re_path('admin/', admin.site.urls),
]
if settings.DEBUG:

View file

@ -2,6 +2,6 @@ import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cheeseshop.settings")
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cheeseshop.settings')
application = get_wsgi_application()

View file

@ -2,8 +2,8 @@
import os
import sys
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cheeseshop.settings")
if __name__ == '__main__':
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'cheeseshop.settings')
from django.core.management import execute_from_command_line

View file

@ -19,8 +19,6 @@ classifiers = [
"Framework :: Django",
"Framework :: Django :: 4.2",
"Framework :: Django :: 5.0",
"Framework :: Django :: 5.1",
"Framework :: Django :: 5.2",
"Intended Audience :: Developers",
"License :: OSI Approved :: BSD License",
"Natural Language :: English",
@ -52,9 +50,6 @@ documentation = "https://django-constance.readthedocs.io/en/latest/"
repository = "https://github.com/jazzband/django-constance/"
changelog = "https://github.com/jazzband/django-constance/releases/"
[tool.setuptools]
license-files = [] # see https://github.com/pypa/twine/issues/1216#issuecomment-2609745412
[tool.setuptools.packages.find]
include = ["constance*"]
@ -66,6 +61,7 @@ line-length = 120
indent-width = 4
[tool.ruff.format]
quote-style = "single"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"

View file

@ -1,15 +1,13 @@
from django.test import TestCase
from django.test import TransactionTestCase
from constance import settings
from constance.base import Config
from tests.storage import StorageTestsMixin
class TestDatabase(StorageTestsMixin, TestCase):
def setUp(self):
self.old_backend = settings.BACKEND
settings.BACKEND = "constance.backends.database.DatabaseBackend"
settings.BACKEND = 'constance.backends.database.DatabaseBackend'
super().setUp()
def test_database_queries(self):
@ -27,122 +25,3 @@ class TestDatabase(StorageTestsMixin, TestCase):
def tearDown(self):
settings.BACKEND = self.old_backend
class TestDatabaseWithCache(StorageTestsMixin, TestCase):
def setUp(self):
self.old_backend = settings.BACKEND
settings.BACKEND = "constance.backends.database.DatabaseBackend"
self.old_cache_backend = settings.DATABASE_CACHE_BACKEND
settings.DATABASE_CACHE_BACKEND = "default"
super().setUp()
self.config._backend._cache.clear()
def test_database_queries(self):
# Read and set to default value
with self.assertNumQueries(6):
self.assertEqual(self.config.INT_VALUE, 1)
# Read again
with self.assertNumQueries(0):
self.assertEqual(self.config.INT_VALUE, 1)
# Set value
with self.assertNumQueries(3):
self.config.INT_VALUE = 15
def tearDown(self):
settings.BACKEND = self.old_backend
settings.DATABASE_CACHE_BACKEND = self.old_cache_backend
class TestDatabaseAsync(TransactionTestCase):
def setUp(self):
self.old_backend = settings.BACKEND
settings.BACKEND = "constance.backends.database.DatabaseBackend"
self.config = Config()
def tearDown(self):
settings.BACKEND = self.old_backend
async def test_aget_returns_none_for_missing_key(self):
result = await self.config._backend.aget("INT_VALUE")
self.assertIsNone(result)
async def test_aget_returns_value(self):
await self.config._backend.aset("INT_VALUE", 42)
result = await self.config._backend.aget("INT_VALUE")
self.assertEqual(result, 42)
async def test_aset_stores_value(self):
await self.config._backend.aset("INT_VALUE", 99)
result = await self.config._backend.aget("INT_VALUE")
self.assertEqual(result, 99)
async def test_amget_returns_empty_for_no_keys(self):
result = await self.config._backend.amget([])
self.assertEqual(result, {})
async def test_amget_returns_values(self):
await self.config._backend.aset("INT_VALUE", 10)
await self.config._backend.aset("BOOL_VALUE", value=True)
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
self.assertEqual(result, {"INT_VALUE": 10, "BOOL_VALUE": True})
class TestDatabaseWithCacheAsync(TransactionTestCase):
def setUp(self):
self.old_backend = settings.BACKEND
settings.BACKEND = "constance.backends.database.DatabaseBackend"
self.old_cache_backend = settings.DATABASE_CACHE_BACKEND
settings.DATABASE_CACHE_BACKEND = "default"
self.config = Config()
self.config._backend._cache.clear()
def tearDown(self):
settings.BACKEND = self.old_backend
settings.DATABASE_CACHE_BACKEND = self.old_cache_backend
async def test_aget_returns_none_for_missing_key(self):
result = await self.config._backend.aget("INT_VALUE")
self.assertIsNone(result)
async def test_aget_returns_value_from_cache(self):
# First set a value using async
await self.config._backend.aset("INT_VALUE", 42)
# Clear cache and re-fetch to test aget path
self.config._backend._cache.clear()
result = await self.config._backend.aget("INT_VALUE")
self.assertEqual(result, 42)
async def test_aget_populates_cache(self):
await self.config._backend.aset("INT_VALUE", 42)
self.config._backend._cache.clear()
# aget should populate the cache
result = await self.config._backend.aget("INT_VALUE")
self.assertEqual(result, 42)
async def test_aset_stores_value(self):
await self.config._backend.aset("INT_VALUE", 99)
result = await self.config._backend.aget("INT_VALUE")
self.assertEqual(result, 99)
async def test_amget_returns_empty_for_no_keys(self):
result = await self.config._backend.amget([])
self.assertEqual(result, {})
async def test_amget_returns_values(self):
await self.config._backend.aset("INT_VALUE", 10)
await self.config._backend.aset("BOOL_VALUE", value=True)
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
self.assertEqual(result, {"INT_VALUE": 10, "BOOL_VALUE": True})
async def test_amget_uses_cache(self):
# Set values using async and ensure they're cached
await self.config._backend.aset("INT_VALUE", 10)
await self.config._backend.aset("BOOL_VALUE", value=True)
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
self.assertEqual(result["INT_VALUE"], 10)
self.assertEqual(result["BOOL_VALUE"], True)

View file

@ -1,58 +1,16 @@
from django.test import TestCase
from django.test import TransactionTestCase
from constance import settings
from constance.base import Config
from tests.storage import StorageTestsMixin
class TestMemory(StorageTestsMixin, TestCase):
def setUp(self):
self.old_backend = settings.BACKEND
settings.BACKEND = "constance.backends.memory.MemoryBackend"
settings.BACKEND = 'constance.backends.memory.MemoryBackend'
super().setUp()
self.config._backend._storage = {}
def tearDown(self):
self.config._backend._storage = {}
settings.BACKEND = self.old_backend
def test_mget_empty_keys(self):
result = self.config._backend.mget([])
self.assertEqual(result, {})
class TestMemoryAsync(TransactionTestCase):
def setUp(self):
self.old_backend = settings.BACKEND
settings.BACKEND = "constance.backends.memory.MemoryBackend"
self.config = Config()
self.config._backend._storage = {}
def tearDown(self):
self.config._backend._storage = {}
settings.BACKEND = self.old_backend
async def test_aget_returns_none_for_missing_key(self):
result = await self.config._backend.aget("INT_VALUE")
self.assertIsNone(result)
async def test_aget_returns_value(self):
self.config._backend.set("INT_VALUE", 42)
result = await self.config._backend.aget("INT_VALUE")
self.assertEqual(result, 42)
async def test_aset_stores_value(self):
await self.config._backend.aset("INT_VALUE", 99)
result = self.config._backend.get("INT_VALUE")
self.assertEqual(result, 99)
async def test_amget_returns_empty_for_no_keys(self):
result = await self.config._backend.amget([])
self.assertEqual(result, {})
async def test_amget_returns_values(self):
self.config._backend.set("INT_VALUE", 10)
self.config._backend.set("BOOL_VALUE", value=True)
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
self.assertEqual(result, {"INT_VALUE": 10, "BOOL_VALUE": True})

View file

@ -1,17 +1,11 @@
from unittest import mock
from django.core.exceptions import ImproperlyConfigured
from django.test import TestCase
from django.test import TransactionTestCase
from constance import settings
from constance.backends.redisd import RedisBackend
from constance.base import Config
from tests.storage import StorageTestsMixin
class TestRedis(StorageTestsMixin, TestCase):
_BACKEND = "constance.backends.redisd.RedisBackend"
_BACKEND = 'constance.backends.redisd.RedisBackend'
def setUp(self):
self.old_backend = settings.BACKEND
@ -23,239 +17,6 @@ class TestRedis(StorageTestsMixin, TestCase):
self.config._backend._rd.clear()
settings.BACKEND = self.old_backend
def test_mget_empty_keys(self):
result = self.config._backend.mget([])
self.assertEqual(result, {})
class TestCachingRedis(TestRedis):
_BACKEND = "constance.backends.redisd.CachingRedisBackend"
def test_mget_empty_keys(self):
result = self.config._backend.mget([])
self.assertEqual(result, {})
class TestRedisAsync(TransactionTestCase):
_BACKEND = "constance.backends.redisd.RedisBackend"
def setUp(self):
self.old_backend = settings.BACKEND
settings.BACKEND = self._BACKEND
self.config = Config()
self.config._backend._rd.clear()
def tearDown(self):
self.config._backend._rd.clear()
settings.BACKEND = self.old_backend
async def test_aget_returns_none_for_missing_key(self):
result = await self.config._backend.aget("INT_VALUE")
self.assertIsNone(result)
async def test_aget_returns_value(self):
self.config._backend.set("INT_VALUE", 42)
result = await self.config._backend.aget("INT_VALUE")
self.assertEqual(result, 42)
async def test_aset_stores_value(self):
await self.config._backend.aset("INT_VALUE", 99)
result = self.config._backend.get("INT_VALUE")
self.assertEqual(result, 99)
async def test_amget_returns_empty_for_no_keys(self):
result = await self.config._backend.amget([])
self.assertEqual(result, {})
async def test_amget_returns_values(self):
self.config._backend.set("INT_VALUE", 10)
self.config._backend.set("BOOL_VALUE", value=True)
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
self.assertEqual(result, {"INT_VALUE": 10, "BOOL_VALUE": True})
async def test_amget_skips_missing_keys(self):
self.config._backend.set("INT_VALUE", 10)
result = await self.config._backend.amget(["INT_VALUE", "MISSING_KEY"])
self.assertEqual(result, {"INT_VALUE": 10})
class TestCachingRedisAsync(TransactionTestCase):
_BACKEND = "constance.backends.redisd.CachingRedisBackend"
def setUp(self):
self.old_backend = settings.BACKEND
settings.BACKEND = self._BACKEND
self.config = Config()
self.config._backend._rd.clear()
self.config._backend._cache.clear()
def tearDown(self):
self.config._backend._rd.clear()
self.config._backend._cache.clear()
settings.BACKEND = self.old_backend
async def test_aget_caches_value(self):
# First set a value via sync
self.config._backend.set("INT_VALUE", 42)
# Clear the in-memory cache
self.config._backend._cache.clear()
# Async get should fetch and cache
result = await self.config._backend.aget("INT_VALUE")
self.assertEqual(result, 42)
# Verify it's cached
self.assertIn("INT_VALUE", self.config._backend._cache)
async def test_aget_returns_cached_value(self):
# Manually set cache
from time import monotonic
timeout = self.config._backend._timeout
self.config._backend._cache["INT_VALUE"] = (monotonic() + timeout, 100)
result = await self.config._backend.aget("INT_VALUE")
self.assertEqual(result, 100)
async def test_aget_refreshes_expired_cache(self):
from time import monotonic
# Set expired cache
self.config._backend._cache["INT_VALUE"] = (monotonic() - 10, 100)
# Set different value in redis using proper codec format
self.config._backend._rd.set(
self.config._backend.add_prefix("INT_VALUE"),
b'{"__type__": "default", "__value__": 200}',
)
result = await self.config._backend.aget("INT_VALUE")
self.assertEqual(result, 200)
async def test_aset_updates_cache(self):
await self.config._backend.aset("INT_VALUE", 55)
# Verify cache is updated
self.assertIn("INT_VALUE", self.config._backend._cache)
self.assertEqual(self.config._backend._cache["INT_VALUE"][1], 55)
async def test_amget_returns_empty_for_no_keys(self):
result = await self.config._backend.amget([])
self.assertEqual(result, {})
async def test_amget_returns_cached_values(self):
from time import monotonic
timeout = self.config._backend._timeout
self.config._backend._cache["INT_VALUE"] = (monotonic() + timeout, 10)
self.config._backend._cache["BOOL_VALUE"] = (monotonic() + timeout, True)
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
self.assertEqual(result, {"INT_VALUE": 10, "BOOL_VALUE": True})
async def test_amget_fetches_missing_keys(self):
from time import monotonic
timeout = self.config._backend._timeout
# One key cached, one in Redis only
self.config._backend._cache["INT_VALUE"] = (monotonic() + timeout, 10)
self.config._backend._rd.set(
self.config._backend.add_prefix("BOOL_VALUE"),
b'{"__type__": "default", "__value__": true}',
)
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
self.assertEqual(result["INT_VALUE"], 10)
self.assertEqual(result["BOOL_VALUE"], True)
async def test_amget_refreshes_expired_keys(self):
from time import monotonic
# Set expired cache
self.config._backend._cache["INT_VALUE"] = (monotonic() - 10, 100)
# Set different value in redis using proper codec format
self.config._backend._rd.set(
self.config._backend.add_prefix("INT_VALUE"),
b'{"__type__": "default", "__value__": 200}',
)
result = await self.config._backend.amget(["INT_VALUE"])
self.assertEqual(result["INT_VALUE"], 200)
class TestRedisBackendInit(TestCase):
"""Tests for RedisBackend.__init__ client initialization paths."""
def setUp(self):
self.old_conn_cls = settings.REDIS_CONNECTION_CLASS
self.old_async_conn_cls = settings.REDIS_ASYNC_CONNECTION_CLASS
self.old_conn = settings.REDIS_CONNECTION
def tearDown(self):
settings.REDIS_CONNECTION_CLASS = self.old_conn_cls
settings.REDIS_ASYNC_CONNECTION_CLASS = self.old_async_conn_cls
settings.REDIS_CONNECTION = self.old_conn
def test_no_redis_package_raises_improperly_configured(self):
settings.REDIS_CONNECTION_CLASS = None
settings.REDIS_ASYNC_CONNECTION_CLASS = "tests.redis_mockup.AsyncConnection"
with mock.patch.dict("sys.modules", {"redis": None}), self.assertRaises(ImproperlyConfigured):
RedisBackend()
def test_sync_redis_from_url_with_string_connection(self):
settings.REDIS_CONNECTION_CLASS = None
settings.REDIS_ASYNC_CONNECTION_CLASS = "tests.redis_mockup.AsyncConnection"
settings.REDIS_CONNECTION = "redis://localhost:6379/0"
mock_redis = mock.MagicMock()
with mock.patch.dict("sys.modules", {"redis": mock_redis, "redis.asyncio": mock_redis.asyncio}):
backend = RedisBackend()
mock_redis.from_url.assert_called_once_with("redis://localhost:6379/0")
self.assertEqual(backend._rd, mock_redis.from_url.return_value)
def test_sync_redis_with_dict_connection(self):
settings.REDIS_CONNECTION_CLASS = None
settings.REDIS_ASYNC_CONNECTION_CLASS = "tests.redis_mockup.AsyncConnection"
settings.REDIS_CONNECTION = {"host": "localhost", "port": 6379}
mock_redis = mock.MagicMock()
with mock.patch.dict("sys.modules", {"redis": mock_redis, "redis.asyncio": mock_redis.asyncio}):
backend = RedisBackend()
mock_redis.Redis.assert_called_once_with(host="localhost", port=6379)
self.assertEqual(backend._rd, mock_redis.Redis.return_value)
def test_async_redis_not_available_sets_ard_none(self):
settings.REDIS_CONNECTION_CLASS = "tests.redis_mockup.Connection"
settings.REDIS_ASYNC_CONNECTION_CLASS = None
mock_redis = mock.MagicMock()
# Simulate redis.asyncio not being available
with mock.patch.dict("sys.modules", {"redis": mock_redis, "redis.asyncio": None}):
backend = RedisBackend()
self.assertIsNone(backend._ard)
def test_async_redis_from_url_with_string_connection(self):
settings.REDIS_CONNECTION_CLASS = "tests.redis_mockup.Connection"
settings.REDIS_ASYNC_CONNECTION_CLASS = None
settings.REDIS_CONNECTION = "redis://localhost:6379/0"
mock_aredis = mock.MagicMock()
mock_redis = mock.MagicMock()
mock_redis.asyncio = mock_aredis
with mock.patch.dict("sys.modules", {"redis": mock_redis, "redis.asyncio": mock_aredis}):
backend = RedisBackend()
mock_aredis.from_url.assert_called_once_with("redis://localhost:6379/0")
self.assertEqual(backend._ard, mock_aredis.from_url.return_value)
def test_async_redis_with_dict_connection(self):
settings.REDIS_CONNECTION_CLASS = "tests.redis_mockup.Connection"
settings.REDIS_ASYNC_CONNECTION_CLASS = None
settings.REDIS_CONNECTION = {"host": "localhost", "port": 6379}
mock_aredis = mock.MagicMock()
mock_redis = mock.MagicMock()
mock_redis.asyncio = mock_aredis
with mock.patch.dict("sys.modules", {"redis": mock_redis, "redis.asyncio": mock_aredis}):
backend = RedisBackend()
mock_aredis.Redis.assert_called_once_with(host="localhost", port=6379)
self.assertEqual(backend._ard, mock_aredis.Redis.return_value)
def test_check_async_support_raises_when_ard_is_none(self):
backend = RedisBackend()
backend._ard = None
with self.assertRaises(ImproperlyConfigured):
backend._check_async_support()
_BACKEND = 'constance.backends.redisd.CachingRedisBackend'

View file

@ -1,25 +0,0 @@
from django.core.cache.backends.base import BaseCache
from django.core.cache.backends.locmem import LocMemCache
class Cache(BaseCache):
def __init__(self, name, params):
self._cache = LocMemCache(name, params)
self.add = self._cache.add
self.delete = self._cache.delete
self.set = self._cache.set
self.get = self._cache.get
self.clear = self._cache.clear
self.set_many = self._cache.set_many
self.get_many = self._cache.get_many
self.delete_many = self._cache.delete_many
# Async methods for DatabaseBackend.aget() support
async def aget(self, key, default=None, version=None):
return self.get(key, default, version)
async def aget_many(self, keys, version=None):
return self.get_many(keys, version)
async def aadd(self, key, value, timeout=None, version=None):
return self.add(key, value, timeout, version)

View file

@ -1,28 +1,6 @@
# Shared storage so sync (Connection) and async (AsyncConnection) instances
# operate on the same underlying data, just like a real Redis server would.
_shared_store = {}
class Connection:
class Connection(dict):
def set(self, key, value):
_shared_store[key] = value
def get(self, key, default=None):
return _shared_store.get(key, default)
self[key] = value
def mget(self, keys):
return [_shared_store.get(key) for key in keys]
def clear(self):
_shared_store.clear()
class AsyncConnection:
async def set(self, key, value):
_shared_store[key] = value
async def get(self, key):
return _shared_store.get(key)
async def mget(self, keys):
return [_shared_store.get(key) for key in keys]
return [self.get(key) for key in keys]

View file

@ -4,115 +4,107 @@ from datetime import time
from datetime import timedelta
from decimal import Decimal
SECRET_KEY = "cheese"
SECRET_KEY = 'cheese'
MIDDLEWARE = (
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.auth.middleware.SessionAuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.auth.middleware.SessionAuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
)
DATABASE_ENGINE = "sqlite3"
DATABASE_ENGINE = 'sqlite3'
DATABASES = {
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": ":memory:",
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
},
"secondary": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": ":memory:",
'secondary': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
},
}
CACHES = {
"default": {
"BACKEND": "tests.cache_mockup.Cache",
"LOCATION": "locmem",
}
}
INSTALLED_APPS = (
"django.contrib.admin",
"django.contrib.staticfiles",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"constance",
"constance.backends.database",
'django.contrib.admin',
'django.contrib.staticfiles',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'constance',
'constance.backends.database',
)
ROOT_URLCONF = "tests.urls"
ROOT_URLCONF = 'tests.urls'
CONSTANCE_REDIS_CONNECTION_CLASS = "tests.redis_mockup.Connection"
CONSTANCE_REDIS_ASYNC_CONNECTION_CLASS = "tests.redis_mockup.AsyncConnection"
CONSTANCE_REDIS_CONNECTION_CLASS = 'tests.redis_mockup.Connection'
CONSTANCE_ADDITIONAL_FIELDS = {
"yes_no_null_select": [
"django.forms.fields.ChoiceField",
{"widget": "django.forms.Select", "choices": ((None, "-----"), ("yes", "Yes"), ("no", "No"))},
'yes_no_null_select': [
'django.forms.fields.ChoiceField',
{'widget': 'django.forms.Select', 'choices': ((None, '-----'), ('yes', 'Yes'), ('no', 'No'))},
],
# note this intentionally uses a tuple so that we can test immutable
"email": ("django.forms.fields.EmailField",),
"array": ["django.forms.fields.CharField", {"widget": "django.forms.Textarea"}],
"json": ["django.forms.fields.CharField", {"widget": "django.forms.Textarea"}],
'email': ('django.forms.fields.EmailField',),
'array': ['django.forms.fields.CharField', {'widget': 'django.forms.Textarea'}],
'json': ['django.forms.fields.CharField', {'widget': 'django.forms.Textarea'}],
}
USE_TZ = True
CONSTANCE_CONFIG = {
"INT_VALUE": (1, "some int"),
"BOOL_VALUE": (True, "true or false"),
"STRING_VALUE": ("Hello world", "greetings"),
"DECIMAL_VALUE": (Decimal("0.1"), "the first release version"),
"DATETIME_VALUE": (datetime(2010, 8, 23, 11, 29, 24), "time of the first commit"),
"FLOAT_VALUE": (3.1415926536, "PI"),
"DATE_VALUE": (date(2010, 12, 24), "Merry Chrismas"),
"TIME_VALUE": (time(23, 59, 59), "And happy New Year"),
"TIMEDELTA_VALUE": (timedelta(days=1, hours=2, minutes=3), "Interval"),
"CHOICE_VALUE": ("yes", "select yes or no", "yes_no_null_select"),
"LINEBREAK_VALUE": ("Spam spam", "eggs\neggs"),
"EMAIL_VALUE": ("test@example.com", "An email", "email"),
"LIST_VALUE": ([1, "1", date(2019, 1, 1)], "A list", "array"),
"JSON_VALUE": (
'INT_VALUE': (1, 'some int'),
'BOOL_VALUE': (True, 'true or false'),
'STRING_VALUE': ('Hello world', 'greetings'),
'DECIMAL_VALUE': (Decimal('0.1'), 'the first release version'),
'DATETIME_VALUE': (datetime(2010, 8, 23, 11, 29, 24), 'time of the first commit'),
'FLOAT_VALUE': (3.1415926536, 'PI'),
'DATE_VALUE': (date(2010, 12, 24), 'Merry Chrismas'),
'TIME_VALUE': (time(23, 59, 59), 'And happy New Year'),
'TIMEDELTA_VALUE': (timedelta(days=1, hours=2, minutes=3), 'Interval'),
'CHOICE_VALUE': ('yes', 'select yes or no', 'yes_no_null_select'),
'LINEBREAK_VALUE': ('Spam spam', 'eggs\neggs'),
'EMAIL_VALUE': ('test@example.com', 'An email', 'email'),
'LIST_VALUE': ([1, '1', date(2019, 1, 1)], 'A list', 'array'),
'JSON_VALUE': (
{
"key": "value",
"key2": 2,
"key3": [1, 2, 3],
"key4": {"key": "value"},
"key5": date(2019, 1, 1),
"key6": None,
'key': 'value',
'key2': 2,
'key3': [1, 2, 3],
'key4': {'key': 'value'},
'key5': date(2019, 1, 1),
'key6': None,
},
"A JSON object",
"json",
'A JSON object',
'json',
),
}
DEBUG = True
STATIC_ROOT = "./static/"
STATIC_ROOT = './static/'
STATIC_URL = "/static/"
STATIC_URL = '/static/'
TEMPLATES = [
{
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.i18n",
"django.template.context_processors.request",
"django.template.context_processors.static",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
"constance.context_processors.config",
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.i18n',
'django.template.context_processors.request',
'django.template.context_processors.static',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
'constance.context_processors.config',
],
},
},

View file

@ -16,60 +16,60 @@ class StorageTestsMixin:
def test_store(self):
self.assertEqual(self.config.INT_VALUE, 1)
self.assertEqual(self.config.BOOL_VALUE, True)
self.assertEqual(self.config.STRING_VALUE, "Hello world")
self.assertEqual(self.config.DECIMAL_VALUE, Decimal("0.1"))
self.assertEqual(self.config.STRING_VALUE, 'Hello world')
self.assertEqual(self.config.DECIMAL_VALUE, Decimal('0.1'))
self.assertEqual(self.config.DATETIME_VALUE, datetime(2010, 8, 23, 11, 29, 24))
self.assertEqual(self.config.FLOAT_VALUE, 3.1415926536)
self.assertEqual(self.config.DATE_VALUE, date(2010, 12, 24))
self.assertEqual(self.config.TIME_VALUE, time(23, 59, 59))
self.assertEqual(self.config.TIMEDELTA_VALUE, timedelta(days=1, hours=2, minutes=3))
self.assertEqual(self.config.CHOICE_VALUE, "yes")
self.assertEqual(self.config.EMAIL_VALUE, "test@example.com")
self.assertEqual(self.config.LIST_VALUE, [1, "1", date(2019, 1, 1)])
self.assertEqual(self.config.CHOICE_VALUE, 'yes')
self.assertEqual(self.config.EMAIL_VALUE, 'test@example.com')
self.assertEqual(self.config.LIST_VALUE, [1, '1', date(2019, 1, 1)])
self.assertEqual(
self.config.JSON_VALUE,
{
"key": "value",
"key2": 2,
"key3": [1, 2, 3],
"key4": {"key": "value"},
"key5": date(2019, 1, 1),
"key6": None,
'key': 'value',
'key2': 2,
'key3': [1, 2, 3],
'key4': {'key': 'value'},
'key5': date(2019, 1, 1),
'key6': None,
},
)
# set values
self.config.INT_VALUE = 100
self.config.BOOL_VALUE = False
self.config.STRING_VALUE = "Beware the weeping angel"
self.config.DECIMAL_VALUE = Decimal("1.2")
self.config.STRING_VALUE = 'Beware the weeping angel'
self.config.DECIMAL_VALUE = Decimal('1.2')
self.config.DATETIME_VALUE = datetime(1977, 10, 2)
self.config.FLOAT_VALUE = 2.718281845905
self.config.DATE_VALUE = date(2001, 12, 20)
self.config.TIME_VALUE = time(1, 59, 0)
self.config.TIMEDELTA_VALUE = timedelta(days=2, hours=3, minutes=4)
self.config.CHOICE_VALUE = "no"
self.config.EMAIL_VALUE = "foo@bar.com"
self.config.CHOICE_VALUE = 'no'
self.config.EMAIL_VALUE = 'foo@bar.com'
self.config.LIST_VALUE = [1, date(2020, 2, 2)]
self.config.JSON_VALUE = {"key": "OK"}
self.config.JSON_VALUE = {'key': 'OK'}
# read again
self.assertEqual(self.config.INT_VALUE, 100)
self.assertEqual(self.config.BOOL_VALUE, False)
self.assertEqual(self.config.STRING_VALUE, "Beware the weeping angel")
self.assertEqual(self.config.DECIMAL_VALUE, Decimal("1.2"))
self.assertEqual(self.config.STRING_VALUE, 'Beware the weeping angel')
self.assertEqual(self.config.DECIMAL_VALUE, Decimal('1.2'))
self.assertEqual(self.config.DATETIME_VALUE, datetime(1977, 10, 2))
self.assertEqual(self.config.FLOAT_VALUE, 2.718281845905)
self.assertEqual(self.config.DATE_VALUE, date(2001, 12, 20))
self.assertEqual(self.config.TIME_VALUE, time(1, 59, 0))
self.assertEqual(self.config.TIMEDELTA_VALUE, timedelta(days=2, hours=3, minutes=4))
self.assertEqual(self.config.CHOICE_VALUE, "no")
self.assertEqual(self.config.EMAIL_VALUE, "foo@bar.com")
self.assertEqual(self.config.CHOICE_VALUE, 'no')
self.assertEqual(self.config.EMAIL_VALUE, 'foo@bar.com')
self.assertEqual(self.config.LIST_VALUE, [1, date(2020, 2, 2)])
self.assertEqual(self.config.JSON_VALUE, {"key": "OK"})
self.assertEqual(self.config.JSON_VALUE, {'key': 'OK'})
def test_nonexistent(self):
self.assertRaises(AttributeError, getattr, self.config, "NON_EXISTENT")
self.assertRaises(AttributeError, getattr, self.config, 'NON_EXISTENT')
with self.assertRaises(AttributeError):
self.config.NON_EXISTENT = 1
@ -77,15 +77,15 @@ class StorageTestsMixin:
def test_missing_values(self):
# set some values and leave out others
self.config.BOOL_VALUE = False
self.config.DECIMAL_VALUE = Decimal("1.2")
self.config.DECIMAL_VALUE = Decimal('1.2')
self.config.DATETIME_VALUE = datetime(1977, 10, 2)
self.config.DATE_VALUE = date(2001, 12, 20)
self.config.TIME_VALUE = time(1, 59, 0)
self.assertEqual(self.config.INT_VALUE, 1) # this should be the default value
self.assertEqual(self.config.BOOL_VALUE, False)
self.assertEqual(self.config.STRING_VALUE, "Hello world") # this should be the default value
self.assertEqual(self.config.DECIMAL_VALUE, Decimal("1.2"))
self.assertEqual(self.config.STRING_VALUE, 'Hello world') # this should be the default value
self.assertEqual(self.config.DECIMAL_VALUE, Decimal('1.2'))
self.assertEqual(self.config.DATETIME_VALUE, datetime(1977, 10, 2))
self.assertEqual(self.config.FLOAT_VALUE, 3.1415926536) # this should be the default value
self.assertEqual(self.config.DATE_VALUE, date(2001, 12, 20))
@ -96,13 +96,13 @@ class StorageTestsMixin:
# Check corner cases such as falsy values
self.config.INT_VALUE = 0
self.config.BOOL_VALUE = False
self.config.STRING_VALUE = ""
self.config.STRING_VALUE = ''
values = self.config._backend.mget(settings.CONFIG)
self.assertEqual(values["INT_VALUE"], 0)
self.assertEqual(values["BOOL_VALUE"], False)
self.assertEqual(values["STRING_VALUE"], "")
values = dict(self.config._backend.mget(settings.CONFIG))
self.assertEqual(values['INT_VALUE'], 0)
self.assertEqual(values['BOOL_VALUE'], False)
self.assertEqual(values['STRING_VALUE'], '')
def test_backend_does_not_return_none_values(self):
result = self.config._backend.mget(settings.CONFIG)
result = dict(self.config._backend.mget(settings.CONFIG))
self.assertEqual(result, {})

View file

@ -23,196 +23,196 @@ class TestAdmin(TestCase):
def setUp(self):
super().setUp()
self.rf = RequestFactory()
self.superuser = User.objects.create_superuser("admin", "nimda", "a@a.cz")
self.normaluser = User.objects.create_user("normal", "nimda", "b@b.cz")
self.superuser = User.objects.create_superuser('admin', 'nimda', 'a@a.cz')
self.normaluser = User.objects.create_user('normal', 'nimda', 'b@b.cz')
self.normaluser.is_staff = True
self.normaluser.save()
self.options = admin.site._registry[self.model]
def test_changelist(self):
self.client.login(username="admin", password="nimda")
request = self.rf.get("/admin/constance/config/")
self.client.login(username='admin', password='nimda')
request = self.rf.get('/admin/constance/config/')
request.user = self.superuser
response = self.options.changelist_view(request, {})
self.assertEqual(response.status_code, 200)
def test_custom_auth(self):
settings.SUPERUSER_ONLY = False
self.client.login(username="normal", password="nimda")
request = self.rf.get("/admin/constance/config/")
self.client.login(username='normal', password='nimda')
request = self.rf.get('/admin/constance/config/')
request.user = self.normaluser
self.assertRaises(PermissionDenied, self.options.changelist_view, request, {})
self.assertFalse(request.user.has_perm("constance.change_config"))
self.assertFalse(request.user.has_perm('constance.change_config'))
# reload user to reset permission cache
request = self.rf.get("/admin/constance/config/")
request = self.rf.get('/admin/constance/config/')
request.user = User.objects.get(pk=self.normaluser.pk)
request.user.user_permissions.add(Permission.objects.get(codename="change_config"))
self.assertTrue(request.user.has_perm("constance.change_config"))
request.user.user_permissions.add(Permission.objects.get(codename='change_config'))
self.assertTrue(request.user.has_perm('constance.change_config'))
response = self.options.changelist_view(request, {})
self.assertEqual(response.status_code, 200)
def test_linebreaks(self):
self.client.login(username="admin", password="nimda")
request = self.rf.get("/admin/constance/config/")
self.client.login(username='admin', password='nimda')
request = self.rf.get('/admin/constance/config/')
request.user = self.superuser
response = self.options.changelist_view(request, {})
self.assertContains(response, "LINEBREAK_VALUE")
self.assertContains(response, linebreaksbr("eggs\neggs"))
self.assertContains(response, 'LINEBREAK_VALUE')
self.assertContains(response, linebreaksbr('eggs\neggs'))
@mock.patch(
"constance.settings.CONFIG_FIELDSETS",
'constance.settings.CONFIG_FIELDSETS',
{
"Numbers": ("INT_VALUE",),
"Text": ("STRING_VALUE",),
'Numbers': ('INT_VALUE',),
'Text': ('STRING_VALUE',),
},
)
def test_fieldset_headers(self):
self.client.login(username="admin", password="nimda")
request = self.rf.get("/admin/constance/config/")
self.client.login(username='admin', password='nimda')
request = self.rf.get('/admin/constance/config/')
request.user = self.superuser
response = self.options.changelist_view(request, {})
self.assertContains(response, "Numbers</h2>")
self.assertContains(response, "Text</h2>")
self.assertContains(response, '<h2>Numbers</h2>')
self.assertContains(response, '<h2>Text</h2>')
@mock.patch(
"constance.settings.CONFIG_FIELDSETS",
'constance.settings.CONFIG_FIELDSETS',
(
("Numbers", ("INT_VALUE",)),
("Text", ("STRING_VALUE",)),
('Numbers', ('INT_VALUE',)),
('Text', ('STRING_VALUE',)),
),
)
def test_fieldset_tuple(self):
self.client.login(username="admin", password="nimda")
request = self.rf.get("/admin/constance/config/")
self.client.login(username='admin', password='nimda')
request = self.rf.get('/admin/constance/config/')
request.user = self.superuser
response = self.options.changelist_view(request, {})
self.assertContains(response, "Numbers</h2>")
self.assertContains(response, "Text</h2>")
self.assertContains(response, '<h2>Numbers</h2>')
self.assertContains(response, '<h2>Text</h2>')
@mock.patch(
"constance.settings.CONFIG_FIELDSETS",
'constance.settings.CONFIG_FIELDSETS',
{
"Numbers": {
"fields": (
"INT_VALUE",
"DECIMAL_VALUE",
'Numbers': {
'fields': (
'INT_VALUE',
'DECIMAL_VALUE',
),
"collapse": True,
'collapse': True,
},
"Text": {
"fields": (
"STRING_VALUE",
"LINEBREAK_VALUE",
'Text': {
'fields': (
'STRING_VALUE',
'LINEBREAK_VALUE',
),
"collapse": True,
'collapse': True,
},
},
)
def test_collapsed_fieldsets(self):
self.client.login(username="admin", password="nimda")
request = self.rf.get("/admin/constance/config/")
self.client.login(username='admin', password='nimda')
request = self.rf.get('/admin/constance/config/')
request.user = self.superuser
response = self.options.changelist_view(request, {})
self.assertContains(response, "module collapse")
self.assertContains(response, 'module collapse')
@mock.patch("constance.settings.CONFIG_FIELDSETS", {"FieldSetOne": ("INT_VALUE",)})
@mock.patch('constance.settings.CONFIG_FIELDSETS', {'FieldSetOne': ('INT_VALUE',)})
@mock.patch(
"constance.settings.CONFIG",
'constance.settings.CONFIG',
{
"INT_VALUE": (1, "some int"),
'INT_VALUE': (1, 'some int'),
},
)
@mock.patch("constance.settings.IGNORE_ADMIN_VERSION_CHECK", True)
@mock.patch("constance.forms.ConstanceForm.save", lambda _: None)
@mock.patch("constance.forms.ConstanceForm.is_valid", lambda _: True)
@mock.patch('constance.settings.IGNORE_ADMIN_VERSION_CHECK', True)
@mock.patch('constance.forms.ConstanceForm.save', lambda _: None)
@mock.patch('constance.forms.ConstanceForm.is_valid', lambda _: True)
def test_submit(self):
"""
Test that submitting the admin page results in an http redirect when
everything is in order.
"""
initial_value = {"INT_VALUE": settings.CONFIG["INT_VALUE"][0]}
initial_value = {'INT_VALUE': settings.CONFIG['INT_VALUE'][0]}
self.client.login(username="admin", password="nimda")
self.client.login(username='admin', password='nimda')
request = self.rf.post(
"/admin/constance/config/",
'/admin/constance/config/',
data={
**initial_value,
"version": "123",
'version': '123',
},
)
request.user = self.superuser
request._dont_enforce_csrf_checks = True
with mock.patch("django.contrib.messages.add_message") as mock_message, mock.patch.object(
ConstanceForm, "__init__", **initial_value, return_value=None
with mock.patch('django.contrib.messages.add_message') as mock_message, mock.patch.object(
ConstanceForm, '__init__', **initial_value, return_value=None
) as mock_form:
response = self.options.changelist_view(request, {})
mock_form.assert_called_with(data=request.POST, files=request.FILES, initial=initial_value, request=request)
mock_message.assert_called_with(request, 25, _("Live settings updated successfully."))
mock_message.assert_called_with(request, 25, _('Live settings updated successfully.'))
self.assertIsInstance(response, HttpResponseRedirect)
@mock.patch("constance.settings.CONFIG_FIELDSETS", {"FieldSetOne": ("MULTILINE",)})
@mock.patch('constance.settings.CONFIG_FIELDSETS', {'FieldSetOne': ('MULTILINE',)})
@mock.patch(
"constance.settings.CONFIG",
'constance.settings.CONFIG',
{
"MULTILINE": ("Hello\nWorld", "multiline value"),
'MULTILINE': ('Hello\nWorld', 'multiline value'),
},
)
@mock.patch("constance.settings.IGNORE_ADMIN_VERSION_CHECK", True)
@mock.patch('constance.settings.IGNORE_ADMIN_VERSION_CHECK', True)
def test_newlines_normalization(self):
self.client.login(username="admin", password="nimda")
self.client.login(username='admin', password='nimda')
request = self.rf.post(
"/admin/constance/config/",
'/admin/constance/config/',
data={
"MULTILINE": "Hello\r\nWorld",
"version": "123",
'MULTILINE': 'Hello\r\nWorld',
'version': '123',
},
)
request.user = self.superuser
request._dont_enforce_csrf_checks = True
with mock.patch("django.contrib.messages.add_message"):
with mock.patch('django.contrib.messages.add_message'):
response = self.options.changelist_view(request, {})
self.assertIsInstance(response, HttpResponseRedirect)
self.assertEqual(get_values()["MULTILINE"], "Hello\nWorld")
self.assertEqual(get_values()['MULTILINE'], 'Hello\nWorld')
@mock.patch(
"constance.settings.CONFIG",
'constance.settings.CONFIG',
{
"DATETIME_VALUE": (datetime(2019, 8, 7, 18, 40, 0), "some naive datetime"),
'DATETIME_VALUE': (datetime(2019, 8, 7, 18, 40, 0), 'some naive datetime'),
},
)
@mock.patch("constance.settings.IGNORE_ADMIN_VERSION_CHECK", True)
@mock.patch("tests.redis_mockup.Connection.set", mock.MagicMock())
@mock.patch('constance.settings.IGNORE_ADMIN_VERSION_CHECK', True)
@mock.patch('tests.redis_mockup.Connection.set', mock.MagicMock())
def test_submit_aware_datetime(self):
"""
Test that submitting the admin page results in an http redirect when
everything is in order.
"""
request = self.rf.post(
"/admin/constance/config/",
'/admin/constance/config/',
data={
"DATETIME_VALUE_0": "2019-08-07",
"DATETIME_VALUE_1": "19:17:01",
"version": "123",
'DATETIME_VALUE_0': '2019-08-07',
'DATETIME_VALUE_1': '19:17:01',
'version': '123',
},
)
request.user = self.superuser
request._dont_enforce_csrf_checks = True
with mock.patch("django.contrib.messages.add_message"):
with mock.patch('django.contrib.messages.add_message'):
response = self.options.changelist_view(request, {})
self.assertIsInstance(response, HttpResponseRedirect)
@mock.patch(
"constance.settings.CONFIG_FIELDSETS",
'constance.settings.CONFIG_FIELDSETS',
{
"Numbers": ("INT_VALUE",),
"Text": ("STRING_VALUE",),
'Numbers': ('INT_VALUE',),
'Text': ('STRING_VALUE',),
},
)
def test_inconsistent_fieldset_submit(self):
@ -220,107 +220,51 @@ class TestAdmin(TestCase):
Test that the admin page warns users if the CONFIG_FIELDSETS setting
doesn't account for every field in CONFIG.
"""
self.client.login(username="admin", password="nimda")
request = self.rf.post("/admin/constance/config/", data=None)
self.client.login(username='admin', password='nimda')
request = self.rf.post('/admin/constance/config/', data=None)
request.user = self.superuser
request._dont_enforce_csrf_checks = True
with mock.patch("django.contrib.messages.add_message"):
with mock.patch('django.contrib.messages.add_message'):
response = self.options.changelist_view(request, {})
self.assertContains(response, "is missing field(s)")
self.assertContains(response, 'is missing field(s)')
@mock.patch(
"constance.settings.CONFIG_FIELDSETS",
'constance.settings.CONFIG_FIELDSETS',
{
"Fieldsets": (
"STRING_VALUE",
"INT_VALUE",
'Fieldsets': (
'STRING_VALUE',
'INT_VALUE',
),
},
)
def test_fieldset_ordering_1(self):
"""Ordering of inner list should be preserved."""
self.client.login(username="admin", password="nimda")
request = self.rf.get("/admin/constance/config/")
self.client.login(username='admin', password='nimda')
request = self.rf.get('/admin/constance/config/')
request.user = self.superuser
response = self.options.changelist_view(request, {})
response.render()
content_str = response.content.decode()
self.assertGreater(content_str.find("INT_VALUE"), content_str.find("STRING_VALUE"))
self.assertGreater(content_str.find('INT_VALUE'), content_str.find('STRING_VALUE'))
@mock.patch(
"constance.settings.CONFIG_FIELDSETS",
'constance.settings.CONFIG_FIELDSETS',
{
"Fieldsets": (
"INT_VALUE",
"STRING_VALUE",
'Fieldsets': (
'INT_VALUE',
'STRING_VALUE',
),
},
)
def test_fieldset_ordering_2(self):
"""Ordering of inner list should be preserved."""
self.client.login(username="admin", password="nimda")
request = self.rf.get("/admin/constance/config/")
self.client.login(username='admin', password='nimda')
request = self.rf.get('/admin/constance/config/')
request.user = self.superuser
response = self.options.changelist_view(request, {})
response.render()
content_str = response.content.decode()
self.assertGreater(content_str.find("STRING_VALUE"), content_str.find("INT_VALUE"))
@mock.patch(
"constance.settings.ADDITIONAL_FIELDS",
{
"language_select": [
"django.forms.fields.TypedMultipleChoiceField",
{
"widget": "django.forms.CheckboxSelectMultiple",
"choices": (("en", "English"), ("de", "German"), ("fr", "French")),
"coerce": str,
},
],
},
)
@mock.patch(
"constance.settings.CONFIG",
{
"LANGUAGES": (["en", "de"], "Supported languages", "language_select"),
},
)
def test_reset_to_default_multi_select(self):
"""
Test that multi-select config values render with data-field-type='multi-select'
and a JSON-encoded data-default attribute.
"""
# Re-parse additional fields so the mock is picked up by the form
from constance.forms import FIELDS
from constance.forms import parse_additional_fields
FIELDS.update(
parse_additional_fields(
{
"language_select": [
"django.forms.fields.TypedMultipleChoiceField",
{
"widget": "django.forms.CheckboxSelectMultiple",
"choices": (("en", "English"), ("de", "German"), ("fr", "French")),
"coerce": str,
},
]
}
)
)
try:
self.client.login(username="admin", password="nimda")
request = self.rf.get("/admin/constance/config/")
request.user = self.superuser
response = self.options.changelist_view(request, {})
response.render()
content = response.content.decode()
self.assertIn('data-field-type="multi-select"', content)
self.assertIn('data-default="[&quot;en&quot;, &quot;de&quot;]"', content)
finally:
# Clean up FIELDS to avoid leaking into other tests
FIELDS.pop("language_select", None)
self.assertGreater(content_str.find('STRING_VALUE'), content_str.find('INT_VALUE'))
def test_labels(self):
self.assertEqual(type(self.model._meta.label), str)

View file

@ -1,198 +0,0 @@
import warnings
from django.test import TransactionTestCase
from constance import config
from constance import utils
class AsyncTestCase(TransactionTestCase):
async def test_async_get(self):
# Accessing an attribute on config should be awaitable when in async context
val = await config.INT_VALUE
self.assertEqual(val, 1)
async def test_async_set(self):
await config.aset("INT_VALUE", 42)
val = await config.INT_VALUE
self.assertEqual(val, 42)
# Verify sync access also works (and emits warning)
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
sync_val = int(config.INT_VALUE)
self.assertEqual(sync_val, 42)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_amget(self):
values = await config.amget(["INT_VALUE", "BOOL_VALUE"])
self.assertEqual(values["INT_VALUE"], 1)
self.assertEqual(values["BOOL_VALUE"], True)
async def test_sync_math_in_async_loop(self):
# Accessing math should work but emit warning
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
res = config.INT_VALUE + 10
# Note: res will be 42 + 10 if test_async_set ran before, or 1 + 10 if not.
# TransactionTestCase should reset state, but let's be careful.
# config.INT_VALUE defaults to 1.
self.assertEqual(res, 11 if res < 50 else 52)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_utils_aget_values(self):
values = await utils.aget_values()
self.assertIn("INT_VALUE", values)
self.assertIn("BOOL_VALUE", values)
self.assertEqual(values["INT_VALUE"], 1)
async def test_utils_aget_values_for_keys(self):
values = await utils.aget_values_for_keys(["INT_VALUE"])
self.assertEqual(len(values), 1)
self.assertEqual(values["INT_VALUE"], 1)
async def test_bool_proxy(self):
# BOOL_VALUE is True by default
self.assertTrue(config.BOOL_VALUE)
async def test_int_proxy(self):
await config.aset("INT_VALUE", 1)
self.assertEqual(int(config.INT_VALUE), 1)
async def test_container_proxy(self):
# LIST_VALUE is [1, "1", date(2019, 1, 1)] by default
self.assertEqual(config.LIST_VALUE[0], 1)
self.assertEqual(len(config.LIST_VALUE), 3)
self.assertIn(1, config.LIST_VALUE)
self.assertEqual(next(iter(config.LIST_VALUE)), 1)
class AsyncValueProxyTestCase(TransactionTestCase):
"""Tests for AsyncValueProxy dunder methods in async context."""
async def test_str_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = str(config.STRING_VALUE)
self.assertEqual(result, "Hello world")
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_repr_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = repr(config.STRING_VALUE)
self.assertEqual(result, "'Hello world'")
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_float_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = float(config.FLOAT_VALUE)
self.assertAlmostEqual(result, 3.1415926536)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_eq_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = config.INT_VALUE == 1
self.assertTrue(result)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_ne_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = config.INT_VALUE != 2
self.assertTrue(result)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_lt_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = config.INT_VALUE < 10
self.assertTrue(result)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_le_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = config.INT_VALUE <= 1
self.assertTrue(result)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_gt_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = config.INT_VALUE > 0
self.assertTrue(result)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_ge_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = config.INT_VALUE >= 1
self.assertTrue(result)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_hash_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = hash(config.INT_VALUE)
self.assertEqual(result, hash(1))
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_sub_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = config.INT_VALUE - 1
self.assertEqual(result, 0)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_mul_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = config.INT_VALUE * 5
self.assertEqual(result, 5)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_truediv_proxy(self):
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
result = config.INT_VALUE / 1
self.assertEqual(result, 1.0)
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
async def test_aset_invalid_key(self):
with self.assertRaises(AttributeError):
await config.aset("INVALID_KEY", 42)
class AsyncUtilsTestCase(TransactionTestCase):
"""Tests for async utility functions."""
async def test_aget_values_for_keys_invalid_type(self):
with self.assertRaises(TypeError):
await utils.aget_values_for_keys("key1")
async def test_aget_values_for_keys_missing_key(self):
with self.assertRaises(AttributeError) as ctx:
await utils.aget_values_for_keys(["INVALID_KEY"])
self.assertIn("INVALID_KEY", str(ctx.exception))
async def test_aget_values_for_keys_empty(self):
result = await utils.aget_values_for_keys([])
self.assertEqual(result, {})
class ConfigBaseTestCase(TransactionTestCase):
"""Tests for Config class edge cases."""
def test_config_dir(self):
# Test __dir__ method
keys = dir(config)
self.assertIn("INT_VALUE", keys)
self.assertIn("BOOL_VALUE", keys)
def test_access_backend_attribute(self):
# Test accessing _backend attribute in sync context
backend = config._backend
self.assertIsNotNone(backend)

View file

@ -8,7 +8,7 @@ from constance.checks import get_inconsistent_fieldnames
class ChecksTestCase(TestCase):
@mock.patch("constance.settings.CONFIG_FIELDSETS", {"Set1": settings.CONFIG.keys()})
@mock.patch('constance.settings.CONFIG_FIELDSETS', {'Set1': settings.CONFIG.keys()})
def test_get_inconsistent_fieldnames_none(self):
"""
Test that get_inconsistent_fieldnames returns an empty data and no checks fail
@ -19,8 +19,8 @@ class ChecksTestCase(TestCase):
self.assertFalse(extra_keys)
@mock.patch(
"constance.settings.CONFIG_FIELDSETS",
{"Set1": list(settings.CONFIG.keys())[:-1]},
'constance.settings.CONFIG_FIELDSETS',
{'Set1': list(settings.CONFIG.keys())[:-1]},
)
def test_get_inconsistent_fieldnames_for_missing_keys(self):
"""
@ -33,8 +33,8 @@ class ChecksTestCase(TestCase):
self.assertEqual(1, len(check_fieldsets()))
@mock.patch(
"constance.settings.CONFIG_FIELDSETS",
{"Set1": [*settings.CONFIG.keys(), "FORGOTTEN_KEY"]},
'constance.settings.CONFIG_FIELDSETS',
{'Set1': [*settings.CONFIG.keys(), 'FORGOTTEN_KEY']},
)
def test_get_inconsistent_fieldnames_for_extra_keys(self):
"""
@ -46,7 +46,7 @@ class ChecksTestCase(TestCase):
self.assertTrue(extra_keys)
self.assertEqual(1, len(check_fieldsets()))
@mock.patch("constance.settings.CONFIG_FIELDSETS", {})
@mock.patch('constance.settings.CONFIG_FIELDSETS', {})
def test_check_fieldsets(self):
"""check_fieldsets should not output warning if CONFIG_FIELDSETS is not defined."""
del settings.CONFIG_FIELDSETS

View file

@ -7,7 +7,6 @@ from django.conf import settings
from django.core.management import CommandError
from django.core.management import call_command
from django.test import TransactionTestCase
from django.test import override_settings
from django.utils import timezone
from django.utils.encoding import smart_str
@ -21,10 +20,10 @@ class CliTestCase(TransactionTestCase):
def test_help(self):
with contextlib.suppress(SystemExit):
call_command("constance", "--help")
call_command('constance', '--help')
def test_list(self):
call_command("constance", "list", stdout=self.out)
call_command('constance', 'list', stdout=self.out)
self.assertEqual(
set(self.out.getvalue().splitlines()),
@ -52,16 +51,16 @@ class CliTestCase(TransactionTestCase):
)
def test_get(self):
call_command("constance", *(["get", "EMAIL_VALUE"]), stdout=self.out)
call_command('constance', *('get EMAIL_VALUE'.split()), stdout=self.out)
self.assertEqual(self.out.getvalue().strip(), "test@example.com")
self.assertEqual(self.out.getvalue().strip(), 'test@example.com')
def test_set(self):
call_command("constance", *(["set", "EMAIL_VALUE", "blah@example.com"]), stdout=self.out)
call_command('constance', *('set EMAIL_VALUE blah@example.com'.split()), stdout=self.out)
self.assertEqual(config.EMAIL_VALUE, "blah@example.com")
self.assertEqual(config.EMAIL_VALUE, 'blah@example.com')
call_command("constance", *("set", "DATETIME_VALUE", "2011-09-24", "12:30:25"), stdout=self.out)
call_command('constance', *('set', 'DATETIME_VALUE', '2011-09-24', '12:30:25'), stdout=self.out)
expected = datetime(2011, 9, 24, 12, 30, 25)
if settings.USE_TZ:
@ -71,70 +70,50 @@ class CliTestCase(TransactionTestCase):
def test_get_invalid_name(self):
self.assertRaisesMessage(
CommandError,
"NOT_A_REAL_CONFIG is not defined in settings.CONSTANCE_CONFIG",
'NOT_A_REAL_CONFIG is not defined in settings.CONSTANCE_CONFIG',
call_command,
"constance",
"get",
"NOT_A_REAL_CONFIG",
'constance',
'get',
'NOT_A_REAL_CONFIG',
)
def test_set_invalid_name(self):
self.assertRaisesMessage(
CommandError,
"NOT_A_REAL_CONFIG is not defined in settings.CONSTANCE_CONFIG",
'NOT_A_REAL_CONFIG is not defined in settings.CONSTANCE_CONFIG',
call_command,
"constance",
"set",
"NOT_A_REAL_CONFIG",
"foo",
'constance',
'set',
'NOT_A_REAL_CONFIG',
'foo',
)
def test_set_invalid_value(self):
self.assertRaisesMessage(
CommandError,
"Enter a valid email address.",
'Enter a valid email address.',
call_command,
"constance",
"set",
"EMAIL_VALUE",
"not a valid email",
'constance',
'set',
'EMAIL_VALUE',
'not a valid email',
)
def test_set_invalid_multi_value(self):
self.assertRaisesMessage(
CommandError,
"Enter a list of values.",
'Enter a list of values.',
call_command,
"constance",
"set",
"DATETIME_VALUE",
"2011-09-24 12:30:25",
'constance',
'set',
'DATETIME_VALUE',
'2011-09-24 12:30:25',
)
def test_delete_stale_records(self):
self._populate_database_with_default_values()
initial_count = Constance.objects.count()
Constance.objects.create(key="STALE_KEY", value=None)
call_command("constance", "remove_stale_keys", stdout=self.out)
Constance.objects.create(key='STALE_KEY', value=None)
call_command('constance', 'remove_stale_keys', stdout=self.out)
self.assertEqual(Constance.objects.count(), initial_count, msg=self.out)
@override_settings(
CONSTANCE_DATABASE_PREFIX="constance:",
)
def test_delete_stale_records_respects_prefix(self):
self._populate_database_with_default_values()
initial_count = Constance.objects.count()
call_command("constance", "remove_stale_keys", stdout=self.out)
self.assertEqual(Constance.objects.count(), initial_count, msg=self.out)
def _populate_database_with_default_values(self):
"""
Helper function to populate the database with default values defined
in settings since that's not done automatically at startup
"""
for key, (value, *_) in settings.CONSTANCE_CONFIG.items():
Constance.objects.create(key=f"{getattr(settings, 'CONSTANCE_DATABASE_PREFIX', '')}{key}", value=value)

View file

@ -16,16 +16,16 @@ class TestJSONSerialization(TestCase):
self.datetime = datetime(2023, 10, 5, 15, 30, 0)
self.date = date(2023, 10, 5)
self.time = time(15, 30, 0)
self.decimal = Decimal("10.5")
self.uuid = uuid.UUID("12345678123456781234567812345678")
self.string = "test"
self.decimal = Decimal('10.5')
self.uuid = uuid.UUID('12345678123456781234567812345678')
self.string = 'test'
self.integer = 42
self.float = 3.14
self.boolean = True
self.none = None
self.timedelta = timedelta(days=1, hours=2, minutes=3)
self.list = [1, 2, self.date]
self.dict = {"key": self.date, "key2": 1}
self.dict = {'key': self.date, 'key2': 1}
def test_serializes_and_deserializes_default_types(self):
self.assertEqual(dumps(self.datetime), '{"__type__": "datetime", "__value__": "2023-10-05T15:30:00"}')
@ -65,18 +65,18 @@ class TestJSONSerialization(TestCase):
self.assertEqual(t, loads(dumps(t)))
def test_invalid_deserialization(self):
with self.assertRaisesRegex(ValueError, "Expecting value"):
loads("THIS_IS_NOT_RIGHT")
with self.assertRaisesRegex(ValueError, "Invalid object"):
with self.assertRaisesRegex(ValueError, 'Expecting value'):
loads('THIS_IS_NOT_RIGHT')
with self.assertRaisesRegex(ValueError, 'Invalid object'):
loads('{"__type__": "THIS_IS_NOT_RIGHT", "__value__": "test", "THIS_IS_NOT_RIGHT": "THIS_IS_NOT_RIGHT"}')
with self.assertRaisesRegex(ValueError, "Unsupported type"):
with self.assertRaisesRegex(ValueError, 'Unsupported type'):
loads('{"__type__": "THIS_IS_NOT_RIGHT", "__value__": "test"}')
def test_handles_unknown_type(self):
class UnknownType:
pass
with self.assertRaisesRegex(TypeError, "Object of type UnknownType is not JSON serializable"):
with self.assertRaisesRegex(TypeError, 'Object of type UnknownType is not JSON serializable'):
dumps(UnknownType())
def test_custom_type_serialization(self):
@ -84,25 +84,25 @@ class TestJSONSerialization(TestCase):
def __init__(self, value):
self.value = value
register_type(CustomType, "custom", lambda o: o.value, lambda o: CustomType(o))
custom_data = CustomType("test")
register_type(CustomType, 'custom', lambda o: o.value, lambda o: CustomType(o))
custom_data = CustomType('test')
json_data = dumps(custom_data)
self.assertEqual(json_data, '{"__type__": "custom", "__value__": "test"}')
deserialized_data = loads(json_data)
self.assertTrue(isinstance(deserialized_data, CustomType))
self.assertEqual(deserialized_data.value, "test")
self.assertEqual(deserialized_data.value, 'test')
def test_register_known_type(self):
with self.assertRaisesRegex(ValueError, "Discriminator must be specified"):
register_type(int, "", lambda o: o.value, lambda o: int(o))
with self.assertRaisesRegex(ValueError, "Type with discriminator default is already registered"):
register_type(int, "default", lambda o: o.value, lambda o: int(o))
register_type(int, "new_custom_type", lambda o: o.value, lambda o: int(o))
with self.assertRaisesRegex(ValueError, "Type with discriminator new_custom_type is already registered"):
register_type(int, "new_custom_type", lambda o: o.value, lambda o: int(o))
with self.assertRaisesRegex(ValueError, 'Discriminator must be specified'):
register_type(int, '', lambda o: o.value, lambda o: int(o))
with self.assertRaisesRegex(ValueError, 'Type with discriminator default is already registered'):
register_type(int, 'default', lambda o: o.value, lambda o: int(o))
register_type(int, 'new_custom_type', lambda o: o.value, lambda o: int(o))
with self.assertRaisesRegex(ValueError, 'Type with discriminator new_custom_type is already registered'):
register_type(int, 'new_custom_type', lambda o: o.value, lambda o: int(o))
def test_nested_collections(self):
data = {"key": [[[[{"key": self.date}]]]]}
data = {'key': [[[[{'key': self.date}]]]]}
self.assertEqual(
dumps(data),
(

View file

@ -8,16 +8,16 @@ class TestForm(TestCase):
def test_form_field_types(self):
f = ConstanceForm({})
self.assertIsInstance(f.fields["INT_VALUE"], fields.IntegerField)
self.assertIsInstance(f.fields["BOOL_VALUE"], fields.BooleanField)
self.assertIsInstance(f.fields["STRING_VALUE"], fields.CharField)
self.assertIsInstance(f.fields["DECIMAL_VALUE"], fields.DecimalField)
self.assertIsInstance(f.fields["DATETIME_VALUE"], fields.SplitDateTimeField)
self.assertIsInstance(f.fields["TIMEDELTA_VALUE"], fields.DurationField)
self.assertIsInstance(f.fields["FLOAT_VALUE"], fields.FloatField)
self.assertIsInstance(f.fields["DATE_VALUE"], fields.DateField)
self.assertIsInstance(f.fields["TIME_VALUE"], fields.TimeField)
self.assertIsInstance(f.fields['INT_VALUE'], fields.IntegerField)
self.assertIsInstance(f.fields['BOOL_VALUE'], fields.BooleanField)
self.assertIsInstance(f.fields['STRING_VALUE'], fields.CharField)
self.assertIsInstance(f.fields['DECIMAL_VALUE'], fields.DecimalField)
self.assertIsInstance(f.fields['DATETIME_VALUE'], fields.SplitDateTimeField)
self.assertIsInstance(f.fields['TIMEDELTA_VALUE'], fields.DurationField)
self.assertIsInstance(f.fields['FLOAT_VALUE'], fields.FloatField)
self.assertIsInstance(f.fields['DATE_VALUE'], fields.DateField)
self.assertIsInstance(f.fields['TIME_VALUE'], fields.TimeField)
# from CONSTANCE_ADDITIONAL_FIELDS
self.assertIsInstance(f.fields["CHOICE_VALUE"], fields.ChoiceField)
self.assertIsInstance(f.fields["EMAIL_VALUE"], fields.EmailField)
self.assertIsInstance(f.fields['CHOICE_VALUE'], fields.ChoiceField)
self.assertIsInstance(f.fields['EMAIL_VALUE'], fields.EmailField)

View file

@ -42,10 +42,10 @@ try:
"""Assert that the class decorator changes config.BOOL_VALUE."""
assert not config.BOOL_VALUE
@pytest.mark.override_config(BOOL_VALUE="True")
@pytest.mark.override_config(BOOL_VALUE='True')
def test_override_config_on_overridden_value(self):
"""Ensure that method mark decorator changes already overridden value for class."""
assert config.BOOL_VALUE == "True"
assert config.BOOL_VALUE == 'True'
def test_fixture_override_config(override_config):
"""
@ -66,7 +66,7 @@ except ImportError:
class PytestTests(unittest.TestCase):
def setUp(self):
self.skipTest("Skip all pytest tests when using unittest")
self.skipTest('Skip all pytest tests when using unittest')
def test_do_not_skip_silently(self):
"""If no at least one test present, unittest silently skips module."""

View file

@ -11,32 +11,28 @@ from constance.utils import get_values_for_keys
class UtilsTestCase(TestCase):
def test_set_value_validation(self):
self.assertRaisesMessage(ValidationError, "Enter a whole number.", _set_constance_value, "INT_VALUE", "foo")
self.assertRaisesMessage(ValidationError, 'Enter a whole number.', _set_constance_value, 'INT_VALUE', 'foo')
self.assertRaisesMessage(
ValidationError,
"Enter a valid email address.",
_set_constance_value,
"EMAIL_VALUE",
"not a valid email",
ValidationError, 'Enter a valid email address.', _set_constance_value, 'EMAIL_VALUE', 'not a valid email'
)
self.assertRaisesMessage(
ValidationError,
"Enter a valid date.",
'Enter a valid date.',
_set_constance_value,
"DATETIME_VALUE",
'DATETIME_VALUE',
(
"2000-00-00",
"99:99:99",
'2000-00-00',
'99:99:99',
),
)
self.assertRaisesMessage(
ValidationError,
"Enter a valid time.",
'Enter a valid time.',
_set_constance_value,
"DATETIME_VALUE",
'DATETIME_VALUE',
(
"2016-01-01",
"99:99:99",
'2016-01-01',
'99:99:99',
),
)
@ -44,37 +40,37 @@ class UtilsTestCase(TestCase):
self.assertEqual(
get_values(),
{
"FLOAT_VALUE": 3.1415926536,
"BOOL_VALUE": True,
"EMAIL_VALUE": "test@example.com",
"INT_VALUE": 1,
"CHOICE_VALUE": "yes",
"TIME_VALUE": datetime.time(23, 59, 59),
"DATE_VALUE": datetime.date(2010, 12, 24),
"TIMEDELTA_VALUE": datetime.timedelta(days=1, hours=2, minutes=3),
"LINEBREAK_VALUE": "Spam spam",
"DECIMAL_VALUE": Decimal("0.1"),
"STRING_VALUE": "Hello world",
"DATETIME_VALUE": datetime.datetime(2010, 8, 23, 11, 29, 24),
"LIST_VALUE": [1, "1", datetime.date(2019, 1, 1)],
"JSON_VALUE": {
"key": "value",
"key2": 2,
"key3": [1, 2, 3],
"key4": {"key": "value"},
"key5": datetime.date(2019, 1, 1),
"key6": None,
'FLOAT_VALUE': 3.1415926536,
'BOOL_VALUE': True,
'EMAIL_VALUE': 'test@example.com',
'INT_VALUE': 1,
'CHOICE_VALUE': 'yes',
'TIME_VALUE': datetime.time(23, 59, 59),
'DATE_VALUE': datetime.date(2010, 12, 24),
'TIMEDELTA_VALUE': datetime.timedelta(days=1, hours=2, minutes=3),
'LINEBREAK_VALUE': 'Spam spam',
'DECIMAL_VALUE': Decimal('0.1'),
'STRING_VALUE': 'Hello world',
'DATETIME_VALUE': datetime.datetime(2010, 8, 23, 11, 29, 24),
'LIST_VALUE': [1, '1', datetime.date(2019, 1, 1)],
'JSON_VALUE': {
'key': 'value',
'key2': 2,
'key3': [1, 2, 3],
'key4': {'key': 'value'},
'key5': datetime.date(2019, 1, 1),
'key6': None,
},
},
)
def test_get_values_for_keys(self):
self.assertEqual(
get_values_for_keys(["BOOL_VALUE", "CHOICE_VALUE", "LINEBREAK_VALUE"]),
get_values_for_keys(['BOOL_VALUE', 'CHOICE_VALUE', 'LINEBREAK_VALUE']),
{
"BOOL_VALUE": True,
"CHOICE_VALUE": "yes",
"LINEBREAK_VALUE": "Spam spam",
'BOOL_VALUE': True,
'CHOICE_VALUE': 'yes',
'LINEBREAK_VALUE': 'Spam spam',
},
)
@ -87,9 +83,9 @@ class UtilsTestCase(TestCase):
AttributeError,
'"OLD_VALUE, BOLD_VALUE" keys not found in configuration.',
get_values_for_keys,
["BOOL_VALUE", "OLD_VALUE", "BOLD_VALUE"],
['BOOL_VALUE', 'OLD_VALUE', 'BOLD_VALUE'],
)
def test_get_values_for_keys_invalid_input_type(self):
with self.assertRaises(TypeError):
get_values_for_keys("key1")
get_values_for_keys('key1')

View file

@ -2,5 +2,5 @@ from django.contrib import admin
from django.urls import path
urlpatterns = [
path("admin/", admin.site.urls),
path('admin/', admin.site.urls),
]

View file

@ -4,8 +4,7 @@ envlist =
py{38,39,310,311,312}-dj{42}-{unittest,pytest,checkmigrations}
py{310,311,312}-dj{50}-{unittest,pytest,checkmigrations}
py{310,311,312,313}-dj{51}-{unittest,pytest,checkmigrations}
py{310,311,312,313,314}-dj{52}-{unittest,pytest,checkmigrations}
py{312,313,314}-dj{main}-{unittest,pytest,checkmigrations}
py{310,311,312,313}-dj{main}-{unittest,pytest,checkmigrations}
skip_missing_interpreters = True
[testenv]
@ -15,7 +14,6 @@ deps =
dj42: django>=4.2,<4.3
dj50: django>=5.0,<5.1
dj51: django>=5.1,<5.2
dj52: django>=5.2,<5.3
djmain: https://github.com/django/django/archive/main.tar.gz
pytest: pytest
pytest: pytest-cov
@ -42,4 +40,3 @@ python =
3.11: py311
3.12: py312
3.13: py313
3.14: py314