Refactor cache backend to use cache.incr method for request tracking

The old cache.set method has problems with correctness as well as
performance on higher traffic sites where there are
multiple parallel web servers running at the same time
which can overwrite each others shared cache
This commit is contained in:
Aleksi Häkli 2023-04-28 18:07:09 +03:00
parent 9924077a2a
commit c3cfb5150a
2 changed files with 16 additions and 10 deletions

View file

@ -6,7 +6,7 @@ from axes.handlers.base import AxesBaseHandler, AbstractAxesHandler
from axes.helpers import (
get_cache,
get_cache_timeout,
get_client_cache_key,
get_client_cache_keys,
get_client_str,
get_client_username,
get_credentials,
@ -110,7 +110,18 @@ class AxesCacheHandler(AbstractAxesHandler, AxesBaseHandler):
log.info("AXES: Login failed from whitelisted client %s.", client_str)
return
failures_since_start = 1 + self.get_failures(request, credentials)
cache_keys = get_client_cache_keys(request, credentials)
cache_timeout = get_cache_timeout()
failures = []
for cache_key in cache_keys:
added = self.cache.add(key=cache_key, value=1, timeout=cache_timeout)
if added:
failures.append(1)
else:
failures.append(self.cache.incr(key=cache_key, delta=1))
self.cache.touch(key=cache_key, timeout=cache_timeout)
failures_since_start = max(failures)
request.axes_failures_since_start = failures_since_start
if failures_since_start > 1:
@ -126,11 +137,6 @@ class AxesCacheHandler(AbstractAxesHandler, AxesBaseHandler):
client_str,
)
cache_keys = get_client_cache_keys(request, credentials)
for cache_key in cache_keys:
failures = self.cache.get(cache_key, default=0)
self.cache.set(cache_key, failures + 1, get_cache_timeout())
if (
settings.AXES_LOCK_OUT_AT_FAILURE
and failures_since_start >= get_failure_limit(request, credentials)

View file

@ -496,12 +496,12 @@ class AxesCacheHandlerTestCase(AxesHandlerBaseTestCase):
"AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created."
)
@patch.object(cache, "set")
def test_user_login_failed_with_none_username(self, cache_set):
@patch.object(cache, "add")
def test_user_login_failed_with_none_username(self, cache_add):
credentials = {"username": None, "password": "test"}
sender = MagicMock()
AxesProxyHandler.user_login_failed(sender, credentials, self.request)
self.assertTrue(cache_set.called)
self.assertTrue(cache_add.called)
@override_settings(AXES_HANDLER="axes.handlers.dummy.AxesDummyHandler")