diff --git a/CHANGES.rst b/CHANGES.rst index 36a7070..26b8b6a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -2,6 +2,15 @@ Changes ======= +5.0.5 (WIP) +----------- + +- Change the lockout response calculation to request flagging + instead of exception throwing in the signal handler and middleware. + Move request attribute calculation from middleware to handler layer. + Deprecate ``axes.request.AxesHttpRequest`` object type definition. + [aleksihakli] + 5.0.4 (2019-05-09) ------------------ diff --git a/axes/attempts.py b/axes/attempts.py index 0e73e16..45a1796 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -5,7 +5,6 @@ from django.db.models import QuerySet from django.utils.timezone import datetime, now from axes.conf import settings -from axes.request import AxesHttpRequest from axes.models import AccessAttempt from axes.helpers import ( get_client_username, @@ -30,7 +29,7 @@ def get_cool_off_threshold(attempt_time: datetime = None) -> datetime: return attempt_time - cool_off -def filter_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet: +def filter_user_attempts(request, credentials: dict = None) -> QuerySet: """ Return a queryset of AccessAttempts that match the given request and credentials. """ @@ -42,7 +41,7 @@ def filter_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> return AccessAttempt.objects.filter(**filter_kwargs) -def get_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet: +def get_user_attempts(request, credentials: dict = None) -> QuerySet: """ Get valid user attempts that match the given request and credentials. """ @@ -73,7 +72,7 @@ def clean_expired_user_attempts(attempt_time: datetime = None) -> int: return count -def reset_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> int: +def reset_user_attempts(request, credentials: dict = None) -> int: """ Reset all user attempts that match the given request and credentials. """ @@ -86,7 +85,7 @@ def reset_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> i return count -def is_user_attempt_whitelisted(request: AxesHttpRequest, credentials: dict = None) -> bool: +def is_user_attempt_whitelisted(request, credentials: dict = None) -> bool: """ Check if the given request or credentials refer to a whitelisted username. diff --git a/axes/backends.py b/axes/backends.py index 2e64cc9..f8376a1 100644 --- a/axes/backends.py +++ b/axes/backends.py @@ -3,7 +3,6 @@ from django.contrib.auth.backends import ModelBackend from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired from axes.handlers.proxy import AxesProxyHandler from axes.helpers import get_credentials, get_lockout_message, toggleable -from axes.request import AxesHttpRequest class AxesBackend(ModelBackend): @@ -18,7 +17,7 @@ class AxesBackend(ModelBackend): """ @toggleable - def authenticate(self, request: AxesHttpRequest, username: str = None, password: str = None, **kwargs: dict): + def authenticate(self, request, username: str = None, password: str = None, **kwargs: dict): """ Checks user lockout status and raises an exception if user is not allowed to log in. diff --git a/axes/decorators.py b/axes/decorators.py index 2e2411a..0cda982 100644 --- a/axes/decorators.py +++ b/axes/decorators.py @@ -2,11 +2,10 @@ from functools import wraps from axes.handlers.proxy import AxesProxyHandler from axes.helpers import get_lockout_response -from axes.request import AxesHttpRequest def axes_dispatch(func): - def inner(request: AxesHttpRequest, *args, **kwargs): + def inner(request, *args, **kwargs): if AxesProxyHandler.is_allowed(request): return func(request, *args, **kwargs) diff --git a/axes/exceptions.py b/axes/exceptions.py index 75fb841..773f1c2 100644 --- a/axes/exceptions.py +++ b/axes/exceptions.py @@ -1,28 +1,7 @@ from django.core.exceptions import PermissionDenied -class AxesPermissionDenied(PermissionDenied): - """ - Base class for permission denied errors raised by axes specifically for easier debugging. - - Two different types of errors are used because of the behaviour Django has: - - - If an authentication backend raises a PermissionDenied error the authentication flow is aborted. - - If another component raises a PermissionDenied error a HTTP 403 Forbidden response is returned. - """ - - pass - - -class AxesSignalPermissionDenied(AxesPermissionDenied): - """ - Raised by signal handler on failed authentication attempts to send user a HTTP 403 Forbidden status code. - """ - - pass - - -class AxesBackendPermissionDenied(AxesPermissionDenied): +class AxesBackendPermissionDenied(PermissionDenied): """ Raised by authentication backend on locked out requests to stop the Django authentication flow. """ diff --git a/axes/handlers/base.py b/axes/handlers/base.py index 60312de..3e5aca9 100644 --- a/axes/handlers/base.py +++ b/axes/handlers/base.py @@ -4,7 +4,6 @@ from axes.helpers import ( is_client_ip_address_whitelisted, is_client_method_whitelisted, ) -from axes.request import AxesHttpRequest class AxesHandler: # pylint: disable=unused-argument @@ -19,7 +18,7 @@ class AxesHandler: # pylint: disable=unused-argument .. note:: This is a virtual class and **can not be used without specialization**. """ - def is_allowed(self, request: AxesHttpRequest, credentials: dict = None) -> bool: + def is_allowed(self, request, credentials: dict = None) -> bool: """ Checks if the user is allowed to access or use given functionality such as a login view or authentication. @@ -45,17 +44,17 @@ class AxesHandler: # pylint: disable=unused-argument return True - def user_login_failed(self, sender, credentials: dict, request: AxesHttpRequest = None, **kwargs): + def user_login_failed(self, sender, credentials: dict, request = None, **kwargs): """ Handles the Django ``django.contrib.auth.signals.user_login_failed`` authentication signal. """ - def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): + def user_logged_in(self, sender, request, user, **kwargs): """ Handles the Django ``django.contrib.auth.signals.user_logged_in`` authentication signal. """ - def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs): + def user_logged_out(self, sender, request, user, **kwargs): """ Handles the Django ``django.contrib.auth.signals.user_logged_out`` authentication signal. """ @@ -70,7 +69,7 @@ class AxesHandler: # pylint: disable=unused-argument Handles the ``axes.models.AccessAttempt`` object post delete signal. """ - def is_blacklisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument + def is_blacklisted(self, request, credentials: dict = None) -> bool: # pylint: disable=unused-argument """ Checks if the request or given credentials are blacklisted from access. """ @@ -80,7 +79,7 @@ class AxesHandler: # pylint: disable=unused-argument return False - def is_whitelisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument + def is_whitelisted(self, request, credentials: dict = None) -> bool: # pylint: disable=unused-argument """ Checks if the request or given credentials are whitelisted for access. """ @@ -93,7 +92,7 @@ class AxesHandler: # pylint: disable=unused-argument return False - def is_locked(self, request: AxesHttpRequest, credentials: dict = None) -> bool: + def is_locked(self, request, credentials: dict = None) -> bool: """ Checks if the request or given credentials are locked. """ @@ -103,7 +102,7 @@ class AxesHandler: # pylint: disable=unused-argument return False - def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int: + def get_failures(self, request, credentials: dict = None) -> int: """ Checks the number of failures associated to the given request and credentials. diff --git a/axes/handlers/cache.py b/axes/handlers/cache.py index 8909bf4..517ced2 100644 --- a/axes/handlers/cache.py +++ b/axes/handlers/cache.py @@ -1,8 +1,6 @@ from logging import getLogger from axes.conf import settings -from axes.exceptions import AxesSignalPermissionDenied -from axes.request import AxesHttpRequest from axes.handlers.base import AxesHandler from axes.signals import user_locked_out from axes.helpers import ( @@ -26,7 +24,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals self.cache = get_cache() self.cache_timeout = get_cache_timeout() - def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int: + def get_failures(self, request, credentials: dict = None) -> int: cache_key = get_client_cache_key(request, credentials) return self.cache.get(cache_key, default=0) @@ -34,7 +32,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals self, sender, credentials: dict, - request: AxesHttpRequest = None, + request = None, **kwargs ): # pylint: disable=too-many-locals """ @@ -47,10 +45,6 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals log.error('AXES: AxesCacheHandler.user_login_failed does not function without a request.') return - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesCacheHandler.user_login_failed needs a valid AxesHttpRequest object.') - return - username = get_client_username(request, credentials) client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) @@ -79,6 +73,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals if failures_since_start >= settings.AXES_FAILURE_LIMIT: log.warning('AXES: Locking out %s after repeated login failures.', client_str) + request.axes_locked_out = True user_locked_out.send( 'axes', request=request, @@ -86,17 +81,11 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals ip_address=request.axes_ip_address, ) - raise AxesSignalPermissionDenied('Locked out due to repeated login failures.') - - def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument + def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument """ When user logs in, update the AccessLog related to the user. """ - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesCacheHandler.user_logged_in needs a valid AxesHttpRequest object.') - return - username = user.get_username() credentials = get_credentials(username) client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) @@ -109,11 +98,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals self.cache.delete(cache_key) log.info('AXES: Deleted %d failed login attempts by %s from cache.', failures_since_start, client_str) - def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs): - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesCacheHandler.user_logged_out needs a valid AxesHttpRequest object.') - return - + def user_logged_out(self, sender, request, user, **kwargs): username = user.get_username() if user else None client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) diff --git a/axes/handlers/database.py b/axes/handlers/database.py index 498b73d..18e6959 100644 --- a/axes/handlers/database.py +++ b/axes/handlers/database.py @@ -10,9 +10,7 @@ from axes.attempts import ( reset_user_attempts, ) from axes.conf import settings -from axes.exceptions import AxesSignalPermissionDenied from axes.handlers.base import AxesHandler -from axes.request import AxesHttpRequest from axes.models import AccessLog, AccessAttempt from axes.signals import user_locked_out from axes.helpers import ( @@ -31,11 +29,11 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals Signal handler implementation that records user login attempts to database and locks users out if necessary. """ - def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int: + def get_failures(self, request, credentials: dict = None) -> int: attempts = get_user_attempts(request, credentials) return attempts.aggregate(Max('failures_since_start'))['failures_since_start__max'] or 0 - def is_locked(self, request: AxesHttpRequest, credentials: dict = None): + def is_locked(self, request, credentials: dict = None): if is_user_attempt_whitelisted(request, credentials): return False @@ -45,7 +43,7 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals self, sender, credentials: dict, - request: AxesHttpRequest = None, + request = None, **kwargs ): # pylint: disable=too-many-locals """ @@ -58,10 +56,6 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals log.error('AXES: AxesDatabaseHandler.user_login_failed does not function without a request.') return - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesDatabaseHandler.user_login_failed needs a valid AxesHttpRequest object.') - return - # 1. database query: Clean up expired user attempts from the database before logging new attempts clean_expired_user_attempts(request.axes_attempt_time) @@ -127,6 +121,8 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals if failures_since_start >= settings.AXES_FAILURE_LIMIT: log.warning('AXES: Locking out %s after repeated login failures.', client_str) + request.axes_locked_out = True + user_locked_out.send( 'axes', request=request, @@ -134,17 +130,11 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals ip_address=request.axes_ip_address, ) - raise AxesSignalPermissionDenied('Locked out due to repeated login failures.') - - def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument + def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument """ When user logs in, update the AccessLog related to the user. """ - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesDatabaseHandler.user_logged_in needs a valid AxesHttpRequest object.') - return - # 1. database query: Clean up expired user attempts from the database clean_expired_user_attempts(request.axes_attempt_time) @@ -170,15 +160,11 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals count = reset_user_attempts(request, credentials) log.info('AXES: Deleted %d failed login attempts by %s from database.', count, client_str) - def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument + def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument """ When user logs out, update the AccessLog related to the user. """ - if not hasattr(request, 'axes_attempt_time'): - log.error('AXES: AxesDatabaseHandler.user_logged_out needs a valid AxesHttpRequest object.') - return - # 1. database query: Clean up expired user attempts from the database clean_expired_user_attempts(request.axes_attempt_time) diff --git a/axes/handlers/dummy.py b/axes/handlers/dummy.py index ce9b005..0364845 100644 --- a/axes/handlers/dummy.py +++ b/axes/handlers/dummy.py @@ -1,5 +1,3 @@ -from django.http import HttpRequest - from axes.handlers.base import AxesHandler @@ -8,5 +6,5 @@ class AxesDummyHandler(AxesHandler): # pylint: disable=unused-argument Signal handler implementation that does nothing and can be used to disable signal processing. """ - def is_allowed(self, request: HttpRequest, credentials: dict = None) -> bool: + def is_allowed(self, request, credentials: dict = None) -> bool: return True diff --git a/axes/handlers/proxy.py b/axes/handlers/proxy.py index 5fb2575..2077508 100644 --- a/axes/handlers/proxy.py +++ b/axes/handlers/proxy.py @@ -1,11 +1,17 @@ from logging import getLogger from django.utils.module_loading import import_string +from django.utils.timezone import now from axes.conf import settings from axes.handlers.base import AxesHandler -from axes.helpers import toggleable -from axes.request import AxesHttpRequest +from axes.helpers import ( + get_client_ip_address, + get_client_user_agent, + get_client_path_info, + get_client_http_accept, + toggleable, +) log = getLogger(settings.AXES_LOGGER) @@ -36,27 +42,49 @@ class AxesProxyHandler(AxesHandler): cls.implementation = import_string(settings.AXES_HANDLER)() return cls.implementation + @staticmethod + def update_request(request): + """ + Update request attributes before passing them into the selected handler class. + """ + + if request is None: + log.error('AXES: AxesProxyHandler.update_request can not set request attributes to a None request') + return + + request.axes_locked_out = False + request.axes_attempt_time = now() + request.axes_ip_address = get_client_ip_address(request) + request.axes_user_agent = get_client_user_agent(request) + request.axes_path_info = get_client_path_info(request) + request.axes_http_accept = get_client_http_accept(request) + @classmethod - def is_locked(cls, request: AxesHttpRequest, credentials: dict = None) -> bool: + def is_locked(cls, request, credentials: dict = None) -> bool: + cls.update_request(request) return cls.get_implementation().is_locked(request, credentials) @classmethod - def is_allowed(cls, request: AxesHttpRequest, credentials: dict = None) -> bool: + def is_allowed(cls, request, credentials: dict = None) -> bool: + cls.update_request(request) return cls.get_implementation().is_allowed(request, credentials) @classmethod @toggleable - def user_login_failed(cls, sender, credentials: dict, request: AxesHttpRequest = None, **kwargs): + def user_login_failed(cls, sender, credentials: dict, request=None, **kwargs): + cls.update_request(request) return cls.get_implementation().user_login_failed(sender, credentials, request, **kwargs) @classmethod @toggleable - def user_logged_in(cls, sender, request: AxesHttpRequest, user, **kwargs): + def user_logged_in(cls, sender, request, user, **kwargs): + cls.update_request(request) return cls.get_implementation().user_logged_in(sender, request, user, **kwargs) @classmethod @toggleable - def user_logged_out(cls, sender, request: AxesHttpRequest, user, **kwargs): + def user_logged_out(cls, sender, request, user, **kwargs): + cls.update_request(request) return cls.get_implementation().user_logged_out(sender, request, user, **kwargs) @classmethod diff --git a/axes/helpers.py b/axes/helpers.py index 79a360f..d02e5aa 100644 --- a/axes/helpers.py +++ b/axes/helpers.py @@ -11,7 +11,6 @@ from django.utils.module_loading import import_string import ipware.ip2 from axes.conf import settings -from axes.request import AxesHttpRequest log = getLogger(__name__) @@ -102,7 +101,7 @@ def get_credentials(username: str = None, **kwargs) -> dict: return credentials -def get_client_username(request: AxesHttpRequest, credentials: dict = None) -> str: +def get_client_username(request, credentials: dict = None) -> str: """ Resolve client username from the given request or credentials if supplied. @@ -133,7 +132,7 @@ def get_client_username(request: AxesHttpRequest, credentials: dict = None) -> s return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) -def get_client_ip_address(request: HttpRequest) -> str: +def get_client_ip_address(request) -> str: """ Get client IP address as configured by the user. @@ -152,15 +151,15 @@ def get_client_ip_address(request: HttpRequest) -> str: return client_ip_address -def get_client_user_agent(request: HttpRequest) -> str: +def get_client_user_agent(request) -> str: return request.META.get('HTTP_USER_AGENT', '')[:255] -def get_client_path_info(request: HttpRequest) -> str: +def get_client_path_info(request) -> str: return request.META.get('PATH_INFO', '')[:255] -def get_client_http_accept(request: HttpRequest) -> str: +def get_client_http_accept(request) -> str: return request.META.get('HTTP_ACCEPT', '')[:1025] @@ -259,7 +258,7 @@ def get_lockout_message() -> str: return settings.AXES_PERMALOCK_MESSAGE -def get_lockout_response(request: AxesHttpRequest, credentials: dict = None) -> HttpResponse: +def get_lockout_response(request, credentials: dict = None) -> HttpResponse: status = 403 context = { 'failure_limit': settings.AXES_FAILURE_LIMIT, @@ -311,7 +310,7 @@ def is_ip_address_in_blacklist(ip_address: str) -> bool: return ip_address in settings.AXES_IP_BLACKLIST -def is_client_ip_address_whitelisted(request: AxesHttpRequest): +def is_client_ip_address_whitelisted(request): """ Check if the given request refers to a whitelisted IP. """ @@ -325,7 +324,7 @@ def is_client_ip_address_whitelisted(request: AxesHttpRequest): return False -def is_client_ip_address_blacklisted(request: AxesHttpRequest) -> bool: +def is_client_ip_address_blacklisted(request) -> bool: """ Check if the given request refers to a blacklisted IP. """ @@ -339,7 +338,7 @@ def is_client_ip_address_blacklisted(request: AxesHttpRequest) -> bool: return False -def is_client_method_whitelisted(request: AxesHttpRequest) -> bool: +def is_client_method_whitelisted(request) -> bool: """ Check if the given request uses a whitelisted method. """ diff --git a/axes/middleware.py b/axes/middleware.py index e2aedb0..2b1087d 100644 --- a/axes/middleware.py +++ b/axes/middleware.py @@ -1,18 +1,6 @@ from typing import Callable -from django.http import HttpRequest -from django.utils.timezone import now - -from axes.exceptions import AxesSignalPermissionDenied -from axes.helpers import ( - get_client_ip_address, - get_client_user_agent, - get_client_path_info, - get_client_http_accept, - get_lockout_response, - toggleable, -) -from axes.request import AxesHttpRequest +from axes.helpers import get_lockout_response class AxesMiddleware: @@ -42,35 +30,10 @@ class AxesMiddleware: def __init__(self, get_response: Callable): self.get_response = get_response - def __call__(self, request: HttpRequest): - self.update_request(request) - return self.get_response(request) + def __call__(self, request): + response = self.get_response(request) - @toggleable - def update_request(self, request: HttpRequest): - """ - Construct an ``AxesHttpRequest`` from the given ``HttpRequest`` - by updating the request with necessary attempt tracking attributes. + if getattr(request, 'axes_locked_out', None): + response = get_lockout_response(request) # type: ignore - This method is called by the middleware class ``__call__`` method - when iterating over the middleware stack. - """ - - request.axes_attempt_time = now() - request.axes_ip_address = get_client_ip_address(request) - request.axes_user_agent = get_client_user_agent(request) - request.axes_path_info = get_client_path_info(request) - request.axes_http_accept = get_client_http_accept(request) - - @toggleable - def process_exception(self, request: AxesHttpRequest, exception): # pylint: disable=inconsistent-return-statements - """ - Handle exceptions raised by the Axes signal handler class when requests fail checks. - - Note that only ``AxesSignalPermissionDenied`` is handled by this middleware class. - - :return: Configured ``HttpResponse`` for failed authentication attempts and lockouts. - """ - - if isinstance(exception, AxesSignalPermissionDenied): - return get_lockout_response(request) + return response diff --git a/axes/request.py b/axes/request.py deleted file mode 100644 index 978c392..0000000 --- a/axes/request.py +++ /dev/null @@ -1,38 +0,0 @@ -from datetime import datetime - -from django.http import HttpRequest - - -class AxesHttpRequest(HttpRequest): - """ - Extended Django ``HttpRequest`` with custom Axes attributes. - - This request is constructed by the ``AxesMiddleware`` class - where the custom attributes are inserted into the request. - - .. note:: The ``str`` type variables have a maximum length of 255 - characters and they are calculated in the middleware layer. - If the HTTP request attributes can not be resolved - they are assigned default value of ````. - - :var axes_attempt_time: Timestamp of the request on the server side. - :vartype axes_attempt_time: datetime - - :var axes_ip_address: Request IP address as resolved by django-axes and django-ipware configurations. - :vartype axes_ip_address: str - - :var axes_user_agent: Request agent from ``request.META['HTTP_USER_AGENT']``. - :vartype axes_user_agent: str - - :var axes_path_info: Request path from ``request.META['PATH_INFO']``. - :vartype axes_path_info: str - - :var axes_http_accept: Request ``Accept`` header from ``request.META['HTTP_ACCEPT']``. - :vartype axes_http_accept: str - """ - - axes_attempt_time: datetime - axes_ip_address: str - axes_user_agent: str - axes_path_info: str - axes_http_accept: str diff --git a/axes/tests/base.py b/axes/tests/base.py index fe0b391..fd18c73 100644 --- a/axes/tests/base.py +++ b/axes/tests/base.py @@ -41,7 +41,7 @@ class AxesTestCase(TestCase): LOGOUT_MESSAGE = 'Logged out' LOGIN_FORM_KEY = '' - SUCCESS = 200 + STATUS_SUCCESS = 200 ALLOWED = 302 BLOCKED = 403 @@ -161,7 +161,7 @@ class AxesTestCase(TestCase): def check_logout(self): response = self.logout() - self.assertContains(response, self.LOGOUT_MESSAGE, status_code=self.SUCCESS) + self.assertContains(response, self.LOGOUT_MESSAGE, status_code=self.STATUS_SUCCESS) def check_handler(self): """ diff --git a/axes/tests/test_middleware.py b/axes/tests/test_middleware.py index f803754..990c7af 100644 --- a/axes/tests/test_middleware.py +++ b/axes/tests/test_middleware.py @@ -1,28 +1,30 @@ from unittest.mock import patch, MagicMock -from django.http import HttpResponse +from django.http import HttpResponse, HttpRequest -from axes.exceptions import AxesSignalPermissionDenied from axes.middleware import AxesMiddleware from axes.tests.base import AxesTestCase class MiddlewareTestCase(AxesTestCase): - SUCCESS_RESPONSE = HttpResponse(status=200, content='Dispatched') - LOCKOUT_RESPONSE = HttpResponse(status=403, content='Locked out') + STATUS_SUCCESS = 200 + STATUS_LOCKOUT = 403 def setUp(self): - self.request = MagicMock() - self.get_response = MagicMock() + self.request = HttpRequest() - @patch('axes.middleware.get_lockout_response', return_value=LOCKOUT_RESPONSE) - def test_process_exception_axes(self, _): - exception = AxesSignalPermissionDenied() - response = AxesMiddleware(self.get_response).process_exception(self.request, exception) - self.assertEqual(response, self.LOCKOUT_RESPONSE) + def test_success_response(self): + def get_response(request): + request.axes_locked_out = False + return HttpResponse() - @patch('axes.middleware.get_lockout_response', return_value=LOCKOUT_RESPONSE) - def test_process_exception_other(self, _): - exception = Exception() - response = AxesMiddleware(self.get_response).process_exception(self.request, exception) - self.assertEqual(response, None) + response = AxesMiddleware(get_response)(self.request) + self.assertEqual(response.status_code, self.STATUS_SUCCESS) + + def test_lockout_response(self): + def get_response(request): + request.axes_locked_out = True + return HttpResponse() + + response = AxesMiddleware(get_response)(self.request) + self.assertEqual(response.status_code, self.STATUS_LOCKOUT) diff --git a/axes/tests/test_utils.py b/axes/tests/test_utils.py index 51f26eb..df28cf8 100644 --- a/axes/tests/test_utils.py +++ b/axes/tests/test_utils.py @@ -2,7 +2,7 @@ from datetime import timedelta from hashlib import md5 from unittest.mock import patch -from django.http import JsonResponse, HttpResponseRedirect, HttpResponse +from django.http import JsonResponse, HttpResponseRedirect, HttpResponse, HttpRequest from django.test import override_settings, RequestFactory from axes import get_version @@ -22,7 +22,6 @@ from axes.helpers import ( is_ip_address_in_whitelist, is_client_method_whitelisted, ) -from axes.request import AxesHttpRequest class VersionTestCase(AxesTestCase): @@ -367,7 +366,7 @@ class UsernameTestCase(AxesTestCase): def test_default_get_client_username(self): expected = 'test-username' - request = AxesHttpRequest() + request = HttpRequest() request.POST['username'] = expected actual = get_client_username(request) @@ -379,7 +378,7 @@ class UsernameTestCase(AxesTestCase): expected = 'test-username' expected_in_credentials = 'test-credentials-username' - request = AxesHttpRequest() + request = HttpRequest() request.POST['username'] = expected credentials = { 'username': expected_in_credentials @@ -399,7 +398,7 @@ class UsernameTestCase(AxesTestCase): expected = 'prefixed-' + provided provided_in_credentials = 'test-credentials-username' - request = AxesHttpRequest() + request = HttpRequest() request.POST['username'] = provided credentials = {'username': provided_in_credentials} @@ -417,7 +416,7 @@ class UsernameTestCase(AxesTestCase): provided_in_credentials = 'test-credentials-username' expected_in_credentials = 'prefixed-' + provided_in_credentials - request = AxesHttpRequest() + request = HttpRequest() request.POST['username'] = provided credentials = {'username': provided_in_credentials} @@ -427,38 +426,38 @@ class UsernameTestCase(AxesTestCase): @override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials: 'example') # pragma: no cover def test_get_client_username(self): - self.assertEqual(get_client_username(AxesHttpRequest(), {}), 'example') + self.assertEqual(get_client_username(HttpRequest(), {}), 'example') @override_settings(AXES_USERNAME_CALLABLE=lambda request: None) # pragma: no cover def test_get_client_username_invalid_callable_too_few_arguments(self): with self.assertRaises(TypeError): - get_client_username(AxesHttpRequest(), {}) + get_client_username(HttpRequest(), {}) @override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials, extra: None) # pragma: no cover def test_get_client_username_invalid_callable_too_many_arguments(self): with self.assertRaises(TypeError): - get_client_username(AxesHttpRequest(), {}) + get_client_username(HttpRequest(), {}) @override_settings(AXES_USERNAME_CALLABLE=True) def test_get_client_username_not_callable(self): with self.assertRaises(TypeError): - get_client_username(AxesHttpRequest(), {}) + get_client_username(HttpRequest(), {}) @override_settings(AXES_USERNAME_CALLABLE='axes.tests.test_utils.get_username') def test_get_client_username_str(self): self.assertEqual( - get_client_username(AxesHttpRequest(), {}), + get_client_username(HttpRequest(), {}), 'username', ) -def get_username(request: AxesHttpRequest, credentials: dict) -> str: +def get_username(request, credentials: dict) -> str: return 'username' class IPWhitelistTestCase(AxesTestCase): def setUp(self): - self.request = AxesHttpRequest() + self.request = HttpRequest() self.request.method = 'POST' self.request.META['REMOTE_ADDR'] = '127.0.0.1' self.request.axes_ip_address = '127.0.0.1' @@ -517,7 +516,7 @@ class IPWhitelistTestCase(AxesTestCase): class MethodWhitelistTestCase(AxesTestCase): def setUp(self): - self.request = AxesHttpRequest() + self.request = HttpRequest() self.request.method = 'GET' @override_settings(AXES_NEVER_LOCKOUT_GET=True) @@ -531,7 +530,7 @@ class MethodWhitelistTestCase(AxesTestCase): class LockoutResponseTestCase(AxesTestCase): def setUp(self): - self.request = AxesHttpRequest() + self.request = HttpRequest() @override_settings(AXES_COOLOFF_TIME=42) def test_get_lockout_response_cool_off(self): diff --git a/docs/8_reference.rst b/docs/8_reference.rst index 2efe381..5f68eb6 100644 --- a/docs/8_reference.rst +++ b/docs/8_reference.rst @@ -16,7 +16,3 @@ third party modules as long as they implement the following APIs. .. automodule:: axes.middleware :members: - -.. automodule:: axes.request - :members: - :show-inheritance: