mirror of
https://github.com/jazzband/django-axes.git
synced 2026-03-16 22:30:23 +00:00
feat: pass username to AXES_COOLOFF_TIME callback
If the AXES_COOLOFF_TIME is a callable or path to a callable taking an argument, pass the username to it. This should enable users to customize the cool off to be user dependant, and possibly implement a growing cool-off time: - First lockout cools off after 5 mins - Second one after 10 mins - etc...
This commit is contained in:
parent
3fa7fce3ad
commit
2fb4c81243
5 changed files with 51 additions and 11 deletions
|
|
@ -113,7 +113,7 @@ class AxesCacheHandler(AbstractAxesHandler, AxesBaseHandler):
|
|||
return
|
||||
|
||||
cache_keys = get_client_cache_keys(request, credentials)
|
||||
cache_timeout = get_cache_timeout()
|
||||
cache_timeout = get_cache_timeout(username)
|
||||
failures = []
|
||||
for cache_key in cache_keys:
|
||||
added = self.cache.add(key=cache_key, value=1, timeout=cache_timeout)
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
import functools
|
||||
import inspect
|
||||
from datetime import timedelta
|
||||
from hashlib import sha256
|
||||
from logging import getLogger
|
||||
|
|
@ -32,7 +34,7 @@ def get_cache() -> BaseCache:
|
|||
return caches[getattr(settings, "AXES_CACHE", "default")]
|
||||
|
||||
|
||||
def get_cache_timeout() -> Optional[int]:
|
||||
def get_cache_timeout(username: Optional[str] = None) -> Optional[int]:
|
||||
"""
|
||||
Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME.
|
||||
|
||||
|
|
@ -43,21 +45,22 @@ def get_cache_timeout() -> Optional[int]:
|
|||
for use with the Django cache backends.
|
||||
"""
|
||||
|
||||
cool_off = get_cool_off()
|
||||
cool_off = get_cool_off(username)
|
||||
if cool_off is None:
|
||||
return None
|
||||
return int(cool_off.total_seconds())
|
||||
|
||||
|
||||
def get_cool_off() -> Optional[timedelta]:
|
||||
def get_cool_off(username: Optional[str] = None) -> Optional[timedelta]:
|
||||
"""
|
||||
Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME.
|
||||
|
||||
The return value is either None or timedelta.
|
||||
|
||||
Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, or integer/float of hours,
|
||||
and this function offers a unified _timedelta or None_ representation of that configuration
|
||||
for use with the Axes internal implementations.
|
||||
Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, integer/float of hours,
|
||||
a path to a callable or a callable taking zero or 1 argument (the username). This function
|
||||
offers a unified _timedelta or None_ representation of that configuration for use with the
|
||||
Axes internal implementations.
|
||||
|
||||
:exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type.
|
||||
"""
|
||||
|
|
@ -69,13 +72,22 @@ def get_cool_off() -> Optional[timedelta]:
|
|||
if isinstance(cool_off, float):
|
||||
return timedelta(minutes=cool_off * 60)
|
||||
if isinstance(cool_off, str):
|
||||
return import_string(cool_off)()
|
||||
cool_off_func = import_string(cool_off)
|
||||
return _maybe_partial(cool_off_func, username)()
|
||||
if callable(cool_off):
|
||||
return cool_off() # pylint: disable=not-callable
|
||||
return _maybe_partial(cool_off, username)() # pylint: disable=not-callable
|
||||
|
||||
return cool_off
|
||||
|
||||
|
||||
def _maybe_partial(func: Callable, username: Optional[str] = None):
|
||||
"""Bind the given username to the function if it accepts a single argument."""
|
||||
sig = inspect.signature(func)
|
||||
if len(sig.parameters) == 1:
|
||||
return functools.partial(func, username)
|
||||
return func
|
||||
|
||||
|
||||
def get_cool_off_iso8601(delta: timedelta) -> str:
|
||||
"""
|
||||
Return datetime.timedelta translated to ISO 8601 formatted duration for use in e.g. cool offs.
|
||||
|
|
|
|||
|
|
@ -53,7 +53,9 @@ class AccessAttempt(AccessBase):
|
|||
|
||||
class AccessLog(AccessBase):
|
||||
logout_time = models.DateTimeField(_("Logout Time"), null=True, blank=True)
|
||||
session_hash = models.CharField(_("Session key hash (sha256)"), default="", blank=True, max_length=64)
|
||||
session_hash = models.CharField(
|
||||
_("Session key hash (sha256)"), default="", blank=True, max_length=64
|
||||
)
|
||||
|
||||
def __str__(self):
|
||||
return f"Access Log for {self.username} @ {self.attempt_time}"
|
||||
|
|
|
|||
|
|
@ -23,7 +23,7 @@ The following ``settings.py`` options are available for customizing Axes behavio
|
|||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_LOCK_OUT_AT_FAILURE | True | After the number of allowed login attempts are exceeded, should we lock out this IP (and optional user agent)? |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_COOLOFF_TIME | None | If set, defines a period of inactivity after which old failed login attempts will be cleared. Can be set to a Python timedelta object, an integer, a float, a callable, or a string path to a callable which takes no arguments. If an integer or float, will be interpreted as a number of hours: ``AXES_COOLOFF_TIME = 2`` 2 hours, ``AXES_COOLOFF_TIME = 2.0`` 2 hours, 120 minutes, ``AXES_COOLOFF_TIME = 1.7`` 1.7 hours, 102 minutes, 6120 seconds |
|
||||
| AXES_COOLOFF_TIME | None | If set, defines a period of inactivity after which old failed login attempts will be cleared. Can be set to a Python timedelta object, an integer, a float, a callable, or a string path to a callable which takes the username as argument. If an integer or float, will be interpreted as a number of hours: ``AXES_COOLOFF_TIME = 2`` 2 hours, ``AXES_COOLOFF_TIME = 2.0`` 2 hours, 120 minutes, ``AXES_COOLOFF_TIME = 1.7`` 1.7 hours, 102 minutes, 6120 seconds |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_ONLY_ADMIN_SITE | False | If ``True``, lock is only enabled for admin site. Admin site is determined by checking request path against the path of ``"admin:index"`` view. If admin urls are not registered in current urlconf, all requests will not be locked. |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
|
|
|||
|
|
@ -59,6 +59,32 @@ class CacheTestCase(AxesTestCase):
|
|||
def test_get_cache_timeout_none(self):
|
||||
self.assertEqual(get_cache_timeout(), None)
|
||||
|
||||
def test_get_increasing_cache_timeout(self):
|
||||
user_durations = {
|
||||
"ben": timedelta(minutes=5),
|
||||
"jen": timedelta(minutes=10),
|
||||
}
|
||||
|
||||
def _callback(username):
|
||||
previous_duration = user_durations.get(username, timedelta())
|
||||
user_durations[username] = previous_duration + timedelta(minutes=5)
|
||||
return user_durations[username]
|
||||
|
||||
with override_settings(AXES_COOLOFF_TIME=_callback):
|
||||
with self.subTest("no username"):
|
||||
self.assertEqual(get_cache_timeout(), 300)
|
||||
|
||||
with self.subTest("ben"):
|
||||
self.assertEqual(get_cache_timeout("ben"), 600)
|
||||
self.assertEqual(get_cache_timeout("ben"), 900)
|
||||
self.assertEqual(get_cache_timeout("ben"), 1200)
|
||||
|
||||
with self.subTest("jen"):
|
||||
self.assertEqual(get_cache_timeout("jen"), 900)
|
||||
|
||||
with self.subTest("james"):
|
||||
self.assertEqual(get_cache_timeout("james"), 300)
|
||||
|
||||
|
||||
class TimestampTestCase(AxesTestCase):
|
||||
def test_iso8601(self):
|
||||
|
|
|
|||
Loading…
Reference in a new issue