From 2fb4c81243e472999db3d654b51ec2feb243cc30 Mon Sep 17 00:00:00 2001 From: Bruno Alla Date: Wed, 14 Aug 2024 10:57:05 +0200 Subject: [PATCH] feat: pass username to AXES_COOLOFF_TIME callback If the AXES_COOLOFF_TIME is a callable or path to a callable taking an argument, pass the username to it. This should enable users to customize the cool off to be user dependant, and possibly implement a growing cool-off time: - First lockout cools off after 5 mins - Second one after 10 mins - etc... --- axes/handlers/cache.py | 2 +- axes/helpers.py | 28 ++++++++++++++++++++-------- axes/models.py | 4 +++- docs/4_configuration.rst | 2 +- tests/test_helpers.py | 26 ++++++++++++++++++++++++++ 5 files changed, 51 insertions(+), 11 deletions(-) diff --git a/axes/handlers/cache.py b/axes/handlers/cache.py index a797f39..07e6794 100644 --- a/axes/handlers/cache.py +++ b/axes/handlers/cache.py @@ -113,7 +113,7 @@ class AxesCacheHandler(AbstractAxesHandler, AxesBaseHandler): return cache_keys = get_client_cache_keys(request, credentials) - cache_timeout = get_cache_timeout() + cache_timeout = get_cache_timeout(username) failures = [] for cache_key in cache_keys: added = self.cache.add(key=cache_key, value=1, timeout=cache_timeout) diff --git a/axes/helpers.py b/axes/helpers.py index 78cc668..00d2b25 100644 --- a/axes/helpers.py +++ b/axes/helpers.py @@ -1,3 +1,5 @@ +import functools +import inspect from datetime import timedelta from hashlib import sha256 from logging import getLogger @@ -32,7 +34,7 @@ def get_cache() -> BaseCache: return caches[getattr(settings, "AXES_CACHE", "default")] -def get_cache_timeout() -> Optional[int]: +def get_cache_timeout(username: Optional[str] = None) -> Optional[int]: """ Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME. @@ -43,21 +45,22 @@ def get_cache_timeout() -> Optional[int]: for use with the Django cache backends. """ - cool_off = get_cool_off() + cool_off = get_cool_off(username) if cool_off is None: return None return int(cool_off.total_seconds()) -def get_cool_off() -> Optional[timedelta]: +def get_cool_off(username: Optional[str] = None) -> Optional[timedelta]: """ Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME. The return value is either None or timedelta. - Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, or integer/float of hours, - and this function offers a unified _timedelta or None_ representation of that configuration - for use with the Axes internal implementations. + Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, integer/float of hours, + a path to a callable or a callable taking zero or 1 argument (the username). This function + offers a unified _timedelta or None_ representation of that configuration for use with the + Axes internal implementations. :exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type. """ @@ -69,13 +72,22 @@ def get_cool_off() -> Optional[timedelta]: if isinstance(cool_off, float): return timedelta(minutes=cool_off * 60) if isinstance(cool_off, str): - return import_string(cool_off)() + cool_off_func = import_string(cool_off) + return _maybe_partial(cool_off_func, username)() if callable(cool_off): - return cool_off() # pylint: disable=not-callable + return _maybe_partial(cool_off, username)() # pylint: disable=not-callable return cool_off +def _maybe_partial(func: Callable, username: Optional[str] = None): + """Bind the given username to the function if it accepts a single argument.""" + sig = inspect.signature(func) + if len(sig.parameters) == 1: + return functools.partial(func, username) + return func + + def get_cool_off_iso8601(delta: timedelta) -> str: """ Return datetime.timedelta translated to ISO 8601 formatted duration for use in e.g. cool offs. diff --git a/axes/models.py b/axes/models.py index 9c8a7da..690aea6 100644 --- a/axes/models.py +++ b/axes/models.py @@ -53,7 +53,9 @@ class AccessAttempt(AccessBase): class AccessLog(AccessBase): logout_time = models.DateTimeField(_("Logout Time"), null=True, blank=True) - session_hash = models.CharField(_("Session key hash (sha256)"), default="", blank=True, max_length=64) + session_hash = models.CharField( + _("Session key hash (sha256)"), default="", blank=True, max_length=64 + ) def __str__(self): return f"Access Log for {self.username} @ {self.attempt_time}" diff --git a/docs/4_configuration.rst b/docs/4_configuration.rst index 1a60594..03849ce 100644 --- a/docs/4_configuration.rst +++ b/docs/4_configuration.rst @@ -23,7 +23,7 @@ The following ``settings.py`` options are available for customizing Axes behavio +------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | AXES_LOCK_OUT_AT_FAILURE | True | After the number of allowed login attempts are exceeded, should we lock out this IP (and optional user agent)? | +------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ -| AXES_COOLOFF_TIME | None | If set, defines a period of inactivity after which old failed login attempts will be cleared. Can be set to a Python timedelta object, an integer, a float, a callable, or a string path to a callable which takes no arguments. If an integer or float, will be interpreted as a number of hours: ``AXES_COOLOFF_TIME = 2`` 2 hours, ``AXES_COOLOFF_TIME = 2.0`` 2 hours, 120 minutes, ``AXES_COOLOFF_TIME = 1.7`` 1.7 hours, 102 minutes, 6120 seconds | +| AXES_COOLOFF_TIME | None | If set, defines a period of inactivity after which old failed login attempts will be cleared. Can be set to a Python timedelta object, an integer, a float, a callable, or a string path to a callable which takes the username as argument. If an integer or float, will be interpreted as a number of hours: ``AXES_COOLOFF_TIME = 2`` 2 hours, ``AXES_COOLOFF_TIME = 2.0`` 2 hours, 120 minutes, ``AXES_COOLOFF_TIME = 1.7`` 1.7 hours, 102 minutes, 6120 seconds | +------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ | AXES_ONLY_ADMIN_SITE | False | If ``True``, lock is only enabled for admin site. Admin site is determined by checking request path against the path of ``"admin:index"`` view. If admin urls are not registered in current urlconf, all requests will not be locked. | +------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+ diff --git a/tests/test_helpers.py b/tests/test_helpers.py index 0201fbb..82550b6 100644 --- a/tests/test_helpers.py +++ b/tests/test_helpers.py @@ -59,6 +59,32 @@ class CacheTestCase(AxesTestCase): def test_get_cache_timeout_none(self): self.assertEqual(get_cache_timeout(), None) + def test_get_increasing_cache_timeout(self): + user_durations = { + "ben": timedelta(minutes=5), + "jen": timedelta(minutes=10), + } + + def _callback(username): + previous_duration = user_durations.get(username, timedelta()) + user_durations[username] = previous_duration + timedelta(minutes=5) + return user_durations[username] + + with override_settings(AXES_COOLOFF_TIME=_callback): + with self.subTest("no username"): + self.assertEqual(get_cache_timeout(), 300) + + with self.subTest("ben"): + self.assertEqual(get_cache_timeout("ben"), 600) + self.assertEqual(get_cache_timeout("ben"), 900) + self.assertEqual(get_cache_timeout("ben"), 1200) + + with self.subTest("jen"): + self.assertEqual(get_cache_timeout("jen"), 900) + + with self.subTest("james"): + self.assertEqual(get_cache_timeout("james"), 300) + class TimestampTestCase(AxesTestCase): def test_iso8601(self):