mirror of
https://github.com/jazzband/django-axes.git
synced 2026-03-16 22:30:23 +00:00
Compare commits
29 commits
e4842012fd
...
a54a655398
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
a54a655398 | ||
|
|
051323166d | ||
|
|
359843c60f | ||
|
|
a9e9704318 | ||
|
|
fdd7b22cd3 | ||
|
|
a5d14cd630 | ||
|
|
2a31c0133f | ||
|
|
4624eed684 | ||
|
|
e27ce891ea | ||
|
|
c3dcd1ba51 | ||
|
|
41ebdc3063 | ||
|
|
31c69dbea5 | ||
|
|
bdd0c9546a | ||
|
|
4b77eb69ee | ||
|
|
5acae054b4 | ||
|
|
d59a289407 | ||
|
|
23ee2fca44 | ||
|
|
4ea615811b | ||
|
|
b4fb3088b4 | ||
|
|
6c8feada83 | ||
|
|
b441ccd5fc | ||
|
|
1d9964be16 | ||
|
|
60e3cceb1d | ||
|
|
8f5e9965d8 | ||
|
|
cf0be90f11 | ||
|
|
d033b70235 | ||
|
|
b14e861631 | ||
|
|
6703b66f17 | ||
|
|
95a8043341 |
19 changed files with 284 additions and 98 deletions
22
CHANGES.rst
22
CHANGES.rst
|
|
@ -2,6 +2,28 @@
|
|||
Changes
|
||||
=======
|
||||
|
||||
8.3.1 (2026-02-11)
|
||||
------------------
|
||||
|
||||
- Fix configuration JSON serialization errors for Celery.
|
||||
[aleksihakli]
|
||||
|
||||
8.3.0 (2026-02-09)
|
||||
------------------
|
||||
|
||||
- Remove deprecated pkg_resources in favour of new importlib.
|
||||
[hugovk]
|
||||
|
||||
8.2.0 (2026-02-06)
|
||||
------------------
|
||||
|
||||
- Fix AttributeError when optional settings are undefined.
|
||||
[rodrigo.nogueira]
|
||||
- Fix circular import with custom user models.
|
||||
[rodrigo.nogueira]
|
||||
- Add unit tests for security check W006.
|
||||
[shayanTaki]
|
||||
|
||||
8.1.0 (2025-12-19)
|
||||
------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -19,8 +19,10 @@ class IsLockedOutFilter(admin.SimpleListFilter):
|
|||
|
||||
def queryset(self, request, queryset):
|
||||
if self.value() == "yes":
|
||||
return queryset.filter(failures_since_start__gte=settings.AXES_FAILURE_LIMIT)
|
||||
elif self.value() == "no":
|
||||
return queryset.filter(
|
||||
failures_since_start__gte=settings.AXES_FAILURE_LIMIT
|
||||
)
|
||||
if self.value() == "no":
|
||||
return queryset.filter(failures_since_start__lt=settings.AXES_FAILURE_LIMIT)
|
||||
return queryset
|
||||
|
||||
|
|
@ -34,9 +36,9 @@ class AccessAttemptAdmin(admin.ModelAdmin):
|
|||
"path_info",
|
||||
"failures_since_start",
|
||||
]
|
||||
|
||||
|
||||
if settings.AXES_USE_ATTEMPT_EXPIRATION:
|
||||
list_display.append('expiration')
|
||||
list_display.append("expiration")
|
||||
|
||||
list_filter = ["attempt_time", "path_info"]
|
||||
|
||||
|
|
@ -44,14 +46,17 @@ class AccessAttemptAdmin(admin.ModelAdmin):
|
|||
# This will only add the status field if AXES_FAILURE_LIMIT is set to a positive integer
|
||||
# Because callable failure limit requires scope of request object
|
||||
list_display.append("status")
|
||||
list_filter.append(IsLockedOutFilter)
|
||||
list_filter.append(IsLockedOutFilter) # type: ignore[arg-type]
|
||||
|
||||
search_fields = ["ip_address", "username", "user_agent", "path_info"]
|
||||
|
||||
date_hierarchy = "attempt_time"
|
||||
|
||||
fieldsets = (
|
||||
(None, {"fields": ("username", "path_info", "failures_since_start", "expiration")}),
|
||||
(
|
||||
None,
|
||||
{"fields": ("username", "path_info", "failures_since_start", "expiration")},
|
||||
),
|
||||
(_("Form Data"), {"fields": ("get_data", "post_data")}),
|
||||
(_("Meta Data"), {"fields": ("user_agent", "ip_address", "http_accept")}),
|
||||
)
|
||||
|
|
@ -69,10 +74,10 @@ class AccessAttemptAdmin(admin.ModelAdmin):
|
|||
"expiration",
|
||||
]
|
||||
|
||||
actions = ['cleanup_expired_attempts']
|
||||
actions = ["cleanup_expired_attempts"]
|
||||
|
||||
@admin.action(description=_('Clean up expired attempts'))
|
||||
def cleanup_expired_attempts(self, request, queryset):
|
||||
@admin.action(description=_("Clean up expired attempts"))
|
||||
def cleanup_expired_attempts(self, request, queryset): # noqa
|
||||
count = self.handler.clean_expired_user_attempts(request=request)
|
||||
self.message_user(request, _(f"Cleaned up {count} expired access attempts."))
|
||||
|
||||
|
|
@ -85,10 +90,15 @@ class AccessAttemptAdmin(admin.ModelAdmin):
|
|||
|
||||
def expiration(self, obj: AccessAttempt):
|
||||
return obj.expiration.expires_at if hasattr(obj, "expiration") else _("Not set")
|
||||
|
||||
|
||||
def status(self, obj: AccessAttempt):
|
||||
return f"{settings.AXES_FAILURE_LIMIT - obj.failures_since_start} "+_("Attempt Remaining") if \
|
||||
obj.failures_since_start < settings.AXES_FAILURE_LIMIT else _("Locked Out")
|
||||
return (
|
||||
f"{settings.AXES_FAILURE_LIMIT - obj.failures_since_start} "
|
||||
+ _("Attempt Remaining")
|
||||
if obj.failures_since_start < settings.AXES_FAILURE_LIMIT
|
||||
else _("Locked Out")
|
||||
)
|
||||
|
||||
|
||||
class AccessLogAdmin(admin.ModelAdmin):
|
||||
list_display = (
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ def get_cool_off_threshold(request: Optional[HttpRequest] = None) -> datetime:
|
|||
"Cool off threshold can not be calculated with settings.AXES_COOLOFF_TIME set to None"
|
||||
)
|
||||
|
||||
attempt_time = request.axes_attempt_time
|
||||
attempt_time = request.axes_attempt_time # type: ignore[union-attr]
|
||||
if attempt_time is None:
|
||||
return now() - cool_off
|
||||
return attempt_time - cool_off
|
||||
|
|
|
|||
|
|
@ -207,7 +207,7 @@ def axes_conf_check(app_configs, **kwargs): # pylint: disable=unused-argument
|
|||
]
|
||||
|
||||
for callable_setting in callable_settings:
|
||||
value = getattr(settings, callable_setting)
|
||||
value = getattr(settings, callable_setting, None)
|
||||
if not is_valid_callable(value):
|
||||
warnings.append(
|
||||
Warning(
|
||||
|
|
@ -235,4 +235,4 @@ def is_valid_callable(value) -> bool:
|
|||
except ImportError:
|
||||
return False
|
||||
|
||||
return True
|
||||
return True
|
||||
|
|
|
|||
32
axes/conf.py
32
axes/conf.py
|
|
@ -1,7 +1,21 @@
|
|||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.utils.functional import SimpleLazyObject
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
|
||||
class JSONSerializableLazyObject(SimpleLazyObject):
|
||||
"""
|
||||
Celery/Kombu config inspection may JSON-encode Django settings.
|
||||
Provide a JSON-friendly representation for lazy values.
|
||||
|
||||
Fixes jazzband/django-axes#1391
|
||||
"""
|
||||
|
||||
def __json__(self):
|
||||
return str(self)
|
||||
|
||||
|
||||
# disable plugin when set to False
|
||||
settings.AXES_ENABLED = getattr(settings, "AXES_ENABLED", True)
|
||||
|
||||
|
|
@ -42,9 +56,16 @@ settings.AXES_ONLY_ADMIN_SITE = getattr(settings, "AXES_ONLY_ADMIN_SITE", False)
|
|||
# show Axes logs in admin
|
||||
settings.AXES_ENABLE_ADMIN = getattr(settings, "AXES_ENABLE_ADMIN", True)
|
||||
|
||||
|
||||
# use a specific username field to retrieve from login POST data
|
||||
def _get_username_field_default():
|
||||
return get_user_model().USERNAME_FIELD
|
||||
|
||||
|
||||
settings.AXES_USERNAME_FORM_FIELD = getattr(
|
||||
settings, "AXES_USERNAME_FORM_FIELD", get_user_model().USERNAME_FIELD
|
||||
settings,
|
||||
"AXES_USERNAME_FORM_FIELD",
|
||||
JSONSerializableLazyObject(_get_username_field_default),
|
||||
)
|
||||
|
||||
# use a specific password field to retrieve from login POST data
|
||||
|
|
@ -87,7 +108,9 @@ settings.AXES_LOCKOUT_URL = getattr(settings, "AXES_LOCKOUT_URL", None)
|
|||
|
||||
settings.AXES_COOLOFF_TIME = getattr(settings, "AXES_COOLOFF_TIME", None)
|
||||
|
||||
settings.AXES_USE_ATTEMPT_EXPIRATION = getattr(settings, "AXES_USE_ATTEMPT_EXPIRATION", False)
|
||||
settings.AXES_USE_ATTEMPT_EXPIRATION = getattr(
|
||||
settings, "AXES_USE_ATTEMPT_EXPIRATION", False
|
||||
)
|
||||
|
||||
settings.AXES_VERBOSE = getattr(settings, "AXES_VERBOSE", settings.AXES_ENABLED)
|
||||
|
||||
|
|
@ -137,6 +160,11 @@ settings.AXES_CLIENT_STR_CALLABLE = getattr(settings, "AXES_CLIENT_STR_CALLABLE"
|
|||
# set the HTTP response code given by too many requests
|
||||
settings.AXES_HTTP_RESPONSE_CODE = getattr(settings, "AXES_HTTP_RESPONSE_CODE", 429)
|
||||
|
||||
# if True, set Retry-After header for lockout responses with cool off configured
|
||||
settings.AXES_ENABLE_RETRY_AFTER_HEADER = getattr(
|
||||
settings, "AXES_ENABLE_RETRY_AFTER_HEADER", False
|
||||
)
|
||||
|
||||
# If True, a failed login attempt during lockout will reset the cool off period
|
||||
settings.AXES_RESET_COOL_OFF_ON_FAILURE_DURING_LOCKOUT = getattr(
|
||||
settings, "AXES_RESET_COOL_OFF_ON_FAILURE_DURING_LOCKOUT", True
|
||||
|
|
|
|||
|
|
@ -21,7 +21,12 @@ from axes.helpers import (
|
|||
get_query_str,
|
||||
get_attempt_expiration,
|
||||
)
|
||||
from axes.models import AccessAttempt, AccessAttemptExpiration, AccessFailureLog, AccessLog
|
||||
from axes.models import (
|
||||
AccessAttempt,
|
||||
AccessAttemptExpiration,
|
||||
AccessFailureLog,
|
||||
AccessLog,
|
||||
)
|
||||
from axes.signals import user_locked_out
|
||||
|
||||
log = getLogger(__name__)
|
||||
|
|
@ -223,15 +228,17 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
|
|||
if settings.AXES_USE_ATTEMPT_EXPIRATION:
|
||||
if not hasattr(attempt, "expiration") or attempt.expiration is None:
|
||||
log.debug(
|
||||
"AXES: Creating new AccessAttemptExpiration for %s", client_str
|
||||
"AXES: Creating new AccessAttemptExpiration for %s",
|
||||
client_str,
|
||||
)
|
||||
attempt.expiration = AccessAttemptExpiration.objects.create(
|
||||
access_attempt=attempt,
|
||||
expires_at=get_attempt_expiration(request)
|
||||
expires_at=get_attempt_expiration(request),
|
||||
)
|
||||
else:
|
||||
attempt.expiration.expires_at = max(
|
||||
get_attempt_expiration(request), attempt.expiration.expires_at
|
||||
get_attempt_expiration(request),
|
||||
attempt.expiration.expires_at,
|
||||
)
|
||||
attempt.expiration.save()
|
||||
|
||||
|
|
@ -365,7 +372,7 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
|
|||
return attempts_list
|
||||
|
||||
def get_user_attempts(
|
||||
self, request: HttpRequest, credentials: Optional[dict] = None
|
||||
self, request: HttpRequest, credentials: Optional[dict] = None # noqa
|
||||
) -> List[QuerySet]:
|
||||
"""
|
||||
Get list of querysets with valid user attempts that match the given request and credentials.
|
||||
|
|
@ -386,7 +393,9 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
|
|||
]
|
||||
|
||||
def clean_expired_user_attempts(
|
||||
self, request: Optional[HttpRequest] = None, credentials: Optional[dict] = None
|
||||
self,
|
||||
request: Optional[HttpRequest] = None,
|
||||
credentials: Optional[dict] = None, # noqa
|
||||
) -> int:
|
||||
"""
|
||||
Clean expired user attempts from the database.
|
||||
|
|
@ -400,7 +409,9 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
|
|||
|
||||
if settings.AXES_USE_ATTEMPT_EXPIRATION:
|
||||
threshold = timezone.now()
|
||||
count, _ = AccessAttempt.objects.filter(expiration__expires_at__lte=threshold).delete()
|
||||
count, _ = AccessAttempt.objects.filter(
|
||||
expiration__expires_at__lte=threshold
|
||||
).delete()
|
||||
log.info(
|
||||
"AXES: Cleaned up %s expired access attempts from database that expiry were older than %s",
|
||||
count,
|
||||
|
|
@ -408,7 +419,9 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
|
|||
)
|
||||
else:
|
||||
threshold = get_cool_off_threshold(request)
|
||||
count, _ = AccessAttempt.objects.filter(attempt_time__lte=threshold).delete()
|
||||
count, _ = AccessAttempt.objects.filter(
|
||||
attempt_time__lte=threshold
|
||||
).delete()
|
||||
log.info(
|
||||
"AXES: Cleaned up %s expired access attempts from database that were older than %s",
|
||||
count,
|
||||
|
|
|
|||
|
|
@ -112,7 +112,7 @@ def get_attempt_expiration(request: Optional[HttpRequest] = None) -> datetime:
|
|||
"Cool off threshold can not be calculated with settings.AXES_COOLOFF_TIME set to None"
|
||||
)
|
||||
|
||||
attempt_time = request.axes_attempt_time
|
||||
attempt_time = request.axes_attempt_time # type: ignore[union-attr]
|
||||
if attempt_time is None:
|
||||
return datetime.now() + cool_off
|
||||
return attempt_time + cool_off
|
||||
|
|
@ -164,7 +164,7 @@ def get_client_username(
|
|||
log.debug(
|
||||
"Using parameter credentials to get username with key settings.AXES_USERNAME_FORM_FIELD"
|
||||
)
|
||||
return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None)
|
||||
return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None) # type: ignore[return-value]
|
||||
|
||||
log.debug(
|
||||
"Using parameter request.POST to get username with key settings.AXES_USERNAME_FORM_FIELD"
|
||||
|
|
@ -461,12 +461,6 @@ def get_lockout_message() -> str:
|
|||
return settings.AXES_PERMALOCK_MESSAGE
|
||||
|
||||
|
||||
def _set_retry_after_header(response: HttpResponse, request: HttpRequest) -> None:
|
||||
cool_off = get_cool_off(request)
|
||||
if cool_off is not None:
|
||||
response["Retry-After"] = str(int(cool_off.total_seconds()))
|
||||
|
||||
|
||||
def get_lockout_response(
|
||||
request: HttpRequest,
|
||||
original_response: Optional[HttpResponse] = None,
|
||||
|
|
@ -519,15 +513,10 @@ def get_lockout_response(
|
|||
json_response["Access-Control-Allow-Headers"] = (
|
||||
"Origin, Content-Type, Accept, Authorization, x-requested-with"
|
||||
)
|
||||
_set_retry_after_header(json_response, request)
|
||||
return json_response
|
||||
|
||||
if settings.AXES_LOCKOUT_TEMPLATE:
|
||||
response = render(
|
||||
request, settings.AXES_LOCKOUT_TEMPLATE, context, status=status
|
||||
)
|
||||
_set_retry_after_header(response, request)
|
||||
return response
|
||||
return render(request, settings.AXES_LOCKOUT_TEMPLATE, context, status=status)
|
||||
|
||||
if settings.AXES_LOCKOUT_URL:
|
||||
lockout_url = settings.AXES_LOCKOUT_URL
|
||||
|
|
@ -535,9 +524,7 @@ def get_lockout_response(
|
|||
url = f"{lockout_url}?{query_string}"
|
||||
return redirect(url)
|
||||
|
||||
response = HttpResponse(get_lockout_message(), status=status)
|
||||
_set_retry_after_header(response, request)
|
||||
return response
|
||||
return HttpResponse(get_lockout_message(), status=status)
|
||||
|
||||
|
||||
def is_ip_address_in_whitelist(ip_address: str) -> bool:
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ from asgiref.sync import iscoroutinefunction, markcoroutinefunction, sync_to_asy
|
|||
from django.conf import settings
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
|
||||
from axes.helpers import get_lockout_response
|
||||
from axes.helpers import get_cool_off, get_lockout_response
|
||||
|
||||
|
||||
class AxesMiddleware:
|
||||
|
|
@ -39,6 +39,22 @@ class AxesMiddleware:
|
|||
if iscoroutinefunction(self.get_response):
|
||||
markcoroutinefunction(self)
|
||||
|
||||
@staticmethod
|
||||
def _set_retry_after_header(
|
||||
response: HttpResponse, request: HttpRequest
|
||||
) -> HttpResponse:
|
||||
if not settings.AXES_ENABLE_RETRY_AFTER_HEADER:
|
||||
return response
|
||||
|
||||
if settings.AXES_LOCKOUT_CALLABLE or settings.AXES_LOCKOUT_URL:
|
||||
return response
|
||||
|
||||
cool_off = get_cool_off(request)
|
||||
if cool_off is not None:
|
||||
response["Retry-After"] = str(int(cool_off.total_seconds()))
|
||||
|
||||
return response
|
||||
|
||||
def __call__(self, request: HttpRequest) -> HttpResponse:
|
||||
# Exit out to async mode, if needed
|
||||
if iscoroutinefunction(self):
|
||||
|
|
@ -49,6 +65,7 @@ class AxesMiddleware:
|
|||
if getattr(request, "axes_locked_out", None):
|
||||
credentials = getattr(request, "axes_credentials", None)
|
||||
response = get_lockout_response(request, response, credentials) # type: ignore
|
||||
response = self._set_retry_after_header(response, request)
|
||||
|
||||
return response
|
||||
|
||||
|
|
@ -60,6 +77,9 @@ class AxesMiddleware:
|
|||
credentials = getattr(request, "axes_credentials", None)
|
||||
response = await sync_to_async(
|
||||
get_lockout_response, thread_sensitive=True
|
||||
)(request, credentials) # type: ignore
|
||||
)(
|
||||
request, response, credentials
|
||||
) # type: ignore
|
||||
response = self._set_retry_after_header(response, request)
|
||||
|
||||
return response
|
||||
|
|
|
|||
|
|
@ -68,9 +68,12 @@ class AccessAttemptExpiration(models.Model):
|
|||
verbose_name = _("access attempt expiration")
|
||||
verbose_name_plural = _("access attempt expirations")
|
||||
|
||||
|
||||
class AccessLog(AccessBase):
|
||||
logout_time = models.DateTimeField(_("Logout Time"), null=True, blank=True)
|
||||
session_hash = models.CharField(_("Session key hash (sha256)"), default="", blank=True, max_length=64)
|
||||
session_hash = models.CharField(
|
||||
_("Session key hash (sha256)"), default="", blank=True, max_length=64
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return f"Access Log for {self.username} @ {self.attempt_time}"
|
||||
|
|
|
|||
|
|
@ -81,17 +81,18 @@ The following ``settings.py`` options are available for customizing Axes behavio
|
|||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_HTTP_RESPONSE_CODE | 429 | Sets the http response code returned when ``AXES_FAILURE_LIMIT`` is reached. For example: ``AXES_HTTP_RESPONSE_CODE = 403`` |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_ENABLE_RETRY_AFTER_HEADER | False | If ``True``, ``AxesMiddleware`` sets the ``Retry-After`` HTTP header on lockout responses when ``AXES_COOLOFF_TIME`` is configured. Set to ``False`` to disable this header. |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_RESET_COOL_OFF_ON_FAILURE_DURING_LOCKOUT | True | If ``True``, a failed login attempt during lockout will reset the cool off period. |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_LOCKOUT_PARAMETERS | ["ip_address"] | A list of parameters that Axes uses to lock out users. It can also be callable, which takes an http request or AccesAttempt object and credentials and returns a list of parameters. Each parameter can be a string (a single parameter) or a list of strings (a combined parameter). For example, if you configure ``AXES_LOCKOUT_PARAMETERS = ["ip_address", ["username", "user_agent"]]``, axes will block clients by ip and/or username and user agent combination. See :ref:`customizing-lockout-parameters` for more details. |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
.. note::
|
||||
When ``AXES_COOLOFF_TIME`` is configured, lockout responses automatically include a
|
||||
``Retry-After`` HTTP header (`RFC 7231 <https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.3>`_)
|
||||
with the cool-off duration in seconds. This applies to JSON, template-rendered, and
|
||||
plain-text lockout responses, but not to redirects (``AXES_LOCKOUT_URL``) or custom
|
||||
callables (``AXES_LOCKOUT_CALLABLE``).
|
||||
If ``AXES_ENABLE_RETRY_AFTER_HEADER`` is enabled and ``AXES_COOLOFF_TIME`` is configured,
|
||||
``AxesMiddleware`` adds a ``Retry-After`` HTTP header (`RFC 7231 <https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.3>`_)
|
||||
with the cool-off duration in seconds. This header is not added for redirects
|
||||
(``AXES_LOCKOUT_URL``) or custom lockout responses (``AXES_LOCKOUT_CALLABLE``).
|
||||
|
||||
The configuration option precedences for the access attempt monitoring are:
|
||||
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ Example ``AXES_LOCKOUT_PARAMETERS`` configuration:
|
|||
|
||||
AXES_LOCKOUT_PARAMETERS = ["ip_address", ["username", "user_agent"]]
|
||||
|
||||
This way, axes will lock out users using ip_address and/or combination of username and user agent
|
||||
This way, axes will lock out users using ip_address or combination of username and user_agent
|
||||
|
||||
Example of callable ``AXES_LOCKOUT_PARAMETERS``:
|
||||
|
||||
|
|
@ -213,7 +213,7 @@ Example of callable ``AXES_LOCKOUT_PARAMETERS``:
|
|||
|
||||
AXES_LOCKOUT_PARAMETERS = "example.utils.get_lockout_parameters"
|
||||
|
||||
This way, if client ip_address is localhost, axes will lockout client only by username. In other case, axes will lockout client by username and/or ip_address.
|
||||
This way, if client ip_address is localhost, axes will lockout client only by username. In other case, axes will lockout client by username or ip_address.
|
||||
|
||||
Customizing client ip address lookups
|
||||
-------------------------------------
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ More information on the configuration options is available at:
|
|||
"""
|
||||
|
||||
# import sphinx_rtd_theme
|
||||
from pkg_resources import get_distribution
|
||||
from importlib.metadata import version as get_version
|
||||
|
||||
import django
|
||||
from django.conf import settings
|
||||
|
|
@ -43,7 +43,7 @@ copyright = "2016, Jazzband"
|
|||
author = "Jazzband"
|
||||
|
||||
# The full version, including alpha/beta/rc tags.
|
||||
release = get_distribution("django-axes").version
|
||||
release = get_version("django-axes")
|
||||
|
||||
# The short X.Y version.
|
||||
version = ".".join(release.split(".")[:2])
|
||||
|
|
|
|||
2
mypy.ini
2
mypy.ini
|
|
@ -1,5 +1,5 @@
|
|||
[mypy]
|
||||
python_version = 3.12
|
||||
python_version = 3.14
|
||||
ignore_missing_imports = True
|
||||
|
||||
[mypy-axes.migrations.*]
|
||||
|
|
|
|||
|
|
@ -13,8 +13,8 @@ envlist =
|
|||
py{310,311,312}-dj42
|
||||
py{310,311,312,313}-dj52
|
||||
py{312,313,314}-dj60
|
||||
py312-djmain
|
||||
py312-djqa
|
||||
py314-djmain
|
||||
py314-djqa
|
||||
|
||||
[gh-actions]
|
||||
python =
|
||||
|
|
@ -36,9 +36,9 @@ DJANGO =
|
|||
[testenv]
|
||||
deps =
|
||||
-r requirements.txt
|
||||
dj42: django>=4.2,<5
|
||||
dj52: django>=5.2,<6
|
||||
dj60: django>=6.0,<7
|
||||
dj42: django>=4.2,<4.3
|
||||
dj52: django>=5.2,<5.3
|
||||
dj60: django>=6.0,<6.1
|
||||
djmain: https://github.com/django/django/archive/main.tar.gz
|
||||
usedevelop = true
|
||||
commands = pytest
|
||||
|
|
@ -51,10 +51,11 @@ ignore_errors =
|
|||
djmain: True
|
||||
|
||||
# QA runs type checks, linting, and code formatting checks
|
||||
[testenv:py312-djqa]
|
||||
[testenv:py314-djqa]
|
||||
stoponfail = false
|
||||
deps = -r requirements.txt
|
||||
commands =
|
||||
mypy axes
|
||||
prospector
|
||||
black -t py312 --check --diff axes
|
||||
prospector axes
|
||||
black --check --diff axes
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -1,12 +1,12 @@
|
|||
-e .
|
||||
black==25.11.0
|
||||
coverage==7.13.0
|
||||
black==26.3.1
|
||||
coverage==7.13.4
|
||||
django-ipware>=3
|
||||
mypy==1.19.1
|
||||
prospector==1.17.3
|
||||
prospector==1.18.0
|
||||
pytest-cov==7.0.0
|
||||
pytest-django==4.11.1
|
||||
pytest-django==4.12.0
|
||||
pytest-subtests==0.15.0
|
||||
pytest==9.0.2
|
||||
sphinx_rtd_theme==3.0.2
|
||||
tox==4.32.0
|
||||
sphinx_rtd_theme==3.1.0
|
||||
tox==4.49.1
|
||||
|
|
|
|||
|
|
@ -129,3 +129,24 @@ class ConfCheckTestCase(AxesTestCase):
|
|||
def test_valid_callable(self):
|
||||
warnings = run_checks()
|
||||
self.assertEqual(warnings, [])
|
||||
|
||||
def test_missing_settings_no_error(self):
|
||||
warnings = run_checks()
|
||||
self.assertEqual(warnings, [])
|
||||
|
||||
|
||||
class LockoutParametersCheckTestCase(AxesTestCase):
|
||||
@override_settings(AXES_LOCKOUT_PARAMETERS=["ip_address", "username"])
|
||||
def test_valid_configuration(self):
|
||||
warnings = run_checks()
|
||||
self.assertEqual(warnings, [])
|
||||
|
||||
@override_settings(AXES_LOCKOUT_PARAMETERS=["username", "user_agent"])
|
||||
def test_invalid_configuration(self):
|
||||
warnings = run_checks()
|
||||
warning = Warning(
|
||||
msg=Messages.LOCKOUT_PARAMETERS_INVALID,
|
||||
hint=Hints.LOCKOUT_PARAMETERS_INVALID,
|
||||
id=Codes.LOCKOUT_PARAMETERS_INVALID,
|
||||
)
|
||||
self.assertEqual(warnings, [warning])
|
||||
|
|
|
|||
45
tests/test_conf.py
Normal file
45
tests/test_conf.py
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
from django.test import TestCase
|
||||
from django.utils.functional import SimpleLazyObject
|
||||
|
||||
|
||||
class ConfTestCase(TestCase):
|
||||
def test_axes_username_form_field_uses_lazy_evaluation(self):
|
||||
"""
|
||||
Test that AXES_USERNAME_FORM_FIELD uses SimpleLazyObject for lazy evaluation.
|
||||
This prevents circular import issues with custom user models (issue #1280).
|
||||
"""
|
||||
from axes.conf import settings
|
||||
|
||||
# Verify that AXES_USERNAME_FORM_FIELD is a SimpleLazyObject if not overridden
|
||||
# This is only the case when the setting is not explicitly defined
|
||||
username_field = settings.AXES_USERNAME_FORM_FIELD
|
||||
|
||||
# The actual type depends on whether AXES_USERNAME_FORM_FIELD was overridden
|
||||
# If it's using the default, it should be a SimpleLazyObject
|
||||
# If overridden in settings, it could be a plain string
|
||||
# Either way, it should be usable as a string
|
||||
|
||||
# Force evaluation and verify it works
|
||||
username_field_str = str(username_field)
|
||||
|
||||
# Should get the default USERNAME_FIELD from the user model
|
||||
# For the test suite, this is "username"
|
||||
self.assertIsInstance(username_field_str, str)
|
||||
self.assertTrue(len(username_field_str) > 0)
|
||||
|
||||
def test_axes_username_form_field_evaluates_correctly(self):
|
||||
"""
|
||||
Test that when AXES_USERNAME_FORM_FIELD is accessed, it correctly
|
||||
resolves to the user model's USERNAME_FIELD.
|
||||
"""
|
||||
from django.contrib.auth import get_user_model
|
||||
from axes.conf import settings
|
||||
|
||||
# Get the expected value
|
||||
expected_username_field = get_user_model().USERNAME_FIELD
|
||||
|
||||
# Get the actual value from axes settings
|
||||
actual_username_field = str(settings.AXES_USERNAME_FORM_FIELD)
|
||||
|
||||
# They should match
|
||||
self.assertEqual(actual_username_field, expected_username_field)
|
||||
|
|
@ -946,34 +946,6 @@ class LockoutResponseTestCase(AxesTestCase):
|
|||
response = get_lockout_response(request=self.request)
|
||||
self.assertEqual(type(response), HttpResponse)
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=2)
|
||||
def test_get_lockout_response_retry_after_header(self):
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertEqual(response["Retry-After"], "7200")
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=None)
|
||||
def test_get_lockout_response_retry_after_no_cooloff(self):
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=2)
|
||||
def test_get_lockout_response_retry_after_json(self):
|
||||
self.request.META["HTTP_X_REQUESTED_WITH"] = "XMLHttpRequest"
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertEqual(response["Retry-After"], "7200")
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=2, AXES_LOCKOUT_TEMPLATE="example.html")
|
||||
@patch("axes.helpers.render")
|
||||
def test_get_lockout_response_retry_after_template(self, mock_render):
|
||||
mock_render.return_value = HttpResponse(status=429)
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertEqual(response["Retry-After"], "7200")
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=2, AXES_LOCKOUT_URL="https://example.com")
|
||||
def test_get_lockout_response_retry_after_redirect_absent(self):
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
|
||||
def mock_get_cool_off_str(req):
|
||||
return timedelta(seconds=30)
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
from datetime import timedelta
|
||||
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse, HttpRequest
|
||||
from django.test import override_settings
|
||||
|
|
@ -10,6 +12,10 @@ def get_username(request, credentials: dict) -> str:
|
|||
return credentials.get(settings.AXES_USERNAME_FORM_FIELD)
|
||||
|
||||
|
||||
def get_custom_lockout_response(request, original_response, credentials):
|
||||
return HttpResponse(status=429)
|
||||
|
||||
|
||||
class MiddlewareTestCase(AxesTestCase):
|
||||
STATUS_SUCCESS = 200
|
||||
STATUS_LOCKOUT = 429
|
||||
|
|
@ -33,11 +39,68 @@ class MiddlewareTestCase(AxesTestCase):
|
|||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertEqual(response.status_code, self.STATUS_LOCKOUT)
|
||||
|
||||
@override_settings(
|
||||
AXES_COOLOFF_TIME=timedelta(seconds=120),
|
||||
AXES_ENABLE_RETRY_AFTER_HEADER=True,
|
||||
)
|
||||
def test_lockout_response_sets_retry_after_header(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
return HttpResponse()
|
||||
|
||||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertEqual(response["Retry-After"], "120")
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=None)
|
||||
def test_lockout_response_without_cooloff_does_not_set_retry_after_header(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
return HttpResponse()
|
||||
|
||||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
@override_settings(
|
||||
AXES_COOLOFF_TIME=timedelta(seconds=120),
|
||||
AXES_ENABLE_RETRY_AFTER_HEADER=False,
|
||||
)
|
||||
def test_lockout_response_respects_retry_after_toggle(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
return HttpResponse()
|
||||
|
||||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
@override_settings(
|
||||
AXES_COOLOFF_TIME=timedelta(seconds=120),
|
||||
AXES_LOCKOUT_URL="https://example.com",
|
||||
)
|
||||
def test_lockout_redirect_response_does_not_set_retry_after_header(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
return HttpResponse()
|
||||
|
||||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
@override_settings(
|
||||
AXES_COOLOFF_TIME=timedelta(seconds=120),
|
||||
AXES_LOCKOUT_CALLABLE="tests.test_middleware.get_custom_lockout_response",
|
||||
)
|
||||
def test_lockout_callable_response_does_not_set_retry_after_header(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
return HttpResponse()
|
||||
|
||||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
@override_settings(AXES_USERNAME_CALLABLE="tests.test_middleware.get_username")
|
||||
def test_lockout_response_with_axes_callable_username(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
request.axes_credentials = {settings.AXES_USERNAME_FORM_FIELD: 'username'}
|
||||
request.axes_credentials = {settings.AXES_USERNAME_FORM_FIELD: "username"}
|
||||
|
||||
return HttpResponse()
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue