From 306dd3c596dcfbb9968f0e9a140f4e34010ad5f3 Mon Sep 17 00:00:00 2001 From: Benedikt Bauer Date: Wed, 19 Dec 2018 14:27:58 +0100 Subject: [PATCH 1/6] Use credentials whenever possible in favor of request.POST --- axes/attempts.py | 36 ++++++++++++++------------ axes/backends.py | 6 ++++- axes/signals.py | 12 +++++---- axes/tests/test_access_attempt.py | 5 +++- axes/tests/test_utils.py | 42 ++++++++++++++++++++++++++++--- axes/utils.py | 8 +++--- 6 files changed, 79 insertions(+), 30 deletions(-) diff --git a/axes/attempts.py b/axes/attempts.py index d78b224..d15e9e1 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -11,12 +11,12 @@ from axes.models import AccessAttempt from axes.utils import get_axes_cache, get_client_ip, get_client_username -def _query_user_attempts(request): +def _query_user_attempts(request, credentials): """Returns access attempt record if it exists. Otherwise return None. """ ip = get_client_ip(request) - username = get_client_username(request) + username = get_client_username(request, credentials) if settings.AXES_ONLY_USER_FAILURES: attempts = AccessAttempt.objects.filter(username=username) @@ -49,10 +49,11 @@ def _query_user_attempts(request): return attempts -def get_cache_key(request_or_obj): +def get_cache_key(request_or_obj, credentials=None): """ Build cache key name from request or AccessAttempt object. :param request_or_obj: Request or AccessAttempt object + :param credentials: Dictionary with access credentials - Only supplied when request_or_obj is not an AccessAttempt :return cache-key: String, key to be used in cache system """ if isinstance(request_or_obj, AccessAttempt): @@ -61,8 +62,11 @@ def get_cache_key(request_or_obj): ua = request_or_obj.user_agent else: ip = get_client_ip(request_or_obj) - un = request_or_obj.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) ua = request_or_obj.META.get('HTTP_USER_AGENT', '')[:255] + if credentials is None: + un = request_or_obj.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) + else: + un = credentials.get(settings.AXES_USERNAME_FORM_FIELD, None) ip = ip.encode('utf-8') if ip else ''.encode('utf-8') un = un.encode('utf-8') if un else ''.encode('utf-8') @@ -96,10 +100,10 @@ def get_cache_timeout(): return cache_timeout -def get_user_attempts(request): +def get_user_attempts(request, credentials): force_reload = False - attempts = _query_user_attempts(request) - cache_hash_key = get_cache_key(request) + attempts = _query_user_attempts(request, credentials) + cache_hash_key = get_cache_key(request, credentials) cache_timeout = get_cache_timeout() cool_off = settings.AXES_COOLOFF_TIME @@ -125,13 +129,13 @@ def get_user_attempts(request): # If objects were deleted, we need to update the queryset to reflect this, # so force a reload. if force_reload: - attempts = _query_user_attempts(request) + attempts = _query_user_attempts(request, credentials) return attempts -def reset_user_attempts(request): - attempts = _query_user_attempts(request) +def reset_user_attempts(request, credentials): + attempts = _query_user_attempts(request, credentials) count, _ = attempts.delete() return count @@ -151,7 +155,7 @@ def ip_in_blacklist(ip): return ip in settings.AXES_IP_BLACKLIST -def is_user_lockable(request): +def is_user_lockable(request, credentials): """Check if the user has a profile with nolockout If so, then return the value to see if this user is special and doesn't get their account locked out @@ -165,7 +169,7 @@ def is_user_lockable(request): try: field = getattr(get_user_model(), 'USERNAME_FIELD', 'username') kwargs = { - field: get_client_username(request) + field: get_client_username(request, credentials) } user = get_user_model().objects.get(**kwargs) @@ -182,7 +186,7 @@ def is_user_lockable(request): return True -def is_already_locked(request): +def is_already_locked(request, credentials=None): ip = get_client_ip(request) if ( @@ -200,10 +204,10 @@ def is_already_locked(request): if ip_in_blacklist(ip): return True - if not is_user_lockable(request): + if not is_user_lockable(request, credentials): return False - cache_hash_key = get_cache_key(request) + cache_hash_key = get_cache_key(request, credentials) failures_cached = get_axes_cache().get(cache_hash_key) if failures_cached is not None: return ( @@ -211,7 +215,7 @@ def is_already_locked(request): settings.AXES_LOCK_OUT_AT_FAILURE ) - for attempt in get_user_attempts(request): + for attempt in get_user_attempts(request, credentials): if ( attempt.failures_since_start >= settings.AXES_FAILURE_LIMIT and settings.AXES_LOCK_OUT_AT_FAILURE diff --git a/axes/backends.py b/axes/backends.py index 8e1b2fd..cbd2d5b 100644 --- a/axes/backends.py +++ b/axes/backends.py @@ -4,6 +4,7 @@ from django.contrib.auth.backends import ModelBackend from django.core.exceptions import PermissionDenied from axes.attempts import is_already_locked +from axes.conf import settings from axes.utils import get_lockout_message @@ -30,10 +31,13 @@ class AxesModelBackend(ModelBackend): :return: Nothing, but will update return_context with lockout message if user is locked out. """ + # Create credentials dictionary from username field + credentials = {settings.AXES_USERNAME_FORM_FIELD: username} + if request is None: raise AxesModelBackend.RequestParameterRequired() - if is_already_locked(request): + if is_already_locked(request, credentials): # locked out, don't try to authenticate, just update return_context and return # Its a bit weird to pass a context and expect a response value but its nice to get a "why" back. error_msg = get_lockout_message() diff --git a/axes/signals.py b/axes/signals.py index a3f6b47..c6e2afc 100644 --- a/axes/signals.py +++ b/axes/signals.py @@ -38,7 +38,7 @@ def log_user_login_failed(sender, credentials, request, **kwargs): # pylint: di return ip_address = get_client_ip(request) - username = get_client_username(request) + username = get_client_username(request, credentials) user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] path_info = request.META.get('PATH_INFO', '')[:255] http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] @@ -47,8 +47,8 @@ def log_user_login_failed(sender, credentials, request, **kwargs): # pylint: di return failures = 0 - attempts = get_user_attempts(request) - cache_hash_key = get_cache_key(request) + attempts = get_user_attempts(request, credentials) + cache_hash_key = get_cache_key(request, credentials) cache_timeout = get_cache_timeout() failures_cached = get_axes_cache().get(cache_hash_key) @@ -110,7 +110,7 @@ def log_user_login_failed(sender, credentials, request, **kwargs): # pylint: di if ( failures >= settings.AXES_FAILURE_LIMIT and settings.AXES_LOCK_OUT_AT_FAILURE and - is_user_lockable(request) + is_user_lockable(request, credentials) ): log.warning( 'AXES: locked out %s after repeated login attempts.', @@ -148,7 +148,9 @@ def log_user_logged_in(sender, request, user, **kwargs): # pylint: disable=unus ) if settings.AXES_RESET_ON_SUCCESS: - count = reset_user_attempts(request) + # Create credentials dictionary from the username field + credentials = {settings.AXES_USERNAME_FORM_FIELD: username} + count = reset_user_attempts(request, credentials) log.info( 'AXES: Deleted %d failed login attempts by %s.', count, diff --git a/axes/tests/test_access_attempt.py b/axes/tests/test_access_attempt.py index 5f7c185..c02f0da 100644 --- a/axes/tests/test_access_attempt.py +++ b/axes/tests/test_access_attempt.py @@ -206,8 +206,11 @@ class AccessAttemptTest(TestCase): 'username': self.VALID_USERNAME, 'password': 'test' }) + credentials = { + 'username': self.VALID_USERNAME + } - self.assertEqual(cache_hash_key, get_cache_key(request)) + self.assertEqual(cache_hash_key, get_cache_key(request, credentials)) # Getting cache key from AccessAttempt Object attempt = AccessAttempt( diff --git a/axes/tests/test_utils.py b/axes/tests/test_utils.py index bf3b40a..378379d 100644 --- a/axes/tests/test_utils.py +++ b/axes/tests/test_utils.py @@ -146,7 +146,7 @@ class UtilsTest(TestCase): self.assertEqual(expected, actual) @override_settings(AXES_USERNAME_FORM_FIELD='username') - def test_default_get_client_username(self): + def test_default_get_client_username_from_request(self): expected = 'test-username' request = HttpRequest() @@ -156,12 +156,27 @@ class UtilsTest(TestCase): self.assertEqual(expected, actual) - def sample_customize_username(request): + @override_settings(AXES_USERNAME_FORM_FIELD='username') + def test_default_get_client_username_from_credentials(self): + expected = 'test-username' + expected_in_credentials = 'test-credentials-username' + + request = HttpRequest() + request.POST['username'] = expected + credentials = { + 'username': expected_in_credentials + } + + actual = get_client_username(request, credentials) + + self.assertEqual(expected_in_credentials, actual) + + def sample_customize_username_from_request(request, credentials): return 'prefixed-' + request.POST.get('username') @override_settings(AXES_USERNAME_FORM_FIELD='username') - @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username) - def test_custom_get_client_username(self): + @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_from_request) + def test_custom_get_client_username_from_request(self): provided = 'test-username' expected = 'prefixed-' + provided @@ -171,3 +186,22 @@ class UtilsTest(TestCase): actual = get_client_username(request) self.assertEqual(expected, actual) + + def sample_customize_username_from_credentials(request, credentials): + return 'prefixed-' + credentials.get('username') + + @override_settings(AXES_USERNAME_FORM_FIELD='username') + @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_from_credentials) + def test_custom_get_client_username_from_credentials(self): + provided = 'test-username' + expected = 'prefixed-' + provided + provided_in_credentials = 'test-username' + expected_in_credentials = 'prefixed-' + provided_in_credentials + + request = HttpRequest() + request.POST['username'] = provided + credentials = {'username': provided_in_credentials} + + actual = get_client_username(request, credentials) + + self.assertEqual(expected_in_credentials, actual) diff --git a/axes/utils.py b/axes/utils.py index 0596f7a..a77beda 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -69,10 +69,12 @@ def get_client_ip(request): return getattr(request, client_ip_attribute) -def get_client_username(request): +def get_client_username(request, credentials=None): if settings.AXES_USERNAME_CALLABLE: - return settings.AXES_USERNAME_CALLABLE(request) - return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) + return settings.AXES_USERNAME_CALLABLE(request, credentials) + if credentials is None: + return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) + return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None) def is_ipv6(ip): From b6abd9c08e2ac62117bbc1be7b67259a7fc53a69 Mon Sep 17 00:00:00 2001 From: Benedikt Bauer Date: Wed, 19 Dec 2018 15:27:20 +0100 Subject: [PATCH 2/6] Documentation update to reflect changes to AXES_USERNAME_CALLABLE AXES_USERNAME_CALLABLE now gets two parameters and AXES_PASSWORD_FORM_FIELD applies to the credentials dictionary first and only uses the POST fields as a fallback anymore --- docs/configuration.rst | 6 +++--- docs/usage.rst | 19 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/docs/configuration.rst b/docs/configuration.rst index 85ed2cb..9c9025f 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -124,10 +124,10 @@ These should be defined in your ``settings.py`` file. Default: ``True`` * ``AXES_USERNAME_FORM_FIELD``: the name of the form field that contains your users usernames. Default: ``username`` -* ``AXES_USERNAME_CALLABLE``: A callable function that takes a request object - and returns the username. If empty, axes just fetches it from the POST body +* ``AXES_USERNAME_CALLABLE``: A callable function that takes a request object and a credentials dictionary + and returns the username. If empty, axes just fetches it from the credentials or requst.POST fields based on ``AXES_USERNAME_FORM_FIELD``. Default: ``None`` -* ``AXES_PASSWORD_FORM_FIELD``: the name of the form field that contains your +* ``AXES_PASSWORD_FORM_FIELD``: the name of the form or credentials field that contains your users password. Default: ``password`` * ``AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP``: If ``True`` prevents the login from IP under a particular user if the attempt limit has been exceeded, diff --git a/docs/usage.rst b/docs/usage.rst index 5120fc0..200d326 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -96,7 +96,7 @@ them as per the example. request = request, username = form.cleaned_data.get('username'), password = form.cleaned_data.get('password'), - ) + ) if user is not None: custom_login(request, user) @@ -129,22 +129,22 @@ Integration with django-allauth ------------------------------- ``axes`` relies on having login information stored under ``AXES_USERNAME_FORM_FIELD`` key -both in ``request.POST`` and in ``credentials`` dict passed to -``user_login_failed`` signal. This is not the case with ``allauth``. +both in ``request.POST`` and in ``credentials`` dict passed to +``user_login_failed`` signal. This is not the case with ``allauth``. ``allauth`` always uses ``login`` key in post POST data but it becomes ``username`` key in ``credentials`` dict in signal handler. To overcome this you need to use custom login form that duplicates the value -of ``username`` key under a ``login`` key in that dict +of ``username`` key under a ``login`` key in that dict (and set ``AXES_USERNAME_FORM_FIELD = 'login'``). -You also need to decorate ``dispatch()`` and ``form_invalid()`` methods -of the ``allauth`` login view. By default ``axes`` is patching only the +You also need to decorate ``dispatch()`` and ``form_invalid()`` methods +of the ``allauth`` login view. By default ``axes`` is patching only the ``LoginView`` from ``django.contrib.auth`` app and with ``allauth`` you have to do the patching of views yourself. *settings.py:* :: - + AXES_USERNAME_FORM_FIELD = 'login' *forms.py:* :: @@ -187,7 +187,8 @@ removing client-set prefixes. In these cases, ``axes`` needs to know how to make these changes so that it can correctly identify the user without any form cleaning or validation. This is where the ``AXES_USERNAME_CALLABLE`` setting comes in. You can define how to make these modifications in a callable that -takes a request object, and provide that callable to ``axes`` via this setting. +takes a request object and a credentials dictionary, +and provide that callable to ``axes`` via this setting. For example, a function like this could take a post body with something like ``username='prefixed-username'`` and ``namespace=my_namespace`` and turn it @@ -195,7 +196,7 @@ into ``my_namespace-username``: *settings.py:* :: - def sample_username_modifier(request): + def sample_username_modifier(request, credentials): provided_username = request.POST.get('username') some_namespace = request.POST.get('namespace') return '-'.join([some_namespace, provided_username[9:]]) From 2e9754914bab813d830c43962e26a2723d8e17e8 Mon Sep 17 00:00:00 2001 From: Benedikt Bauer Date: Wed, 19 Dec 2018 16:33:32 +0100 Subject: [PATCH 3/6] Use get_client_username instead of manual username lookup for cache key --- axes/attempts.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/axes/attempts.py b/axes/attempts.py index d15e9e1..43c8f4d 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -63,10 +63,7 @@ def get_cache_key(request_or_obj, credentials=None): else: ip = get_client_ip(request_or_obj) ua = request_or_obj.META.get('HTTP_USER_AGENT', '')[:255] - if credentials is None: - un = request_or_obj.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) - else: - un = credentials.get(settings.AXES_USERNAME_FORM_FIELD, None) + un = get_client_username(request_or_obj, credentials) ip = ip.encode('utf-8') if ip else ''.encode('utf-8') un = un.encode('utf-8') if un else ''.encode('utf-8') From a17c7f4706d5cb2f313207c9eb7831d66c69151b Mon Sep 17 00:00:00 2001 From: Benedikt Bauer Date: Wed, 19 Dec 2018 18:01:56 +0100 Subject: [PATCH 4/6] Strip Whitespace changes from PR --- docs/usage.rst | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/usage.rst b/docs/usage.rst index 200d326..ebc7303 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -96,7 +96,7 @@ them as per the example. request = request, username = form.cleaned_data.get('username'), password = form.cleaned_data.get('password'), - ) + ) if user is not None: custom_login(request, user) @@ -129,22 +129,22 @@ Integration with django-allauth ------------------------------- ``axes`` relies on having login information stored under ``AXES_USERNAME_FORM_FIELD`` key -both in ``request.POST`` and in ``credentials`` dict passed to -``user_login_failed`` signal. This is not the case with ``allauth``. +both in ``request.POST`` and in ``credentials`` dict passed to +``user_login_failed`` signal. This is not the case with ``allauth``. ``allauth`` always uses ``login`` key in post POST data but it becomes ``username`` key in ``credentials`` dict in signal handler. To overcome this you need to use custom login form that duplicates the value -of ``username`` key under a ``login`` key in that dict +of ``username`` key under a ``login`` key in that dict (and set ``AXES_USERNAME_FORM_FIELD = 'login'``). -You also need to decorate ``dispatch()`` and ``form_invalid()`` methods -of the ``allauth`` login view. By default ``axes`` is patching only the +You also need to decorate ``dispatch()`` and ``form_invalid()`` methods +of the ``allauth`` login view. By default ``axes`` is patching only the ``LoginView`` from ``django.contrib.auth`` app and with ``allauth`` you have to do the patching of views yourself. *settings.py:* :: - + AXES_USERNAME_FORM_FIELD = 'login' *forms.py:* :: From efc02327f3811a4a40aa580fddc5b41dc6748656 Mon Sep 17 00:00:00 2001 From: Benedikt Bauer Date: Thu, 20 Dec 2018 16:47:29 +0100 Subject: [PATCH 5/6] Enhance configuration docs with detailed credentials description --- docs/configuration.rst | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/configuration.rst b/docs/configuration.rst index 9c9025f..6386b38 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -124,8 +124,12 @@ These should be defined in your ``settings.py`` file. Default: ``True`` * ``AXES_USERNAME_FORM_FIELD``: the name of the form field that contains your users usernames. Default: ``username`` -* ``AXES_USERNAME_CALLABLE``: A callable function that takes a request object and a credentials dictionary - and returns the username. If empty, axes just fetches it from the credentials or requst.POST fields +* ``AXES_USERNAME_CALLABLE``: A callable function that takes two arguments: + The request object and A dictionary of keyword arguments containing the user credentials + that were passed to authenticate() or your own custom authentication backend. + Credentials matching a set of ‘sensitive’ patterns, (including password) are not contained. + The function must return the username. + If no function is supplied, axes just fetches the username from the credentials or requst.POST fields based on ``AXES_USERNAME_FORM_FIELD``. Default: ``None`` * ``AXES_PASSWORD_FORM_FIELD``: the name of the form or credentials field that contains your users password. Default: ``password`` From 41b532c882446d855b48d730fdd7f804668eab7e Mon Sep 17 00:00:00 2001 From: Benedikt Bauer Date: Thu, 20 Dec 2018 16:48:10 +0100 Subject: [PATCH 6/6] Fix problem with Django Admin -> It always uses username as the username field --- axes/utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/axes/utils.py b/axes/utils.py index a77beda..36e055c 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -73,7 +73,7 @@ def get_client_username(request, credentials=None): if settings.AXES_USERNAME_CALLABLE: return settings.AXES_USERNAME_CALLABLE(request, credentials) if credentials is None: - return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) + return request.POST.get('username', None) return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None)