Backwards compatibility fixes for credentials

Revert some of the PR changes to tests to make sure that all of the
old format function invocations work with the old and new tests.

Implement small enchancements to documentation and credentials resolving
for usability and flexibility with custom authentication backends.

Update documentation to indicate that backwards compatibility
is supported as well as the new format credentials invocations.

Signed-off-by: Aleksi Häkli <aleksi.hakli@iki.fi>
This commit is contained in:
Aleksi Häkli 2018-12-22 19:44:44 +01:00
parent 34f9322367
commit 7c3e21166e
No known key found for this signature in database
GPG key ID: 3E7146964D726BBE
9 changed files with 129 additions and 48 deletions

1
.gitignore vendored
View file

@ -1,5 +1,6 @@
*.egg-info
*.pyc
*.swp
.coverage
.DS_Store
.project

View file

@ -11,7 +11,7 @@ from axes.models import AccessAttempt
from axes.utils import get_axes_cache, get_client_ip, get_client_username
def _query_user_attempts(request, credentials):
def _query_user_attempts(request, credentials=None):
"""Returns access attempt record if it exists.
Otherwise return None.
"""
@ -53,7 +53,6 @@ def get_cache_key(request_or_obj, credentials=None):
"""
Build cache key name from request or AccessAttempt object.
:param request_or_obj: Request or AccessAttempt object
:param credentials: Dictionary with access credentials - Only supplied when request_or_obj is not an AccessAttempt
:return cache-key: String, key to be used in cache system
"""
if isinstance(request_or_obj, AccessAttempt):
@ -62,8 +61,8 @@ def get_cache_key(request_or_obj, credentials=None):
ua = request_or_obj.user_agent
else:
ip = get_client_ip(request_or_obj)
ua = request_or_obj.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
un = get_client_username(request_or_obj, credentials)
ua = request_or_obj.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
ip = ip.encode('utf-8') if ip else ''.encode('utf-8')
un = un.encode('utf-8') if un else ''.encode('utf-8')
@ -97,7 +96,7 @@ def get_cache_timeout():
return cache_timeout
def get_user_attempts(request, credentials):
def get_user_attempts(request, credentials=None):
force_reload = False
attempts = _query_user_attempts(request, credentials)
cache_hash_key = get_cache_key(request, credentials)
@ -131,7 +130,7 @@ def get_user_attempts(request, credentials):
return attempts
def reset_user_attempts(request, credentials):
def reset_user_attempts(request, credentials=None):
attempts = _query_user_attempts(request, credentials)
count, _ = attempts.delete()
@ -152,7 +151,7 @@ def ip_in_blacklist(ip):
return ip in settings.AXES_IP_BLACKLIST
def is_user_lockable(request, credentials):
def is_user_lockable(request, credentials=None):
"""Check if the user has a profile with nolockout
If so, then return the value to see if this user is special
and doesn't get their account locked out

View file

@ -4,8 +4,7 @@ from django.contrib.auth.backends import ModelBackend
from django.core.exceptions import PermissionDenied
from axes.attempts import is_already_locked
from axes.conf import settings
from axes.utils import get_lockout_message
from axes.utils import get_credentials, get_lockout_message
class AxesModelBackend(ModelBackend):
@ -17,26 +16,28 @@ class AxesModelBackend(ModelBackend):
super(AxesModelBackend.RequestParameterRequired, self).__init__(
AxesModelBackend.RequestParameterRequired.msg)
def authenticate(self, request, username=None, password=None, **kwargs):
"""
Add django-axes handling and add allow adding errors directly to a passed return_context.
Will never actually authenticate a user, just blocks locked out uses so don't use this as your only back end.
:param request: see ModelBackend.authenticate
:param username: see ModelBackend.authenticate
:param password: see ModelBackend.authenticate
:keyword response_context: context dict that will be returned/used in the response template.
NOTE: will overwrite 'error' field in dict
:param kwargs: see ModelBackend.authenticate
:raises PermissionDenied: if user is already locked out.
:return: Nothing, but will update return_context with lockout message if user is locked out.
"""
def authenticate(self, request, **kwargs):
"""Checks user lock out status and raises PermissionDenied if user is not allowed to log in.
# Create credentials dictionary from username field
credentials = {settings.AXES_USERNAME_FORM_FIELD: username}
Inserts errors directly to `return_context` that is supplied as a keyword argument.
Use this on top of your AUTHENTICATION_BACKENDS list to prevent locked out users
from being authenticated in the standard Django authentication flow.
Note that this method does not log your user in and delegates login to other backends.
:param request: see ModelBackend.authenticate
:param kwargs: see ModelBackend.authenticate
:keyword response_context: context dict that will be updated with error information
:raises PermissionDenied: if user is already locked out
:return: None
"""
if request is None:
raise AxesModelBackend.RequestParameterRequired()
credentials = get_credentials(**kwargs)
if is_already_locked(request, credentials):
# locked out, don't try to authenticate, just update return_context and return
# Its a bit weird to pass a context and expect a response value but its nice to get a "why" back.

View file

@ -20,7 +20,7 @@ from axes.attempts import reset_user_attempts
from axes.models import AccessLog, AccessAttempt
from axes.utils import get_client_str
from axes.utils import query2str
from axes.utils import get_axes_cache, get_client_ip, get_client_username
from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials
log = logging.getLogger(settings.AXES_LOGGER)
@ -128,6 +128,7 @@ def log_user_logged_in(sender, request, user, **kwargs): # pylint: disable=unus
""" When a user logs in, update the access log
"""
username = user.get_username()
credentials = get_credentials(username)
ip_address = get_client_ip(request)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
@ -148,8 +149,6 @@ def log_user_logged_in(sender, request, user, **kwargs): # pylint: disable=unus
)
if settings.AXES_RESET_ON_SUCCESS:
# Create credentials dictionary from the username field
credentials = {settings.AXES_USERNAME_FORM_FIELD: username}
count = reset_user_attempts(request, credentials)
log.info(
'AXES: Deleted %d failed login attempts by %s.',

View file

@ -206,9 +206,41 @@ class AccessAttemptTest(TestCase):
'username': self.VALID_USERNAME,
'password': 'test'
})
credentials = {
'username': self.VALID_USERNAME
}
self.assertEqual(cache_hash_key, get_cache_key(request))
# Getting cache key from AccessAttempt Object
attempt = AccessAttempt(
user_agent='<unknown>',
ip_address=ip_address,
username=self.VALID_USERNAME,
get_data='',
post_data='',
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
failures_since_start=0,
)
self.assertEqual(cache_hash_key, get_cache_key(attempt))
@patch('axes.utils.get_client_ip', return_value='127.0.0.1')
def test_get_cache_key_credentials(self, _):
""" Test the cache key format"""
# Getting cache key from request
ip_address = '127.0.0.1'
cache_hash_key = 'axes-{}'.format(
hashlib.md5(ip_address.encode()).hexdigest()
)
request_factory = RequestFactory()
request = request_factory.post('/admin/login/',
data={
'username': self.VALID_USERNAME,
'password': 'test'
})
# Difference between the upper test: new call signature with credentials
credentials = {'username': self.VALID_USERNAME}
self.assertEqual(cache_hash_key, get_cache_key(request, credentials))

View file

@ -146,7 +146,7 @@ class UtilsTest(TestCase):
self.assertEqual(expected, actual)
@override_settings(AXES_USERNAME_FORM_FIELD='username')
def test_default_get_client_username_from_request(self):
def test_default_get_client_username(self):
expected = 'test-username'
request = HttpRequest()
@ -157,7 +157,7 @@ class UtilsTest(TestCase):
self.assertEqual(expected, actual)
@override_settings(AXES_USERNAME_FORM_FIELD='username')
def test_default_get_client_username_from_credentials(self):
def test_default_get_client_username_credentials(self):
expected = 'test-username'
expected_in_credentials = 'test-credentials-username'
@ -171,12 +171,12 @@ class UtilsTest(TestCase):
self.assertEqual(expected_in_credentials, actual)
def sample_customize_username_from_request(request, credentials):
def sample_customize_username(request):
return 'prefixed-' + request.POST.get('username')
@override_settings(AXES_USERNAME_FORM_FIELD='username')
@override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_from_request)
def test_custom_get_client_username_from_request(self):
@override_settings(AXES_USERNAME_CALLABLE=sample_customize_username)
def test_custom_get_client_username(self):
provided = 'test-username'
expected = 'prefixed-' + provided
@ -187,11 +187,11 @@ class UtilsTest(TestCase):
self.assertEqual(expected, actual)
def sample_customize_username_from_credentials(request, credentials):
def sample_customize_username_credentials(request, credentials):
return 'prefixed-' + credentials.get('username')
@override_settings(AXES_USERNAME_FORM_FIELD='username')
@override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_from_credentials)
@override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_credentials)
def test_custom_get_client_username_from_credentials(self):
provided = 'test-username'
expected = 'prefixed-' + provided

View file

@ -5,6 +5,8 @@ try:
except ImportError:
pass
from inspect import getargspec
from logging import getLogger
from socket import error, inet_pton, AF_INET6
from django.core.cache import caches
@ -15,6 +17,7 @@ import ipware.ip2
from axes.conf import settings
from axes.models import AccessAttempt
logger = getLogger(__name__)
def get_axes_cache():
return caches[getattr(settings, 'AXES_CACHE', 'default')]
@ -70,11 +73,47 @@ def get_client_ip(request):
def get_client_username(request, credentials=None):
"""Resolve client username from the given request or credentials if supplied
The order of preference for fetching the username is as follows:
1. If configured, use `AXES_USERNAME_CALLABLE`, and supply either `request` or `request, credentials` as arguments
depending on the function argument count (multiple signatures are supported for backwards compatibility)
2. If given, use `credentials` and fetch username from `AXES_USERNAME_FORM_FIELD` (defaults to `username`)
3. Use request.POST and fetch username from `AXES_USERNAME_FORM_FIELD` (defaults to `username`)
:param request: incoming Django `HttpRequest` or similar object from authentication backend or other source
:param credentials: incoming credentials `dict` or similar object from authentication backend or other source
"""
if settings.AXES_USERNAME_CALLABLE:
return settings.AXES_USERNAME_CALLABLE(request, credentials)
if credentials is None:
return request.POST.get('username', None)
return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None)
num_args = len(
getargspec(settings.AXES_USERNAME_CALLABLE).args # pylint: disable=deprecated-method
)
if num_args == 2:
logger.debug('Using AXES_USERNAME_CALLABLE for username with two arguments: request, credentials')
return settings.AXES_USERNAME_CALLABLE(request, credentials)
if num_args == 1:
logger.debug('Using AXES_USERNAME_CALLABLE for username with one argument: request')
return settings.AXES_USERNAME_CALLABLE(request)
logger.error('Using AXES_USERNAME_CALLABLE for username failed: wrong number of arguments %s', num_args)
raise TypeError('Wrong number of arguments in function call to AXES_USERNAME_CALLABLE', num_args)
if credentials:
logger.debug('Using `credentials` to get username with key AXES_USERNAME_FORM_FIELD')
return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None)
logger.debug('Using `request.POST` to get username with key AXES_USERNAME_FORM_FIELD')
return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None)
def get_credentials(username=None, **kwargs):
credentials = {settings.AXES_USERNAME_FORM_FIELD: username}
credentials.update(kwargs)
return credentials
def is_ipv6(ip):

View file

@ -124,13 +124,12 @@ These should be defined in your ``settings.py`` file.
Default: ``True``
* ``AXES_USERNAME_FORM_FIELD``: the name of the form field that contains your
users usernames. Default: ``username``
* ``AXES_USERNAME_CALLABLE``: A callable function that takes two arguments:
The request object and A dictionary of keyword arguments containing the user credentials
that were passed to authenticate() or your own custom authentication backend.
Credentials matching a set of sensitive patterns, (including password) are not contained.
The function must return the username.
If no function is supplied, axes just fetches the username from the credentials or requst.POST fields
based on ``AXES_USERNAME_FORM_FIELD``. Default: ``None``
* ``AXES_USERNAME_CALLABLE``: A callable function that takes either one or two arguments:
``AXES_USERNAME_CALLABLE(request)`` or ``AXES_USERNAME_CALLABLE(request, credentials)``.
The ``request`` is a HttpRequest like object and the ``credentials`` is a dictionary like object.
``credentials`` are the ones that were passed to Django ``authenticate()`` in the login flow.
If no function is supplied, axes fetches the username from the ``credentials`` or ``request.POST``
dictionaries based on ``AXES_USERNAME_FORM_FIELD``. Default: ``None``
* ``AXES_PASSWORD_FORM_FIELD``: the name of the form or credentials field that contains your
users password. Default: ``password``
* ``AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP``: If ``True`` prevents the login

View file

@ -196,13 +196,24 @@ into ``my_namespace-username``:
*settings.py:* ::
def sample_username_modifier(request, credentials):
def sample_username_modifier(request):
provided_username = request.POST.get('username')
some_namespace = request.POST.get('namespace')
return '-'.join([some_namespace, provided_username[9:]])
AXES_USERNAME_CALLABLE = sample_username_modifier
# New format that can also be used
# the credentials argument is provided if the
# function signature has two arguments instead of one
def sample_username_modifier_credentials(request, credentials):
provided_username = credentials.get('username')
some_namespace = credentials.get('namespace')
return '-'.join([some_namespace, provided_username[9:]])
AXES_USERNAME_CALLABLE = sample_username_modifier_new
NOTE: You still have to make these modifications yourself before calling
authenticate. If you want to re-use the same function for consistency, that's
fine, but ``axes`` doesn't inject these changes into the authentication flow