mirror of
https://github.com/jazzband/django-defender.git
synced 2026-03-16 22:10:32 +00:00
Some checks failed
Test / build (3.10, 5) (push) Has been cancelled
Test / build (3.10, 6) (push) Has been cancelled
Test / build (3.10, 7) (push) Has been cancelled
Test / build (3.11, 5) (push) Has been cancelled
Test / build (3.11, 6) (push) Has been cancelled
Test / build (3.11, 7) (push) Has been cancelled
Test / build (3.12, 5) (push) Has been cancelled
Test / build (3.12, 6) (push) Has been cancelled
Test / build (3.12, 7) (push) Has been cancelled
Test / build (3.13, 5) (push) Has been cancelled
Test / build (3.13, 6) (push) Has been cancelled
Test / build (3.13, 7) (push) Has been cancelled
Test / build (3.9, 5) (push) Has been cancelled
Test / build (3.9, 6) (push) Has been cancelled
Test / build (3.9, 7) (push) Has been cancelled
* Use redis cache in `get_approx_account_lockouts_from_login_attempts` * use django_redis in ci * Add `django_redis` and `redis` to requirements.txt * Fix an issue detected by tests: clear redis cache upon block reset * Remove the unnecessary `if`
76 lines
3.1 KiB
Python
76 lines
3.1 KiB
Python
from datetime import timedelta
|
|
|
|
from defender import config
|
|
from defender.connection import get_redis_connection
|
|
from .models import AccessAttempt
|
|
from django.db.models import Q
|
|
from django.utils import timezone
|
|
|
|
|
|
def store_login_attempt(
|
|
user_agent, ip_address, username, http_accept, path_info, login_valid
|
|
):
|
|
""" Store the login attempt to the db. """
|
|
AccessAttempt.objects.create(
|
|
user_agent=user_agent,
|
|
ip_address=ip_address,
|
|
username=username,
|
|
http_accept=http_accept,
|
|
path_info=path_info,
|
|
login_valid=login_valid,
|
|
)
|
|
|
|
|
|
def get_approx_lockouts_cache_key(ip_address, username):
|
|
"""get cache key for approximate number of account lockouts"""
|
|
return "{0}:approx_lockouts:ip:{1}:user:{2}".format(
|
|
config.CACHE_PREFIX, ip_address or "", username.lower() if username else ""
|
|
)
|
|
|
|
|
|
def get_approx_account_lockouts_from_login_attempts(ip_address=None, username=None):
|
|
"""Get the approximate number of account lockouts in a period of ACCESS_ATTEMPT_EXPIRATION hours.
|
|
This is approximate because we do not consider the time between these failed
|
|
login attempts to be relevant.
|
|
|
|
Args:
|
|
ip_address (str, optional): IP address to search for. Can be used in conjunction with username for filtering when DISABLE_IP_LOCKOUT is False. Defaults to None.
|
|
username (str, optional): Username to search for. Can be used in conjunction with ip_address for filtering when DISABLE_USERNAME_LOCKOUT is False. Defaults to None.
|
|
|
|
Returns:
|
|
int: The minimum of the count of logged failure attempts and the length of the LOCKOUT_COOLOFF_TIMES - 1, or 0 dependant on either configuration or argument parameters (ie. both ip_address and username being None).
|
|
"""
|
|
if not config.STORE_ACCESS_ATTEMPTS or not (ip_address or username):
|
|
# If we're not storing login attempts OR both ip_address and username are
|
|
# None we should return 0.
|
|
return 0
|
|
|
|
q = Q(attempt_time__gte=timezone.now() - timedelta(hours=config.ACCESS_ATTEMPT_EXPIRATION))
|
|
failure_limit = config.FAILURE_LIMIT
|
|
if (ip_address and username and config.LOCKOUT_BY_IP_USERNAME \
|
|
and not config.DISABLE_IP_LOCKOUT and not config.DISABLE_USERNAME_LOCKOUT
|
|
):
|
|
q = q & Q(ip_address=ip_address) & Q(username=username)
|
|
elif ip_address and not config.DISABLE_IP_LOCKOUT:
|
|
failure_limit = config.IP_FAILURE_LIMIT
|
|
q = q & Q(ip_address=ip_address)
|
|
elif username and not config.DISABLE_USERNAME_LOCKOUT:
|
|
failure_limit = config.USERNAME_FAILURE_LIMIT
|
|
q = q & Q(username=username)
|
|
else:
|
|
# If we've made it this far and didn't hit one of the other if or elif
|
|
# conditions, we're in an inappropriate context.
|
|
raise Exception("Invalid state requested")
|
|
|
|
cache_key = get_approx_lockouts_cache_key(ip_address, username)
|
|
|
|
redis_client = get_redis_connection()
|
|
cached_value = redis_client.get(cache_key)
|
|
if cached_value is not None:
|
|
return int(cached_value)
|
|
|
|
lockouts = AccessAttempt.objects.filter(q).count() // failure_limit
|
|
|
|
redis_client.set(cache_key, int(lockouts), 60)
|
|
|
|
return lockouts
|