diff --git a/axes/attempts.py b/axes/attempts.py index 2f331a0..1414e76 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -1,6 +1,5 @@ from logging import getLogger -from django.contrib.auth import get_user_model from django.db.models import QuerySet from django.utils.timezone import datetime, now @@ -91,32 +90,3 @@ def reset_user_attempts(request, credentials: dict = None) -> int: log.info("AXES: Reset %s access attempts from database.", count) return count - - -def is_user_attempt_whitelisted(request, credentials: dict = None) -> bool: - """ - Check if the given request or credentials refer to a whitelisted username. - - A whitelisted user has the magic ``nolockout`` property set. - - If the property is unknown or False or the user can not be found, - this implementation fails gracefully and returns False. - - This is a legacy method forom an older release - that should be converted to a configurable callable - for determining whitelisting criteria per developer specification. - """ - - username_field = getattr(get_user_model(), "USERNAME_FIELD", "username") - username_value = get_client_username(request, credentials) - kwargs = {username_field: username_value} - - user_model = get_user_model() - - try: - user = user_model.objects.get(**kwargs) - return user.nolockout - except (user_model.DoesNotExist, AttributeError): - pass - - return False diff --git a/axes/conf.py b/axes/conf.py index a4aaae6..edd4a41 100644 --- a/axes/conf.py +++ b/axes/conf.py @@ -38,6 +38,9 @@ class AxesAppConf(AppConf): # use a provided callable to transform the POSTed username into the one used in credentials USERNAME_CALLABLE = None + # determine if given user should be always allowed to attempt authentication + WHITELIST_CALLABLE = None + # reset the number of failed attempts after one successful attempt RESET_ON_SUCCESS = False diff --git a/axes/handlers/base.py b/axes/handlers/base.py index 98d9fbc..8ee8dfe 100644 --- a/axes/handlers/base.py +++ b/axes/handlers/base.py @@ -8,6 +8,7 @@ from axes.helpers import ( is_client_ip_address_blacklisted, is_client_ip_address_whitelisted, is_client_method_whitelisted, + is_user_attempt_whitelisted, ) @@ -118,6 +119,9 @@ class AxesHandler: # pylint: disable=unused-argument Checks if the request or given credentials are whitelisted for access. """ + if is_user_attempt_whitelisted(request, credentials): + return True + if is_client_ip_address_whitelisted(request): return True diff --git a/axes/handlers/database.py b/axes/handlers/database.py index 9af490a..76b362a 100644 --- a/axes/handlers/database.py +++ b/axes/handlers/database.py @@ -7,7 +7,6 @@ from django.utils import timezone from axes.attempts import ( clean_expired_user_attempts, get_user_attempts, - is_user_attempt_whitelisted, reset_user_attempts, ) from axes.conf import settings @@ -69,12 +68,6 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals or 0 ) - def is_locked(self, request, credentials: dict = None): - if is_user_attempt_whitelisted(request, credentials): - return False - - return super().is_locked(request, credentials) - def user_login_failed( self, sender, credentials: dict, request=None, **kwargs ): # pylint: disable=too-many-locals diff --git a/axes/helpers.py b/axes/helpers.py index 778e669..ac39cd5 100644 --- a/axes/helpers.py +++ b/axes/helpers.py @@ -208,13 +208,43 @@ def get_client_parameters(username: str, ip_address: str, user_agent: str) -> di return filter_kwargs +def get_client_cache_key( + request_or_attempt: Union[HttpRequest, AccessBase], credentials: dict = None +) -> str: + """ + Build cache key name from request or AccessAttempt object. + + :param request_or_attempt: HttpRequest or AccessAttempt object + :param credentials: credentials containing user information + :return cache_key: Hash key that is usable for Django cache backends + """ + + if isinstance(request_or_attempt, AccessBase): + username = request_or_attempt.username + ip_address = request_or_attempt.ip_address + user_agent = request_or_attempt.user_agent + else: + username = get_client_username(request_or_attempt, credentials) + ip_address = get_client_ip_address(request_or_attempt) + user_agent = get_client_user_agent(request_or_attempt) + + filter_kwargs = get_client_parameters(username, ip_address, user_agent) + + cache_key_components = "".join(value for value in filter_kwargs.values() if value) + cache_key_digest = md5(cache_key_components.encode()).hexdigest() + cache_key = f"axes-{cache_key_digest}" + + return cache_key + + def get_client_str( username: str, ip_address: str, user_agent: str, path_info: str ) -> str: """ Get a readable string that can be used in e.g. logging to distinguish client requests. - Example log format would be ``{username: "example", ip_address: "127.0.0.1", path_info: "/example/"}`` + Example log format would be + ``{username: "example", ip_address: "127.0.0.1", path_info: "/example/"}`` """ client_dict = dict() @@ -366,33 +396,42 @@ def is_client_method_whitelisted(request) -> bool: return False -def get_client_cache_key( - request_or_attempt: Union[HttpRequest, AccessBase], credentials: dict = None -) -> str: +def is_user_attempt_whitelisted(request, credentials: dict = None) -> bool: """ - Build cache key name from request or AccessAttempt object. + Check if the given request or credentials refer to a whitelisted username. - :param request_or_attempt: HttpRequest or AccessAttempt object - :param credentials: credentials containing user information - :return cache_key: Hash key that is usable for Django cache backends + This method invokes the ``settings.AXES_WHITELIST`` callable + with ``request`` and ``credentials`` arguments. + + This function could use the following implementation for checking + the lockout flags from a specific property in the user object: + + .. code-block: python + + username_value = get_client_username(request, credentials) + username_field = getattr( + get_user_model(), + "USERNAME_FIELD", + "username" + ) + kwargs = {username_field: username_value} + + user_model = get_user_model() + user = user_model.objects.get(**kwargs) + return user.nolockout """ - if isinstance(request_or_attempt, AccessBase): - username = request_or_attempt.username - ip_address = request_or_attempt.ip_address - user_agent = request_or_attempt.user_agent - else: - username = get_client_username(request_or_attempt, credentials) - ip_address = get_client_ip_address(request_or_attempt) - user_agent = get_client_user_agent(request_or_attempt) + whitelist_callable = settings.AXES_WHITELIST_CALLABLE + if whitelist_callable is None: + return False + if callable(whitelist_callable): + return whitelist_callable(request, credentials) + if isinstance(whitelist_callable, str): + return import_string(whitelist_callable)(request, credentials) - filter_kwargs = get_client_parameters(username, ip_address, user_agent) - - cache_key_components = "".join(value for value in filter_kwargs.values() if value) - cache_key_digest = md5(cache_key_components.encode()).hexdigest() - cache_key = f"axes-{cache_key_digest}" - - return cache_key + raise TypeError( + "settings.AXES_WHITELIST_CALLABLE needs to be a string, callable, or None." + ) def toggleable(func) -> Callable: diff --git a/axes/tests/test_attempts.py b/axes/tests/test_attempts.py index b474528..2e72c3e 100644 --- a/axes/tests/test_attempts.py +++ b/axes/tests/test_attempts.py @@ -1,11 +1,9 @@ from unittest.mock import patch -from django.contrib.auth import get_user_model -from django.http import HttpRequest from django.test import override_settings from django.utils.timezone import now -from axes.attempts import is_user_attempt_whitelisted, get_cool_off_threshold +from axes.attempts import get_cool_off_threshold from axes.models import AccessAttempt from axes.tests.base import AxesTestCase from axes.utils import reset @@ -46,33 +44,3 @@ class ResetTestCase(AxesTestCase): self.create_attempt(username=self.username) reset(username=self.username) self.assertFalse(AccessAttempt.objects.count()) - - -class UserWhitelistTestCase(AxesTestCase): - def setUp(self): - self.user_model = get_user_model() - self.user = self.user_model.objects.create(username="jane.doe") - self.request = HttpRequest() - - def test_is_client_username_whitelisted(self): - with patch.object(self.user_model, "nolockout", True, create=True): - self.assertTrue( - is_user_attempt_whitelisted( - self.request, {self.user_model.USERNAME_FIELD: self.user.username} - ) - ) - - def test_is_client_username_whitelisted_not(self): - self.assertFalse( - is_user_attempt_whitelisted( - self.request, {self.user_model.USERNAME_FIELD: self.user.username} - ) - ) - - def test_is_client_username_whitelisted_does_not_exist(self): - self.assertFalse( - is_user_attempt_whitelisted( - self.request, - {self.user_model.USERNAME_FIELD: "not." + self.user.username}, - ) - ) diff --git a/axes/tests/test_handlers.py b/axes/tests/test_handlers.py index 2a5f810..e0ea960 100644 --- a/axes/tests/test_handlers.py +++ b/axes/tests/test_handlers.py @@ -194,11 +194,6 @@ class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase): def test_whitelist(self, log): self.check_whitelist(log) - @patch("axes.handlers.database.is_user_attempt_whitelisted", return_value=True) - def test_user_whitelisted(self, is_whitelisted): - self.assertFalse(AxesProxyHandler().is_locked(self.request, self.credentials)) - self.assertEqual(1, is_whitelisted.call_count) - def test_user_login_failed_multiple_username(self): configurations = ( (1, 2, {}, ["admin", "admin1"]), diff --git a/axes/tests/test_helpers.py b/axes/tests/test_helpers.py index d3a6d50..992812b 100644 --- a/axes/tests/test_helpers.py +++ b/axes/tests/test_helpers.py @@ -1,8 +1,10 @@ from datetime import timedelta +from django.contrib.auth import get_user_model +from django.http import HttpRequest from django.test import override_settings -from axes.helpers import get_cool_off +from axes.helpers import get_cool_off, is_user_attempt_whitelisted from axes.tests.base import AxesTestCase @@ -10,6 +12,10 @@ def get_cool_off_str(): return timedelta(seconds=30) +def is_whitelisted(request, credentials): + return True + + class AxesHelpersTestCase(AxesTestCase): @override_settings(AXES_COOLOFF_TIME=None) def test_get_cool_off_none(self): @@ -26,3 +32,27 @@ class AxesHelpersTestCase(AxesTestCase): @override_settings(AXES_COOLOFF_TIME="axes.tests.test_helpers.get_cool_off_str") def test_get_cool_off_str(self): self.assertEqual(get_cool_off(), timedelta(seconds=30)) + + +class UserWhitelistTestCase(AxesTestCase): + def setUp(self): + self.user_model = get_user_model() + self.user = self.user_model.objects.create(username="jane.doe") + self.request = HttpRequest() + self.credentials = dict() + + def test_is_whitelisted(self): + self.assertFalse(is_user_attempt_whitelisted(self.request, self.credentials)) + + @override_settings(AXES_WHITELIST_CALLABLE=is_whitelisted) + def test_is_whitelisted_override(self): + self.assertTrue(is_user_attempt_whitelisted(self.request, self.credentials)) + + @override_settings(AXES_WHITELIST_CALLABLE="axes.tests.test_helpers.is_whitelisted") + def test_is_whitelisted_override_path(self): + self.assertTrue(is_user_attempt_whitelisted(self.request, self.credentials)) + + @override_settings(AXES_WHITELIST_CALLABLE=42) + def test_is_whitelisted_override_invalid(self): + with self.assertRaises(TypeError): + is_user_attempt_whitelisted(self.request, self.credentials) diff --git a/docs/4_configuration.rst b/docs/4_configuration.rst index 89c0321..a9f5d76 100644 --- a/docs/4_configuration.rst +++ b/docs/4_configuration.rst @@ -76,6 +76,11 @@ The following ``settings.py`` options are available for customizing Axes behavio ``credentials`` are the ones that were passed to Django ``authenticate()`` in the login flow. If no function is supplied, Axes fetches the username from the ``credentials`` or ``request.POST`` dictionaries based on ``AXES_USERNAME_FORM_FIELD``. + * ``AXES_WHITELIST_CALLABLE``: A callable or a string path to callable that takes + two arguments for whitelisting determination and returns True, + if user should be whitelisted: + ``def is_whilisted(request: HttpRequest, credentials: dict) -> bool: ...``. + This can be any callable similarly to ``AXES_USERNAME_CALLABLE`` Default: ``None`` * ``AXES_PASSWORD_FORM_FIELD``: the name of the form or credentials field that contains your users password. Default: ``password`` @@ -121,15 +126,15 @@ following settings to suit your set up to correctly resolve client IP addresses: .. note:: For reverse proxies or e.g. Heroku, you might also want to fetch IP addresses from a HTTP header such as ``X-Forwarded-For``. To configure this, you can fetch IPs through the ``HTTP_X_FORWARDED_FOR`` key from the ``request.META`` property which contains all the HTTP headers in Django: - + .. code-block:: python - + # refer to the Django request and response objects documentation AXES_META_PRECEDENCE_ORDER = [ 'HTTP_X_FORWARDED_FOR', 'REMOTE_ADDR', ] - + Please note that proxies have different behaviours with the HTTP headers. Make sure that your proxy either strips the incoming value or otherwise makes sure of the validity of the header that is used because **any header values used in application configuration must be secure and trusted**. Otherwise the client can spoof IP addresses by just setting the header in their request and circumvent the IP address monitoring. Normal proxy server behaviours include overriding and appending the header value depending on the platform. Different platforms and gateway services utilize different headers, please refer to your deployment target documentation for up-to-date information on correct configuration. @@ -227,4 +232,3 @@ with third party applications and plugins such as - Django REST Framework - Django Allauth - Django Simple Captcha -