Use redis cache in get_approx_account_lockouts_from_login_attempts

This commit is contained in:
Yurii Parfinenko 2026-01-14 13:34:34 -05:00
parent 37e5dd3123
commit 785130bbd6
No known key found for this signature in database
GPG key ID: C8AB91A72E804C3B
2 changed files with 38 additions and 6 deletions

View file

@ -1,6 +1,7 @@
from datetime import timedelta
from defender import config
from defender.connection import get_redis_connection
from .models import AccessAttempt
from django.db.models import Q
from django.utils import timezone
@ -19,6 +20,14 @@ def store_login_attempt(
login_valid=login_valid,
)
def get_approx_lockouts_cache_key(ip_address, username):
"""get cache key for approximate number of account lockouts"""
return "{0}:approx_lockouts:ip:{1}:user:{2}".format(
config.CACHE_PREFIX, ip_address or "", username.lower() if username else ""
)
def get_approx_account_lockouts_from_login_attempts(ip_address=None, username=None):
"""Get the approximate number of account lockouts in a period of ACCESS_ATTEMPT_EXPIRATION hours.
This is approximate because we do not consider the time between these failed
@ -31,10 +40,6 @@ def get_approx_account_lockouts_from_login_attempts(ip_address=None, username=No
Returns:
int: The minimum of the count of logged failure attempts and the length of the LOCKOUT_COOLOFF_TIMES - 1, or 0 dependant on either configuration or argument parameters (ie. both ip_address and username being None).
"""
# TODO: Possibly add logic to temporarily store this info in the cache
# to help mitigate any potential performance impact this could have.
if not config.STORE_ACCESS_ATTEMPTS or not (ip_address or username):
# If we're not storing login attempts OR both ip_address and username are
# None we should return 0.
@ -57,4 +62,18 @@ def get_approx_account_lockouts_from_login_attempts(ip_address=None, username=No
# conditions, we're in an inappropriate context.
raise Exception("Invalid state requested")
return AccessAttempt.objects.filter(q).count() // failure_limit
cache_key = get_approx_lockouts_cache_key(ip_address, username)
redis_client = get_redis_connection()
cached_value = redis_client.get(cache_key)
if cached_value is not None:
return int(cached_value)
lockouts = AccessAttempt.objects.filter(q).count() // failure_limit
# Cache for the same window length used by the DB query.
cache_ttl = int(config.ACCESS_ATTEMPT_EXPIRATION * 60 * 60)
redis_client.set(cache_key, int(lockouts), cache_ttl)
return lockouts

View file

@ -13,7 +13,7 @@ from django.urls import reverse
import redis
from defender.data import get_approx_account_lockouts_from_login_attempts
from defender.data import get_approx_account_lockouts_from_login_attempts, get_approx_lockouts_cache_key
from . import utils
from . import config
@ -964,6 +964,19 @@ class AccessAttemptTest(DefenderTestCase):
def test_approx_account_lockout_count_default_case_invalid_args_pt2(self):
with self.assertRaises(Exception):
get_approx_account_lockouts_from_login_attempts(username=VALID_USERNAME)
def test_approx_account_lockout_uses_redis_cache(self):
get_approx_account_lockouts_from_login_attempts(
ip_address="127.0.0.1", username=VALID_USERNAME
)
redis_client = get_redis_connection()
cached_value = redis_client.get(
get_approx_lockouts_cache_key(
ip_address="127.0.0.1", username=VALID_USERNAME
)
)
self.assertIsNone(cached_value)
class SignalTest(DefenderTestCase):