mirror of
https://github.com/jazzband/django-axes.git
synced 2026-03-16 22:30:23 +00:00
106 lines
3.1 KiB
Python
106 lines
3.1 KiB
Python
from logging import getLogger
|
|
from typing import List, Optional
|
|
|
|
from django.db.models import QuerySet
|
|
from django.http import HttpRequest
|
|
from django.utils.timezone import datetime, now
|
|
|
|
from axes.conf import settings
|
|
from axes.helpers import get_client_username, get_client_parameters, get_cool_off
|
|
from axes.models import AccessAttempt
|
|
|
|
log = getLogger(__name__)
|
|
|
|
|
|
def get_cool_off_threshold(attempt_time: Optional[datetime] = None) -> datetime:
|
|
"""
|
|
Get threshold for fetching access attempts from the database.
|
|
"""
|
|
|
|
cool_off = get_cool_off()
|
|
if cool_off is None:
|
|
raise TypeError(
|
|
"Cool off threshold can not be calculated with settings.AXES_COOLOFF_TIME set to None"
|
|
)
|
|
|
|
if attempt_time is None:
|
|
return now() - cool_off
|
|
return attempt_time - cool_off
|
|
|
|
|
|
def filter_user_attempts(
|
|
request: HttpRequest, credentials: Optional[dict] = None
|
|
) -> List[QuerySet]:
|
|
"""
|
|
Return a list querysets of AccessAttempts that match the given request and credentials.
|
|
"""
|
|
|
|
username = get_client_username(request, credentials)
|
|
|
|
filter_kwargs_list = get_client_parameters(
|
|
username, request.axes_ip_address, request.axes_user_agent, request, credentials
|
|
)
|
|
attempts_list = [
|
|
AccessAttempt.objects.filter(**filter_kwargs)
|
|
for filter_kwargs in filter_kwargs_list
|
|
]
|
|
return attempts_list
|
|
|
|
|
|
def get_user_attempts(
|
|
request: HttpRequest, credentials: Optional[dict] = None
|
|
) -> List[QuerySet]:
|
|
"""
|
|
Get list of querysets with valid user attempts that match the given request and credentials.
|
|
"""
|
|
|
|
attempts_list = filter_user_attempts(request, credentials)
|
|
|
|
if settings.AXES_COOLOFF_TIME is None:
|
|
log.debug(
|
|
"AXES: Getting all access attempts from database because no AXES_COOLOFF_TIME is configured"
|
|
)
|
|
return attempts_list
|
|
|
|
threshold = get_cool_off_threshold(request.axes_attempt_time)
|
|
log.debug("AXES: Getting access attempts that are newer than %s", threshold)
|
|
return [attempts.filter(attempt_time__gte=threshold) for attempts in attempts_list]
|
|
|
|
|
|
def clean_expired_user_attempts(attempt_time: Optional[datetime] = None) -> int:
|
|
"""
|
|
Clean expired user attempts from the database.
|
|
"""
|
|
|
|
if settings.AXES_COOLOFF_TIME is None:
|
|
log.debug(
|
|
"AXES: Skipping clean for expired access attempts because no AXES_COOLOFF_TIME is configured"
|
|
)
|
|
return 0
|
|
|
|
threshold = get_cool_off_threshold(attempt_time)
|
|
count, _ = AccessAttempt.objects.filter(attempt_time__lt=threshold).delete()
|
|
log.info(
|
|
"AXES: Cleaned up %s expired access attempts from database that were older than %s",
|
|
count,
|
|
threshold,
|
|
)
|
|
return count
|
|
|
|
|
|
def reset_user_attempts(
|
|
request: HttpRequest, credentials: Optional[dict] = None
|
|
) -> int:
|
|
"""
|
|
Reset all user attempts that match the given request and credentials.
|
|
"""
|
|
|
|
attempts_list = filter_user_attempts(request, credentials)
|
|
|
|
count = 0
|
|
for attempts in attempts_list:
|
|
_count, _ = attempts.delete()
|
|
count += _count
|
|
log.info("AXES: Reset %s access attempts from database.", count)
|
|
|
|
return count
|