feat: introduce progressive lockout tiers for dynamic cool-off periods based on failure count.

This commit is contained in:
rodrigo.nogueira 2026-02-22 00:51:19 -03:00
parent e27ce891ea
commit 7eef2f14b3
6 changed files with 263 additions and 8 deletions

View file

@ -6,7 +6,7 @@ from django.core.checks import ( # pylint: disable=redefined-builtin
from django.utils.module_loading import import_string
from axes.backends import AxesStandaloneBackend
from axes.conf import settings
from axes.conf import LockoutTier, settings
class Messages:
@ -26,6 +26,14 @@ class Messages:
"AXES_LOCKOUT_PARAMETERS does not contain 'ip_address'."
" This configuration allows attackers to bypass rate limits by rotating User-Agents or Cookies."
)
LOCKOUT_TIERS_CONFLICT = (
"AXES_LOCKOUT_TIERS is set alongside AXES_COOLOFF_TIME."
" When tiers are active, AXES_COOLOFF_TIME is ignored."
" Remove AXES_COOLOFF_TIME to silence this warning."
)
LOCKOUT_TIERS_INVALID = (
"AXES_LOCKOUT_TIERS must be a list of LockoutTier instances."
)
class Hints:
@ -35,6 +43,10 @@ class Hints:
SETTING_DEPRECATED = None
CALLABLE_INVALID = None
LOCKOUT_PARAMETERS_INVALID = "Add 'ip_address' to AXES_LOCKOUT_PARAMETERS."
LOCKOUT_TIERS_CONFLICT = "Remove AXES_COOLOFF_TIME when using AXES_LOCKOUT_TIERS."
LOCKOUT_TIERS_INVALID = (
"Use: AXES_LOCKOUT_TIERS = [LockoutTier(failures=3, cooloff=timedelta(minutes=15)), ...]"
)
class Codes:
@ -44,6 +56,8 @@ class Codes:
SETTING_DEPRECATED = "axes.W004"
CALLABLE_INVALID = "axes.W005"
LOCKOUT_PARAMETERS_INVALID = "axes.W006"
LOCKOUT_TIERS_CONFLICT = "axes.W007"
LOCKOUT_TIERS_INVALID = "axes.W008"
@register(Tags.security, Tags.caches, Tags.compatibility)
@ -192,6 +206,41 @@ def axes_lockout_params_check(app_configs, **kwargs): # pylint: disable=unused-
return warnings
@register(Tags.security)
def axes_lockout_tiers_check(app_configs, **kwargs): # pylint: disable=unused-argument
warnings = []
tiers = getattr(settings, "AXES_LOCKOUT_TIERS", None)
if tiers is None:
return warnings
if not _is_valid_tiers_list(tiers):
warnings.append(
Warning(
msg=Messages.LOCKOUT_TIERS_INVALID,
hint=Hints.LOCKOUT_TIERS_INVALID,
id=Codes.LOCKOUT_TIERS_INVALID,
)
)
return warnings
if getattr(settings, "AXES_COOLOFF_TIME", None) is not None:
warnings.append(
Warning(
msg=Messages.LOCKOUT_TIERS_CONFLICT,
hint=Hints.LOCKOUT_TIERS_CONFLICT,
id=Codes.LOCKOUT_TIERS_CONFLICT,
)
)
return warnings
def _is_valid_tiers_list(tiers) -> bool:
if not isinstance(tiers, (list, tuple)):
return False
return all(isinstance(t, LockoutTier) for t in tiers)
@register
def axes_conf_check(app_configs, **kwargs): # pylint: disable=unused-argument
warnings = []

View file

@ -1,3 +1,6 @@
from dataclasses import dataclass
from datetime import timedelta
from django.conf import settings
from django.contrib.auth import get_user_model
from django.utils.functional import SimpleLazyObject
@ -16,6 +19,12 @@ class JSONSerializableLazyObject(SimpleLazyObject):
return str(self)
@dataclass(frozen=True, order=True)
class LockoutTier:
failures: int
cooloff: timedelta
# disable plugin when set to False
settings.AXES_ENABLED = getattr(settings, "AXES_ENABLED", True)
@ -108,6 +117,10 @@ settings.AXES_LOCKOUT_URL = getattr(settings, "AXES_LOCKOUT_URL", None)
settings.AXES_COOLOFF_TIME = getattr(settings, "AXES_COOLOFF_TIME", None)
# Progressive lockout tiers: list of LockoutTier(failures, cooloff) instances.
# When set, overrides AXES_FAILURE_LIMIT and AXES_COOLOFF_TIME.
settings.AXES_LOCKOUT_TIERS = getattr(settings, "AXES_LOCKOUT_TIERS", None)
settings.AXES_USE_ATTEMPT_EXPIRATION = getattr(
settings, "AXES_USE_ATTEMPT_EXPIRATION", False
)

View file

@ -1,8 +1,8 @@
from datetime import timedelta, datetime
from datetime import datetime, timedelta
from hashlib import sha256
from logging import getLogger
from string import Template
from typing import Callable, Optional, Type, Union, List
from typing import Callable, List, Optional, Type, Union
from urllib.parse import urlencode
from django.core.cache import BaseCache, caches
@ -11,7 +11,7 @@ from django.shortcuts import redirect, render
from django.utils.encoding import force_bytes
from django.utils.module_loading import import_string
from axes.conf import settings
from axes.conf import LockoutTier, settings
from axes.models import AccessBase
log = getLogger(__name__)
@ -60,9 +60,16 @@ def get_cool_off(request: Optional[HttpRequest] = None) -> Optional[timedelta]:
offers a unified _timedelta or None_ representation of that configuration for use with the
Axes internal implementations.
When ``AXES_LOCKOUT_TIERS`` is configured, the cooloff is resolved from the
matching tier based on the failure count attached to the request.
:exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type.
"""
tier = _resolve_tier_from_request(request)
if tier is not None:
return tier.cooloff
cool_off = settings.AXES_COOLOFF_TIME
if isinstance(cool_off, int):
@ -101,6 +108,31 @@ def get_cool_off_iso8601(delta: timedelta) -> str:
return f"P{days_str}"
def get_lockout_tier(failures: int) -> Optional[LockoutTier]:
"""Return the matching ``LockoutTier`` for *failures*, or ``None``."""
tiers = settings.AXES_LOCKOUT_TIERS
if not tiers:
return None
sorted_tiers = sorted(tiers, key=lambda t: t.failures)
matched = None
for tier in sorted_tiers:
if failures >= tier.failures:
matched = tier
return matched
def _resolve_tier_from_request(
request: Optional[HttpRequest],
) -> Optional[LockoutTier]:
"""Extract failure count from *request* and resolve the tier."""
if not settings.AXES_LOCKOUT_TIERS or request is None:
return None
failures = getattr(request, "axes_failures_since_start", None)
if failures is None:
return None
return get_lockout_tier(failures)
def get_attempt_expiration(request: Optional[HttpRequest] = None) -> datetime:
"""
Get threshold for fetching access attempts from the database.
@ -444,6 +476,11 @@ def get_query_str(query: Type[QueryDict], max_length: int = 1024) -> str:
def get_failure_limit(request: HttpRequest, credentials) -> int:
tiers = settings.AXES_LOCKOUT_TIERS
if tiers:
sorted_tiers = sorted(tiers, key=lambda t: t.failures)
return sorted_tiers[0].failures
if callable(settings.AXES_FAILURE_LIMIT):
return settings.AXES_FAILURE_LIMIT( # pylint: disable=not-callable
request, credentials
@ -456,7 +493,7 @@ def get_failure_limit(request: HttpRequest, credentials) -> int:
def get_lockout_message() -> str:
if settings.AXES_COOLOFF_TIME:
if settings.AXES_COOLOFF_TIME or settings.AXES_LOCKOUT_TIERS:
return settings.AXES_COOLOFF_MESSAGE
return settings.AXES_PERMALOCK_MESSAGE
@ -488,8 +525,10 @@ def get_lockout_response(
)
status = settings.AXES_HTTP_RESPONSE_CODE
failures = getattr(request, "axes_failures_since_start", None) or 0
context = {
"failure_limit": get_failure_limit(request, credentials),
"failure_count": failures,
"username": get_client_username(request, credentials) or "",
}

View file

@ -25,6 +25,8 @@ The following ``settings.py`` options are available for customizing Axes behavio
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| AXES_COOLOFF_TIME | None | If set, defines a period of inactivity after which old failed login attempts will be cleared. Can be set to a Python timedelta object, an integer, a float, a callable, or a string path to a callable which takes the request as argument. If an integer or float, will be interpreted as a number of hours: ``AXES_COOLOFF_TIME = 2`` 2 hours, ``AXES_COOLOFF_TIME = 2.0`` 2 hours, 120 minutes, ``AXES_COOLOFF_TIME = 1.7`` 1.7 hours, 102 minutes, 6120 seconds |
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| AXES_LOCKOUT_TIERS | None | A list of ``LockoutTier(failures, cooloff)`` instances that define progressive lockout durations. When set, overrides ``AXES_FAILURE_LIMIT`` and ``AXES_COOLOFF_TIME``. The lowest tier threshold becomes the effective failure limit, and each subsequent tier applies a longer cool-off. Example: ``from datetime import timedelta``, ``from axes.conf import LockoutTier``, ``AXES_LOCKOUT_TIERS = [LockoutTier(failures=3, cooloff=timedelta(minutes=15)), LockoutTier(failures=6, cooloff=timedelta(hours=2)), LockoutTier(failures=10, cooloff=timedelta(days=1))]``. With this configuration: 3 failures → 15 min lockout, 6 failures → 2 h, 10+ failures → 24 h. |
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| AXES_ONLY_ADMIN_SITE | False | If ``True``, lock is only enabled for admin site. Admin site is determined by checking request path against the path of ``"admin:index"`` view. If admin urls are not registered in current urlconf, all requests will not be locked. |
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| AXES_ONLY_USER_FAILURES | False | DEPRECATED: USE ``AXES_LOCKOUT_PARAMETERS`` INSTEAD. If ``True``, only lock based on username, and never lock based on IP if attempts exceed the limit. Otherwise utilize the existing IP and user locking logic. |

View file

@ -1,8 +1,12 @@
from django.core.checks import run_checks, Warning # pylint: disable=redefined-builtin
from django.test import override_settings, modify_settings
from datetime import timedelta
from django.core.checks import (Warning, # pylint: disable=redefined-builtin
run_checks)
from django.test import modify_settings, override_settings
from axes.backends import AxesStandaloneBackend
from axes.checks import Messages, Hints, Codes
from axes.checks import Codes, Hints, Messages
from axes.conf import LockoutTier
from tests.base import AxesTestCase
@ -150,3 +154,45 @@ class LockoutParametersCheckTestCase(AxesTestCase):
id=Codes.LOCKOUT_PARAMETERS_INVALID,
)
self.assertEqual(warnings, [warning])
class LockoutTiersCheckTestCase(AxesTestCase):
SAMPLE_TIERS = [
LockoutTier(failures=3, cooloff=timedelta(minutes=15)),
LockoutTier(failures=6, cooloff=timedelta(hours=2)),
]
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None)
def test_tiers_alone_no_warning(self):
warnings = run_checks()
self.assertEqual(warnings, [])
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=1)
def test_tiers_with_cooloff_time_warns(self):
warnings = run_checks()
warning = Warning(
msg=Messages.LOCKOUT_TIERS_CONFLICT,
hint=Hints.LOCKOUT_TIERS_CONFLICT,
id=Codes.LOCKOUT_TIERS_CONFLICT,
)
self.assertIn(warning, warnings)
@override_settings(AXES_LOCKOUT_TIERS="not a list")
def test_tiers_invalid_format_warns(self):
warnings = run_checks()
warning = Warning(
msg=Messages.LOCKOUT_TIERS_INVALID,
hint=Hints.LOCKOUT_TIERS_INVALID,
id=Codes.LOCKOUT_TIERS_INVALID,
)
self.assertIn(warning, warnings)
@override_settings(AXES_LOCKOUT_TIERS=[(3, timedelta(minutes=15))])
def test_tiers_plain_tuples_warns(self):
warnings = run_checks()
warning = Warning(
msg=Messages.LOCKOUT_TIERS_INVALID,
hint=Hints.LOCKOUT_TIERS_INVALID,
id=Codes.LOCKOUT_TIERS_INVALID,
)
self.assertIn(warning, warnings)

View file

@ -7,6 +7,7 @@ from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, JsonRes
from django.test import RequestFactory, override_settings
from axes.apps import AppConfig
from axes.conf import LockoutTier
from axes.helpers import (
cleanse_parameters,
get_cache_timeout,
@ -18,6 +19,7 @@ from axes.helpers import (
get_cool_off,
get_cool_off_iso8601,
get_lockout_response,
get_lockout_tier,
is_client_ip_address_blacklisted,
is_client_ip_address_whitelisted,
is_client_method_whitelisted,
@ -1103,3 +1105,107 @@ class AxesCleanseParamsTestCase(AxesTestCase):
self.assertEqual("test_user", cleansed["username"])
self.assertEqual("********************", cleansed["password"])
self.assertEqual("sensitive", cleansed["other_sensitive_data"])
class AxesLockoutTiersTestCase(AxesTestCase):
SAMPLE_TIERS = [
LockoutTier(failures=3, cooloff=timedelta(minutes=15)),
LockoutTier(failures=6, cooloff=timedelta(hours=2)),
LockoutTier(failures=10, cooloff=timedelta(days=1)),
]
# -- get_lockout_tier --
def test_get_lockout_tier_none_when_not_configured(self):
self.assertIsNone(get_lockout_tier(5))
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
def test_get_lockout_tier_below_threshold(self):
self.assertIsNone(get_lockout_tier(2))
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
def test_get_lockout_tier_first_tier(self):
tier = get_lockout_tier(3)
self.assertEqual(tier, LockoutTier(failures=3, cooloff=timedelta(minutes=15)))
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
def test_get_lockout_tier_between_tiers(self):
tier = get_lockout_tier(5)
self.assertEqual(tier, LockoutTier(failures=3, cooloff=timedelta(minutes=15)))
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
def test_get_lockout_tier_second_tier(self):
tier = get_lockout_tier(6)
self.assertEqual(tier, LockoutTier(failures=6, cooloff=timedelta(hours=2)))
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
def test_get_lockout_tier_highest_tier(self):
tier = get_lockout_tier(10)
self.assertEqual(tier, LockoutTier(failures=10, cooloff=timedelta(days=1)))
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
def test_get_lockout_tier_above_highest(self):
tier = get_lockout_tier(100)
self.assertEqual(tier, LockoutTier(failures=10, cooloff=timedelta(days=1)))
# -- get_cool_off with tiers --
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
def test_get_cool_off_with_tiers(self):
self.request.axes_failures_since_start = 6
self.assertEqual(get_cool_off(self.request), timedelta(hours=2))
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
def test_get_cool_off_tiers_no_failures_attr(self):
"""When request has no failures attribute, returns None (no tier matched)."""
request = HttpRequest()
self.assertIsNone(get_cool_off(request))
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
def test_get_cool_off_tiers_below_first_threshold(self):
self.request.axes_failures_since_start = 1
self.assertIsNone(get_cool_off(self.request))
# -- get_failure_limit with tiers --
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
def test_get_failure_limit_returns_lowest_tier(self):
from axes.helpers import get_failure_limit
self.assertEqual(get_failure_limit(self.request, self.credentials), 3)
@override_settings(AXES_LOCKOUT_TIERS=[
LockoutTier(failures=10, cooloff=timedelta(days=1)),
LockoutTier(failures=5, cooloff=timedelta(hours=1)),
])
def test_get_failure_limit_sorts_tiers(self):
from axes.helpers import get_failure_limit
self.assertEqual(get_failure_limit(self.request, self.credentials), 5)
# -- get_lockout_message with tiers --
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None)
def test_get_lockout_message_with_tiers(self):
from axes.helpers import get_lockout_message
from axes.conf import settings
self.assertEqual(get_lockout_message(), settings.AXES_COOLOFF_MESSAGE)
# -- get_lockout_response with tiers --
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None)
def test_get_lockout_response_includes_failure_count(self):
self.request.axes_failures_since_start = 6
response = get_lockout_response(request=self.request)
self.assertEqual(response.status_code, 429)
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None)
def test_get_lockout_response_json_with_tiers(self):
self.request.axes_failures_since_start = 3
self.request.META["HTTP_X_REQUESTED_WITH"] = "XMLHttpRequest"
response = get_lockout_response(request=self.request)
self.assertEqual(type(response), JsonResponse)
import json
data = json.loads(response.content)
self.assertEqual(data["failure_count"], 3)
self.assertIn("cooloff_time", data)