diff --git a/.gitignore b/.gitignore index f35d8c8..4efa8a6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ *.egg-info *.pyc +*.swp .coverage .DS_Store .project diff --git a/axes/attempts.py b/axes/attempts.py index 43c8f4d..4fe7725 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -11,7 +11,7 @@ from axes.models import AccessAttempt from axes.utils import get_axes_cache, get_client_ip, get_client_username -def _query_user_attempts(request, credentials): +def _query_user_attempts(request, credentials=None): """Returns access attempt record if it exists. Otherwise return None. """ @@ -53,7 +53,6 @@ def get_cache_key(request_or_obj, credentials=None): """ Build cache key name from request or AccessAttempt object. :param request_or_obj: Request or AccessAttempt object - :param credentials: Dictionary with access credentials - Only supplied when request_or_obj is not an AccessAttempt :return cache-key: String, key to be used in cache system """ if isinstance(request_or_obj, AccessAttempt): @@ -62,8 +61,8 @@ def get_cache_key(request_or_obj, credentials=None): ua = request_or_obj.user_agent else: ip = get_client_ip(request_or_obj) - ua = request_or_obj.META.get('HTTP_USER_AGENT', '')[:255] un = get_client_username(request_or_obj, credentials) + ua = request_or_obj.META.get('HTTP_USER_AGENT', '')[:255] ip = ip.encode('utf-8') if ip else ''.encode('utf-8') un = un.encode('utf-8') if un else ''.encode('utf-8') @@ -97,7 +96,7 @@ def get_cache_timeout(): return cache_timeout -def get_user_attempts(request, credentials): +def get_user_attempts(request, credentials=None): force_reload = False attempts = _query_user_attempts(request, credentials) cache_hash_key = get_cache_key(request, credentials) @@ -131,7 +130,7 @@ def get_user_attempts(request, credentials): return attempts -def reset_user_attempts(request, credentials): +def reset_user_attempts(request, credentials=None): attempts = _query_user_attempts(request, credentials) count, _ = attempts.delete() @@ -152,7 +151,7 @@ def ip_in_blacklist(ip): return ip in settings.AXES_IP_BLACKLIST -def is_user_lockable(request, credentials): +def is_user_lockable(request, credentials=None): """Check if the user has a profile with nolockout If so, then return the value to see if this user is special and doesn't get their account locked out diff --git a/axes/backends.py b/axes/backends.py index cbd2d5b..beed926 100644 --- a/axes/backends.py +++ b/axes/backends.py @@ -4,8 +4,7 @@ from django.contrib.auth.backends import ModelBackend from django.core.exceptions import PermissionDenied from axes.attempts import is_already_locked -from axes.conf import settings -from axes.utils import get_lockout_message +from axes.utils import get_credentials, get_lockout_message class AxesModelBackend(ModelBackend): @@ -17,26 +16,28 @@ class AxesModelBackend(ModelBackend): super(AxesModelBackend.RequestParameterRequired, self).__init__( AxesModelBackend.RequestParameterRequired.msg) - def authenticate(self, request, username=None, password=None, **kwargs): - """ - Add django-axes handling and add allow adding errors directly to a passed return_context. - Will never actually authenticate a user, just blocks locked out uses so don't use this as your only back end. - :param request: see ModelBackend.authenticate - :param username: see ModelBackend.authenticate - :param password: see ModelBackend.authenticate - :keyword response_context: context dict that will be returned/used in the response template. - NOTE: will overwrite 'error' field in dict - :param kwargs: see ModelBackend.authenticate - :raises PermissionDenied: if user is already locked out. - :return: Nothing, but will update return_context with lockout message if user is locked out. - """ + def authenticate(self, request, **kwargs): + """Checks user lock out status and raises PermissionDenied if user is not allowed to log in. - # Create credentials dictionary from username field - credentials = {settings.AXES_USERNAME_FORM_FIELD: username} + Inserts errors directly to `return_context` that is supplied as a keyword argument. + + Use this on top of your AUTHENTICATION_BACKENDS list to prevent locked out users + from being authenticated in the standard Django authentication flow. + + Note that this method does not log your user in and delegates login to other backends. + + :param request: see ModelBackend.authenticate + :param kwargs: see ModelBackend.authenticate + :keyword response_context: context dict that will be updated with error information + :raises PermissionDenied: if user is already locked out + :return: None + """ if request is None: raise AxesModelBackend.RequestParameterRequired() + credentials = get_credentials(**kwargs) + if is_already_locked(request, credentials): # locked out, don't try to authenticate, just update return_context and return # Its a bit weird to pass a context and expect a response value but its nice to get a "why" back. diff --git a/axes/signals.py b/axes/signals.py index c6e2afc..be2bbea 100644 --- a/axes/signals.py +++ b/axes/signals.py @@ -20,7 +20,7 @@ from axes.attempts import reset_user_attempts from axes.models import AccessLog, AccessAttempt from axes.utils import get_client_str from axes.utils import query2str -from axes.utils import get_axes_cache, get_client_ip, get_client_username +from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials log = logging.getLogger(settings.AXES_LOGGER) @@ -128,6 +128,7 @@ def log_user_logged_in(sender, request, user, **kwargs): # pylint: disable=unus """ When a user logs in, update the access log """ username = user.get_username() + credentials = get_credentials(username) ip_address = get_client_ip(request) user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] path_info = request.META.get('PATH_INFO', '')[:255] @@ -148,8 +149,6 @@ def log_user_logged_in(sender, request, user, **kwargs): # pylint: disable=unus ) if settings.AXES_RESET_ON_SUCCESS: - # Create credentials dictionary from the username field - credentials = {settings.AXES_USERNAME_FORM_FIELD: username} count = reset_user_attempts(request, credentials) log.info( 'AXES: Deleted %d failed login attempts by %s.', diff --git a/axes/tests/test_access_attempt.py b/axes/tests/test_access_attempt.py index c02f0da..84a0173 100644 --- a/axes/tests/test_access_attempt.py +++ b/axes/tests/test_access_attempt.py @@ -206,9 +206,41 @@ class AccessAttemptTest(TestCase): 'username': self.VALID_USERNAME, 'password': 'test' }) - credentials = { - 'username': self.VALID_USERNAME - } + + self.assertEqual(cache_hash_key, get_cache_key(request)) + + # Getting cache key from AccessAttempt Object + attempt = AccessAttempt( + user_agent='', + ip_address=ip_address, + username=self.VALID_USERNAME, + get_data='', + post_data='', + http_accept=request.META.get('HTTP_ACCEPT', ''), + path_info=request.META.get('PATH_INFO', ''), + failures_since_start=0, + ) + self.assertEqual(cache_hash_key, get_cache_key(attempt)) + + + @patch('axes.utils.get_client_ip', return_value='127.0.0.1') + def test_get_cache_key_credentials(self, _): + """ Test the cache key format""" + # Getting cache key from request + ip_address = '127.0.0.1' + cache_hash_key = 'axes-{}'.format( + hashlib.md5(ip_address.encode()).hexdigest() + ) + + request_factory = RequestFactory() + request = request_factory.post('/admin/login/', + data={ + 'username': self.VALID_USERNAME, + 'password': 'test' + }) + + # Difference between the upper test: new call signature with credentials + credentials = {'username': self.VALID_USERNAME} self.assertEqual(cache_hash_key, get_cache_key(request, credentials)) diff --git a/axes/tests/test_utils.py b/axes/tests/test_utils.py index 378379d..53a5a23 100644 --- a/axes/tests/test_utils.py +++ b/axes/tests/test_utils.py @@ -146,7 +146,7 @@ class UtilsTest(TestCase): self.assertEqual(expected, actual) @override_settings(AXES_USERNAME_FORM_FIELD='username') - def test_default_get_client_username_from_request(self): + def test_default_get_client_username(self): expected = 'test-username' request = HttpRequest() @@ -157,7 +157,7 @@ class UtilsTest(TestCase): self.assertEqual(expected, actual) @override_settings(AXES_USERNAME_FORM_FIELD='username') - def test_default_get_client_username_from_credentials(self): + def test_default_get_client_username_credentials(self): expected = 'test-username' expected_in_credentials = 'test-credentials-username' @@ -171,12 +171,12 @@ class UtilsTest(TestCase): self.assertEqual(expected_in_credentials, actual) - def sample_customize_username_from_request(request, credentials): + def sample_customize_username(request): return 'prefixed-' + request.POST.get('username') @override_settings(AXES_USERNAME_FORM_FIELD='username') - @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_from_request) - def test_custom_get_client_username_from_request(self): + @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username) + def test_custom_get_client_username(self): provided = 'test-username' expected = 'prefixed-' + provided @@ -187,11 +187,11 @@ class UtilsTest(TestCase): self.assertEqual(expected, actual) - def sample_customize_username_from_credentials(request, credentials): + def sample_customize_username_credentials(request, credentials): return 'prefixed-' + credentials.get('username') @override_settings(AXES_USERNAME_FORM_FIELD='username') - @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_from_credentials) + @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_credentials) def test_custom_get_client_username_from_credentials(self): provided = 'test-username' expected = 'prefixed-' + provided diff --git a/axes/utils.py b/axes/utils.py index 36e055c..f98c024 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -5,6 +5,8 @@ try: except ImportError: pass +from inspect import getargspec +from logging import getLogger from socket import error, inet_pton, AF_INET6 from django.core.cache import caches @@ -15,6 +17,7 @@ import ipware.ip2 from axes.conf import settings from axes.models import AccessAttempt +logger = getLogger(__name__) def get_axes_cache(): return caches[getattr(settings, 'AXES_CACHE', 'default')] @@ -70,11 +73,47 @@ def get_client_ip(request): def get_client_username(request, credentials=None): + """Resolve client username from the given request or credentials if supplied + + The order of preference for fetching the username is as follows: + + 1. If configured, use `AXES_USERNAME_CALLABLE`, and supply either `request` or `request, credentials` as arguments + depending on the function argument count (multiple signatures are supported for backwards compatibility) + 2. If given, use `credentials` and fetch username from `AXES_USERNAME_FORM_FIELD` (defaults to `username`) + 3. Use request.POST and fetch username from `AXES_USERNAME_FORM_FIELD` (defaults to `username`) + + :param request: incoming Django `HttpRequest` or similar object from authentication backend or other source + :param credentials: incoming credentials `dict` or similar object from authentication backend or other source + """ + if settings.AXES_USERNAME_CALLABLE: - return settings.AXES_USERNAME_CALLABLE(request, credentials) - if credentials is None: - return request.POST.get('username', None) - return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None) + num_args = len( + getargspec(settings.AXES_USERNAME_CALLABLE).args # pylint: disable=deprecated-method + ) + + if num_args == 2: + logger.debug('Using AXES_USERNAME_CALLABLE for username with two arguments: request, credentials') + return settings.AXES_USERNAME_CALLABLE(request, credentials) + + if num_args == 1: + logger.debug('Using AXES_USERNAME_CALLABLE for username with one argument: request') + return settings.AXES_USERNAME_CALLABLE(request) + + logger.error('Using AXES_USERNAME_CALLABLE for username failed: wrong number of arguments %s', num_args) + raise TypeError('Wrong number of arguments in function call to AXES_USERNAME_CALLABLE', num_args) + + if credentials: + logger.debug('Using `credentials` to get username with key AXES_USERNAME_FORM_FIELD') + return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None) + + logger.debug('Using `request.POST` to get username with key AXES_USERNAME_FORM_FIELD') + return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) + + +def get_credentials(username=None, **kwargs): + credentials = {settings.AXES_USERNAME_FORM_FIELD: username} + credentials.update(kwargs) + return credentials def is_ipv6(ip): diff --git a/docs/configuration.rst b/docs/configuration.rst index 6386b38..8404142 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -124,13 +124,12 @@ These should be defined in your ``settings.py`` file. Default: ``True`` * ``AXES_USERNAME_FORM_FIELD``: the name of the form field that contains your users usernames. Default: ``username`` -* ``AXES_USERNAME_CALLABLE``: A callable function that takes two arguments: - The request object and A dictionary of keyword arguments containing the user credentials - that were passed to authenticate() or your own custom authentication backend. - Credentials matching a set of ‘sensitive’ patterns, (including password) are not contained. - The function must return the username. - If no function is supplied, axes just fetches the username from the credentials or requst.POST fields - based on ``AXES_USERNAME_FORM_FIELD``. Default: ``None`` +* ``AXES_USERNAME_CALLABLE``: A callable function that takes either one or two arguments: + ``AXES_USERNAME_CALLABLE(request)`` or ``AXES_USERNAME_CALLABLE(request, credentials)``. + The ``request`` is a HttpRequest like object and the ``credentials`` is a dictionary like object. + ``credentials`` are the ones that were passed to Django ``authenticate()`` in the login flow. + If no function is supplied, axes fetches the username from the ``credentials`` or ``request.POST`` + dictionaries based on ``AXES_USERNAME_FORM_FIELD``. Default: ``None`` * ``AXES_PASSWORD_FORM_FIELD``: the name of the form or credentials field that contains your users password. Default: ``password`` * ``AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP``: If ``True`` prevents the login diff --git a/docs/usage.rst b/docs/usage.rst index 200d326..1e1187b 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -196,13 +196,24 @@ into ``my_namespace-username``: *settings.py:* :: - def sample_username_modifier(request, credentials): + def sample_username_modifier(request): provided_username = request.POST.get('username') some_namespace = request.POST.get('namespace') return '-'.join([some_namespace, provided_username[9:]]) AXES_USERNAME_CALLABLE = sample_username_modifier + # New format that can also be used + # the credentials argument is provided if the + # function signature has two arguments instead of one + + def sample_username_modifier_credentials(request, credentials): + provided_username = credentials.get('username') + some_namespace = credentials.get('namespace') + return '-'.join([some_namespace, provided_username[9:]]) + + AXES_USERNAME_CALLABLE = sample_username_modifier_new + NOTE: You still have to make these modifications yourself before calling authenticate. If you want to re-use the same function for consistency, that's fine, but ``axes`` doesn't inject these changes into the authentication flow