From 3bece1aaaa90bdb5aab86e26a163543ab59dd532 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksi=20Ha=CC=88kli?= Date: Sun, 3 Mar 2019 21:56:57 +0200 Subject: [PATCH] Set Axes request attributes in middleware MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes #415 Signed-off-by: Aleksi Häkli --- axes/attempts.py | 20 +++----- axes/backends.py | 3 +- axes/conf.py | 3 -- axes/decorators.py | 4 +- axes/handlers/base.py | 22 ++++----- axes/handlers/cache.py | 35 ++++++-------- axes/handlers/database.py | 93 ++++++++++++++++---------------------- axes/handlers/proxy.py | 13 +++--- axes/helpers.py | 47 ++++++++----------- axes/middleware.py | 22 +++++++-- axes/request.py | 19 ++++++++ axes/tests/base.py | 19 +++++++- axes/tests/test_logging.py | 10 ---- axes/tests/test_utils.py | 33 ++++++++------ docs/4_configuration.rst | 1 - 15 files changed, 174 insertions(+), 170 deletions(-) create mode 100644 axes/request.py diff --git a/axes/attempts.py b/axes/attempts.py index 3308f62..e3e1904 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -2,15 +2,13 @@ from logging import getLogger from django.contrib.auth import get_user_model from django.db.models import QuerySet -from django.http import HttpRequest from django.utils.timezone import datetime, now from axes.conf import settings +from axes.request import AxesHttpRequest from axes.models import AccessAttempt from axes.helpers import ( - get_client_ip_address, get_client_username, - get_client_user_agent, get_client_parameters, get_cool_off, ) @@ -29,21 +27,19 @@ def get_cool_off_threshold(attempt_time: datetime = None) -> datetime: return attempt_time - get_cool_off() -def filter_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet: +def filter_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet: """ Return a queryset of AccessAttempts that match the given request and credentials. """ username = get_client_username(request, credentials) - ip_address = get_client_ip_address(request) - user_agent = get_client_user_agent(request) - filter_kwargs = get_client_parameters(username, ip_address, user_agent) + filter_kwargs = get_client_parameters(username, request.axes_ip_address, request.axes_user_agent) return AccessAttempt.objects.filter(**filter_kwargs) -def get_user_attempts(request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> QuerySet: +def get_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet: """ Get valid user attempts that match the given request and credentials. """ @@ -54,7 +50,7 @@ def get_user_attempts(request: HttpRequest, credentials: dict = None, attempt_ti log.debug('AXES: Getting all access attempts from database because no AXES_COOLOFF_TIME is configured') return attempts - threshold = get_cool_off_threshold(attempt_time) + threshold = get_cool_off_threshold(request.axes_attempt_time) log.debug('AXES: Getting access attempts that are newer than %s', threshold) return attempts.filter(attempt_time__gte=threshold) @@ -74,7 +70,7 @@ def clean_expired_user_attempts(attempt_time: datetime = None) -> int: return count -def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int: +def reset_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> int: """ Reset all user attempts that match the given request and credentials. """ @@ -87,9 +83,7 @@ def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int: return count - - -def is_user_attempt_whitelisted(request: HttpRequest, credentials: dict = None) -> bool: +def is_user_attempt_whitelisted(request: AxesHttpRequest, credentials: dict = None) -> bool: """ Check if the given request or credentials refer to a whitelisted username. diff --git a/axes/backends.py b/axes/backends.py index 2745374..bcf8ae1 100644 --- a/axes/backends.py +++ b/axes/backends.py @@ -3,6 +3,7 @@ from django.contrib.auth.backends import ModelBackend from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired from axes.handlers.proxy import AxesProxyHandler from axes.helpers import get_credentials, get_lockout_message +from axes.request import AxesHttpRequest class AxesBackend(ModelBackend): @@ -10,7 +11,7 @@ class AxesBackend(ModelBackend): Authentication backend that forbids login attempts for locked out users. """ - def authenticate(self, request, username=None, password=None, **kwargs): + def authenticate(self, request: AxesHttpRequest, username: str = None, password: str = None, **kwargs): """ Check user lock out status and raises PermissionDenied if user is not allowed to log in. diff --git a/axes/conf.py b/axes/conf.py index b405204..37875a0 100644 --- a/axes/conf.py +++ b/axes/conf.py @@ -69,9 +69,6 @@ class AxesAppConf(AppConf): IP_BLACKLIST = None - # if no attribute is set by your backend, a value is calculated dynamically with the ipware package - CLIENT_IP_ATTRIBUTE = 'axes_client_ip' - # message to show when locked out and have cooloff enabled COOLOFF_MESSAGE = _('Account locked: too many login attempts. Please try again later') diff --git a/axes/decorators.py b/axes/decorators.py index 1656a6f..2e2411a 100644 --- a/axes/decorators.py +++ b/axes/decorators.py @@ -2,10 +2,11 @@ from functools import wraps from axes.handlers.proxy import AxesProxyHandler from axes.helpers import get_lockout_response +from axes.request import AxesHttpRequest def axes_dispatch(func): - def inner(request, *args, **kwargs): + def inner(request: AxesHttpRequest, *args, **kwargs): if AxesProxyHandler.is_allowed(request): return func(request, *args, **kwargs) @@ -22,5 +23,4 @@ def axes_form_invalid(func): return get_lockout_response(self.request) - return inner diff --git a/axes/handlers/base.py b/axes/handlers/base.py index 275ab67..17896c9 100644 --- a/axes/handlers/base.py +++ b/axes/handlers/base.py @@ -1,12 +1,10 @@ -from django.http import HttpRequest -from django.utils.timezone import datetime - from axes.conf import settings from axes.helpers import ( is_client_ip_address_blacklisted, is_client_ip_address_whitelisted, is_client_method_whitelisted, ) +from axes.request import AxesHttpRequest class AxesBaseHandler: # pylint: disable=unused-argument @@ -28,7 +26,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument and define the class to be used with ``settings.AXES_HANDLER = 'dotted.full.path.to.YourClass'``. """ - def is_allowed(self, request: HttpRequest, credentials: dict = None) -> bool: + def is_allowed(self, request: AxesHttpRequest, credentials: dict = None) -> bool: """ Check if the user is allowed to access or use given functionality such as a login view or authentication. @@ -54,17 +52,17 @@ class AxesBaseHandler: # pylint: disable=unused-argument return True - def user_login_failed(self, sender, credentials: dict, request: HttpRequest = None, **kwargs): + def user_login_failed(self, sender, credentials: dict, request: AxesHttpRequest = None, **kwargs): """ Handle the Django user_login_failed authentication signal. """ - def user_logged_in(self, sender, request: HttpRequest, user, **kwargs): + def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): """ Handle the Django user_logged_in authentication signal. """ - def user_logged_out(self, sender, request: HttpRequest, user, **kwargs): + def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs): """ Handle the Django user_logged_out authentication signal. """ @@ -79,7 +77,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument Handle the Axes AccessAttempt object post delete signal. """ - def is_blacklisted(self, request: HttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument + def is_blacklisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument """ Check if the request or given credentials are blacklisted from access. """ @@ -89,7 +87,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument return False - def is_whitelisted(self, request: HttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument + def is_whitelisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument """ Check if the request or given credentials are whitelisted for access. """ @@ -102,17 +100,17 @@ class AxesBaseHandler: # pylint: disable=unused-argument return False - def is_locked(self, request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> bool: + def is_locked(self, request: AxesHttpRequest, credentials: dict = None) -> bool: """ Check if the request or given credentials are locked. """ if settings.AXES_LOCK_OUT_AT_FAILURE: - return self.get_failures(request, credentials, attempt_time) >= settings.AXES_FAILURE_LIMIT + return self.get_failures(request, credentials) >= settings.AXES_FAILURE_LIMIT return False - def get_failures(self, request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> int: + def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int: """ Check the number of failures associated to the given request and credentials. """ diff --git a/axes/handlers/cache.py b/axes/handlers/cache.py index ff63b1e..14f68b5 100644 --- a/axes/handlers/cache.py +++ b/axes/handlers/cache.py @@ -2,17 +2,15 @@ from logging import getLogger from axes.conf import settings from axes.exceptions import AxesSignalPermissionDenied +from axes.request import AxesHttpRequest from axes.handlers.base import AxesBaseHandler from axes.signals import user_locked_out from axes.helpers import ( get_cache, get_cache_timeout, get_client_cache_key, - get_client_ip_address, - get_client_path_info, get_client_str, get_client_username, - get_client_user_agent, get_credentials, ) @@ -28,11 +26,17 @@ class AxesCacheHandler(AxesBaseHandler): # pylint: disable=too-many-locals self.cache = get_cache() self.cache_timeout = get_cache_timeout() - def get_failures(self, request, credentials=None, attempt_time=None) -> int: + def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int: cache_key = get_client_cache_key(request, credentials) return self.cache.get(cache_key, default=0) - def user_login_failed(self, sender, credentials, request=None, **kwargs): # pylint: disable=too-many-locals + def user_login_failed( + self, + sender, + credentials: dict, + request: AxesHttpRequest = None, + **kwargs + ): # pylint: disable=too-many-locals """ When user login fails, save attempt record in cache and lock user out if necessary. @@ -44,10 +48,7 @@ class AxesCacheHandler(AxesBaseHandler): # pylint: disable=too-many-locals return username = get_client_username(request, credentials) - ip_address = get_client_ip_address(request) - user_agent = get_client_user_agent(request) - path_info = get_client_path_info(request) - client_str = get_client_str(username, ip_address, user_agent, path_info) + client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) if self.is_whitelisted(request, credentials): log.info('AXES: Login failed from whitelisted client %s.', client_str) @@ -78,22 +79,19 @@ class AxesCacheHandler(AxesBaseHandler): # pylint: disable=too-many-locals 'axes', request=request, username=username, - ip_address=ip_address, + ip_address=request.axes_ip_address, ) raise AxesSignalPermissionDenied('Locked out due to repeated login failures.') - def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument + def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument """ When user logs in, update the AccessLog related to the user. """ username = user.get_username() credentials = get_credentials(username) - ip_address = get_client_ip_address(request) - user_agent = get_client_user_agent(request) - path_info = get_client_path_info(request) - client_str = get_client_str(username, ip_address, user_agent, path_info) + client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) log.info('AXES: Successful login by %s.', client_str) @@ -103,11 +101,8 @@ class AxesCacheHandler(AxesBaseHandler): # pylint: disable=too-many-locals self.cache.delete(cache_key) log.info('AXES: Deleted %d failed login attempts by %s from cache.', failures_since_start, client_str) - def user_logged_out(self, sender, request, user, **kwargs): + def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs): username = user.get_username() - ip_address = get_client_ip_address(request) - user_agent = get_client_user_agent(request) - path_info = get_client_path_info(request) - client_str = get_client_str(username, ip_address, user_agent, path_info) + client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) log.info('AXES: Successful logout by %s.', client_str) diff --git a/axes/handlers/database.py b/axes/handlers/database.py index aa6f24f..26af79f 100644 --- a/axes/handlers/database.py +++ b/axes/handlers/database.py @@ -2,7 +2,6 @@ from logging import getLogger from django.db.models import Max, Value from django.db.models.functions import Concat -from django.utils.timezone import now from axes.attempts import ( clean_expired_user_attempts, @@ -13,15 +12,12 @@ from axes.attempts import ( from axes.conf import settings from axes.exceptions import AxesSignalPermissionDenied from axes.handlers.base import AxesBaseHandler +from axes.request import AxesHttpRequest from axes.models import AccessLog, AccessAttempt from axes.signals import user_locked_out from axes.helpers import ( - get_client_ip_address, - get_client_path_info, - get_client_http_accept, get_client_str, get_client_username, - get_client_user_agent, get_credentials, get_query_str, ) @@ -35,38 +31,38 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals Signal handler implementation that records user login attempts to database and locks users out if necessary. """ - def get_failures(self, request, credentials=None, attempt_time=None) -> int: - attempts = get_user_attempts(request, credentials, attempt_time) + def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int: + attempts = get_user_attempts(request, credentials) return attempts.aggregate(Max('failures_since_start'))['failures_since_start__max'] or 0 - def is_locked(self, request, credentials=None, attempt_time=None): + def is_locked(self, request: AxesHttpRequest, credentials: dict = None): if is_user_attempt_whitelisted(request, credentials): return False - return super().is_locked(request, credentials, attempt_time) + return super().is_locked(request, credentials) - def user_login_failed(self, sender, credentials, request=None, **kwargs): # pylint: disable=too-many-locals + def user_login_failed( + self, + sender, + credentials: dict, + request: AxesHttpRequest = None, + **kwargs + ): # pylint: disable=too-many-locals """ When user login fails, save AccessAttempt record in database and lock user out if necessary. :raises AxesSignalPermissionDenied: if user should be locked out. """ - attempt_time = now() - - # 1. database query: Clean up expired user attempts from the database before logging new attempts - clean_expired_user_attempts(attempt_time) - if request is None: log.error('AXES: AxesDatabaseHandler.user_login_failed does not function without a request.') return + # 1. database query: Clean up expired user attempts from the database before logging new attempts + clean_expired_user_attempts(request.axes_attempt_time) + username = get_client_username(request, credentials) - ip_address = get_client_ip_address(request) - user_agent = get_client_user_agent(request) - path_info = get_client_path_info(request) - http_accept = get_client_http_accept(request) - client_str = get_client_str(username, ip_address, user_agent, path_info) + client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) get_data = get_query_str(request.GET) post_data = get_query_str(request.POST) @@ -76,7 +72,7 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals return # 2. database query: Calculate the current maximum failure number from the existing attempts - failures_since_start = 1 + self.get_failures(request, credentials, attempt_time) + failures_since_start = 1 + self.get_failures(request, credentials) # 3. database query: Insert or update access records with the new failure data if failures_since_start > 1: @@ -93,14 +89,14 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals separator = '\n---------\n' - attempts = get_user_attempts(request, credentials, attempt_time) + attempts = get_user_attempts(request, credentials) attempts.update( get_data=Concat('get_data', Value(separator + get_data)), post_data=Concat('post_data', Value(separator + post_data)), - http_accept=http_accept, - path_info=path_info, + http_accept=request.axes_http_accept, + path_info=request.axes_path_info, failures_since_start=failures_since_start, - attempt_time=attempt_time, + attempt_time=request.axes_attempt_time, ) else: # Record failed attempt with all the relevant information. @@ -114,14 +110,14 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals AccessAttempt.objects.create( username=username, - ip_address=ip_address, - user_agent=user_agent, + ip_address=request.axes_ip_address, + user_agent=request.axes_user_agent, get_data=get_data, post_data=post_data, - http_accept=http_accept, - path_info=path_info, + http_accept=request.axes_http_accept, + path_info=request.axes_path_info, failures_since_start=failures_since_start, - attempt_time=attempt_time, + attempt_time=request.axes_attempt_time, ) if failures_since_start >= settings.AXES_FAILURE_LIMIT: @@ -131,28 +127,22 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals 'axes', request=request, username=username, - ip_address=ip_address, + ip_address=request.axes_ip_address, ) raise AxesSignalPermissionDenied('Locked out due to repeated login failures.') - def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument + def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument """ When user logs in, update the AccessLog related to the user. """ - attempt_time = now() - # 1. database query: Clean up expired user attempts from the database - clean_expired_user_attempts(attempt_time) + clean_expired_user_attempts(request.axes_attempt_time) username = user.get_username() credentials = get_credentials(username) - ip_address = get_client_ip_address(request) - user_agent = get_client_user_agent(request) - path_info = get_client_path_info(request) - http_accept = get_client_http_accept(request) - client_str = get_client_str(username, ip_address, user_agent, path_info) + client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) log.info('AXES: Successful login by %s.', client_str) @@ -160,11 +150,11 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals # 2. database query: Insert new access logs with login time AccessLog.objects.create( username=username, - ip_address=ip_address, - user_agent=user_agent, - http_accept=http_accept, - path_info=path_info, - attempt_time=attempt_time, + ip_address=request.axes_ip_address, + user_agent=request.axes_user_agent, + http_accept=request.axes_http_accept, + path_info=request.axes_path_info, + attempt_time=request.axes_attempt_time, trusted=True, ) @@ -173,21 +163,16 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals count = reset_user_attempts(request, credentials) log.info('AXES: Deleted %d failed login attempts by %s from database.', count, client_str) - def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument + def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument """ When user logs out, update the AccessLog related to the user. """ - attempt_time = now() - # 1. database query: Clean up expired user attempts from the database - clean_expired_user_attempts(attempt_time) + clean_expired_user_attempts(request.axes_attempt_time) username = user.get_username() - ip_address = get_client_ip_address(request) - user_agent = get_client_user_agent(request) - path_info = get_client_path_info(request) - client_str = get_client_str(username, ip_address, user_agent, path_info) + client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) log.info('AXES: Successful logout by %s.', client_str) @@ -197,5 +182,5 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals username=username, logout_time__isnull=True, ).update( - logout_time=attempt_time, + logout_time=request.axes_attempt_time, ) diff --git a/axes/handlers/proxy.py b/axes/handlers/proxy.py index 81a3fc0..ea88b9e 100644 --- a/axes/handlers/proxy.py +++ b/axes/handlers/proxy.py @@ -1,11 +1,10 @@ from logging import getLogger -from django.http import HttpRequest from django.utils.module_loading import import_string -from django.utils.timezone import datetime from axes.conf import settings from axes.handlers.base import AxesBaseHandler +from axes.request import AxesHttpRequest log = getLogger(settings.AXES_LOGGER) @@ -37,23 +36,23 @@ class AxesProxyHandler(AxesBaseHandler): return cls.implementation @classmethod - def is_locked(cls, request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> bool: + def is_locked(cls, request: AxesHttpRequest, credentials: dict = None) -> bool: return cls.get_implementation().is_locked(request, credentials) @classmethod - def is_allowed(cls, request: HttpRequest, credentials: dict = None) -> bool: + def is_allowed(cls, request: AxesHttpRequest, credentials: dict = None) -> bool: return cls.get_implementation().is_allowed(request, credentials) @classmethod - def user_login_failed(cls, sender, credentials: dict, request: HttpRequest = None, **kwargs): + def user_login_failed(cls, sender, credentials: dict, request: AxesHttpRequest = None, **kwargs): return cls.get_implementation().user_login_failed(sender, credentials, request, **kwargs) @classmethod - def user_logged_in(cls, sender, request: HttpRequest, user, **kwargs): + def user_logged_in(cls, sender, request: AxesHttpRequest, user, **kwargs): return cls.get_implementation().user_logged_in(sender, request, user, **kwargs) @classmethod - def user_logged_out(cls, sender, request: HttpRequest, user, **kwargs): + def user_logged_out(cls, sender, request: AxesHttpRequest, user, **kwargs): return cls.get_implementation().user_logged_out(sender, request, user, **kwargs) @classmethod diff --git a/axes/helpers.py b/axes/helpers.py index 1ef0d22..5af1c93 100644 --- a/axes/helpers.py +++ b/axes/helpers.py @@ -1,17 +1,19 @@ from collections import OrderedDict from datetime import timedelta from hashlib import md5 +from ipaddress import ip_address from logging import getLogger from typing import Any, Optional, Type, Union from django.core.cache import caches, BaseCache -from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, JsonResponse, QueryDict +from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, JsonResponse, QueryDict from django.shortcuts import render from django.utils.module_loading import import_string import ipware.ip2 from axes.conf import settings +from axes.request import AxesHttpRequest log = getLogger(__name__) @@ -105,7 +107,7 @@ def get_credentials(username: str = None, **kwargs) -> dict: return credentials -def get_client_username(request: HttpRequest, credentials: dict = None) -> str: +def get_client_username(request: AxesHttpRequest, credentials: dict = None) -> str: """ Resolve client username from the given request or credentials if supplied. @@ -148,20 +150,15 @@ def get_client_ip_address(request: HttpRequest) -> str: that is used in the users HTTP proxy or *aaS service layers. Refer to the documentation for more information. """ - client_ip_attribute = settings.AXES_CLIENT_IP_ATTRIBUTE + client_ip_address, _ = ipware.ip2.get_client_ip( + request, + proxy_order=settings.AXES_PROXY_ORDER, + proxy_count=settings.AXES_PROXY_COUNT, + proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS, + request_header_order=settings.AXES_META_PRECEDENCE_ORDER, + ) - if not hasattr(request, client_ip_attribute): - client_ip, _ = ipware.ip2.get_client_ip( - request, - proxy_order=settings.AXES_PROXY_ORDER, - proxy_count=settings.AXES_PROXY_COUNT, - proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS, - request_header_order=settings.AXES_META_PRECEDENCE_ORDER, - ) - - setattr(request, client_ip_attribute, client_ip) - - return getattr(request, client_ip_attribute) + return str(ip_address(client_ip_address)) def get_client_user_agent(request: HttpRequest) -> str: @@ -271,7 +268,7 @@ def get_lockout_message() -> str: return settings.AXES_PERMALOCK_MESSAGE -def get_lockout_response(request: HttpRequest, credentials: dict = None) -> HttpResponse: +def get_lockout_response(request: AxesHttpRequest, credentials: dict = None) -> HttpResponse: status = 403 context = { 'failure_limit': settings.AXES_FAILURE_LIMIT, @@ -323,39 +320,35 @@ def is_ip_address_in_blacklist(ip_address: str) -> bool: return ip_address in settings.AXES_IP_BLACKLIST -def is_client_ip_address_whitelisted(request: HttpRequest): +def is_client_ip_address_whitelisted(request: AxesHttpRequest): """ Check if the given request refers to a whitelisted IP. """ - ip_address = get_client_ip_address(request) - - if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(ip_address): + if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(request.axes_ip_address): return True - if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(ip_address): + if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(request.axes_ip_address): return True return False -def is_client_ip_address_blacklisted(request: HttpRequest) -> bool: +def is_client_ip_address_blacklisted(request: AxesHttpRequest) -> bool: """ Check if the given request refers to a blacklisted IP. """ - ip_address = get_client_ip_address(request) - - if is_ip_address_in_blacklist(ip_address): + if is_ip_address_in_blacklist(request.axes_ip_address): return True - if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(ip_address): + if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(request.axes_ip_address): return True return False -def is_client_method_whitelisted(request: HttpRequest) -> bool: +def is_client_method_whitelisted(request: AxesHttpRequest) -> bool: """ Check if the given request uses a whitelisted method. """ diff --git a/axes/middleware.py b/axes/middleware.py index 79c1f94..1bbd13b 100644 --- a/axes/middleware.py +++ b/axes/middleware.py @@ -1,5 +1,15 @@ +from django.http import HttpRequest +from django.utils.timezone import now + from axes.exceptions import AxesSignalPermissionDenied -from axes.helpers import get_lockout_response +from axes.helpers import ( + get_client_ip_address, + get_client_user_agent, + get_client_path_info, + get_client_http_accept, + get_lockout_response, +) +from axes.request import AxesHttpRequest class AxesMiddleware: @@ -21,10 +31,16 @@ class AxesMiddleware: def __init__(self, get_response): self.get_response = get_response - def __call__(self, request): + def __call__(self, request: HttpRequest): + request.axes_attempt_time = now() + request.axes_ip_address = get_client_ip_address(request) + request.axes_user_agent = get_client_user_agent(request) + request.axes_path_info = get_client_path_info(request) + request.axes_http_accept = get_client_http_accept(request) + return self.get_response(request) - def process_exception(self, request, exception): # pylint: disable=inconsistent-return-statements + def process_exception(self, request: AxesHttpRequest, exception): # pylint: disable=inconsistent-return-statements """ Exception handler that processes exceptions raised by the axes signal handler when request fails with login. diff --git a/axes/request.py b/axes/request.py new file mode 100644 index 0000000..158fdc0 --- /dev/null +++ b/axes/request.py @@ -0,0 +1,19 @@ +from datetime import datetime # noqa + +from django.http import HttpRequest + + +class AxesHttpRequest(HttpRequest): + """ + Type definition for the HTTP request Axes uses. + """ + + def __init__(self): + super().__init__() + + # TODO: Move attribute definitions to class level in Python 3.6+ + self.axes_attempt_time = None # type: datetime + self.axes_ip_address = None # type: str + self.axes_user_agent = None # type: str + self.axes_path_info = None # type: str + self.axes_http_accept = None # type: str diff --git a/axes/tests/base.py b/axes/tests/base.py index a2259ff..fe0b391 100644 --- a/axes/tests/base.py +++ b/axes/tests/base.py @@ -6,11 +6,20 @@ from django.contrib.auth import get_user_model from django.http import HttpRequest from django.test import TestCase from django.urls import reverse +from django.utils.timezone import now from axes.utils import reset from axes.conf import settings -from axes.helpers import get_cache, get_cool_off, get_credentials -from axes.models import AccessLog, AccessAttempt +from axes.helpers import ( + get_cache, + get_client_http_accept, + get_client_ip_address, + get_client_path_info, + get_client_user_agent, + get_cool_off, + get_credentials, +) +from axes.models import AccessAttempt class AxesTestCase(TestCase): @@ -61,6 +70,12 @@ class AxesTestCase(TestCase): self.request.META['HTTP_USER_AGENT'] = self.user_agent self.request.META['PATH_INFO'] = self.path_info + self.request.axes_attempt_time = now() + self.request.axes_ip_address = get_client_ip_address(self.request) + self.request.axes_user_agent = get_client_user_agent(self.request) + self.request.axes_path_info = get_client_path_info(self.request) + self.request.axes_http_accept = get_client_http_accept(self.request) + self.credentials = get_credentials(self.username) def tearDown(self): diff --git a/axes/tests/test_logging.py b/axes/tests/test_logging.py index 1a4bccd..5e76959 100644 --- a/axes/tests/test_logging.py +++ b/axes/tests/test_logging.py @@ -49,16 +49,6 @@ class AppsTestCase(AxesTestCase): class AccessLogTestCase(AxesTestCase): - def test_authenticate_invalid_parameters(self): - """ - Test that logging is not done if an attempt to authenticate with a custom authentication backend fails. - """ - - request = HttpRequest() - request.META['REMOTE_ADDR'] = '127.0.0.1' - authenticate(request=request, foo='bar') - self.assertEqual(AccessLog.objects.all().count(), 0) - def test_access_log_on_logout(self): """ Test a valid logout and make sure the logout_time is updated. diff --git a/axes/tests/test_utils.py b/axes/tests/test_utils.py index 7361d9d..42324fd 100644 --- a/axes/tests/test_utils.py +++ b/axes/tests/test_utils.py @@ -2,7 +2,7 @@ from datetime import timedelta from hashlib import md5 from unittest.mock import patch -from django.http import HttpRequest, JsonResponse, HttpResponseRedirect, HttpResponse +from django.http import JsonResponse, HttpResponseRedirect, HttpResponse from django.test import override_settings, RequestFactory from axes import get_version @@ -20,7 +20,9 @@ from axes.helpers import ( is_client_ip_address_whitelisted, is_ip_address_in_blacklist, is_ip_address_in_whitelist, - is_client_method_whitelisted) + is_client_method_whitelisted, +) +from axes.request import AxesHttpRequest class VersionTestCase(AxesTestCase): @@ -331,7 +333,7 @@ class UsernameTestCase(AxesTestCase): def test_default_get_client_username(self): expected = 'test-username' - request = HttpRequest() + request = AxesHttpRequest() request.POST['username'] = expected actual = get_client_username(request) @@ -343,7 +345,7 @@ class UsernameTestCase(AxesTestCase): expected = 'test-username' expected_in_credentials = 'test-credentials-username' - request = HttpRequest() + request = AxesHttpRequest() request.POST['username'] = expected credentials = { 'username': expected_in_credentials @@ -363,7 +365,7 @@ class UsernameTestCase(AxesTestCase): expected = 'prefixed-' + provided provided_in_credentials = 'test-credentials-username' - request = HttpRequest() + request = AxesHttpRequest() request.POST['username'] = provided credentials = {'username': provided_in_credentials} @@ -381,7 +383,7 @@ class UsernameTestCase(AxesTestCase): provided_in_credentials = 'test-credentials-username' expected_in_credentials = 'prefixed-' + provided_in_credentials - request = HttpRequest() + request = AxesHttpRequest() request.POST['username'] = provided credentials = {'username': provided_in_credentials} @@ -391,40 +393,41 @@ class UsernameTestCase(AxesTestCase): @override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials: 'example') # pragma: no cover def test_get_client_username(self): - self.assertEqual(get_client_username(HttpRequest(), {}), 'example') + self.assertEqual(get_client_username(AxesHttpRequest(), {}), 'example') @override_settings(AXES_USERNAME_CALLABLE=lambda request: None) # pragma: no cover def test_get_client_username_invalid_callable_too_few_arguments(self): with self.assertRaises(TypeError): - get_client_username(HttpRequest(), {}) + get_client_username(AxesHttpRequest(), {}) @override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials, extra: None) # pragma: no cover def test_get_client_username_invalid_callable_too_many_arguments(self): with self.assertRaises(TypeError): - get_client_username(HttpRequest(), {}) + get_client_username(AxesHttpRequest(), {}) @override_settings(AXES_USERNAME_CALLABLE=True) def test_get_client_username_not_callable(self): with self.assertRaises(TypeError): - get_client_username(HttpRequest(), {}) + get_client_username(AxesHttpRequest(), {}) @override_settings(AXES_USERNAME_CALLABLE='axes.tests.test_utils.get_username') def test_get_client_username_str(self): self.assertEqual( - get_client_username(HttpRequest(), {}), + get_client_username(AxesHttpRequest(), {}), 'username', ) -def get_username(request: HttpRequest, credentials: dict) -> str: +def get_username(request: AxesHttpRequest, credentials: dict) -> str: return 'username' class IPWhitelistTestCase(AxesTestCase): def setUp(self): - self.request = HttpRequest() + self.request = AxesHttpRequest() self.request.method = 'POST' self.request.META['REMOTE_ADDR'] = '127.0.0.1' + self.request.axes_ip_address = '127.0.0.1' @override_settings(AXES_IP_WHITELIST=None) def test_ip_in_whitelist_none(self): @@ -480,7 +483,7 @@ class IPWhitelistTestCase(AxesTestCase): class MethodWhitelistTestCase(AxesTestCase): def setUp(self): - self.request = HttpRequest() + self.request = AxesHttpRequest() self.request.method = 'GET' @override_settings(AXES_NEVER_LOCKOUT_GET=True) @@ -494,7 +497,7 @@ class MethodWhitelistTestCase(AxesTestCase): class LockoutResponseTestCase(AxesTestCase): def setUp(self): - self.request = HttpRequest() + self.request = AxesHttpRequest() @override_settings(AXES_COOLOFF_TIME=42) def test_get_lockout_response_cool_off(self): diff --git a/docs/4_configuration.rst b/docs/4_configuration.rst index 7f3a16d..fc67ad0 100644 --- a/docs/4_configuration.rst +++ b/docs/4_configuration.rst @@ -66,7 +66,6 @@ The following ``settings.py`` options are available for customizing Axes behavio Default: ``False`` * ``AXES_NEVER_LOCKOUT_WHITELIST``: If ``True``, users can always login from whitelisted IP addresses. Default: ``False`` -* ``AXES_CLIENT_IP_ATTRIBUTE``: A string that is used to lookup and set client IP on the request object. Default: ``'axes_client_ip'`` * ``AXES_IP_BLACKLIST``: An iterable of IPs to be blacklisted. Takes precedence over whitelists. For example: ``AXES_IP_BLACKLIST = ['0.0.0.0']``. Default: ``None`` * ``AXES_IP_WHITELIST``: An iterable of IPs to be whitelisted. For example: ``AXES_IP_WHITELIST = ['0.0.0.0']``. Default: ``None`` * ``AXES_DISABLE_ACCESS_LOG``: If ``True``, disable all access logging, so the admin interface will be empty. Default: ``False``