diff --git a/axes/checks.py b/axes/checks.py index b16e879..9fa08a4 100644 --- a/axes/checks.py +++ b/axes/checks.py @@ -6,7 +6,7 @@ from django.core.checks import ( # pylint: disable=redefined-builtin from django.utils.module_loading import import_string from axes.backends import AxesStandaloneBackend -from axes.conf import settings +from axes.conf import LockoutTier, settings class Messages: @@ -26,6 +26,14 @@ class Messages: "AXES_LOCKOUT_PARAMETERS does not contain 'ip_address'." " This configuration allows attackers to bypass rate limits by rotating User-Agents or Cookies." ) + LOCKOUT_TIERS_CONFLICT = ( + "AXES_LOCKOUT_TIERS is set alongside AXES_COOLOFF_TIME." + " When tiers are active, AXES_COOLOFF_TIME is ignored." + " Remove AXES_COOLOFF_TIME to silence this warning." + ) + LOCKOUT_TIERS_INVALID = ( + "AXES_LOCKOUT_TIERS must be a list of LockoutTier instances." + ) class Hints: @@ -35,6 +43,10 @@ class Hints: SETTING_DEPRECATED = None CALLABLE_INVALID = None LOCKOUT_PARAMETERS_INVALID = "Add 'ip_address' to AXES_LOCKOUT_PARAMETERS." + LOCKOUT_TIERS_CONFLICT = "Remove AXES_COOLOFF_TIME when using AXES_LOCKOUT_TIERS." + LOCKOUT_TIERS_INVALID = ( + "Use: AXES_LOCKOUT_TIERS = [LockoutTier(failures=3, cooloff=timedelta(minutes=15)), ...]" + ) class Codes: @@ -44,6 +56,8 @@ class Codes: SETTING_DEPRECATED = "axes.W004" CALLABLE_INVALID = "axes.W005" LOCKOUT_PARAMETERS_INVALID = "axes.W006" + LOCKOUT_TIERS_CONFLICT = "axes.W007" + LOCKOUT_TIERS_INVALID = "axes.W008" @register(Tags.security, Tags.caches, Tags.compatibility) @@ -192,6 +206,41 @@ def axes_lockout_params_check(app_configs, **kwargs): # pylint: disable=unused- return warnings +@register(Tags.security) +def axes_lockout_tiers_check(app_configs, **kwargs): # pylint: disable=unused-argument + warnings = [] + tiers = getattr(settings, "AXES_LOCKOUT_TIERS", None) + if tiers is None: + return warnings + + if not _is_valid_tiers_list(tiers): + warnings.append( + Warning( + msg=Messages.LOCKOUT_TIERS_INVALID, + hint=Hints.LOCKOUT_TIERS_INVALID, + id=Codes.LOCKOUT_TIERS_INVALID, + ) + ) + return warnings + + if getattr(settings, "AXES_COOLOFF_TIME", None) is not None: + warnings.append( + Warning( + msg=Messages.LOCKOUT_TIERS_CONFLICT, + hint=Hints.LOCKOUT_TIERS_CONFLICT, + id=Codes.LOCKOUT_TIERS_CONFLICT, + ) + ) + + return warnings + + +def _is_valid_tiers_list(tiers) -> bool: + if not isinstance(tiers, (list, tuple)): + return False + return all(isinstance(t, LockoutTier) for t in tiers) + + @register def axes_conf_check(app_configs, **kwargs): # pylint: disable=unused-argument warnings = [] diff --git a/axes/conf.py b/axes/conf.py index e908401..1f7dd85 100644 --- a/axes/conf.py +++ b/axes/conf.py @@ -1,3 +1,6 @@ +from dataclasses import dataclass +from datetime import timedelta + from django.conf import settings from django.contrib.auth import get_user_model from django.utils.functional import SimpleLazyObject @@ -16,6 +19,12 @@ class JSONSerializableLazyObject(SimpleLazyObject): return str(self) +@dataclass(frozen=True, order=True) +class LockoutTier: + failures: int + cooloff: timedelta + + # disable plugin when set to False settings.AXES_ENABLED = getattr(settings, "AXES_ENABLED", True) @@ -108,6 +117,10 @@ settings.AXES_LOCKOUT_URL = getattr(settings, "AXES_LOCKOUT_URL", None) settings.AXES_COOLOFF_TIME = getattr(settings, "AXES_COOLOFF_TIME", None) +# Progressive lockout tiers: list of LockoutTier(failures, cooloff) instances. +# When set, overrides AXES_FAILURE_LIMIT and AXES_COOLOFF_TIME. +settings.AXES_LOCKOUT_TIERS = getattr(settings, "AXES_LOCKOUT_TIERS", None) + settings.AXES_USE_ATTEMPT_EXPIRATION = getattr( settings, "AXES_USE_ATTEMPT_EXPIRATION", False ) diff --git a/axes/helpers.py b/axes/helpers.py index a7ccf60..01005e2 100644 --- a/axes/helpers.py +++ b/axes/helpers.py @@ -1,8 +1,8 @@ -from datetime import timedelta, datetime +from datetime import datetime, timedelta from hashlib import sha256 from logging import getLogger from string import Template -from typing import Callable, Optional, Type, Union, List +from typing import Callable, List, Optional, Type, Union from urllib.parse import urlencode from django.core.cache import BaseCache, caches @@ -11,7 +11,7 @@ from django.shortcuts import redirect, render from django.utils.encoding import force_bytes from django.utils.module_loading import import_string -from axes.conf import settings +from axes.conf import LockoutTier, settings from axes.models import AccessBase log = getLogger(__name__) @@ -60,9 +60,16 @@ def get_cool_off(request: Optional[HttpRequest] = None) -> Optional[timedelta]: offers a unified _timedelta or None_ representation of that configuration for use with the Axes internal implementations. + When ``AXES_LOCKOUT_TIERS`` is configured, the cooloff is resolved from the + matching tier based on the failure count attached to the request. + :exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type. """ + tier = _resolve_tier_from_request(request) + if tier is not None: + return tier.cooloff + cool_off = settings.AXES_COOLOFF_TIME if isinstance(cool_off, int): @@ -101,6 +108,31 @@ def get_cool_off_iso8601(delta: timedelta) -> str: return f"P{days_str}" +def get_lockout_tier(failures: int) -> Optional[LockoutTier]: + """Return the matching ``LockoutTier`` for *failures*, or ``None``.""" + tiers = settings.AXES_LOCKOUT_TIERS + if not tiers: + return None + sorted_tiers = sorted(tiers, key=lambda t: t.failures) + matched = None + for tier in sorted_tiers: + if failures >= tier.failures: + matched = tier + return matched + + +def _resolve_tier_from_request( + request: Optional[HttpRequest], +) -> Optional[LockoutTier]: + """Extract failure count from *request* and resolve the tier.""" + if not settings.AXES_LOCKOUT_TIERS or request is None: + return None + failures = getattr(request, "axes_failures_since_start", None) + if failures is None: + return None + return get_lockout_tier(failures) + + def get_attempt_expiration(request: Optional[HttpRequest] = None) -> datetime: """ Get threshold for fetching access attempts from the database. @@ -444,6 +476,11 @@ def get_query_str(query: Type[QueryDict], max_length: int = 1024) -> str: def get_failure_limit(request: HttpRequest, credentials) -> int: + tiers = settings.AXES_LOCKOUT_TIERS + if tiers: + sorted_tiers = sorted(tiers, key=lambda t: t.failures) + return sorted_tiers[0].failures + if callable(settings.AXES_FAILURE_LIMIT): return settings.AXES_FAILURE_LIMIT( # pylint: disable=not-callable request, credentials @@ -456,7 +493,7 @@ def get_failure_limit(request: HttpRequest, credentials) -> int: def get_lockout_message() -> str: - if settings.AXES_COOLOFF_TIME: + if settings.AXES_COOLOFF_TIME or settings.AXES_LOCKOUT_TIERS: return settings.AXES_COOLOFF_MESSAGE return settings.AXES_PERMALOCK_MESSAGE @@ -488,8 +525,10 @@ def get_lockout_response( ) status = settings.AXES_HTTP_RESPONSE_CODE + failures = getattr(request, "axes_failures_since_start", None) or 0 context = { "failure_limit": get_failure_limit(request, credentials), + "failure_count": failures, "username": get_client_username(request, credentials) or "", } diff --git a/docs/4_configuration.rst b/docs/4_configuration.rst index 36c4af5..d8202be 100644 --- a/docs/4_configuration.rst +++ b/docs/4_configuration.rst @@ -25,6 +25,8 @@ The following ``settings.py`` options are available for customizing Axes behavio +------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | AXES_COOLOFF_TIME | None | If set, defines a period of inactivity after which old failed login attempts will be cleared. Can be set to a Python timedelta object, an integer, a float, a callable, or a string path to a callable which takes the request as argument. If an integer or float, will be interpreted as a number of hours: ``AXES_COOLOFF_TIME = 2`` 2 hours, ``AXES_COOLOFF_TIME = 2.0`` 2 hours, 120 minutes, ``AXES_COOLOFF_TIME = 1.7`` 1.7 hours, 102 minutes, 6120 seconds | +------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ +| AXES_LOCKOUT_TIERS | None | A list of ``LockoutTier(failures, cooloff)`` instances that define progressive lockout durations. When set, overrides ``AXES_FAILURE_LIMIT`` and ``AXES_COOLOFF_TIME``. The lowest tier threshold becomes the effective failure limit, and each subsequent tier applies a longer cool-off. Example: ``from datetime import timedelta``, ``from axes.conf import LockoutTier``, ``AXES_LOCKOUT_TIERS = [LockoutTier(failures=3, cooloff=timedelta(minutes=15)), LockoutTier(failures=6, cooloff=timedelta(hours=2)), LockoutTier(failures=10, cooloff=timedelta(days=1))]``. With this configuration: 3 failures → 15 min lockout, 6 failures → 2 h, 10+ failures → 24 h. | ++------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | AXES_ONLY_ADMIN_SITE | False | If ``True``, lock is only enabled for admin site. Admin site is determined by checking request path against the path of ``"admin:index"`` view. If admin urls are not registered in current urlconf, all requests will not be locked. | +------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | AXES_ONLY_USER_FAILURES | False | DEPRECATED: USE ``AXES_LOCKOUT_PARAMETERS`` INSTEAD. If ``True``, only lock based on username, and never lock based on IP if attempts exceed the limit. Otherwise utilize the existing IP and user locking logic. | diff --git a/tests/test_checks.py b/tests/test_checks.py index 1f5fdaa..d641ee1 100644 --- a/tests/test_checks.py +++ b/tests/test_checks.py @@ -1,8 +1,12 @@ -from django.core.checks import run_checks, Warning # pylint: disable=redefined-builtin -from django.test import override_settings, modify_settings +from datetime import timedelta + +from django.core.checks import (Warning, # pylint: disable=redefined-builtin + run_checks) +from django.test import modify_settings, override_settings from axes.backends import AxesStandaloneBackend -from axes.checks import Messages, Hints, Codes +from axes.checks import Codes, Hints, Messages +from axes.conf import LockoutTier from tests.base import AxesTestCase @@ -150,3 +154,45 @@ class LockoutParametersCheckTestCase(AxesTestCase): id=Codes.LOCKOUT_PARAMETERS_INVALID, ) self.assertEqual(warnings, [warning]) + + +class LockoutTiersCheckTestCase(AxesTestCase): + SAMPLE_TIERS = [ + LockoutTier(failures=3, cooloff=timedelta(minutes=15)), + LockoutTier(failures=6, cooloff=timedelta(hours=2)), + ] + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None) + def test_tiers_alone_no_warning(self): + warnings = run_checks() + self.assertEqual(warnings, []) + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=1) + def test_tiers_with_cooloff_time_warns(self): + warnings = run_checks() + warning = Warning( + msg=Messages.LOCKOUT_TIERS_CONFLICT, + hint=Hints.LOCKOUT_TIERS_CONFLICT, + id=Codes.LOCKOUT_TIERS_CONFLICT, + ) + self.assertIn(warning, warnings) + + @override_settings(AXES_LOCKOUT_TIERS="not a list") + def test_tiers_invalid_format_warns(self): + warnings = run_checks() + warning = Warning( + msg=Messages.LOCKOUT_TIERS_INVALID, + hint=Hints.LOCKOUT_TIERS_INVALID, + id=Codes.LOCKOUT_TIERS_INVALID, + ) + self.assertIn(warning, warnings) + + @override_settings(AXES_LOCKOUT_TIERS=[(3, timedelta(minutes=15))]) + def test_tiers_plain_tuples_warns(self): + warnings = run_checks() + warning = Warning( + msg=Messages.LOCKOUT_TIERS_INVALID, + hint=Hints.LOCKOUT_TIERS_INVALID, + id=Codes.LOCKOUT_TIERS_INVALID, + ) + self.assertIn(warning, warnings) diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 584cfc4..c047491 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -7,6 +7,7 @@ from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, JsonRes from django.test import RequestFactory, override_settings from axes.apps import AppConfig +from axes.conf import LockoutTier from axes.helpers import ( cleanse_parameters, get_cache_timeout, @@ -18,6 +19,7 @@ from axes.helpers import ( get_cool_off, get_cool_off_iso8601, get_lockout_response, + get_lockout_tier, is_client_ip_address_blacklisted, is_client_ip_address_whitelisted, is_client_method_whitelisted, @@ -1103,3 +1105,107 @@ class AxesCleanseParamsTestCase(AxesTestCase): self.assertEqual("test_user", cleansed["username"]) self.assertEqual("********************", cleansed["password"]) self.assertEqual("sensitive", cleansed["other_sensitive_data"]) + + + +class AxesLockoutTiersTestCase(AxesTestCase): + SAMPLE_TIERS = [ + LockoutTier(failures=3, cooloff=timedelta(minutes=15)), + LockoutTier(failures=6, cooloff=timedelta(hours=2)), + LockoutTier(failures=10, cooloff=timedelta(days=1)), + ] + + # -- get_lockout_tier -- + + def test_get_lockout_tier_none_when_not_configured(self): + self.assertIsNone(get_lockout_tier(5)) + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS) + def test_get_lockout_tier_below_threshold(self): + self.assertIsNone(get_lockout_tier(2)) + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS) + def test_get_lockout_tier_first_tier(self): + tier = get_lockout_tier(3) + self.assertEqual(tier, LockoutTier(failures=3, cooloff=timedelta(minutes=15))) + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS) + def test_get_lockout_tier_between_tiers(self): + tier = get_lockout_tier(5) + self.assertEqual(tier, LockoutTier(failures=3, cooloff=timedelta(minutes=15))) + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS) + def test_get_lockout_tier_second_tier(self): + tier = get_lockout_tier(6) + self.assertEqual(tier, LockoutTier(failures=6, cooloff=timedelta(hours=2))) + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS) + def test_get_lockout_tier_highest_tier(self): + tier = get_lockout_tier(10) + self.assertEqual(tier, LockoutTier(failures=10, cooloff=timedelta(days=1))) + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS) + def test_get_lockout_tier_above_highest(self): + tier = get_lockout_tier(100) + self.assertEqual(tier, LockoutTier(failures=10, cooloff=timedelta(days=1))) + + # -- get_cool_off with tiers -- + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS) + def test_get_cool_off_with_tiers(self): + self.request.axes_failures_since_start = 6 + self.assertEqual(get_cool_off(self.request), timedelta(hours=2)) + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS) + def test_get_cool_off_tiers_no_failures_attr(self): + """When request has no failures attribute, returns None (no tier matched).""" + request = HttpRequest() + self.assertIsNone(get_cool_off(request)) + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS) + def test_get_cool_off_tiers_below_first_threshold(self): + self.request.axes_failures_since_start = 1 + self.assertIsNone(get_cool_off(self.request)) + + # -- get_failure_limit with tiers -- + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS) + def test_get_failure_limit_returns_lowest_tier(self): + from axes.helpers import get_failure_limit + self.assertEqual(get_failure_limit(self.request, self.credentials), 3) + + @override_settings(AXES_LOCKOUT_TIERS=[ + LockoutTier(failures=10, cooloff=timedelta(days=1)), + LockoutTier(failures=5, cooloff=timedelta(hours=1)), + ]) + def test_get_failure_limit_sorts_tiers(self): + from axes.helpers import get_failure_limit + self.assertEqual(get_failure_limit(self.request, self.credentials), 5) + + # -- get_lockout_message with tiers -- + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None) + def test_get_lockout_message_with_tiers(self): + from axes.helpers import get_lockout_message + from axes.conf import settings + self.assertEqual(get_lockout_message(), settings.AXES_COOLOFF_MESSAGE) + + # -- get_lockout_response with tiers -- + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None) + def test_get_lockout_response_includes_failure_count(self): + self.request.axes_failures_since_start = 6 + response = get_lockout_response(request=self.request) + self.assertEqual(response.status_code, 429) + + @override_settings(AXES_LOCKOUT_TIERS=SAMPLE_TIERS, AXES_COOLOFF_TIME=None) + def test_get_lockout_response_json_with_tiers(self): + self.request.axes_failures_since_start = 3 + self.request.META["HTTP_X_REQUESTED_WITH"] = "XMLHttpRequest" + response = get_lockout_response(request=self.request) + self.assertEqual(type(response), JsonResponse) + import json + data = json.loads(response.content) + self.assertEqual(data["failure_count"], 3) + self.assertIn("cooloff_time", data) +