Improve lockout and request handling

The old architecture used exceptions in the signal handler
which prevented transactions from running smoothly
and signal handlers from running after Axes handlers.

The new architecture changes the request approach to request flagging
and moves the exception handling into the middleware call method.

This allows users to more flexibly run their own signal handlers
and optionally use the Axes middleware if they want to do so.

Fixes #440
Fixes #442
This commit is contained in:
Aleksi Häkli 2019-05-19 15:54:27 +03:00 committed by Aleksi Häkli
parent 31c8c16f94
commit 3152b4d7e9
17 changed files with 121 additions and 219 deletions

View file

@ -2,6 +2,15 @@
Changes
=======
5.0.5 (WIP)
-----------
- Change the lockout response calculation to request flagging
instead of exception throwing in the signal handler and middleware.
Move request attribute calculation from middleware to handler layer.
Deprecate ``axes.request.AxesHttpRequest`` object type definition.
[aleksihakli]
5.0.4 (2019-05-09)
------------------

View file

@ -5,7 +5,6 @@ from django.db.models import QuerySet
from django.utils.timezone import datetime, now
from axes.conf import settings
from axes.request import AxesHttpRequest
from axes.models import AccessAttempt
from axes.helpers import (
get_client_username,
@ -30,7 +29,7 @@ def get_cool_off_threshold(attempt_time: datetime = None) -> datetime:
return attempt_time - cool_off
def filter_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet:
def filter_user_attempts(request, credentials: dict = None) -> QuerySet:
"""
Return a queryset of AccessAttempts that match the given request and credentials.
"""
@ -42,7 +41,7 @@ def filter_user_attempts(request: AxesHttpRequest, credentials: dict = None) ->
return AccessAttempt.objects.filter(**filter_kwargs)
def get_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet:
def get_user_attempts(request, credentials: dict = None) -> QuerySet:
"""
Get valid user attempts that match the given request and credentials.
"""
@ -73,7 +72,7 @@ def clean_expired_user_attempts(attempt_time: datetime = None) -> int:
return count
def reset_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> int:
def reset_user_attempts(request, credentials: dict = None) -> int:
"""
Reset all user attempts that match the given request and credentials.
"""
@ -86,7 +85,7 @@ def reset_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> i
return count
def is_user_attempt_whitelisted(request: AxesHttpRequest, credentials: dict = None) -> bool:
def is_user_attempt_whitelisted(request, credentials: dict = None) -> bool:
"""
Check if the given request or credentials refer to a whitelisted username.

View file

@ -3,7 +3,6 @@ from django.contrib.auth.backends import ModelBackend
from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired
from axes.handlers.proxy import AxesProxyHandler
from axes.helpers import get_credentials, get_lockout_message, toggleable
from axes.request import AxesHttpRequest
class AxesBackend(ModelBackend):
@ -18,7 +17,7 @@ class AxesBackend(ModelBackend):
"""
@toggleable
def authenticate(self, request: AxesHttpRequest, username: str = None, password: str = None, **kwargs: dict):
def authenticate(self, request, username: str = None, password: str = None, **kwargs: dict):
"""
Checks user lockout status and raises an exception if user is not allowed to log in.

View file

@ -2,11 +2,10 @@ from functools import wraps
from axes.handlers.proxy import AxesProxyHandler
from axes.helpers import get_lockout_response
from axes.request import AxesHttpRequest
def axes_dispatch(func):
def inner(request: AxesHttpRequest, *args, **kwargs):
def inner(request, *args, **kwargs):
if AxesProxyHandler.is_allowed(request):
return func(request, *args, **kwargs)

View file

@ -1,28 +1,7 @@
from django.core.exceptions import PermissionDenied
class AxesPermissionDenied(PermissionDenied):
"""
Base class for permission denied errors raised by axes specifically for easier debugging.
Two different types of errors are used because of the behaviour Django has:
- If an authentication backend raises a PermissionDenied error the authentication flow is aborted.
- If another component raises a PermissionDenied error a HTTP 403 Forbidden response is returned.
"""
pass
class AxesSignalPermissionDenied(AxesPermissionDenied):
"""
Raised by signal handler on failed authentication attempts to send user a HTTP 403 Forbidden status code.
"""
pass
class AxesBackendPermissionDenied(AxesPermissionDenied):
class AxesBackendPermissionDenied(PermissionDenied):
"""
Raised by authentication backend on locked out requests to stop the Django authentication flow.
"""

View file

@ -4,7 +4,6 @@ from axes.helpers import (
is_client_ip_address_whitelisted,
is_client_method_whitelisted,
)
from axes.request import AxesHttpRequest
class AxesHandler: # pylint: disable=unused-argument
@ -19,7 +18,7 @@ class AxesHandler: # pylint: disable=unused-argument
.. note:: This is a virtual class and **can not be used without specialization**.
"""
def is_allowed(self, request: AxesHttpRequest, credentials: dict = None) -> bool:
def is_allowed(self, request, credentials: dict = None) -> bool:
"""
Checks if the user is allowed to access or use given functionality such as a login view or authentication.
@ -45,17 +44,17 @@ class AxesHandler: # pylint: disable=unused-argument
return True
def user_login_failed(self, sender, credentials: dict, request: AxesHttpRequest = None, **kwargs):
def user_login_failed(self, sender, credentials: dict, request = None, **kwargs):
"""
Handles the Django ``django.contrib.auth.signals.user_login_failed`` authentication signal.
"""
def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs):
def user_logged_in(self, sender, request, user, **kwargs):
"""
Handles the Django ``django.contrib.auth.signals.user_logged_in`` authentication signal.
"""
def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs):
def user_logged_out(self, sender, request, user, **kwargs):
"""
Handles the Django ``django.contrib.auth.signals.user_logged_out`` authentication signal.
"""
@ -70,7 +69,7 @@ class AxesHandler: # pylint: disable=unused-argument
Handles the ``axes.models.AccessAttempt`` object post delete signal.
"""
def is_blacklisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument
def is_blacklisted(self, request, credentials: dict = None) -> bool: # pylint: disable=unused-argument
"""
Checks if the request or given credentials are blacklisted from access.
"""
@ -80,7 +79,7 @@ class AxesHandler: # pylint: disable=unused-argument
return False
def is_whitelisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument
def is_whitelisted(self, request, credentials: dict = None) -> bool: # pylint: disable=unused-argument
"""
Checks if the request or given credentials are whitelisted for access.
"""
@ -93,7 +92,7 @@ class AxesHandler: # pylint: disable=unused-argument
return False
def is_locked(self, request: AxesHttpRequest, credentials: dict = None) -> bool:
def is_locked(self, request, credentials: dict = None) -> bool:
"""
Checks if the request or given credentials are locked.
"""
@ -103,7 +102,7 @@ class AxesHandler: # pylint: disable=unused-argument
return False
def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int:
def get_failures(self, request, credentials: dict = None) -> int:
"""
Checks the number of failures associated to the given request and credentials.

View file

@ -1,8 +1,6 @@
from logging import getLogger
from axes.conf import settings
from axes.exceptions import AxesSignalPermissionDenied
from axes.request import AxesHttpRequest
from axes.handlers.base import AxesHandler
from axes.signals import user_locked_out
from axes.helpers import (
@ -26,7 +24,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals
self.cache = get_cache()
self.cache_timeout = get_cache_timeout()
def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int:
def get_failures(self, request, credentials: dict = None) -> int:
cache_key = get_client_cache_key(request, credentials)
return self.cache.get(cache_key, default=0)
@ -34,7 +32,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals
self,
sender,
credentials: dict,
request: AxesHttpRequest = None,
request = None,
**kwargs
): # pylint: disable=too-many-locals
"""
@ -47,10 +45,6 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals
log.error('AXES: AxesCacheHandler.user_login_failed does not function without a request.')
return
if not hasattr(request, 'axes_attempt_time'):
log.error('AXES: AxesCacheHandler.user_login_failed needs a valid AxesHttpRequest object.')
return
username = get_client_username(request, credentials)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
@ -79,6 +73,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals
if failures_since_start >= settings.AXES_FAILURE_LIMIT:
log.warning('AXES: Locking out %s after repeated login failures.', client_str)
request.axes_locked_out = True
user_locked_out.send(
'axes',
request=request,
@ -86,17 +81,11 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals
ip_address=request.axes_ip_address,
)
raise AxesSignalPermissionDenied('Locked out due to repeated login failures.')
def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument
def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs in, update the AccessLog related to the user.
"""
if not hasattr(request, 'axes_attempt_time'):
log.error('AXES: AxesCacheHandler.user_logged_in needs a valid AxesHttpRequest object.')
return
username = user.get_username()
credentials = get_credentials(username)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
@ -109,11 +98,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals
self.cache.delete(cache_key)
log.info('AXES: Deleted %d failed login attempts by %s from cache.', failures_since_start, client_str)
def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs):
if not hasattr(request, 'axes_attempt_time'):
log.error('AXES: AxesCacheHandler.user_logged_out needs a valid AxesHttpRequest object.')
return
def user_logged_out(self, sender, request, user, **kwargs):
username = user.get_username() if user else None
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)

View file

@ -10,9 +10,7 @@ from axes.attempts import (
reset_user_attempts,
)
from axes.conf import settings
from axes.exceptions import AxesSignalPermissionDenied
from axes.handlers.base import AxesHandler
from axes.request import AxesHttpRequest
from axes.models import AccessLog, AccessAttempt
from axes.signals import user_locked_out
from axes.helpers import (
@ -31,11 +29,11 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
Signal handler implementation that records user login attempts to database and locks users out if necessary.
"""
def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int:
def get_failures(self, request, credentials: dict = None) -> int:
attempts = get_user_attempts(request, credentials)
return attempts.aggregate(Max('failures_since_start'))['failures_since_start__max'] or 0
def is_locked(self, request: AxesHttpRequest, credentials: dict = None):
def is_locked(self, request, credentials: dict = None):
if is_user_attempt_whitelisted(request, credentials):
return False
@ -45,7 +43,7 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
self,
sender,
credentials: dict,
request: AxesHttpRequest = None,
request = None,
**kwargs
): # pylint: disable=too-many-locals
"""
@ -58,10 +56,6 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
log.error('AXES: AxesDatabaseHandler.user_login_failed does not function without a request.')
return
if not hasattr(request, 'axes_attempt_time'):
log.error('AXES: AxesDatabaseHandler.user_login_failed needs a valid AxesHttpRequest object.')
return
# 1. database query: Clean up expired user attempts from the database before logging new attempts
clean_expired_user_attempts(request.axes_attempt_time)
@ -127,6 +121,8 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
if failures_since_start >= settings.AXES_FAILURE_LIMIT:
log.warning('AXES: Locking out %s after repeated login failures.', client_str)
request.axes_locked_out = True
user_locked_out.send(
'axes',
request=request,
@ -134,17 +130,11 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
ip_address=request.axes_ip_address,
)
raise AxesSignalPermissionDenied('Locked out due to repeated login failures.')
def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument
def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs in, update the AccessLog related to the user.
"""
if not hasattr(request, 'axes_attempt_time'):
log.error('AXES: AxesDatabaseHandler.user_logged_in needs a valid AxesHttpRequest object.')
return
# 1. database query: Clean up expired user attempts from the database
clean_expired_user_attempts(request.axes_attempt_time)
@ -170,15 +160,11 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
count = reset_user_attempts(request, credentials)
log.info('AXES: Deleted %d failed login attempts by %s from database.', count, client_str)
def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument
def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs out, update the AccessLog related to the user.
"""
if not hasattr(request, 'axes_attempt_time'):
log.error('AXES: AxesDatabaseHandler.user_logged_out needs a valid AxesHttpRequest object.')
return
# 1. database query: Clean up expired user attempts from the database
clean_expired_user_attempts(request.axes_attempt_time)

View file

@ -1,5 +1,3 @@
from django.http import HttpRequest
from axes.handlers.base import AxesHandler
@ -8,5 +6,5 @@ class AxesDummyHandler(AxesHandler): # pylint: disable=unused-argument
Signal handler implementation that does nothing and can be used to disable signal processing.
"""
def is_allowed(self, request: HttpRequest, credentials: dict = None) -> bool:
def is_allowed(self, request, credentials: dict = None) -> bool:
return True

View file

@ -1,11 +1,17 @@
from logging import getLogger
from django.utils.module_loading import import_string
from django.utils.timezone import now
from axes.conf import settings
from axes.handlers.base import AxesHandler
from axes.helpers import toggleable
from axes.request import AxesHttpRequest
from axes.helpers import (
get_client_ip_address,
get_client_user_agent,
get_client_path_info,
get_client_http_accept,
toggleable,
)
log = getLogger(settings.AXES_LOGGER)
@ -36,27 +42,49 @@ class AxesProxyHandler(AxesHandler):
cls.implementation = import_string(settings.AXES_HANDLER)()
return cls.implementation
@staticmethod
def update_request(request):
"""
Update request attributes before passing them into the selected handler class.
"""
if request is None:
log.error('AXES: AxesProxyHandler.update_request can not set request attributes to a None request')
return
request.axes_locked_out = False
request.axes_attempt_time = now()
request.axes_ip_address = get_client_ip_address(request)
request.axes_user_agent = get_client_user_agent(request)
request.axes_path_info = get_client_path_info(request)
request.axes_http_accept = get_client_http_accept(request)
@classmethod
def is_locked(cls, request: AxesHttpRequest, credentials: dict = None) -> bool:
def is_locked(cls, request, credentials: dict = None) -> bool:
cls.update_request(request)
return cls.get_implementation().is_locked(request, credentials)
@classmethod
def is_allowed(cls, request: AxesHttpRequest, credentials: dict = None) -> bool:
def is_allowed(cls, request, credentials: dict = None) -> bool:
cls.update_request(request)
return cls.get_implementation().is_allowed(request, credentials)
@classmethod
@toggleable
def user_login_failed(cls, sender, credentials: dict, request: AxesHttpRequest = None, **kwargs):
def user_login_failed(cls, sender, credentials: dict, request=None, **kwargs):
cls.update_request(request)
return cls.get_implementation().user_login_failed(sender, credentials, request, **kwargs)
@classmethod
@toggleable
def user_logged_in(cls, sender, request: AxesHttpRequest, user, **kwargs):
def user_logged_in(cls, sender, request, user, **kwargs):
cls.update_request(request)
return cls.get_implementation().user_logged_in(sender, request, user, **kwargs)
@classmethod
@toggleable
def user_logged_out(cls, sender, request: AxesHttpRequest, user, **kwargs):
def user_logged_out(cls, sender, request, user, **kwargs):
cls.update_request(request)
return cls.get_implementation().user_logged_out(sender, request, user, **kwargs)
@classmethod

View file

@ -11,7 +11,6 @@ from django.utils.module_loading import import_string
import ipware.ip2
from axes.conf import settings
from axes.request import AxesHttpRequest
log = getLogger(__name__)
@ -102,7 +101,7 @@ def get_credentials(username: str = None, **kwargs) -> dict:
return credentials
def get_client_username(request: AxesHttpRequest, credentials: dict = None) -> str:
def get_client_username(request, credentials: dict = None) -> str:
"""
Resolve client username from the given request or credentials if supplied.
@ -133,7 +132,7 @@ def get_client_username(request: AxesHttpRequest, credentials: dict = None) -> s
return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None)
def get_client_ip_address(request: HttpRequest) -> str:
def get_client_ip_address(request) -> str:
"""
Get client IP address as configured by the user.
@ -152,15 +151,15 @@ def get_client_ip_address(request: HttpRequest) -> str:
return client_ip_address
def get_client_user_agent(request: HttpRequest) -> str:
def get_client_user_agent(request) -> str:
return request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
def get_client_path_info(request: HttpRequest) -> str:
def get_client_path_info(request) -> str:
return request.META.get('PATH_INFO', '<unknown>')[:255]
def get_client_http_accept(request: HttpRequest) -> str:
def get_client_http_accept(request) -> str:
return request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
@ -259,7 +258,7 @@ def get_lockout_message() -> str:
return settings.AXES_PERMALOCK_MESSAGE
def get_lockout_response(request: AxesHttpRequest, credentials: dict = None) -> HttpResponse:
def get_lockout_response(request, credentials: dict = None) -> HttpResponse:
status = 403
context = {
'failure_limit': settings.AXES_FAILURE_LIMIT,
@ -311,7 +310,7 @@ def is_ip_address_in_blacklist(ip_address: str) -> bool:
return ip_address in settings.AXES_IP_BLACKLIST
def is_client_ip_address_whitelisted(request: AxesHttpRequest):
def is_client_ip_address_whitelisted(request):
"""
Check if the given request refers to a whitelisted IP.
"""
@ -325,7 +324,7 @@ def is_client_ip_address_whitelisted(request: AxesHttpRequest):
return False
def is_client_ip_address_blacklisted(request: AxesHttpRequest) -> bool:
def is_client_ip_address_blacklisted(request) -> bool:
"""
Check if the given request refers to a blacklisted IP.
"""
@ -339,7 +338,7 @@ def is_client_ip_address_blacklisted(request: AxesHttpRequest) -> bool:
return False
def is_client_method_whitelisted(request: AxesHttpRequest) -> bool:
def is_client_method_whitelisted(request) -> bool:
"""
Check if the given request uses a whitelisted method.
"""

View file

@ -1,18 +1,6 @@
from typing import Callable
from django.http import HttpRequest
from django.utils.timezone import now
from axes.exceptions import AxesSignalPermissionDenied
from axes.helpers import (
get_client_ip_address,
get_client_user_agent,
get_client_path_info,
get_client_http_accept,
get_lockout_response,
toggleable,
)
from axes.request import AxesHttpRequest
from axes.helpers import get_lockout_response
class AxesMiddleware:
@ -42,35 +30,10 @@ class AxesMiddleware:
def __init__(self, get_response: Callable):
self.get_response = get_response
def __call__(self, request: HttpRequest):
self.update_request(request)
return self.get_response(request)
def __call__(self, request):
response = self.get_response(request)
@toggleable
def update_request(self, request: HttpRequest):
"""
Construct an ``AxesHttpRequest`` from the given ``HttpRequest``
by updating the request with necessary attempt tracking attributes.
if getattr(request, 'axes_locked_out', None):
response = get_lockout_response(request) # type: ignore
This method is called by the middleware class ``__call__`` method
when iterating over the middleware stack.
"""
request.axes_attempt_time = now()
request.axes_ip_address = get_client_ip_address(request)
request.axes_user_agent = get_client_user_agent(request)
request.axes_path_info = get_client_path_info(request)
request.axes_http_accept = get_client_http_accept(request)
@toggleable
def process_exception(self, request: AxesHttpRequest, exception): # pylint: disable=inconsistent-return-statements
"""
Handle exceptions raised by the Axes signal handler class when requests fail checks.
Note that only ``AxesSignalPermissionDenied`` is handled by this middleware class.
:return: Configured ``HttpResponse`` for failed authentication attempts and lockouts.
"""
if isinstance(exception, AxesSignalPermissionDenied):
return get_lockout_response(request)
return response

View file

@ -1,38 +0,0 @@
from datetime import datetime
from django.http import HttpRequest
class AxesHttpRequest(HttpRequest):
"""
Extended Django ``HttpRequest`` with custom Axes attributes.
This request is constructed by the ``AxesMiddleware`` class
where the custom attributes are inserted into the request.
.. note:: The ``str`` type variables have a maximum length of 255
characters and they are calculated in the middleware layer.
If the HTTP request attributes can not be resolved
they are assigned default value of ``<unknown>``.
:var axes_attempt_time: Timestamp of the request on the server side.
:vartype axes_attempt_time: datetime
:var axes_ip_address: Request IP address as resolved by django-axes and django-ipware configurations.
:vartype axes_ip_address: str
:var axes_user_agent: Request agent from ``request.META['HTTP_USER_AGENT']``.
:vartype axes_user_agent: str
:var axes_path_info: Request path from ``request.META['PATH_INFO']``.
:vartype axes_path_info: str
:var axes_http_accept: Request ``Accept`` header from ``request.META['HTTP_ACCEPT']``.
:vartype axes_http_accept: str
"""
axes_attempt_time: datetime
axes_ip_address: str
axes_user_agent: str
axes_path_info: str
axes_http_accept: str

View file

@ -41,7 +41,7 @@ class AxesTestCase(TestCase):
LOGOUT_MESSAGE = 'Logged out'
LOGIN_FORM_KEY = '<input type="submit" value="Log in" />'
SUCCESS = 200
STATUS_SUCCESS = 200
ALLOWED = 302
BLOCKED = 403
@ -161,7 +161,7 @@ class AxesTestCase(TestCase):
def check_logout(self):
response = self.logout()
self.assertContains(response, self.LOGOUT_MESSAGE, status_code=self.SUCCESS)
self.assertContains(response, self.LOGOUT_MESSAGE, status_code=self.STATUS_SUCCESS)
def check_handler(self):
"""

View file

@ -1,28 +1,30 @@
from unittest.mock import patch, MagicMock
from django.http import HttpResponse
from django.http import HttpResponse, HttpRequest
from axes.exceptions import AxesSignalPermissionDenied
from axes.middleware import AxesMiddleware
from axes.tests.base import AxesTestCase
class MiddlewareTestCase(AxesTestCase):
SUCCESS_RESPONSE = HttpResponse(status=200, content='Dispatched')
LOCKOUT_RESPONSE = HttpResponse(status=403, content='Locked out')
STATUS_SUCCESS = 200
STATUS_LOCKOUT = 403
def setUp(self):
self.request = MagicMock()
self.get_response = MagicMock()
self.request = HttpRequest()
@patch('axes.middleware.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_process_exception_axes(self, _):
exception = AxesSignalPermissionDenied()
response = AxesMiddleware(self.get_response).process_exception(self.request, exception)
self.assertEqual(response, self.LOCKOUT_RESPONSE)
def test_success_response(self):
def get_response(request):
request.axes_locked_out = False
return HttpResponse()
@patch('axes.middleware.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_process_exception_other(self, _):
exception = Exception()
response = AxesMiddleware(self.get_response).process_exception(self.request, exception)
self.assertEqual(response, None)
response = AxesMiddleware(get_response)(self.request)
self.assertEqual(response.status_code, self.STATUS_SUCCESS)
def test_lockout_response(self):
def get_response(request):
request.axes_locked_out = True
return HttpResponse()
response = AxesMiddleware(get_response)(self.request)
self.assertEqual(response.status_code, self.STATUS_LOCKOUT)

View file

@ -2,7 +2,7 @@ from datetime import timedelta
from hashlib import md5
from unittest.mock import patch
from django.http import JsonResponse, HttpResponseRedirect, HttpResponse
from django.http import JsonResponse, HttpResponseRedirect, HttpResponse, HttpRequest
from django.test import override_settings, RequestFactory
from axes import get_version
@ -22,7 +22,6 @@ from axes.helpers import (
is_ip_address_in_whitelist,
is_client_method_whitelisted,
)
from axes.request import AxesHttpRequest
class VersionTestCase(AxesTestCase):
@ -367,7 +366,7 @@ class UsernameTestCase(AxesTestCase):
def test_default_get_client_username(self):
expected = 'test-username'
request = AxesHttpRequest()
request = HttpRequest()
request.POST['username'] = expected
actual = get_client_username(request)
@ -379,7 +378,7 @@ class UsernameTestCase(AxesTestCase):
expected = 'test-username'
expected_in_credentials = 'test-credentials-username'
request = AxesHttpRequest()
request = HttpRequest()
request.POST['username'] = expected
credentials = {
'username': expected_in_credentials
@ -399,7 +398,7 @@ class UsernameTestCase(AxesTestCase):
expected = 'prefixed-' + provided
provided_in_credentials = 'test-credentials-username'
request = AxesHttpRequest()
request = HttpRequest()
request.POST['username'] = provided
credentials = {'username': provided_in_credentials}
@ -417,7 +416,7 @@ class UsernameTestCase(AxesTestCase):
provided_in_credentials = 'test-credentials-username'
expected_in_credentials = 'prefixed-' + provided_in_credentials
request = AxesHttpRequest()
request = HttpRequest()
request.POST['username'] = provided
credentials = {'username': provided_in_credentials}
@ -427,38 +426,38 @@ class UsernameTestCase(AxesTestCase):
@override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials: 'example') # pragma: no cover
def test_get_client_username(self):
self.assertEqual(get_client_username(AxesHttpRequest(), {}), 'example')
self.assertEqual(get_client_username(HttpRequest(), {}), 'example')
@override_settings(AXES_USERNAME_CALLABLE=lambda request: None) # pragma: no cover
def test_get_client_username_invalid_callable_too_few_arguments(self):
with self.assertRaises(TypeError):
get_client_username(AxesHttpRequest(), {})
get_client_username(HttpRequest(), {})
@override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials, extra: None) # pragma: no cover
def test_get_client_username_invalid_callable_too_many_arguments(self):
with self.assertRaises(TypeError):
get_client_username(AxesHttpRequest(), {})
get_client_username(HttpRequest(), {})
@override_settings(AXES_USERNAME_CALLABLE=True)
def test_get_client_username_not_callable(self):
with self.assertRaises(TypeError):
get_client_username(AxesHttpRequest(), {})
get_client_username(HttpRequest(), {})
@override_settings(AXES_USERNAME_CALLABLE='axes.tests.test_utils.get_username')
def test_get_client_username_str(self):
self.assertEqual(
get_client_username(AxesHttpRequest(), {}),
get_client_username(HttpRequest(), {}),
'username',
)
def get_username(request: AxesHttpRequest, credentials: dict) -> str:
def get_username(request, credentials: dict) -> str:
return 'username'
class IPWhitelistTestCase(AxesTestCase):
def setUp(self):
self.request = AxesHttpRequest()
self.request = HttpRequest()
self.request.method = 'POST'
self.request.META['REMOTE_ADDR'] = '127.0.0.1'
self.request.axes_ip_address = '127.0.0.1'
@ -517,7 +516,7 @@ class IPWhitelistTestCase(AxesTestCase):
class MethodWhitelistTestCase(AxesTestCase):
def setUp(self):
self.request = AxesHttpRequest()
self.request = HttpRequest()
self.request.method = 'GET'
@override_settings(AXES_NEVER_LOCKOUT_GET=True)
@ -531,7 +530,7 @@ class MethodWhitelistTestCase(AxesTestCase):
class LockoutResponseTestCase(AxesTestCase):
def setUp(self):
self.request = AxesHttpRequest()
self.request = HttpRequest()
@override_settings(AXES_COOLOFF_TIME=42)
def test_get_lockout_response_cool_off(self):

View file

@ -16,7 +16,3 @@ third party modules as long as they implement the following APIs.
.. automodule:: axes.middleware
:members:
.. automodule:: axes.request
:members:
:show-inheritance: