From e69d479f6ae88f08b4334ba798d505c65d88c793 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksi=20Ha=CC=88kli?= Date: Sun, 17 Feb 2019 23:56:48 +0200 Subject: [PATCH] Refactor handlers to a more pluggable format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Define a base handler API with method signatures - Move proxy handler to a separate path for importability - Implement a database handler with clean external dependencies - Change the authentication backend and decorators to use the authentication backend This enables clean pluggable authentication backend definitions that users can override and specialize with e.g. cached handlers in their own packages. Signed-off-by: Aleksi Häkli --- axes/apps.py | 6 +- axes/attempts.py | 110 --------------- axes/backends.py | 31 ++-- axes/conf.py | 2 +- axes/decorators.py | 15 +- axes/handlers/__init__.py | 0 axes/handlers/base.py | 48 +++++++ axes/{handlers.py => handlers/database.py} | 84 +++++++++-- axes/handlers/proxy.py | 64 +++++++++ axes/signals.py | 102 ++------------ ...test_access_attempt.py => test_attempt.py} | 109 +-------------- axes/tests/test_backends.py | 2 +- axes/tests/test_decorators.py | 8 +- axes/tests/test_handlers.py | 132 ++++++++++-------- ...access_attempt_config.py => test_login.py} | 12 +- axes/tests/test_utils.py | 112 ++++++++++++++- axes/utils.py | 91 +++++++++++- docs/configuration.rst | 4 +- 18 files changed, 513 insertions(+), 419 deletions(-) create mode 100644 axes/handlers/__init__.py create mode 100644 axes/handlers/base.py rename axes/{handlers.py => handlers/database.py} (69%) create mode 100644 axes/handlers/proxy.py rename axes/tests/{test_access_attempt.py => test_attempt.py} (82%) rename axes/tests/{test_access_attempt_config.py => test_login.py} (98%) diff --git a/axes/apps.py b/axes/apps.py index 6366fcf..603a0eb 100644 --- a/axes/apps.py +++ b/axes/apps.py @@ -4,6 +4,7 @@ from django import apps from axes import get_version from axes.conf import settings +from axes.handlers.proxy import AxesProxyHandler log = getLogger(settings.AXES_LOGGER) @@ -41,5 +42,6 @@ class AppConfig(apps.AppConfig): def ready(self): self.initialize() - from axes import signals - signals.ProxyHandler.initialize() + AxesProxyHandler.initialize() + + from axes import signals # noqa diff --git a/axes/attempts.py b/axes/attempts.py index a5a7993..6184f21 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -2,7 +2,6 @@ from hashlib import md5 from logging import getLogger from typing import Union -from django.contrib.auth import get_user_model from django.db.models import QuerySet from django.http import HttpRequest from django.utils import timezone @@ -103,112 +102,3 @@ def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int: count, _ = attempts.delete() return count - - -def ip_in_whitelist(ip: str) -> bool: - if not settings.AXES_IP_WHITELIST: - return False - - return ip in settings.AXES_IP_WHITELIST - - -def ip_in_blacklist(ip: str) -> bool: - if not settings.AXES_IP_BLACKLIST: - return False - - return ip in settings.AXES_IP_BLACKLIST - - -def is_ip_blacklisted(request: HttpRequest) -> bool: - """ - Check if the given request refers to a blacklisted IP. - """ - - ip = get_client_ip_address(request) - - if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip): - return False - - if settings.AXES_ONLY_WHITELIST and not ip_in_whitelist(ip): - return True - - if ip_in_blacklist(ip): - return True - - return False - - -def is_user_lockable(request: HttpRequest, credentials: dict = None) -> bool: - """ - Check if the given request or credentials refer to a whitelisted user object - - A whitelisted user has the magic ``nolockout`` property set. - - If the property is unknown or False or the user can not be found, - this implementation fails gracefully and returns True. - """ - - username_field = getattr(get_user_model(), 'USERNAME_FIELD', 'username') - username_value = get_client_username(request, credentials) - kwargs = { - username_field: username_value - } - - UserModel = get_user_model() - - try: - user = UserModel.objects.get(**kwargs) - return not user.nolockout - except (UserModel.DoesNotExist, AttributeError): - pass - - return True - - -def is_already_locked(request: HttpRequest, credentials: dict = None) -> bool: - """ - Check if the request or given credentials are already locked by Axes. - - This function is called from - - - function decorators defined in ``axes.decorators``, - - authentication backends defined in ``axes.backends``, and - - signal handlers defined in ``axes.handlers``. - - This function checks the following facts for a given request: - - 1. Is the request HTTP method _whitelisted_? If it is, return ``False``. - 2. Is the request IP address _blacklisted_? If it is, return ``True``. - 3. Is the request user _whitelisted_? If it is, return ``False``. - 4. Is the request failure count over the attempt limit? If it is, return ``True``. - - Refer to the function source code for the exact implementation. - """ - - if settings.AXES_NEVER_LOCKOUT_GET and request.method == 'GET': - return False - - if is_ip_blacklisted(request): - return True - - if not is_user_lockable(request, credentials): - return False - - if not settings.AXES_LOCK_OUT_AT_FAILURE: - return False - - # Check failure statistics against cache - cache_hash_key = get_cache_key(request, credentials) - num_failures_cached = get_axes_cache().get(cache_hash_key) - - # Do not hit the database if we have an answer in the cache - if num_failures_cached is not None: - return num_failures_cached >= settings.AXES_FAILURE_LIMIT - - # Check failure statistics against database - attempts = get_user_attempts(request, credentials) - failures = attempts.filter( - failures_since_start__gte=settings.AXES_FAILURE_LIMIT, - ) - - return failures.exists() diff --git a/axes/backends.py b/axes/backends.py index e7ef014..1f45e4d 100644 --- a/axes/backends.py +++ b/axes/backends.py @@ -1,7 +1,7 @@ from django.contrib.auth.backends import ModelBackend -from axes.attempts import is_already_locked from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired +from axes.handlers.proxy import AxesProxyHandler from axes.utils import get_credentials, get_lockout_message @@ -36,20 +36,21 @@ class AxesBackend(ModelBackend): credentials = get_credentials(username=username, password=password, **kwargs) - if is_already_locked(request, credentials): - # locked out, don't try to authenticate, just update return_context and return - # Its a bit weird to pass a context and expect a response value but its nice to get a "why" back. - error_msg = get_lockout_message() - response_context = kwargs.get('response_context', {}) - response_context['error'] = error_msg + if AxesProxyHandler.is_allowed_to_authenticate(request, credentials): + return - # Raise an error that stops the authentication flows at django.contrib.auth.authenticate. - # This error stops bubbling up at the authenticate call which catches backend PermissionDenied errors. - # After this error is caught by authenticate it emits a signal indicating user login failed, - # which is processed by axes.signals.log_user_login_failed which logs the attempt and raises - # a second exception which bubbles up the middleware stack and produces a HTTP 403 Forbidden reply - # in the axes.middleware.AxesMiddleware.process_exception middleware exception handler. + # Locked out, don't try to authenticate, just update response_context and return. + # Its a bit weird to pass a context and expect a response value but its nice to get a "why" back. - raise AxesBackendPermissionDenied('AxesBackend detected that the given user is locked out') + error_msg = get_lockout_message() + response_context = kwargs.get('response_context', {}) + response_context['error'] = error_msg - # No-op + # Raise an error that stops the authentication flows at django.contrib.auth.authenticate. + # This error stops bubbling up at the authenticate call which catches backend PermissionDenied errors. + # After this error is caught by authenticate it emits a signal indicating user login failed, + # which is processed by axes.signals.log_user_login_failed which logs the attempt and raises + # a second exception which bubbles up the middleware stack and produces a HTTP 403 Forbidden reply + # in the axes.middleware.AxesMiddleware.process_exception middleware exception handler. + + raise AxesBackendPermissionDenied('AxesBackend detected that the given user is locked out') diff --git a/axes/conf.py b/axes/conf.py index 40ca8fe..4a66f29 100644 --- a/axes/conf.py +++ b/axes/conf.py @@ -35,7 +35,7 @@ class MyAppConf(AppConf): DISABLE_SUCCESS_ACCESS_LOG = False - HANDLER = 'axes.handlers.AxesHandler' + HANDLER = 'axes.handlers.database.AxesDatabaseHandler' LOGGER = 'axes.watch_login' diff --git a/axes/decorators.py b/axes/decorators.py index d279fbd..f2dcab7 100644 --- a/axes/decorators.py +++ b/axes/decorators.py @@ -1,15 +1,15 @@ from functools import wraps -from axes.attempts import is_already_locked +from axes.handlers.proxy import AxesProxyHandler from axes.utils import get_lockout_response def axes_dispatch(func): def inner(request, *args, **kwargs): - if is_already_locked(request): - return get_lockout_response(request) + if AxesProxyHandler.is_allowed_to_authenticate(request): + return func(request, *args, **kwargs) - return func(request, *args, **kwargs) + return get_lockout_response(request) return inner @@ -17,9 +17,10 @@ def axes_dispatch(func): def axes_form_invalid(func): @wraps(func) def inner(self, *args, **kwargs): - if is_already_locked(self.request): - return get_lockout_response(self.request) + if AxesProxyHandler.is_allowed_to_authenticate(self.request): + return func(self, *args, **kwargs) + + return get_lockout_response(self.request) - return func(self, *args, **kwargs) return inner diff --git a/axes/handlers/__init__.py b/axes/handlers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/axes/handlers/base.py b/axes/handlers/base.py new file mode 100644 index 0000000..2867384 --- /dev/null +++ b/axes/handlers/base.py @@ -0,0 +1,48 @@ +from typing import Any, Dict, Optional + +from django.http import HttpRequest + + +class AxesBaseHandler: # pylint: disable=unused-argument + """ + Handler API definition for subclassing handlers that can be used with the AxesProxyHandler. + + If you wish to implement your own handler class just override the methods you wish to specialize + and define the class to be used with ``settings.AXES_HANDLER = 'dotted.full.path.to.YourClass'``. + """ + + def is_allowed_to_authenticate( + self, + request: HttpRequest, + credentials: Optional[Dict[str, Any]] = None, + ) -> bool: + """ + Check if the user is allowed to authenticate into the site. + """ + + raise NotImplementedError('The Axes handler class needs a method definition for is_allowed_to_authenticate') + + def user_login_failed(self, sender, credentials: Dict[str, Any], request: HttpRequest, **kwargs): + """ + Handle the Django user_login_failed authentication signal. + """ + + def user_logged_in(self, sender, request: HttpRequest, user, **kwargs): + """ + Handle the Django user_logged_in authentication signal. + """ + + def user_logged_out(self, sender, request: HttpRequest, user, **kwargs): + """ + Handle the Django user_logged_out authentication signal. + """ + + def post_save_access_attempt(self, instance, **kwargs): + """ + Handle the Axes AccessAttempt object post save signal. + """ + + def post_delete_access_attempt(self, instance, **kwargs): + """ + Handle the Axes AccessAttempt object post delete signal. + """ diff --git a/axes/handlers.py b/axes/handlers/database.py similarity index 69% rename from axes/handlers.py rename to axes/handlers/database.py index 3a707f7..e5c1a03 100644 --- a/axes/handlers.py +++ b/axes/handlers/database.py @@ -1,19 +1,23 @@ from logging import getLogger +from typing import Any, Dict, Optional from django.db.models import Max +from django.http import HttpRequest from django.utils.timezone import now -from axes.attempts import get_cache_key, is_already_locked -from axes.attempts import get_cache_timeout -from axes.attempts import get_user_attempts -from axes.attempts import ip_in_whitelist -from axes.attempts import reset_user_attempts +from axes.attempts import ( + get_cache_key, + get_user_attempts, + reset_user_attempts, +) from axes.conf import settings from axes.exceptions import AxesSignalPermissionDenied from axes.models import AccessLog, AccessAttempt from axes.signals import user_locked_out +from axes.handlers.base import AxesBaseHandler from axes.utils import ( get_axes_cache, + get_cache_timeout, get_client_ip_address, get_client_path_info, get_client_http_accept, @@ -22,18 +26,76 @@ from axes.utils import ( get_client_user_agent, get_credentials, get_query_str, + is_ip_address_in_whitelist, + is_client_ip_address_blacklisted, + is_client_ip_address_whitelisted, + is_client_method_whitelisted, + is_client_username_whitelisted, ) log = getLogger(settings.AXES_LOGGER) -class AxesHandler: # pylint: disable=too-many-locals +class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals """ Signal handler implementation that records user login attempts to database and locks users out if necessary. """ - def user_login_failed(self, sender, credentials, request, **kwargs): # pylint: disable=unused-argument + def is_allowed_to_authenticate(self, request: HttpRequest, credentials: Optional[Dict[str, Any]] = None) -> bool: + """ + Check if the request or given credentials are already locked by Axes. + + This function is called from + + - function decorators defined in ``axes.decorators``, + - authentication backends defined in ``axes.backends``, and + - signal handlers defined in ``axes.handlers``. + + This function checks the following facts for a given request: + + 1. Is the request IP address _blacklisted_? If it is, return ``False``. + 2. Is the request IP address _whitelisted_? If it is, return ``True``. + 4. Is the request HTTP method _whitelisted_? If it is, return ``True``. + 3. Is the request user _whitelisted_? If it is, return ``True``. + 5. Is failed authentication attempt always allowed to proceed? If it is, return ``True``. + 6. Is failed authentication attempt count over the attempt limit? If it is, return ``False``. + + Refer to the function source code for the exact implementation. + """ + + if is_client_ip_address_blacklisted(request): + return False + + if is_client_ip_address_whitelisted(request): + return True + + if is_client_method_whitelisted(request): + return True + + if is_client_username_whitelisted(request, credentials): + return True + + if not settings.AXES_LOCK_OUT_AT_FAILURE: + return True + + # Check failure statistics against cache + cache_hash_key = get_cache_key(request, credentials) + num_failures_cached = get_axes_cache().get(cache_hash_key) + + # Do not hit the database if we have an answer in the cache + if num_failures_cached is not None: + return num_failures_cached < settings.AXES_FAILURE_LIMIT + + # Check failure statistics against database + attempts = get_user_attempts(request, credentials) + failures = attempts.filter( + failures_since_start__gte=settings.AXES_FAILURE_LIMIT, + ) + + return not failures.exists() + + def user_login_failed(self, sender, credentials, request, **kwargs): # pylint: disable=too-many-locals """ When user login fails, save AccessAttempt record in database and lock user out if necessary. @@ -41,7 +103,7 @@ class AxesHandler: # pylint: disable=too-many-locals """ if request is None: - log.warning('AxesHandler.user_login_failed does not function without a request.') + log.warning('AXES: AxesDatabaseHandler.user_login_failed does not function without a request.') return username = get_client_username(request, credentials) @@ -51,8 +113,8 @@ class AxesHandler: # pylint: disable=too-many-locals http_accept = get_client_http_accept(request) client_str = get_client_str(username, ip_address, user_agent, path_info) - if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address): - log.info('Login failed from whitelisted IP %s.', ip_address) + if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(ip_address): + log.info('AXES: Login failed from whitelisted IP %s.', ip_address) return attempts = get_user_attempts(request, credentials) @@ -122,7 +184,7 @@ class AxesHandler: # pylint: disable=too-many-locals client_str, ) - if is_already_locked(request, credentials): + if not self.is_allowed_to_authenticate(request, credentials): log.warning( 'AXES: Locked out %s after repeated login failures.', client_str, diff --git a/axes/handlers/proxy.py b/axes/handlers/proxy.py new file mode 100644 index 0000000..30dbb4f --- /dev/null +++ b/axes/handlers/proxy.py @@ -0,0 +1,64 @@ +from logging import getLogger +from typing import Any, Dict, Optional + +from django.http import HttpRequest +from django.utils.module_loading import import_string + +from axes.conf import settings +from axes.handlers.base import AxesBaseHandler + +log = getLogger(settings.AXES_LOGGER) + + +class AxesProxyHandler(AxesBaseHandler): + """ + Proxy interface for configurable Axes signal handler class. + + If you wish to implement a custom version of this handler, + you can override the settings.AXES_HANDLER configuration string + with a class that implements a compatible interface and methods. + + Defaults to using axes.handlers.proxy.AxesProxyHandler if not overridden. + Refer to axes.handlers.proxy.AxesProxyHandler for default implementation. + """ + + implementation = None # type: AxesBaseHandler + + @classmethod + def initialize(cls, force=False): + """ + Fetch and initialize configured handler implementation and memoize it to avoid reinitialization. + + This method is re-entrant and can be called multiple times from e.g. Django application loader. + """ + + if force or not cls.implementation: + cls.implementation = cls.get_implementation() + + @classmethod + def get_implementation(cls) -> AxesBaseHandler: + return import_string(settings.AXES_HANDLER)() + + @classmethod + def is_allowed_to_authenticate(cls, request: HttpRequest, credentials: Optional[Dict[str, Any]] = None) -> bool: + return cls.implementation.is_allowed_to_authenticate(request, credentials) + + @classmethod + def user_login_failed(cls, sender: Any, credentials: Dict[str, Any], request: HttpRequest, **kwargs): + return cls.implementation.user_login_failed(sender, credentials, request, **kwargs) + + @classmethod + def user_logged_in(cls, sender: Any, request: HttpRequest, user, **kwargs): + return cls.implementation.user_logged_in(sender, request, user, **kwargs) + + @classmethod + def user_logged_out(cls, sender: Any, request: HttpRequest, user, **kwargs): + return cls.implementation.user_logged_out(sender, request, user, **kwargs) + + @classmethod + def post_save_access_attempt(cls, instance, **kwargs): + return cls.implementation.post_save_access_attempt(instance, **kwargs) + + @classmethod + def post_delete_access_attempt(cls, instance, **kwargs): + return cls.implementation.post_delete_access_attempt(instance, **kwargs) diff --git a/axes/signals.py b/axes/signals.py index b41dc30..145d7a5 100644 --- a/axes/signals.py +++ b/axes/signals.py @@ -1,16 +1,14 @@ from logging import getLogger -from django.contrib.auth.signals import user_logged_in -from django.contrib.auth.signals import user_logged_out -from django.contrib.auth.signals import user_login_failed +from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed from django.core.signals import setting_changed from django.db.models.signals import post_save, post_delete from django.dispatch import receiver from django.dispatch import Signal -from django.utils.module_loading import import_string from axes.conf import settings from axes.models import AccessAttempt +from axes.handlers.proxy import AxesProxyHandler log = getLogger(settings.AXES_LOGGER) @@ -18,113 +16,29 @@ log = getLogger(settings.AXES_LOGGER) user_locked_out = Signal(providing_args=['request', 'username', 'ip_address']) -class ProxyHandler: - """ - Proxy interface for configurable Axes signal handler class. - - If you wish to implement a custom version of this handler, - you can override the settings.AXES_HANDLER configuration string - with a class that implements a compatible interface and methods. - - Defaults to using axes.handlers.AxesHandler if not overridden. - Refer to axes.handlers.AxesHandler for default implementation. - """ - - implementation = None # concrete handler that is bootstrapped by the Django application loader - - @classmethod - def initialize(cls, force=False): - """ - Fetch and initialize concrete handler implementation and memoize it to avoid reinitialization. - - This method is re-entrant and can be called multiple times. - """ - - if force or cls.implementation is None: - cls.implementation = import_string(settings.AXES_HANDLER)() - - @classmethod - def user_login_failed(cls, sender, credentials, request, **kwargs): - """ - Handle user login failure event. - - :param credentials: credentials used for authentication attempt - :param request: request used for failed authentication attempt - :return: None - """ - - cls.implementation.user_login_failed(sender, credentials, request, **kwargs) - - @classmethod - def user_logged_in(cls, sender, request, user, **kwargs): - """ - Handle user login event. - - :param credentials: credentials used for successful authentication - :param request: request used for successful authentication - :return: None - """ - - cls.implementation.user_logged_in(sender, request, user, **kwargs) - - @classmethod - def user_logged_out(cls, sender, request, user, **kwargs): - """ - Handle user logout event. - - :param request: request used for logout - :param user: user used for logout - :return: None - """ - - cls.implementation.user_logged_out(sender, request, user, **kwargs) - - @classmethod - def post_save_access_attempt(cls, instance, **kwargs): - """ - Handle AccessAttempt save event. - - :param instance: axes.models.AccessAttempt instance that will be saved - :return: None - """ - - cls.implementation.post_save_access_attempt(instance, **kwargs) - - @classmethod - def post_delete_access_attempt(cls, instance, **kwargs): - """ - Handle AccessAttempt delete event. - - :param instance: axes.models.AccessAttempt instance that was deleted - :return: None - """ - - cls.implementation.post_delete_access_attempt(instance, **kwargs) - - @receiver(user_login_failed) def handle_user_login_failed(*args, **kwargs): - ProxyHandler.user_login_failed(*args, **kwargs) + AxesProxyHandler.user_login_failed(*args, **kwargs) @receiver(user_logged_in) def handle_user_logged_in(*args, **kwargs): - ProxyHandler.user_logged_in(*args, **kwargs) + AxesProxyHandler.user_logged_in(*args, **kwargs) @receiver(user_logged_out) def handle_user_logged_out(*args, **kwargs): - ProxyHandler.user_logged_out(*args, **kwargs) + AxesProxyHandler.user_logged_out(*args, **kwargs) @receiver(post_save, sender=AccessAttempt) def handle_post_save_access_attempt(*args, **kwargs): - ProxyHandler.post_save_access_attempt(*args, **kwargs) + AxesProxyHandler.post_save_access_attempt(*args, **kwargs) @receiver(post_delete, sender=AccessAttempt) def handle_post_delete_access_attempt(*args, **kwargs): - ProxyHandler.post_delete_access_attempt(*args, **kwargs) + AxesProxyHandler.post_delete_access_attempt(*args, **kwargs) @receiver(setting_changed) @@ -135,4 +49,4 @@ def handle_setting_changed(sender, setting, value, enter, **kwargs): # pylint: """ if enter and setting == 'AXES_HANDLER': - ProxyHandler.initialize(force=enter) + AxesProxyHandler.initialize(force=enter) diff --git a/axes/tests/test_access_attempt.py b/axes/tests/test_attempt.py similarity index 82% rename from axes/tests/test_access_attempt.py rename to axes/tests/test_attempt.py index 4222adf..2db0a77 100644 --- a/axes/tests/test_access_attempt.py +++ b/axes/tests/test_attempt.py @@ -5,7 +5,7 @@ import string import time from unittest.mock import patch, MagicMock -from django.contrib.auth import authenticate, get_user_model +from django.contrib.auth import authenticate from django.contrib.auth.models import User from django.http import HttpRequest from django.test import TestCase, override_settings @@ -14,11 +14,6 @@ from django.urls import reverse from axes.attempts import ( get_cache_key, - get_cache_timeout, - is_already_locked, - ip_in_blacklist, - ip_in_whitelist, - is_user_lockable, get_client_parameters, get_user_attempts, ) @@ -445,7 +440,7 @@ class AccessAttemptTest(TestCase): # reset the username only and make sure we can log in now even though # our IP has failed each time - AccessAttempt.objects.filter(username=AccessAttemptTest.VALID_USERNAME).delete() + AccessAttempt.objects.filter(username=self.VALID_USERNAME).delete() response = self._login( is_valid_username=True, is_valid_password=True, @@ -593,103 +588,3 @@ class AccessAttemptTest(TestCase): response = self._login() self.assertContains(response, self.LOCKED_MESSAGE, status_code=403) - -class AttemptUtilsTestCase(TestCase): - def setUp(self): - self.request = HttpRequest() - self.request.method = 'POST' - self.request.META['REMOTE_ADDR'] = '127.0.0.1' - - @override_settings(AXES_IP_WHITELIST=None) - def test_ip_in_whitelist_none(self): - self.assertFalse(ip_in_whitelist('127.0.0.2')) - - @override_settings(AXES_IP_WHITELIST=['127.0.0.1']) - def test_ip_in_whitelist(self): - self.assertTrue(ip_in_whitelist('127.0.0.1')) - self.assertFalse(ip_in_whitelist('127.0.0.2')) - - @override_settings(AXES_IP_BLACKLIST=None) - def test_ip_in_blacklist_none(self): - self.assertFalse(ip_in_blacklist('127.0.0.2')) - - @override_settings(AXES_IP_BLACKLIST=['127.0.0.1']) - def test_ip_in_blacklist(self): - self.assertTrue(ip_in_blacklist('127.0.0.1')) - self.assertFalse(ip_in_blacklist('127.0.0.2')) - - @override_settings(AXES_IP_BLACKLIST=['127.0.0.1']) - def test_is_already_locked_ip_in_blacklist(self): - self.assertTrue(is_already_locked(self.request)) - - @override_settings(AXES_IP_BLACKLIST=['127.0.0.2']) - def test_is_already_locked_ip_not_in_blacklist(self): - self.assertFalse(is_already_locked(self.request)) - - @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True) - @override_settings(AXES_IP_WHITELIST=['127.0.0.1']) - def test_is_already_locked_ip_in_whitelist(self): - self.assertFalse(is_already_locked(self.request)) - - @override_settings(AXES_ONLY_WHITELIST=True) - @override_settings(AXES_IP_WHITELIST=['127.0.0.2']) - def test_is_already_locked_ip_not_in_whitelist(self): - self.assertTrue(is_already_locked(self.request)) - - @override_settings(AXES_COOLOFF_TIME=3) # hours - def test_get_cache_timeout(self): - timeout_seconds = float(60 * 60 * 3) - self.assertEqual(get_cache_timeout(), timeout_seconds) - - @override_settings(AXES_LOCK_OUT_AT_FAILURE=True) - @override_settings(AXES_FAILURE_LIMIT=40) - @patch('axes.attempts.get_axes_cache') - def test_is_already_locked_cache(self, get_cache): - cache = MagicMock() - cache.get.return_value = 42 - get_cache.return_value = cache - - self.assertTrue(is_already_locked(self.request, {})) - self.assertTrue(cache.get.call_count) - - @override_settings(AXES_LOCK_OUT_AT_FAILURE=False) - @override_settings(AXES_FAILURE_LIMIT=40) - @patch('axes.attempts.get_axes_cache') - def test_is_already_locked_do_not_lock_out_at_failure(self, get_cache): - cache = MagicMock() - cache.get.return_value = 42 - get_cache.return_value = cache - - self.assertFalse(is_already_locked(self.request)) - - @override_settings(AXES_NEVER_LOCKOUT_GET=True) - def test_is_already_locked_never_lockout_get(self): - request = HttpRequest() - request.method = 'GET' - self.assertFalse(is_already_locked(request)) - - def test_is_already_locked_nolockable(self): - UserModel = get_user_model() - user = UserModel.objects.create(username='jane.doe') - - with self.subTest('User is marked as nolockout.'): - with patch.object(UserModel, 'nolockout', True, create=True): - locked = is_already_locked(self.request, {UserModel.USERNAME_FIELD: user.username}) - self.assertFalse(locked) - - def test_is_user_lockable(self): - UserModel = get_user_model() - user = UserModel.objects.create(username='jane.doe') - - with self.subTest('User is marked as nolockout.'): - with patch.object(UserModel, 'nolockout', True, create=True): - lockable = is_user_lockable(self.request, {UserModel.USERNAME_FIELD: user.username}) - self.assertFalse(lockable) - - with self.subTest('User exists but attemptee can be locked out.'): - lockable = is_user_lockable(self.request, {UserModel.USERNAME_FIELD: user.username}) - self.assertTrue(lockable) - - with self.subTest('User does not exist and attemptee can be locked out.'): - lockable = is_user_lockable(self.request, {UserModel.USERNAME_FIELD: 'not.' + user.username}) - self.assertTrue(lockable) diff --git a/axes/tests/test_backends.py b/axes/tests/test_backends.py index 22eec0f..f6808dc 100644 --- a/axes/tests/test_backends.py +++ b/axes/tests/test_backends.py @@ -13,7 +13,7 @@ class BackendTestCase(TestCase): with self.assertRaises(AxesBackendRequestParameterRequired): AxesBackend().authenticate(request) - @patch('axes.backends.is_already_locked', return_value=True) + @patch('axes.handlers.proxy.AxesProxyHandler.is_allowed_to_authenticate', return_value=False) def test_authenticate_raises_on_locked_request(self, _): request = MagicMock() diff --git a/axes/tests/test_decorators.py b/axes/tests/test_decorators.py index 82c5f29..552f0e2 100644 --- a/axes/tests/test_decorators.py +++ b/axes/tests/test_decorators.py @@ -15,25 +15,25 @@ class DecoratorTestCase(TestCase): self.cls = MagicMock(return_value=self.request) self.func = MagicMock(return_value=self.SUCCESS_RESPONSE) - @patch('axes.decorators.is_already_locked', return_value=True) + @patch('axes.handlers.proxy.AxesProxyHandler.is_allowed_to_authenticate', return_value=False) @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) def test_axes_dispatch_locks_out(self, _, __): response = axes_dispatch(self.func)(self.request) self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content) - @patch('axes.decorators.is_already_locked', return_value=False) + @patch('axes.handlers.proxy.AxesProxyHandler.is_allowed_to_authenticate', return_value=True) @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) def test_axes_dispatch_dispatches(self, _, __): response = axes_dispatch(self.func)(self.request) self.assertEqual(response.content, self.SUCCESS_RESPONSE.content) - @patch('axes.decorators.is_already_locked', return_value=True) + @patch('axes.handlers.proxy.AxesProxyHandler.is_allowed_to_authenticate', return_value=False) @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) def test_axes_form_invalid_locks_out(self, _, __): response = axes_form_invalid(self.func)(self.cls) self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content) - @patch('axes.decorators.is_already_locked', return_value=False) + @patch('axes.handlers.proxy.AxesProxyHandler.is_allowed_to_authenticate', return_value=True) @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) def test_axes_form_invalid_dispatches(self, _, __): response = axes_form_invalid(self.func)(self.cls) diff --git a/axes/tests/test_handlers.py b/axes/tests/test_handlers.py index d89d78b..076b9ab 100644 --- a/axes/tests/test_handlers.py +++ b/axes/tests/test_handlers.py @@ -3,12 +3,22 @@ from unittest.mock import MagicMock, patch from django.http import HttpRequest from django.test import TestCase, override_settings -from axes.handlers import AxesHandler +from axes.handlers.base import AxesBaseHandler +from axes.handlers.proxy import AxesProxyHandler from axes.models import AccessAttempt -from axes.signals import ProxyHandler -class ProxyHandlerTestCase(TestCase): +class AxesBaseHandlerTestCase(TestCase): + def test_base_handler_raises_on_undefined_is_allowed_to_authenticate(self): + class FaultyHandler(AxesBaseHandler): + pass + + with self.assertRaises(NotImplementedError): + handler = FaultyHandler() + handler.is_allowed_to_authenticate(HttpRequest(), {}) + + +class AxesProxyHandlerTestCase(TestCase): def setUp(self): self.sender = MagicMock() self.credentials = MagicMock() @@ -16,65 +26,47 @@ class ProxyHandlerTestCase(TestCase): self.user = MagicMock() self.instance = MagicMock() - @patch('axes.signals.import_string', return_value=AxesHandler) - def test_setting_changed_signal_triggers_handler_reimport(self, importer): - self.assertEqual(0, importer.call_count) + @patch('axes.handlers.proxy.AxesProxyHandler.implementation', None) + def test_setting_changed_signal_triggers_handler_reimport(self): + self.assertIsNone(AxesProxyHandler().implementation) - with self.settings( - AXES_HANDLER='axes.handlers.AxesHandler' - ): - self.assertEqual(1, importer.call_count) + with self.settings(AXES_HANDLER='axes.handlers.database.AxesDatabaseHandler'): + self.assertIsNotNone(AxesProxyHandler().implementation) - @patch('axes.signals.ProxyHandler.implementation', None) - @patch('axes.signals.import_string', return_value=AxesHandler) - def test_initialize(self, importer): - self.assertEqual(0, importer.call_count) - self.assertIsNone(ProxyHandler.implementation) - - ProxyHandler.initialize() - - self.assertEqual(1, importer.call_count) - self.assertIsInstance(ProxyHandler.implementation, AxesHandler) - - ProxyHandler.initialize() - - self.assertEqual(1, importer.call_count) - self.assertIsInstance(ProxyHandler.implementation, AxesHandler) - - @patch('axes.signals.ProxyHandler.implementation') + @patch('axes.handlers.proxy.AxesProxyHandler.implementation') def test_user_login_failed(self, handler): self.assertFalse(handler.user_login_failed.called) - ProxyHandler().user_login_failed(self.sender, self.credentials, self.request) + AxesProxyHandler().user_login_failed(self.sender, self.credentials, self.request) self.assertTrue(handler.user_login_failed.called) - @patch('axes.signals.ProxyHandler.implementation') + @patch('axes.handlers.proxy.AxesProxyHandler.implementation') def test_user_logged_in(self, handler): self.assertFalse(handler.user_logged_in.called) - ProxyHandler().user_logged_in(self.sender, self.request, self.user) + AxesProxyHandler().user_logged_in(self.sender, self.request, self.user) self.assertTrue(handler.user_logged_in.called) - @patch('axes.signals.ProxyHandler.implementation') + @patch('axes.handlers.proxy.AxesProxyHandler.implementation') def test_user_logged_out(self, handler): self.assertFalse(handler.user_logged_out.called) - ProxyHandler().user_logged_out(self.sender, self.request, self.user) + AxesProxyHandler().user_logged_out(self.sender, self.request, self.user) self.assertTrue(handler.user_logged_out.called) - @patch('axes.signals.ProxyHandler.implementation') + @patch('axes.handlers.proxy.AxesProxyHandler.implementation') def test_post_save_access_attempt(self, handler): self.assertFalse(handler.post_save_access_attempt.called) - ProxyHandler().post_save_access_attempt(self.instance) + AxesProxyHandler().post_save_access_attempt(self.instance) self.assertTrue(handler.post_save_access_attempt.called) - @patch('axes.signals.ProxyHandler.implementation') + @patch('axes.handlers.proxy.AxesProxyHandler.implementation') def test_post_delete_access_attempt(self, handler): self.assertFalse(handler.post_delete_access_attempt.called) - ProxyHandler().post_delete_access_attempt(self.instance) + AxesProxyHandler().post_delete_access_attempt(self.instance) self.assertTrue(handler.post_delete_access_attempt.called) -class AxesHandlerTestCase(TestCase): +class AxesDatabaseHandlerTestCase(TestCase): def setUp(self): - self.handler = AxesHandler() + self.handler = AxesProxyHandler() self.attempt = AccessAttempt.objects.create( username='jane.doe', @@ -83,36 +75,42 @@ class AxesHandlerTestCase(TestCase): failures_since_start=42, ) - @patch('axes.handlers.log') + self.request = HttpRequest() + self.request.method = 'POST' + self.request.META['REMOTE_ADDR'] = '127.0.0.1' + + @patch('axes.handlers.database.log') def test_user_login_failed_no_request(self, log): - self.handler.user_login_failed(sender=None, credentials=None, request=None) - log.warning.assert_called_with('AxesHandler.user_login_failed does not function without a request.') + self.handler.user_login_failed(sender=None, credentials={}, request=None) + log.warning.assert_called_with( + 'AXES: AxesDatabaseHandler.user_login_failed does not function without a request.' + ) @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=['127.0.0.1']) - @patch('axes.handlers.get_client_ip_address', return_value='127.0.0.1') - @patch('axes.handlers.ip_in_whitelist', return_value=True) - @patch('axes.handlers.log') + @patch('axes.handlers.database.get_client_ip_address', return_value='127.0.0.1') + @patch('axes.handlers.database.is_ip_address_in_whitelist', return_value=True) + @patch('axes.handlers.database.log') def test_user_login_failed_whitelist(self, log, _, __): - request = HttpRequest() - self.handler.user_login_failed(sender=None, credentials=None, request=request) - log.info.assert_called_with('Login failed from whitelisted IP %s.', '127.0.0.1') + self.handler.user_login_failed(sender=None, credentials={}, request=self.request) + log.info.assert_called_with('AXES: Login failed from whitelisted IP %s.', '127.0.0.1') - @patch('axes.handlers.get_axes_cache') + @patch('axes.handlers.database.get_axes_cache') def test_post_save_access_attempt_updates_cache(self, get_cache): cache = MagicMock() cache.get.return_value = None cache.set.return_value = None + get_cache.return_value = cache - cache.get.assert_not_called() - cache.set.assert_not_called() + self.assertFalse(cache.get.call_count) + self.assertFalse(cache.set.call_count) self.handler.post_save_access_attempt(self.attempt) self.assertTrue(cache.get.call_count) self.assertTrue(cache.set.call_count) - @patch('axes.handlers.get_axes_cache') + @patch('axes.handlers.database.get_axes_cache') def test_user_login_failed_utilizes_cache(self, get_cache): cache = MagicMock() cache.get.return_value = 1 @@ -120,11 +118,35 @@ class AxesHandlerTestCase(TestCase): sender = MagicMock() credentials = {'username': self.attempt.username} - request = HttpRequest() - request.META['REMOTE_ADDR'] = '127.0.0.1' - cache.get.assert_not_called() + self.assertFalse(cache.get.call_count) - self.handler.user_login_failed(sender, credentials, request) + self.handler.user_login_failed(sender, credentials, self.request) self.assertTrue(cache.get.call_count) + + @override_settings(AXES_LOCK_OUT_AT_FAILURE=True) + @override_settings(AXES_FAILURE_LIMIT=40) + @patch('axes.handlers.database.get_axes_cache') + def test_is_already_locked_cache(self, get_cache): + cache = MagicMock() + cache.get.return_value = 42 + get_cache.return_value = cache + + self.assertFalse(AxesProxyHandler.is_allowed_to_authenticate(self.request, {})) + + @override_settings(AXES_LOCK_OUT_AT_FAILURE=False) + @override_settings(AXES_FAILURE_LIMIT=40) + @patch('axes.handlers.database.get_axes_cache') + def test_is_already_locked_do_not_lock_out_at_failure(self, get_cache): + cache = MagicMock() + cache.get.return_value = 42 + get_cache.return_value = cache + + self.assertTrue(AxesProxyHandler.is_allowed_to_authenticate(self.request, {})) + + @override_settings(AXES_NEVER_LOCKOUT_GET=True) + def test_is_already_locked_never_lockout_get(self): + self.request.method = 'GET' + + self.assertTrue(AxesProxyHandler.is_allowed_to_authenticate(self.request, {})) diff --git a/axes/tests/test_access_attempt_config.py b/axes/tests/test_login.py similarity index 98% rename from axes/tests/test_access_attempt_config.py rename to axes/tests/test_login.py index bcd3d83..6e3a5c1 100644 --- a/axes/tests/test_access_attempt_config.py +++ b/axes/tests/test_login.py @@ -1,11 +1,15 @@ +""" +Test access from purely the +""" + from django.test import TestCase, override_settings from django.urls import reverse -from django.contrib.auth.models import User +from django.contrib.auth import get_user_model from axes.conf import settings -class AccessAttemptConfigTest(TestCase): +class LoginTestCase(TestCase): """ Test for lockouts under different configurations and circumstances to prevent false positives and false negatives. @@ -66,12 +70,12 @@ class AccessAttemptConfigTest(TestCase): Create two valid users for authentication. """ - self.user = User.objects.create_superuser( + self.user = get_user_model().objects.create_superuser( username=self.USER_1, email='test_1@example.com', password=self.VALID_PASSWORD, ) - self.user = User.objects.create_superuser( + self.user = get_user_model().objects.create_superuser( username=self.USER_2, email='test_2@example.com', password=self.VALID_PASSWORD, diff --git a/axes/tests/test_utils.py b/axes/tests/test_utils.py index 9f661ec..9a1b415 100644 --- a/axes/tests/test_utils.py +++ b/axes/tests/test_utils.py @@ -1,11 +1,22 @@ from datetime import timedelta from unittest.mock import patch +from django.contrib.auth import get_user_model from django.http import HttpRequest, JsonResponse, HttpResponseRedirect, HttpResponse from django.test import TestCase, override_settings from axes import get_version -from axes.utils import get_cool_off_iso8601, get_client_str, get_client_username, get_lockout_response +from axes.utils import ( + get_cool_off_iso8601, + get_client_str, + get_client_username, + get_lockout_response, + is_client_ip_address_blacklisted, + is_ip_address_in_blacklist, + is_ip_address_in_whitelist, + get_cache_timeout, + is_client_username_whitelisted, + is_client_ip_address_whitelisted) def get_username(request: HttpRequest, credentials: dict) -> str: @@ -17,13 +28,46 @@ def get_expected_client_str(*args, **kwargs): return client_str_template.format(*args, **kwargs) -class AxesTestCase(TestCase): +class VersionTestCase(TestCase): @patch('axes.__version__', 'test') def test_get_version(self): self.assertEqual(get_version(), 'test') -class UtilsTestCase(TestCase): +class CacheTestCase(TestCase): + @override_settings(AXES_COOLOFF_TIME=3) # hours + def test_get_cache_timeout(self): + timeout_seconds = float(60 * 60 * 3) + self.assertEqual(get_cache_timeout(), timeout_seconds) + + +class UserTestCase(TestCase): + def setUp(self): + self.user_model = get_user_model() + self.user = self.user_model.objects.create(username='jane.doe') + self.request = HttpRequest() + + def test_is_client_username_whitelisted(self): + with patch.object(self.user_model, 'nolockout', True, create=True): + self.assertTrue(is_client_username_whitelisted( + self.request, + {self.user_model.USERNAME_FIELD: self.user.username}, + )) + + def test_is_client_username_whitelisted_not(self): + self.assertFalse(is_client_username_whitelisted( + self.request, + {self.user_model.USERNAME_FIELD: self.user.username}, + )) + + def test_is_client_username_whitelisted_does_not_exist(self): + self.assertFalse(is_client_username_whitelisted( + self.request, + {self.user_model.USERNAME_FIELD: 'not.' + self.user.username}, + )) + + +class TimestampTestCase(TestCase): def test_iso8601(self): """ Test get_cool_off_iso8601 correctly translates datetime.timdelta to ISO 8601 formatted duration. @@ -52,6 +96,8 @@ class UtilsTestCase(TestCase): with self.subTest(iso_duration): self.assertEqual(get_cool_off_iso8601(delta), iso_duration) + +class ClientStringTestCase(TestCase): @override_settings(AXES_VERBOSE=True) def test_verbose_ip_only_client_details(self): username = 'test@example.com' @@ -166,6 +212,8 @@ class UtilsTestCase(TestCase): self.assertEqual(expected, actual) + +class UsernameTestCase(TestCase): @override_settings(AXES_USERNAME_FORM_FIELD='username') def test_default_get_client_username(self): expected = 'test-username' @@ -257,6 +305,64 @@ class UtilsTestCase(TestCase): ) +class WhitelistTestCase(TestCase): + def setUp(self): + self.request = HttpRequest() + self.request.method = 'POST' + self.request.META['REMOTE_ADDR'] = '127.0.0.1' + + @override_settings(AXES_IP_WHITELIST=None) + def test_ip_in_whitelist_none(self): + self.assertFalse(is_ip_address_in_whitelist('127.0.0.2')) + + @override_settings(AXES_IP_WHITELIST=['127.0.0.1']) + def test_ip_in_whitelist(self): + self.assertTrue(is_ip_address_in_whitelist('127.0.0.1')) + self.assertFalse(is_ip_address_in_whitelist('127.0.0.2')) + + @override_settings(AXES_IP_BLACKLIST=None) + def test_ip_in_blacklist_none(self): + self.assertFalse(is_ip_address_in_blacklist('127.0.0.2')) + + @override_settings(AXES_IP_BLACKLIST=['127.0.0.1']) + def test_ip_in_blacklist(self): + self.assertTrue(is_ip_address_in_blacklist('127.0.0.1')) + self.assertFalse(is_ip_address_in_blacklist('127.0.0.2')) + + @override_settings(AXES_IP_BLACKLIST=['127.0.0.1']) + def test_is_client_ip_address_blacklisted_ip_in_blacklist(self): + self.assertTrue(is_client_ip_address_blacklisted(self.request)) + + @override_settings(AXES_IP_BLACKLIST=['127.0.0.2']) + def test_is_is_client_ip_address_blacklisted_ip_not_in_blacklist(self): + self.assertFalse(is_client_ip_address_blacklisted(self.request)) + + @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True) + @override_settings(AXES_IP_WHITELIST=['127.0.0.1']) + def test_is_client_ip_address_blacklisted_ip_in_whitelist(self): + self.assertFalse(is_client_ip_address_blacklisted(self.request)) + + @override_settings(AXES_ONLY_WHITELIST=True) + @override_settings(AXES_IP_WHITELIST=['127.0.0.2']) + def test_is_already_locked_ip_not_in_whitelist(self): + self.assertTrue(is_client_ip_address_blacklisted(self.request)) + + @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True) + @override_settings(AXES_IP_WHITELIST=['127.0.0.1']) + def test_is_client_ip_address_whitelisted_never_lockout(self): + self.assertTrue(is_client_ip_address_whitelisted(self.request)) + + @override_settings(AXES_ONLY_WHITELIST=True) + @override_settings(AXES_IP_WHITELIST=['127.0.0.1']) + def test_is_client_ip_address_whitelisted_only_allow(self): + self.assertTrue(is_client_ip_address_whitelisted(self.request)) + + @override_settings(AXES_ONLY_WHITELIST=True) + @override_settings(AXES_IP_WHITELIST=['127.0.0.2']) + def test_is_client_ip_address_whitelisted_not(self): + self.assertFalse(is_client_ip_address_whitelisted(self.request)) + + class LockoutResponseTestCase(TestCase): def setUp(self): self.request = HttpRequest() diff --git a/axes/utils.py b/axes/utils.py index 28d9600..fe044bc 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -1,10 +1,11 @@ from collections import OrderedDict from datetime import timedelta from logging import getLogger -from typing import Optional +from typing import Optional, Type +from django.contrib.auth import get_user_model from django.core.cache import caches, BaseCache -from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, JsonResponse +from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, JsonResponse, QueryDict from django.shortcuts import render from django.utils.module_loading import import_string @@ -259,7 +260,7 @@ def get_client_str(username: str, ip_address: str, user_agent: str, path_info: s return template.format(client_dict) -def get_query_str(query: dict, max_length: int = 1024) -> str: +def get_query_str(query: Type[QueryDict], max_length: int = 1024) -> str: """ Turns a query dictionary into an easy-to-read list of key-value pairs. @@ -323,3 +324,87 @@ def get_lockout_response(request: HttpRequest, credentials: dict = None) -> Http get_lockout_message(), status=status, ) + + +def is_ip_address_in_whitelist(ip_address: str) -> bool: + if not settings.AXES_IP_WHITELIST: + return False + + return ip_address in settings.AXES_IP_WHITELIST + + +def is_ip_address_in_blacklist(ip_address: str) -> bool: + if not settings.AXES_IP_BLACKLIST: + return False + + return ip_address in settings.AXES_IP_BLACKLIST + + +def is_client_ip_address_whitelisted(request: HttpRequest): + """ + Check if the given request refers to a whitelisted IP. + """ + + ip_address = get_client_ip_address(request) + + if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(ip_address): + return True + + if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(ip_address): + return True + + return False + + +def is_client_ip_address_blacklisted(request: HttpRequest) -> bool: + """ + Check if the given request refers to a blacklisted IP. + """ + + ip_address = get_client_ip_address(request) + + if is_ip_address_in_blacklist(ip_address): + return True + + if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(ip_address): + return True + + return False + + +def is_client_method_whitelisted(request: HttpRequest) -> bool: + """ + Check if the given request uses a whitelisted method. + """ + + if settings.AXES_NEVER_LOCKOUT_GET and request.method == 'GET': + return True + + return False + + +def is_client_username_whitelisted(request: HttpRequest, credentials: dict = None) -> bool: + """ + Check if the given request or credentials refer to a whitelisted username. + + A whitelisted user has the magic ``nolockout`` property set. + + If the property is unknown or False or the user can not be found, + this implementation fails gracefully and returns True. + """ + + username_field = getattr(get_user_model(), 'USERNAME_FIELD', 'username') + username_value = get_client_username(request, credentials) + kwargs = { + username_field: username_value + } + + user_model = get_user_model() + + try: + user = user_model.objects.get(**kwargs) + return user.nolockout + except (user_model.DoesNotExist, AttributeError): + pass + + return False diff --git a/docs/configuration.rst b/docs/configuration.rst index 6b24b23..f896664 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -236,7 +236,7 @@ These should be defined in your ``settings.py`` file. timedelta object or an integer. If an integer, will be interpreted as a number of hours. Default: ``None`` * ``AXES_HANDLER``: If set, overrides the default signal handler backend. - Default: ``'axes.handlers.AxesHandler'`` + Default: ``'axes.handlers.database.AxesDatabaseHandler'`` * ``AXES_LOGGER``: If set, specifies a logging mechanism for Axes to use. Default: ``'axes.watch_login'`` * ``AXES_LOCKOUT_TEMPLATE``: If set, specifies a template to render when a @@ -271,7 +271,7 @@ These should be defined in your ``settings.py`` file. * ``AXES_NEVER_LOCKOUT_WHITELIST``: If ``True``, users can always login from whitelisted IP addresses. Default: ``False`` * ``AXES_CLIENT_IP_ATTRIBUTE``: A string that is used to lookup and set client IP on the request object. Default: ``'axes_client_ip'`` -* ``AXES_IP_BLACKLIST``: An iterable of IPs to be blacklisted. For example: ``AXES_IP_BLACKLIST = ['0.0.0.0']``. Default: ``None`` +* ``AXES_IP_BLACKLIST``: An iterable of IPs to be blacklisted. Takes precedence over whitelists. For example: ``AXES_IP_BLACKLIST = ['0.0.0.0']``. Default: ``None`` * ``AXES_IP_WHITELIST``: An iterable of IPs to be whitelisted. For example: ``AXES_IP_WHITELIST = ['0.0.0.0']``. Default: ``None`` * ``AXES_DISABLE_ACCESS_LOG``: If ``True``, disable all access logging, so the admin interface will be empty. Default: ``False`` * ``AXES_DISABLE_SUCCESS_ACCESS_LOG``: If ``True``, successful logins will not be logged, so the access log shown in the admin interface will only list unsuccessful login attempts. Default: ``False``