mirror of
https://github.com/jazzband/django-axes.git
synced 2026-03-16 22:30:23 +00:00
Merge 546cf4df0c into fdd7b22cd3
This commit is contained in:
commit
725a79ecd1
6 changed files with 263 additions and 3 deletions
|
|
@ -6,7 +6,7 @@ from django.core.checks import ( # pylint: disable=redefined-builtin
|
|||
from django.utils.module_loading import import_string
|
||||
|
||||
from axes.backends import AxesStandaloneBackend
|
||||
from axes.conf import settings
|
||||
from axes.conf import LockoutTier, settings
|
||||
|
||||
|
||||
class Messages:
|
||||
|
|
@ -26,6 +26,14 @@ class Messages:
|
|||
"AXES_LOCKOUT_PARAMETERS does not contain 'ip_address'."
|
||||
" This configuration allows attackers to bypass rate limits by rotating User-Agents or Cookies."
|
||||
)
|
||||
LOCKOUT_TIERS_CONFLICT = (
|
||||
"AXES_LOCKOUT_TIERS is set alongside AXES_COOLOFF_TIME."
|
||||
" When tiers are active, AXES_COOLOFF_TIME is ignored."
|
||||
" Remove AXES_COOLOFF_TIME to silence this warning."
|
||||
)
|
||||
LOCKOUT_TIERS_INVALID = (
|
||||
"AXES_LOCKOUT_TIERS must be a list of LockoutTier instances."
|
||||
)
|
||||
|
||||
|
||||
class Hints:
|
||||
|
|
@ -35,6 +43,8 @@ class Hints:
|
|||
SETTING_DEPRECATED = None
|
||||
CALLABLE_INVALID = None
|
||||
LOCKOUT_PARAMETERS_INVALID = "Add 'ip_address' to AXES_LOCKOUT_PARAMETERS."
|
||||
LOCKOUT_TIERS_CONFLICT = "Remove AXES_COOLOFF_TIME when using AXES_LOCKOUT_TIERS."
|
||||
LOCKOUT_TIERS_INVALID = "Use: AXES_LOCKOUT_TIERS = [LockoutTier(failures=3, cooloff=timedelta(minutes=15)), ...]"
|
||||
|
||||
|
||||
class Codes:
|
||||
|
|
@ -44,6 +54,8 @@ class Codes:
|
|||
SETTING_DEPRECATED = "axes.W004"
|
||||
CALLABLE_INVALID = "axes.W005"
|
||||
LOCKOUT_PARAMETERS_INVALID = "axes.W006"
|
||||
LOCKOUT_TIERS_CONFLICT = "axes.W007"
|
||||
LOCKOUT_TIERS_INVALID = "axes.W008"
|
||||
|
||||
|
||||
@register(Tags.security, Tags.caches, Tags.compatibility)
|
||||
|
|
@ -192,6 +204,41 @@ def axes_lockout_params_check(app_configs, **kwargs): # pylint: disable=unused-
|
|||
return warnings
|
||||
|
||||
|
||||
@register(Tags.security)
|
||||
def axes_lockout_tiers_check(app_configs, **kwargs): # pylint: disable=unused-argument
|
||||
warnings = []
|
||||
tiers = getattr(settings, "AXES_LOCKOUT_TIERS", None)
|
||||
if tiers is None:
|
||||
return warnings
|
||||
|
||||
if not _is_valid_tiers_list(tiers):
|
||||
warnings.append(
|
||||
Warning(
|
||||
msg=Messages.LOCKOUT_TIERS_INVALID,
|
||||
hint=Hints.LOCKOUT_TIERS_INVALID,
|
||||
id=Codes.LOCKOUT_TIERS_INVALID,
|
||||
)
|
||||
)
|
||||
return warnings
|
||||
|
||||
if getattr(settings, "AXES_COOLOFF_TIME", None) is not None:
|
||||
warnings.append(
|
||||
Warning(
|
||||
msg=Messages.LOCKOUT_TIERS_CONFLICT,
|
||||
hint=Hints.LOCKOUT_TIERS_CONFLICT,
|
||||
id=Codes.LOCKOUT_TIERS_CONFLICT,
|
||||
)
|
||||
)
|
||||
|
||||
return warnings
|
||||
|
||||
|
||||
def _is_valid_tiers_list(tiers) -> bool:
|
||||
if not isinstance(tiers, (list, tuple)):
|
||||
return False
|
||||
return all(isinstance(t, LockoutTier) for t in tiers)
|
||||
|
||||
|
||||
@register
|
||||
def axes_conf_check(app_configs, **kwargs): # pylint: disable=unused-argument
|
||||
warnings = []
|
||||
|
|
|
|||
13
axes/conf.py
13
axes/conf.py
|
|
@ -1,3 +1,6 @@
|
|||
from dataclasses import dataclass
|
||||
from datetime import timedelta
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.utils.functional import SimpleLazyObject
|
||||
|
|
@ -16,6 +19,12 @@ class JSONSerializableLazyObject(SimpleLazyObject):
|
|||
return str(self)
|
||||
|
||||
|
||||
@dataclass(frozen=True, order=True)
|
||||
class LockoutTier:
|
||||
failures: int
|
||||
cooloff: timedelta
|
||||
|
||||
|
||||
# disable plugin when set to False
|
||||
settings.AXES_ENABLED = getattr(settings, "AXES_ENABLED", True)
|
||||
|
||||
|
|
@ -108,6 +117,10 @@ settings.AXES_LOCKOUT_URL = getattr(settings, "AXES_LOCKOUT_URL", None)
|
|||
|
||||
settings.AXES_COOLOFF_TIME = getattr(settings, "AXES_COOLOFF_TIME", None)
|
||||
|
||||
# Progressive lockout tiers: list of LockoutTier(failures, cooloff) instances.
|
||||
# When set, overrides AXES_FAILURE_LIMIT and AXES_COOLOFF_TIME.
|
||||
settings.AXES_LOCKOUT_TIERS = getattr(settings, "AXES_LOCKOUT_TIERS", None)
|
||||
|
||||
settings.AXES_USE_ATTEMPT_EXPIRATION = getattr(
|
||||
settings, "AXES_USE_ATTEMPT_EXPIRATION", False
|
||||
)
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ from django.shortcuts import redirect, render
|
|||
from django.utils.encoding import force_bytes
|
||||
from django.utils.module_loading import import_string
|
||||
|
||||
from axes.conf import settings
|
||||
from axes.conf import LockoutTier, settings
|
||||
from axes.models import AccessBase
|
||||
|
||||
log = getLogger(__name__)
|
||||
|
|
@ -60,9 +60,16 @@ def get_cool_off(request: Optional[HttpRequest] = None) -> Optional[timedelta]:
|
|||
offers a unified _timedelta or None_ representation of that configuration for use with the
|
||||
Axes internal implementations.
|
||||
|
||||
When ``AXES_LOCKOUT_TIERS`` is configured, the cooloff is resolved from the
|
||||
matching tier based on the failure count attached to the request.
|
||||
|
||||
:exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type.
|
||||
"""
|
||||
|
||||
tier = _resolve_tier_from_request(request)
|
||||
if tier is not None:
|
||||
return tier.cooloff
|
||||
|
||||
cool_off = settings.AXES_COOLOFF_TIME
|
||||
|
||||
if isinstance(cool_off, int):
|
||||
|
|
@ -101,6 +108,31 @@ def get_cool_off_iso8601(delta: timedelta) -> str:
|
|||
return f"P{days_str}"
|
||||
|
||||
|
||||
def get_lockout_tier(failures: int) -> Optional[LockoutTier]:
|
||||
"""Return the highest ``LockoutTier`` threshold met by *failures*."""
|
||||
tiers = settings.AXES_LOCKOUT_TIERS
|
||||
if not tiers:
|
||||
return None
|
||||
sorted_tiers = sorted(tiers, key=lambda t: t.failures)
|
||||
matched = None
|
||||
for tier in sorted_tiers:
|
||||
if failures >= tier.failures:
|
||||
matched = tier
|
||||
return matched
|
||||
|
||||
|
||||
def _resolve_tier_from_request(
|
||||
request: Optional[HttpRequest],
|
||||
) -> Optional[LockoutTier]:
|
||||
"""Extract failure count from *request* and resolve the tier."""
|
||||
if not settings.AXES_LOCKOUT_TIERS or request is None:
|
||||
return None
|
||||
failures = getattr(request, "axes_failures_since_start", None)
|
||||
if failures is None:
|
||||
return None
|
||||
return get_lockout_tier(failures)
|
||||
|
||||
|
||||
def get_attempt_expiration(request: Optional[HttpRequest] = None) -> datetime:
|
||||
"""
|
||||
Get threshold for fetching access attempts from the database.
|
||||
|
|
@ -444,6 +476,11 @@ def get_query_str(query: Type[QueryDict], max_length: int = 1024) -> str:
|
|||
|
||||
|
||||
def get_failure_limit(request: HttpRequest, credentials) -> int:
|
||||
tiers = settings.AXES_LOCKOUT_TIERS
|
||||
if tiers:
|
||||
sorted_tiers = sorted(tiers, key=lambda t: t.failures)
|
||||
return sorted_tiers[0].failures
|
||||
|
||||
if callable(settings.AXES_FAILURE_LIMIT):
|
||||
return settings.AXES_FAILURE_LIMIT( # pylint: disable=not-callable
|
||||
request, credentials
|
||||
|
|
@ -456,7 +493,7 @@ def get_failure_limit(request: HttpRequest, credentials) -> int:
|
|||
|
||||
|
||||
def get_lockout_message() -> str:
|
||||
if settings.AXES_COOLOFF_TIME:
|
||||
if settings.AXES_COOLOFF_TIME or settings.AXES_LOCKOUT_TIERS:
|
||||
return settings.AXES_COOLOFF_MESSAGE
|
||||
return settings.AXES_PERMALOCK_MESSAGE
|
||||
|
||||
|
|
@ -488,8 +525,10 @@ def get_lockout_response(
|
|||
)
|
||||
|
||||
status = settings.AXES_HTTP_RESPONSE_CODE
|
||||
failures = getattr(request, "axes_failures_since_start", None) or 0
|
||||
context = {
|
||||
"failure_limit": get_failure_limit(request, credentials),
|
||||
"failure_count": failures,
|
||||
"username": get_client_username(request, credentials) or "",
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -25,6 +25,8 @@ The following ``settings.py`` options are available for customizing Axes behavio
|
|||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_COOLOFF_TIME | None | If set, defines a period of inactivity after which old failed login attempts will be cleared. Can be set to a Python timedelta object, an integer, a float, a callable, or a string path to a callable which takes the request as argument. If an integer or float, will be interpreted as a number of hours: ``AXES_COOLOFF_TIME = 2`` 2 hours, ``AXES_COOLOFF_TIME = 2.0`` 2 hours, 120 minutes, ``AXES_COOLOFF_TIME = 1.7`` 1.7 hours, 102 minutes, 6120 seconds |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_LOCKOUT_TIERS | None | A list of ``LockoutTier(failures, cooloff)`` instances that define progressive lockout durations. When set, overrides ``AXES_FAILURE_LIMIT`` and ``AXES_COOLOFF_TIME``. The lowest tier threshold becomes the effective failure limit, and each subsequent tier applies a longer cool-off. Example: ``from datetime import timedelta``, ``from axes.conf import LockoutTier``, ``AXES_LOCKOUT_TIERS = [LockoutTier(failures=3, cooloff=timedelta(minutes=15)), LockoutTier(failures=6, cooloff=timedelta(hours=2)), LockoutTier(failures=10, cooloff=timedelta(days=1))]``. With this configuration: 3 failures → 15 min lockout, 6 failures → 2 h, 10+ failures → 24 h. .. note:: When tiers are active, ``AXES_RESET_COOL_OFF_ON_FAILURE_DURING_LOCKOUT`` still works and will reset the *current tier's* cool-off timer. |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_ONLY_ADMIN_SITE | False | If ``True``, lock is only enabled for admin site. Admin site is determined by checking request path against the path of ``"admin:index"`` view. If admin urls are not registered in current urlconf, all requests will not be locked. |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_ONLY_USER_FAILURES | False | DEPRECATED: USE ``AXES_LOCKOUT_PARAMETERS`` INSTEAD. If ``True``, only lock based on username, and never lock based on IP if attempts exceed the limit. Otherwise utilize the existing IP and user locking logic. |
|
||||
|
|
|
|||
|
|
@ -1,8 +1,10 @@
|
|||
from django.core.checks import run_checks, Warning # pylint: disable=redefined-builtin
|
||||
from django.test import override_settings, modify_settings
|
||||
from datetime import timedelta
|
||||
|
||||
from axes.backends import AxesStandaloneBackend
|
||||
from axes.checks import Messages, Hints, Codes
|
||||
from axes.conf import LockoutTier
|
||||
from tests.base import AxesTestCase
|
||||
|
||||
|
||||
|
|
@ -150,3 +152,45 @@ class LockoutParametersCheckTestCase(AxesTestCase):
|
|||
id=Codes.LOCKOUT_PARAMETERS_INVALID,
|
||||
)
|
||||
self.assertEqual(warnings, [warning])
|
||||
|
||||
|
||||
class LockoutTiersCheckTestCase(AxesTestCase):
|
||||
SAMPLE_TIERS = [
|
||||
LockoutTier(failures=3, cooloff=timedelta(minutes=15)),
|
||||
LockoutTier(failures=6, cooloff=timedelta(hours=2)),
|
||||
]
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None)
|
||||
def test_tiers_alone_no_warning(self):
|
||||
warnings = run_checks()
|
||||
self.assertEqual(warnings, [])
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=1)
|
||||
def test_tiers_with_cooloff_time_warns(self):
|
||||
warnings = run_checks()
|
||||
warning = Warning(
|
||||
msg=Messages.LOCKOUT_TIERS_CONFLICT,
|
||||
hint=Hints.LOCKOUT_TIERS_CONFLICT,
|
||||
id=Codes.LOCKOUT_TIERS_CONFLICT,
|
||||
)
|
||||
self.assertIn(warning, warnings)
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS="not a list")
|
||||
def test_tiers_invalid_format_warns(self):
|
||||
warnings = run_checks()
|
||||
warning = Warning(
|
||||
msg=Messages.LOCKOUT_TIERS_INVALID,
|
||||
hint=Hints.LOCKOUT_TIERS_INVALID,
|
||||
id=Codes.LOCKOUT_TIERS_INVALID,
|
||||
)
|
||||
self.assertIn(warning, warnings)
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=[(3, timedelta(minutes=15))])
|
||||
def test_tiers_plain_tuples_warns(self):
|
||||
warnings = run_checks()
|
||||
warning = Warning(
|
||||
msg=Messages.LOCKOUT_TIERS_INVALID,
|
||||
hint=Hints.LOCKOUT_TIERS_INVALID,
|
||||
id=Codes.LOCKOUT_TIERS_INVALID,
|
||||
)
|
||||
self.assertIn(warning, warnings)
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, JsonRes
|
|||
from django.test import RequestFactory, override_settings
|
||||
|
||||
from axes.apps import AppConfig
|
||||
from axes.conf import LockoutTier
|
||||
from axes.helpers import (
|
||||
cleanse_parameters,
|
||||
get_cache_timeout,
|
||||
|
|
@ -18,6 +19,7 @@ from axes.helpers import (
|
|||
get_cool_off,
|
||||
get_cool_off_iso8601,
|
||||
get_lockout_response,
|
||||
get_lockout_tier,
|
||||
is_client_ip_address_blacklisted,
|
||||
is_client_ip_address_whitelisted,
|
||||
is_client_method_whitelisted,
|
||||
|
|
@ -1103,3 +1105,116 @@ class AxesCleanseParamsTestCase(AxesTestCase):
|
|||
self.assertEqual("test_user", cleansed["username"])
|
||||
self.assertEqual("********************", cleansed["password"])
|
||||
self.assertEqual("sensitive", cleansed["other_sensitive_data"])
|
||||
|
||||
|
||||
class AxesLockoutTiersTestCase(AxesTestCase):
|
||||
SAMPLE_TIERS = [
|
||||
LockoutTier(failures=3, cooloff=timedelta(minutes=15)),
|
||||
LockoutTier(failures=6, cooloff=timedelta(hours=2)),
|
||||
LockoutTier(failures=10, cooloff=timedelta(days=1)),
|
||||
]
|
||||
|
||||
# -- get_lockout_tier --
|
||||
|
||||
def test_get_lockout_tier_none_when_not_configured(self):
|
||||
self.assertIsNone(get_lockout_tier(5))
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
|
||||
def test_get_lockout_tier_below_threshold(self):
|
||||
self.assertIsNone(get_lockout_tier(2))
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
|
||||
def test_get_lockout_tier_first_tier(self):
|
||||
tier = get_lockout_tier(3)
|
||||
self.assertEqual(tier, LockoutTier(failures=3, cooloff=timedelta(minutes=15)))
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
|
||||
def test_get_lockout_tier_between_tiers(self):
|
||||
tier = get_lockout_tier(5)
|
||||
self.assertEqual(tier, LockoutTier(failures=3, cooloff=timedelta(minutes=15)))
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
|
||||
def test_get_lockout_tier_second_tier(self):
|
||||
tier = get_lockout_tier(6)
|
||||
self.assertEqual(tier, LockoutTier(failures=6, cooloff=timedelta(hours=2)))
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
|
||||
def test_get_lockout_tier_seven_failures(self):
|
||||
tier = get_lockout_tier(7)
|
||||
self.assertEqual(tier, LockoutTier(failures=6, cooloff=timedelta(hours=2)))
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
|
||||
def test_get_lockout_tier_highest_tier(self):
|
||||
tier = get_lockout_tier(10)
|
||||
self.assertEqual(tier, LockoutTier(failures=10, cooloff=timedelta(days=1)))
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
|
||||
def test_get_lockout_tier_above_highest(self):
|
||||
tier = get_lockout_tier(100)
|
||||
self.assertEqual(tier, LockoutTier(failures=10, cooloff=timedelta(days=1)))
|
||||
|
||||
# -- get_cool_off with tiers --
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
|
||||
def test_get_cool_off_with_tiers(self):
|
||||
self.request.axes_failures_since_start = 6
|
||||
self.assertEqual(get_cool_off(self.request), timedelta(hours=2))
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
|
||||
def test_get_cool_off_tiers_no_failures_attr(self):
|
||||
"""When request has no failures attribute, returns None (no tier matched)."""
|
||||
request = HttpRequest()
|
||||
self.assertIsNone(get_cool_off(request))
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
|
||||
def test_get_cool_off_tiers_below_first_threshold(self):
|
||||
self.request.axes_failures_since_start = 1
|
||||
self.assertIsNone(get_cool_off(self.request))
|
||||
|
||||
# -- get_failure_limit with tiers --
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS)
|
||||
def test_get_failure_limit_returns_lowest_tier(self):
|
||||
from axes.helpers import get_failure_limit
|
||||
|
||||
self.assertEqual(get_failure_limit(self.request, self.credentials), 3)
|
||||
|
||||
@override_settings(
|
||||
AXES_LOCKOUT_TIERS=[
|
||||
LockoutTier(failures=10, cooloff=timedelta(days=1)),
|
||||
LockoutTier(failures=5, cooloff=timedelta(hours=1)),
|
||||
]
|
||||
)
|
||||
def test_get_failure_limit_sorts_tiers(self):
|
||||
from axes.helpers import get_failure_limit
|
||||
|
||||
self.assertEqual(get_failure_limit(self.request, self.credentials), 5)
|
||||
|
||||
# -- get_lockout_message with tiers --
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None)
|
||||
def test_get_lockout_message_with_tiers(self):
|
||||
from axes.conf import settings
|
||||
from axes.helpers import get_lockout_message
|
||||
|
||||
self.assertEqual(get_lockout_message(), settings.AXES_COOLOFF_MESSAGE)
|
||||
|
||||
# -- get_lockout_response with tiers --
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None)
|
||||
def test_get_lockout_response_includes_failure_count(self):
|
||||
self.request.axes_failures_since_start = 6
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertEqual(response.status_code, 429)
|
||||
|
||||
@override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None)
|
||||
def test_get_lockout_response_json_with_tiers(self):
|
||||
self.request.axes_failures_since_start = 3
|
||||
self.request.META["HTTP_X_REQUESTED_WITH"] = "XMLHttpRequest"
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertEqual(type(response), JsonResponse)
|
||||
import json
|
||||
|
||||
data = json.loads(response.content)
|
||||
self.assertEqual(data["failure_count"], 3)
|
||||
self.assertIn("cooloff_time", data)
|
||||
|
|
|
|||
Loading…
Reference in a new issue