Refactor attempt and handler implementations

Signed-off-by: Aleksi Häkli <aleksi.hakli@iki.fi>
This commit is contained in:
Aleksi Häkli 2019-02-13 20:39:44 +02:00
parent 820ecca236
commit 6d83901031
No known key found for this signature in database
GPG key ID: 3E7146964D726BBE
2 changed files with 89 additions and 76 deletions

View file

@ -46,7 +46,7 @@ def get_filter_kwargs(username: str, ip_address: str, user_agent: str) -> Ordere
return query
def query_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet:
def filter_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet:
"""
Return a queryset of AccessAttempts that match the given request and credentials.
"""
@ -87,34 +87,39 @@ def get_cache_key(request_or_attempt, credentials: dict = None) -> str:
return cache_key
def get_user_attempts(request: HttpRequest, credentials: dict = None):
force_reload = False
attempts = query_user_attempts(request, credentials)
cache_hash_key = get_cache_key(request, credentials)
cache_timeout = get_cache_timeout()
def get_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet:
"""
Get valid user attempts and delete expired attempts which have cool offs in the past.
"""
attempts = filter_user_attempts(request, credentials)
# If settings.AXES_COOLOFF_TIME is not configured return the attempts
cool_off = get_cool_off()
if cool_off is None:
return attempts
if cool_off:
for attempt in attempts:
if attempt.attempt_time + cool_off < timezone.now():
attempt.delete()
force_reload = True
failures_cached = get_axes_cache().get(cache_hash_key)
if failures_cached is not None:
get_axes_cache().set(
cache_hash_key, failures_cached - 1, cache_timeout
)
# Else AccessAttempts that have expired need to be cleaned up from the database
num_deleted, _ = attempts.filter(attempt_time__lte=timezone.now() - cool_off).delete()
if not num_deleted:
return attempts
# If objects were deleted, we need to update the queryset to reflect this,
# so force a reload.
if force_reload:
attempts = query_user_attempts(request, credentials)
# If there deletions the cache needs to be updated
cache_key = get_cache_key(request, credentials)
num_failures_cached = get_axes_cache().get(cache_key)
if num_failures_cached is not None:
get_axes_cache().set(
cache_key,
num_failures_cached - num_deleted,
get_cache_timeout(),
)
return attempts
# AccessAttempts need to be refreshed from the database because of the delete before returning them
return attempts.all()
def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int:
attempts = query_user_attempts(request, credentials)
attempts = filter_user_attempts(request, credentials)
count, _ = attempts.delete()
return count
@ -194,8 +199,8 @@ def is_already_locked(request: HttpRequest, credentials: dict = None) -> bool:
1. Is the request HTTP method _whitelisted_? If it is, return ``False``.
2. Is the request IP address _blacklisted_? If it is, return ``True``.
3. Does the request or given credentials refer to a whitelisted user? If it does, return ``False``.
4. Does the request exceed the configured maximum attempt limit? If it does, return ``True``.
3. Is the request user _whitelisted_? If it is, return ``False``.
4. Is the request failure count over the attempt limit? If it is, return ``True``.
Refer to the function source code for the exact implementation.
"""
@ -209,19 +214,21 @@ def is_already_locked(request: HttpRequest, credentials: dict = None) -> bool:
if not is_user_lockable(request, credentials):
return False
if not settings.AXES_LOCK_OUT_AT_FAILURE:
return False
# Check failure statistics against cache
cache_hash_key = get_cache_key(request, credentials)
failures_cached = get_axes_cache().get(cache_hash_key)
if failures_cached is not None:
return (
failures_cached >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE
)
num_failures_cached = get_axes_cache().get(cache_hash_key)
for attempt in get_user_attempts(request, credentials):
if (
attempt.failures_since_start >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE
):
return True
# Do not hit the database if we have an answer in the cache
if num_failures_cached is not None:
return num_failures_cached >= settings.AXES_FAILURE_LIMIT
return False
# Check failure statistics against database
attempts = get_user_attempts(request, credentials)
failures = attempts.filter(
failures_since_start__gte=settings.AXES_FAILURE_LIMIT,
)
return failures.exists()

View file

@ -1,18 +1,18 @@
from logging import getLogger
from django.db.models import Max
from django.utils.timezone import now
from axes.attempts import get_cache_key
from axes.attempts import get_cache_key, is_already_locked
from axes.attempts import get_cache_timeout
from axes.attempts import get_user_attempts
from axes.attempts import is_user_lockable
from axes.attempts import ip_in_whitelist
from axes.attempts import reset_user_attempts
from axes.conf import settings
from axes.exceptions import AxesSignalPermissionDenied
from axes.models import AccessLog, AccessAttempt
from axes.signals import user_locked_out
from axes.utils import get_client_str
from axes.utils import get_client_str, get_client_user_agent
from axes.utils import query2str
from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials
@ -36,40 +36,49 @@ class AxesHandler: # pylint: disable=too-many-locals
log.warning('AxesHandler.user_login_failed does not function without a request.')
return
ip_address = get_client_ip(request)
username = get_client_username(request, credentials)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
ip_address = get_client_ip(request)
user_agent = get_client_user_agent(request)
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
client_str = get_client_str(username, ip_address, user_agent, path_info)
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address):
log.info('Login failed from whitelisted IP %s.', ip_address)
return
failures = 0
attempts = get_user_attempts(request, credentials)
cache_hash_key = get_cache_key(request, credentials)
cache_timeout = get_cache_timeout()
cache_key = get_cache_key(request, credentials)
num_failures_cached = get_axes_cache().get(cache_key)
failures_cached = get_axes_cache().get(cache_hash_key)
if failures_cached is not None:
failures = failures_cached
if num_failures_cached:
failures = num_failures_cached
elif attempts:
failures = attempts.aggregate(
Max('failures_since_start'),
)['failures_since_start__max']
else:
for attempt in attempts:
failures = max(failures, attempt.failures_since_start)
failures = 0
# add a failed attempt for this user
failures += 1
get_axes_cache().set(cache_hash_key, failures, cache_timeout)
get_axes_cache().set(
cache_key,
failures,
get_cache_timeout(),
)
# has already attempted, update the info
if attempts:
# Update existing attempt information but do not touch the username, ip_address, or user_agent fields,
# because attackers can request the site with multiple different usernames, addresses, or programs.
for attempt in attempts:
attempt.get_data = '%s\n---------\n%s' % (
template = '{}\n---------\n{}'
attempt.get_data = template.format(
attempt.get_data,
query2str(request.GET),
)
attempt.post_data = '%s\n---------\n%s' % (
attempt.post_data = template.format(
attempt.post_data,
query2str(request.POST)
)
@ -81,18 +90,18 @@ class AxesHandler: # pylint: disable=too-many-locals
log.info(
'AXES: Repeated login failure by %s. Count = %d of %d',
get_client_str(username, ip_address, user_agent, path_info),
client_str,
failures,
settings.AXES_FAILURE_LIMIT,
)
else:
# Record failed attempt. Whether or not the IP address or user agent is
# used in counting failures is handled elsewhere, so we just record
# everything here.
# used in counting failures is handled elsewhere, so we just record # everything here.
AccessAttempt.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
ip_address=ip_address,
user_agent=user_agent,
get_data=query2str(request.GET),
post_data=query2str(request.POST),
http_accept=http_accept,
@ -102,22 +111,15 @@ class AxesHandler: # pylint: disable=too-many-locals
log.info(
'AXES: New login failure by %s. Creating access record.',
get_client_str(username, ip_address, user_agent, path_info),
client_str,
)
# no matter what, we want to lock them out if they're past the number of
# attempts allowed, unless the user is set to notlockable
if (
failures >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE and
is_user_lockable(request, credentials)
):
if is_already_locked(request, credentials):
log.warning(
'AXES: Locked out %s after repeated login failures.',
get_client_str(username, ip_address, user_agent, path_info),
client_str,
)
# send signal when someone is locked out.
user_locked_out.send(
'axes',
request=request,
@ -135,13 +137,14 @@ class AxesHandler: # pylint: disable=too-many-locals
username = user.get_username()
credentials = get_credentials(username)
ip_address = get_client_ip(request)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
user_agent = get_client_user_agent(request)
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
client_str = get_client_str(username, ip_address, user_agent, path_info)
log.info(
'AXES: Successful login by %s.',
get_client_str(username, ip_address, user_agent, path_info),
client_str,
)
if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG:
@ -159,7 +162,7 @@ class AxesHandler: # pylint: disable=too-many-locals
log.info(
'AXES: Deleted %d failed login attempts by %s.',
count,
get_client_str(username, ip_address, user_agent, path_info),
client_str,
)
def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
@ -182,11 +185,14 @@ class AxesHandler: # pylint: disable=too-many-locals
Update cache after saving AccessAttempts.
"""
cache_hash_key = get_cache_key(instance)
cache_key = get_cache_key(instance)
if not get_axes_cache().get(cache_hash_key):
cache_timeout = get_cache_timeout()
get_axes_cache().set(cache_hash_key, instance.failures_since_start, cache_timeout)
if not get_axes_cache().get(cache_key):
get_axes_cache().set(
cache_key,
instance.failures_since_start,
get_cache_timeout(),
)
def post_delete_access_attempt(self, instance, **kwargs): # pylint: disable=unused-argument
"""