From 3152b4d7e9255d542382886e77f1c92c414b52bd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksi=20Ha=CC=88kli?= Date: Sun, 19 May 2019 15:54:27 +0300 Subject: [PATCH] Improve lockout and request handling The old architecture used exceptions in the signal handler which prevented transactions from running smoothly and signal handlers from running after Axes handlers. The new architecture changes the request approach to request flagging and moves the exception handling into the middleware call method. This allows users to more flexibly run their own signal handlers and optionally use the Axes middleware if they want to do so. Fixes #440 Fixes #442 --- CHANGES.rst | 9 +++++++ axes/attempts.py | 9 +++---- axes/backends.py | 3 +-- axes/decorators.py | 3 +-- axes/exceptions.py | 23 +--------------- axes/handlers/base.py | 17 ++++++------ axes/handlers/cache.py | 25 ++++-------------- axes/handlers/database.py | 28 +++++--------------- axes/handlers/dummy.py | 4 +-- axes/handlers/proxy.py | 42 +++++++++++++++++++++++++----- axes/helpers.py | 19 +++++++------- axes/middleware.py | 49 +++++------------------------------ axes/request.py | 38 --------------------------- axes/tests/base.py | 4 +-- axes/tests/test_middleware.py | 34 ++++++++++++------------ axes/tests/test_utils.py | 29 ++++++++++----------- docs/8_reference.rst | 4 --- 17 files changed, 121 insertions(+), 219 deletions(-) delete mode 100644 axes/request.py diff --git a/CHANGES.rst b/CHANGES.rst index 36a7070..26b8b6a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,15 @@ Changes ======= +5.0.5 (WIP) +----------- + +- Change the lockout response calculation to request flagging + instead of exception throwing in the signal handler and middleware. + Move request attribute calculation from middleware to handler layer. + Deprecate ``axes.request.AxesHttpRequest`` object type definition. + [aleksihakli] + 5.0.4 (2019-05-09) ------------------ diff --git a/axes/attempts.py b/axes/attempts.py index 0e73e16..45a1796 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -5,7 +5,6 @@ from django.db.models import QuerySet from django.utils.timezone import datetime, now from axes.conf import settings -from axes.request import AxesHttpRequest from axes.models import AccessAttempt from axes.helpers import ( get_client_username, @@ -30,7 +29,7 @@ def get_cool_off_threshold(attempt_time: datetime = None) -> datetime: return attempt_time - cool_off -def filter_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet: +def filter_user_attempts(request, credentials: dict = None) -> QuerySet: """ Return a queryset of AccessAttempts that match the given request and credentials. """ @@ -42,7 +41,7 @@ def filter_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> return AccessAttempt.objects.filter(**filter_kwargs) -def get_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet: +def get_user_attempts(request, credentials: dict = None) -> QuerySet: """ Get valid user attempts that match the given request and credentials. """ @@ -73,7 +72,7 @@ def clean_expired_user_attempts(attempt_time: datetime = None) -> int: return count -def reset_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> int: +def reset_user_attempts(request, credentials: dict = None) -> int: """ Reset all user attempts that match the given request and credentials. """ @@ -86,7 +85,7 @@ def reset_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> i return count -def is_user_attempt_whitelisted(request: AxesHttpRequest, credentials: dict = None) -> bool: +def is_user_attempt_whitelisted(request, credentials: dict = None) -> bool: """ Check if the given request or credentials refer to a whitelisted username. diff --git a/axes/backends.py b/axes/backends.py index 2e64cc9..f8376a1 100644 --- a/axes/backends.py +++ b/axes/backends.py @@ -3,7 +3,6 @@ from django.contrib.auth.backends import ModelBackend from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired from axes.handlers.proxy import AxesProxyHandler from axes.helpers import get_credentials, get_lockout_message, toggleable -from axes.request import AxesHttpRequest class AxesBackend(ModelBackend): @@ -18,7 +17,7 @@ class AxesBackend(ModelBackend): """ @toggleable - def authenticate(self, request: AxesHttpRequest, username: str = None, password: str = None, **kwargs: dict): + def authenticate(self, request, username: str = None, password: str = None, **kwargs: dict): """ Checks user lockout status and raises an exception if user is not allowed to log in. diff --git a/axes/decorators.py b/axes/decorators.py index 2e2411a..0cda982 100644 --- a/axes/decorators.py +++ b/axes/decorators.py @@ -2,11 +2,10 @@ from functools import wraps from axes.handlers.proxy import AxesProxyHandler from axes.helpers import get_lockout_response -from axes.request import AxesHttpRequest def axes_dispatch(func): - def inner(request: AxesHttpRequest, *args, **kwargs): + def inner(request, *args, **kwargs): if AxesProxyHandler.is_allowed(request): return func(request, *args, **kwargs) diff --git a/axes/exceptions.py b/axes/exceptions.py index 75fb841..773f1c2 100644 --- a/axes/exceptions.py +++ b/axes/exceptions.py @@ -1,28 +1,7 @@ from django.core.exceptions import PermissionDenied -class AxesPermissionDenied(PermissionDenied): - """ - Base class for permission denied errors raised by axes specifically for easier debugging. - - Two different types of errors are used because of the behaviour Django has: - - - If an authentication backend raises a PermissionDenied error the authentication flow is aborted. - - If another component raises a PermissionDenied error a HTTP 403 Forbidden response is returned. - """ - - pass - - -class AxesSignalPermissionDenied(AxesPermissionDenied): - """ - Raised by signal handler on failed authentication attempts to send user a HTTP 403 Forbidden status code. - """ - - pass - - -class AxesBackendPermissionDenied(AxesPermissionDenied): +class AxesBackendPermissionDenied(PermissionDenied): """ Raised by authentication backend on locked out requests to stop the Django authentication flow. """ diff --git a/axes/handlers/base.py b/axes/handlers/base.py index 60312de..3e5aca9 100644 --- a/axes/handlers/base.py +++ b/axes/handlers/base.py @@ -4,7 +4,6 @@ from axes.helpers import ( is_client_ip_address_whitelisted, is_client_method_whitelisted, ) -from axes.request import AxesHttpRequest class AxesHandler: # pylint: disable=unused-argument @@ -19,7 +18,7 @@ class AxesHandler: # pylint: disable=unused-argument .. note:: This is a virtual class and **can not be used without specialization**. """ - def is_allowed(self, request: AxesHttpRequest, credentials: dict = None) -> bool: + def is_allowed(self, request, credentials: dict = None) -> bool: """ Checks if the user is allowed to access or use given functionality such as a login view or authentication. @@ -45,17 +44,17 @@ class AxesHandler: # pylint: disable=unused-argument return True - def user_login_failed(self, sender, credentials: dict, request: AxesHttpRequest = None, **kwargs): + def user_login_failed(self, sender, credentials: dict, request = None, **kwargs): """ Handles the Django ``django.contrib.auth.signals.user_login_failed`` authentication signal. """ - def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): + def user_logged_in(self, sender, request, user, **kwargs): """ Handles the Django ``django.contrib.auth.signals.user_logged_in`` authentication signal. """ - def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs): + def user_logged_out(self, sender, request, user, **kwargs): """ Handles the Django ``django.contrib.auth.signals.user_logged_out`` authentication signal. """ @@ -70,7 +69,7 @@ class AxesHandler: # pylint: disable=unused-argument Handles the ``axes.models.AccessAttempt`` object post delete signal. """ - def is_blacklisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument + def is_blacklisted(self, request, credentials: dict = None) -> bool: # pylint: disable=unused-argument """ Checks if the request or given credentials are blacklisted from access. """ @@ -80,7 +79,7 @@ class AxesHandler: # pylint: disable=unused-argument return False - def is_whitelisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument + def is_whitelisted(self, request, credentials: dict = None) -> bool: # pylint: disable=unused-argument """ Checks if the request or given credentials are whitelisted for access. """ @@ -93,7 +92,7 @@ class AxesHandler: # pylint: disable=unused-argument return False - def is_locked(self, request: AxesHttpRequest, credentials: dict = None) -> bool: + def is_locked(self, request, credentials: dict = None) -> bool: """ Checks if the request or given credentials are locked. """ @@ -103,7 +102,7 @@ class AxesHandler: # pylint: disable=unused-argument return False - def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int: + def get_failures(self, request, credentials: dict = None) -> int: """ Checks the number of failures associated to the given request and credentials. diff --git a/axes/handlers/cache.py b/axes/handlers/cache.py index 8909bf4..517ced2 100644 --- a/axes/handlers/cache.py +++ b/axes/handlers/cache.py @@ -1,8 +1,6 @@ from logging import getLogger from axes.conf import settings -from axes.exceptions import AxesSignalPermissionDenied -from axes.request import AxesHttpRequest from axes.handlers.base import AxesHandler from axes.signals import user_locked_out from axes.helpers import ( @@ -26,7 +24,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals self.cache = get_cache() self.cache_timeout = get_cache_timeout() - def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int: + def get_failures(self, request, credentials: dict = None) -> int: cache_key = get_client_cache_key(request, credentials) return self.cache.get(cache_key, default=0) @@ -34,7 +32,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals self, sender, credentials: dict, - request: AxesHttpRequest = None, + request = None, **kwargs ): # pylint: disable=too-many-locals """ @@ -47,10 +45,6 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals log.error('AXES: AxesCacheHandler.user_login_failed does not function without a request.') return - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesCacheHandler.user_login_failed needs a valid AxesHttpRequest object.') - return - username = get_client_username(request, credentials) client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) @@ -79,6 +73,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals if failures_since_start >= settings.AXES_FAILURE_LIMIT: log.warning('AXES: Locking out %s after repeated login failures.', client_str) + request.axes_locked_out = True user_locked_out.send( 'axes', request=request, @@ -86,17 +81,11 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals ip_address=request.axes_ip_address, ) - raise AxesSignalPermissionDenied('Locked out due to repeated login failures.') - - def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument + def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument """ When user logs in, update the AccessLog related to the user. """ - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesCacheHandler.user_logged_in needs a valid AxesHttpRequest object.') - return - username = user.get_username() credentials = get_credentials(username) client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) @@ -109,11 +98,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals self.cache.delete(cache_key) log.info('AXES: Deleted %d failed login attempts by %s from cache.', failures_since_start, client_str) - def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs): - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesCacheHandler.user_logged_out needs a valid AxesHttpRequest object.') - return - + def user_logged_out(self, sender, request, user, **kwargs): username = user.get_username() if user else None client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) diff --git a/axes/handlers/database.py b/axes/handlers/database.py index 498b73d..18e6959 100644 --- a/axes/handlers/database.py +++ b/axes/handlers/database.py @@ -10,9 +10,7 @@ from axes.attempts import ( reset_user_attempts, ) from axes.conf import settings -from axes.exceptions import AxesSignalPermissionDenied from axes.handlers.base import AxesHandler -from axes.request import AxesHttpRequest from axes.models import AccessLog, AccessAttempt from axes.signals import user_locked_out from axes.helpers import ( @@ -31,11 +29,11 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals Signal handler implementation that records user login attempts to database and locks users out if necessary. """ - def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int: + def get_failures(self, request, credentials: dict = None) -> int: attempts = get_user_attempts(request, credentials) return attempts.aggregate(Max('failures_since_start'))['failures_since_start__max'] or 0 - def is_locked(self, request: AxesHttpRequest, credentials: dict = None): + def is_locked(self, request, credentials: dict = None): if is_user_attempt_whitelisted(request, credentials): return False @@ -45,7 +43,7 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals self, sender, credentials: dict, - request: AxesHttpRequest = None, + request = None, **kwargs ): # pylint: disable=too-many-locals """ @@ -58,10 +56,6 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals log.error('AXES: AxesDatabaseHandler.user_login_failed does not function without a request.') return - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesDatabaseHandler.user_login_failed needs a valid AxesHttpRequest object.') - return - # 1. database query: Clean up expired user attempts from the database before logging new attempts clean_expired_user_attempts(request.axes_attempt_time) @@ -127,6 +121,8 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals if failures_since_start >= settings.AXES_FAILURE_LIMIT: log.warning('AXES: Locking out %s after repeated login failures.', client_str) + request.axes_locked_out = True + user_locked_out.send( 'axes', request=request, @@ -134,17 +130,11 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals ip_address=request.axes_ip_address, ) - raise AxesSignalPermissionDenied('Locked out due to repeated login failures.') - - def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument + def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument """ When user logs in, update the AccessLog related to the user. """ - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesDatabaseHandler.user_logged_in needs a valid AxesHttpRequest object.') - return - # 1. database query: Clean up expired user attempts from the database clean_expired_user_attempts(request.axes_attempt_time) @@ -170,15 +160,11 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals count = reset_user_attempts(request, credentials) log.info('AXES: Deleted %d failed login attempts by %s from database.', count, client_str) - def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument + def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument """ When user logs out, update the AccessLog related to the user. """ - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesDatabaseHandler.user_logged_out needs a valid AxesHttpRequest object.') - return - # 1. database query: Clean up expired user attempts from the database clean_expired_user_attempts(request.axes_attempt_time) diff --git a/axes/handlers/dummy.py b/axes/handlers/dummy.py index ce9b005..0364845 100644 --- a/axes/handlers/dummy.py +++ b/axes/handlers/dummy.py @@ -1,5 +1,3 @@ -from django.http import HttpRequest - from axes.handlers.base import AxesHandler @@ -8,5 +6,5 @@ class AxesDummyHandler(AxesHandler): # pylint: disable=unused-argument Signal handler implementation that does nothing and can be used to disable signal processing. """ - def is_allowed(self, request: HttpRequest, credentials: dict = None) -> bool: + def is_allowed(self, request, credentials: dict = None) -> bool: return True diff --git a/axes/handlers/proxy.py b/axes/handlers/proxy.py index 5fb2575..2077508 100644 --- a/axes/handlers/proxy.py +++ b/axes/handlers/proxy.py @@ -1,11 +1,17 @@ from logging import getLogger from django.utils.module_loading import import_string +from django.utils.timezone import now from axes.conf import settings from axes.handlers.base import AxesHandler -from axes.helpers import toggleable -from axes.request import AxesHttpRequest +from axes.helpers import ( + get_client_ip_address, + get_client_user_agent, + get_client_path_info, + get_client_http_accept, + toggleable, +) log = getLogger(settings.AXES_LOGGER) @@ -36,27 +42,49 @@ class AxesProxyHandler(AxesHandler): cls.implementation = import_string(settings.AXES_HANDLER)() return cls.implementation + @staticmethod + def update_request(request): + """ + Update request attributes before passing them into the selected handler class. + """ + + if request is None: + log.error('AXES: AxesProxyHandler.update_request can not set request attributes to a None request') + return + + request.axes_locked_out = False + request.axes_attempt_time = now() + request.axes_ip_address = get_client_ip_address(request) + request.axes_user_agent = get_client_user_agent(request) + request.axes_path_info = get_client_path_info(request) + request.axes_http_accept = get_client_http_accept(request) + @classmethod - def is_locked(cls, request: AxesHttpRequest, credentials: dict = None) -> bool: + def is_locked(cls, request, credentials: dict = None) -> bool: + cls.update_request(request) return cls.get_implementation().is_locked(request, credentials) @classmethod - def is_allowed(cls, request: AxesHttpRequest, credentials: dict = None) -> bool: + def is_allowed(cls, request, credentials: dict = None) -> bool: + cls.update_request(request) return cls.get_implementation().is_allowed(request, credentials) @classmethod @toggleable - def user_login_failed(cls, sender, credentials: dict, request: AxesHttpRequest = None, **kwargs): + def user_login_failed(cls, sender, credentials: dict, request=None, **kwargs): + cls.update_request(request) return cls.get_implementation().user_login_failed(sender, credentials, request, **kwargs) @classmethod @toggleable - def user_logged_in(cls, sender, request: AxesHttpRequest, user, **kwargs): + def user_logged_in(cls, sender, request, user, **kwargs): + cls.update_request(request) return cls.get_implementation().user_logged_in(sender, request, user, **kwargs) @classmethod @toggleable - def user_logged_out(cls, sender, request: AxesHttpRequest, user, **kwargs): + def user_logged_out(cls, sender, request, user, **kwargs): + cls.update_request(request) return cls.get_implementation().user_logged_out(sender, request, user, **kwargs) @classmethod diff --git a/axes/helpers.py b/axes/helpers.py index 79a360f..d02e5aa 100644 --- a/axes/helpers.py +++ b/axes/helpers.py @@ -11,7 +11,6 @@ from django.utils.module_loading import import_string import ipware.ip2 from axes.conf import settings -from axes.request import AxesHttpRequest log = getLogger(__name__) @@ -102,7 +101,7 @@ def get_credentials(username: str = None, **kwargs) -> dict: return credentials -def get_client_username(request: AxesHttpRequest, credentials: dict = None) -> str: +def get_client_username(request, credentials: dict = None) -> str: """ Resolve client username from the given request or credentials if supplied. @@ -133,7 +132,7 @@ def get_client_username(request: AxesHttpRequest, credentials: dict = None) -> s return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) -def get_client_ip_address(request: HttpRequest) -> str: +def get_client_ip_address(request) -> str: """ Get client IP address as configured by the user. @@ -152,15 +151,15 @@ def get_client_ip_address(request: HttpRequest) -> str: return client_ip_address -def get_client_user_agent(request: HttpRequest) -> str: +def get_client_user_agent(request) -> str: return request.META.get('HTTP_USER_AGENT', '')[:255] -def get_client_path_info(request: HttpRequest) -> str: +def get_client_path_info(request) -> str: return request.META.get('PATH_INFO', '')[:255] -def get_client_http_accept(request: HttpRequest) -> str: +def get_client_http_accept(request) -> str: return request.META.get('HTTP_ACCEPT', '')[:1025] @@ -259,7 +258,7 @@ def get_lockout_message() -> str: return settings.AXES_PERMALOCK_MESSAGE -def get_lockout_response(request: AxesHttpRequest, credentials: dict = None) -> HttpResponse: +def get_lockout_response(request, credentials: dict = None) -> HttpResponse: status = 403 context = { 'failure_limit': settings.AXES_FAILURE_LIMIT, @@ -311,7 +310,7 @@ def is_ip_address_in_blacklist(ip_address: str) -> bool: return ip_address in settings.AXES_IP_BLACKLIST -def is_client_ip_address_whitelisted(request: AxesHttpRequest): +def is_client_ip_address_whitelisted(request): """ Check if the given request refers to a whitelisted IP. """ @@ -325,7 +324,7 @@ def is_client_ip_address_whitelisted(request: AxesHttpRequest): return False -def is_client_ip_address_blacklisted(request: AxesHttpRequest) -> bool: +def is_client_ip_address_blacklisted(request) -> bool: """ Check if the given request refers to a blacklisted IP. """ @@ -339,7 +338,7 @@ def is_client_ip_address_blacklisted(request: AxesHttpRequest) -> bool: return False -def is_client_method_whitelisted(request: AxesHttpRequest) -> bool: +def is_client_method_whitelisted(request) -> bool: """ Check if the given request uses a whitelisted method. """ diff --git a/axes/middleware.py b/axes/middleware.py index e2aedb0..2b1087d 100644 --- a/axes/middleware.py +++ b/axes/middleware.py @@ -1,18 +1,6 @@ from typing import Callable -from django.http import HttpRequest -from django.utils.timezone import now - -from axes.exceptions import AxesSignalPermissionDenied -from axes.helpers import ( - get_client_ip_address, - get_client_user_agent, - get_client_path_info, - get_client_http_accept, - get_lockout_response, - toggleable, -) -from axes.request import AxesHttpRequest +from axes.helpers import get_lockout_response class AxesMiddleware: @@ -42,35 +30,10 @@ class AxesMiddleware: def __init__(self, get_response: Callable): self.get_response = get_response - def __call__(self, request: HttpRequest): - self.update_request(request) - return self.get_response(request) + def __call__(self, request): + response = self.get_response(request) - @toggleable - def update_request(self, request: HttpRequest): - """ - Construct an ``AxesHttpRequest`` from the given ``HttpRequest`` - by updating the request with necessary attempt tracking attributes. + if getattr(request, 'axes_locked_out', None): + response = get_lockout_response(request) # type: ignore - This method is called by the middleware class ``__call__`` method - when iterating over the middleware stack. - """ - - request.axes_attempt_time = now() - request.axes_ip_address = get_client_ip_address(request) - request.axes_user_agent = get_client_user_agent(request) - request.axes_path_info = get_client_path_info(request) - request.axes_http_accept = get_client_http_accept(request) - - @toggleable - def process_exception(self, request: AxesHttpRequest, exception): # pylint: disable=inconsistent-return-statements - """ - Handle exceptions raised by the Axes signal handler class when requests fail checks. - - Note that only ``AxesSignalPermissionDenied`` is handled by this middleware class. - - :return: Configured ``HttpResponse`` for failed authentication attempts and lockouts. - """ - - if isinstance(exception, AxesSignalPermissionDenied): - return get_lockout_response(request) + return response diff --git a/axes/request.py b/axes/request.py deleted file mode 100644 index 978c392..0000000 --- a/axes/request.py +++ /dev/null @@ -1,38 +0,0 @@ -from datetime import datetime - -from django.http import HttpRequest - - -class AxesHttpRequest(HttpRequest): - """ - Extended Django ``HttpRequest`` with custom Axes attributes. - - This request is constructed by the ``AxesMiddleware`` class - where the custom attributes are inserted into the request. - - .. note:: The ``str`` type variables have a maximum length of 255 - characters and they are calculated in the middleware layer. - If the HTTP request attributes can not be resolved - they are assigned default value of ````. - - :var axes_attempt_time: Timestamp of the request on the server side. - :vartype axes_attempt_time: datetime - - :var axes_ip_address: Request IP address as resolved by django-axes and django-ipware configurations. - :vartype axes_ip_address: str - - :var axes_user_agent: Request agent from ``request.META['HTTP_USER_AGENT']``. - :vartype axes_user_agent: str - - :var axes_path_info: Request path from ``request.META['PATH_INFO']``. - :vartype axes_path_info: str - - :var axes_http_accept: Request ``Accept`` header from ``request.META['HTTP_ACCEPT']``. - :vartype axes_http_accept: str - """ - - axes_attempt_time: datetime - axes_ip_address: str - axes_user_agent: str - axes_path_info: str - axes_http_accept: str diff --git a/axes/tests/base.py b/axes/tests/base.py index fe0b391..fd18c73 100644 --- a/axes/tests/base.py +++ b/axes/tests/base.py @@ -41,7 +41,7 @@ class AxesTestCase(TestCase): LOGOUT_MESSAGE = 'Logged out' LOGIN_FORM_KEY = '' - SUCCESS = 200 + STATUS_SUCCESS = 200 ALLOWED = 302 BLOCKED = 403 @@ -161,7 +161,7 @@ class AxesTestCase(TestCase): def check_logout(self): response = self.logout() - self.assertContains(response, self.LOGOUT_MESSAGE, status_code=self.SUCCESS) + self.assertContains(response, self.LOGOUT_MESSAGE, status_code=self.STATUS_SUCCESS) def check_handler(self): """ diff --git a/axes/tests/test_middleware.py b/axes/tests/test_middleware.py index f803754..990c7af 100644 --- a/axes/tests/test_middleware.py +++ b/axes/tests/test_middleware.py @@ -1,28 +1,30 @@ from unittest.mock import patch, MagicMock -from django.http import HttpResponse +from django.http import HttpResponse, HttpRequest -from axes.exceptions import AxesSignalPermissionDenied from axes.middleware import AxesMiddleware from axes.tests.base import AxesTestCase class MiddlewareTestCase(AxesTestCase): - SUCCESS_RESPONSE = HttpResponse(status=200, content='Dispatched') - LOCKOUT_RESPONSE = HttpResponse(status=403, content='Locked out') + STATUS_SUCCESS = 200 + STATUS_LOCKOUT = 403 def setUp(self): - self.request = MagicMock() - self.get_response = MagicMock() + self.request = HttpRequest() - @patch('axes.middleware.get_lockout_response', return_value=LOCKOUT_RESPONSE) - def test_process_exception_axes(self, _): - exception = AxesSignalPermissionDenied() - response = AxesMiddleware(self.get_response).process_exception(self.request, exception) - self.assertEqual(response, self.LOCKOUT_RESPONSE) + def test_success_response(self): + def get_response(request): + request.axes_locked_out = False + return HttpResponse() - @patch('axes.middleware.get_lockout_response', return_value=LOCKOUT_RESPONSE) - def test_process_exception_other(self, _): - exception = Exception() - response = AxesMiddleware(self.get_response).process_exception(self.request, exception) - self.assertEqual(response, None) + response = AxesMiddleware(get_response)(self.request) + self.assertEqual(response.status_code, self.STATUS_SUCCESS) + + def test_lockout_response(self): + def get_response(request): + request.axes_locked_out = True + return HttpResponse() + + response = AxesMiddleware(get_response)(self.request) + self.assertEqual(response.status_code, self.STATUS_LOCKOUT) diff --git a/axes/tests/test_utils.py b/axes/tests/test_utils.py index 51f26eb..df28cf8 100644 --- a/axes/tests/test_utils.py +++ b/axes/tests/test_utils.py @@ -2,7 +2,7 @@ from datetime import timedelta from hashlib import md5 from unittest.mock import patch -from django.http import JsonResponse, HttpResponseRedirect, HttpResponse +from django.http import JsonResponse, HttpResponseRedirect, HttpResponse, HttpRequest from django.test import override_settings, RequestFactory from axes import get_version @@ -22,7 +22,6 @@ from axes.helpers import ( is_ip_address_in_whitelist, is_client_method_whitelisted, ) -from axes.request import AxesHttpRequest class VersionTestCase(AxesTestCase): @@ -367,7 +366,7 @@ class UsernameTestCase(AxesTestCase): def test_default_get_client_username(self): expected = 'test-username' - request = AxesHttpRequest() + request = HttpRequest() request.POST['username'] = expected actual = get_client_username(request) @@ -379,7 +378,7 @@ class UsernameTestCase(AxesTestCase): expected = 'test-username' expected_in_credentials = 'test-credentials-username' - request = AxesHttpRequest() + request = HttpRequest() request.POST['username'] = expected credentials = { 'username': expected_in_credentials @@ -399,7 +398,7 @@ class UsernameTestCase(AxesTestCase): expected = 'prefixed-' + provided provided_in_credentials = 'test-credentials-username' - request = AxesHttpRequest() + request = HttpRequest() request.POST['username'] = provided credentials = {'username': provided_in_credentials} @@ -417,7 +416,7 @@ class UsernameTestCase(AxesTestCase): provided_in_credentials = 'test-credentials-username' expected_in_credentials = 'prefixed-' + provided_in_credentials - request = AxesHttpRequest() + request = HttpRequest() request.POST['username'] = provided credentials = {'username': provided_in_credentials} @@ -427,38 +426,38 @@ class UsernameTestCase(AxesTestCase): @override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials: 'example') # pragma: no cover def test_get_client_username(self): - self.assertEqual(get_client_username(AxesHttpRequest(), {}), 'example') + self.assertEqual(get_client_username(HttpRequest(), {}), 'example') @override_settings(AXES_USERNAME_CALLABLE=lambda request: None) # pragma: no cover def test_get_client_username_invalid_callable_too_few_arguments(self): with self.assertRaises(TypeError): - get_client_username(AxesHttpRequest(), {}) + get_client_username(HttpRequest(), {}) @override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials, extra: None) # pragma: no cover def test_get_client_username_invalid_callable_too_many_arguments(self): with self.assertRaises(TypeError): - get_client_username(AxesHttpRequest(), {}) + get_client_username(HttpRequest(), {}) @override_settings(AXES_USERNAME_CALLABLE=True) def test_get_client_username_not_callable(self): with self.assertRaises(TypeError): - get_client_username(AxesHttpRequest(), {}) + get_client_username(HttpRequest(), {}) @override_settings(AXES_USERNAME_CALLABLE='axes.tests.test_utils.get_username') def test_get_client_username_str(self): self.assertEqual( - get_client_username(AxesHttpRequest(), {}), + get_client_username(HttpRequest(), {}), 'username', ) -def get_username(request: AxesHttpRequest, credentials: dict) -> str: +def get_username(request, credentials: dict) -> str: return 'username' class IPWhitelistTestCase(AxesTestCase): def setUp(self): - self.request = AxesHttpRequest() + self.request = HttpRequest() self.request.method = 'POST' self.request.META['REMOTE_ADDR'] = '127.0.0.1' self.request.axes_ip_address = '127.0.0.1' @@ -517,7 +516,7 @@ class IPWhitelistTestCase(AxesTestCase): class MethodWhitelistTestCase(AxesTestCase): def setUp(self): - self.request = AxesHttpRequest() + self.request = HttpRequest() self.request.method = 'GET' @override_settings(AXES_NEVER_LOCKOUT_GET=True) @@ -531,7 +530,7 @@ class MethodWhitelistTestCase(AxesTestCase): class LockoutResponseTestCase(AxesTestCase): def setUp(self): - self.request = AxesHttpRequest() + self.request = HttpRequest() @override_settings(AXES_COOLOFF_TIME=42) def test_get_lockout_response_cool_off(self): diff --git a/docs/8_reference.rst b/docs/8_reference.rst index 2efe381..5f68eb6 100644 --- a/docs/8_reference.rst +++ b/docs/8_reference.rst @@ -16,7 +16,3 @@ third party modules as long as they implement the following APIs. .. automodule:: axes.middleware :members: - -.. automodule:: axes.request - :members: - :show-inheritance: