django-axes/axes/attempts.py
Aleksi Häkli 46fc538193
Add cache handler and refactor tests
Signed-off-by: Aleksi Häkli <aleksi.hakli@iki.fi>
2019-02-23 20:19:37 +02:00

134 lines
4 KiB
Python

from logging import getLogger
from django.contrib.auth import get_user_model
from django.db.models import QuerySet
from django.http import HttpRequest
from django.utils.timezone import datetime, now
from axes.conf import settings
from axes.models import AccessAttempt
from axes.utils import (
get_client_ip_address,
get_client_username,
get_client_user_agent,
get_client_parameters,
get_cool_off,
)
log = getLogger(settings.AXES_LOGGER)
def get_cool_off_threshold(attempt_time: datetime = None) -> datetime:
"""
Get threshold for fetching access attempts from the database.
"""
if attempt_time is None:
return now() - get_cool_off()
return attempt_time - get_cool_off()
def filter_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet:
"""
Return a queryset of AccessAttempts that match the given request and credentials.
"""
username = get_client_username(request, credentials)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
filter_kwargs = get_client_parameters(username, ip_address, user_agent)
return AccessAttempt.objects.filter(**filter_kwargs)
def get_user_attempts(request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> QuerySet:
"""
Get valid user attempts that match the given request and credentials.
"""
attempts = filter_user_attempts(request, credentials)
if settings.AXES_COOLOFF_TIME is None:
log.debug('AXES: Getting all access attempts from database because no AXES_COOLOFF_TIME is configured')
return attempts
threshold = get_cool_off_threshold(attempt_time)
log.debug('AXES: Getting access attempts that are newer than %s', threshold)
return attempts.filter(attempt_time__gte=threshold)
def clean_expired_user_attempts(attempt_time: datetime = None) -> int:
"""
Clean expired user attempts from the database.
"""
if settings.AXES_COOLOFF_TIME is None:
log.debug('AXES: Skipping clean for expired access attempts because no AXES_COOLOFF_TIME is configured')
return 0
threshold = get_cool_off_threshold(attempt_time)
count, _ = AccessAttempt.objects.filter(attempt_time__lt=threshold).delete()
log.info('AXES: Cleaned up %s expired access attempts from database that were older than %s', count, threshold)
return count
def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int:
"""
Reset all user attempts that match the given request and credentials.
"""
attempts = filter_user_attempts(request, credentials)
count, _ = attempts.delete()
log.info('AXES: Reset %s access attempts from database.', count)
return count
def reset(ip: str = None, username: str = None) -> int:
"""
Reset records that match IP or username, and return the count of removed attempts.
This utility method is meant to be used from the CLI or via Python API.
"""
attempts = AccessAttempt.objects.all()
if ip:
attempts = attempts.filter(ip_address=ip)
if username:
attempts = attempts.filter(username=username)
count, _ = attempts.delete()
log.info('AXES: Reset %s access attempts from database.', count)
return count
def is_user_attempt_whitelisted(request: HttpRequest, credentials: dict = None) -> bool:
"""
Check if the given request or credentials refer to a whitelisted username.
A whitelisted user has the magic ``nolockout`` property set.
If the property is unknown or False or the user can not be found,
this implementation fails gracefully and returns True.
"""
username_field = getattr(get_user_model(), 'USERNAME_FIELD', 'username')
username_value = get_client_username(request, credentials)
kwargs = {
username_field: username_value
}
user_model = get_user_model()
try:
user = user_model.objects.get(**kwargs)
return user.nolockout
except (user_model.DoesNotExist, AttributeError):
pass
return False