Merge pull request #377 from mastacheata/bugfix/358-credentials-in-favor-of-request

Bugfix #358 credentials in favor of request
This commit is contained in:
Aleksi Häkli 2018-12-22 19:30:17 +01:00 committed by GitHub
commit 34f9322367
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 86 additions and 35 deletions

View file

@ -11,12 +11,12 @@ from axes.models import AccessAttempt
from axes.utils import get_axes_cache, get_client_ip, get_client_username
def _query_user_attempts(request):
def _query_user_attempts(request, credentials):
"""Returns access attempt record if it exists.
Otherwise return None.
"""
ip = get_client_ip(request)
username = get_client_username(request)
username = get_client_username(request, credentials)
if settings.AXES_ONLY_USER_FAILURES:
attempts = AccessAttempt.objects.filter(username=username)
@ -49,10 +49,11 @@ def _query_user_attempts(request):
return attempts
def get_cache_key(request_or_obj):
def get_cache_key(request_or_obj, credentials=None):
"""
Build cache key name from request or AccessAttempt object.
:param request_or_obj: Request or AccessAttempt object
:param credentials: Dictionary with access credentials - Only supplied when request_or_obj is not an AccessAttempt
:return cache-key: String, key to be used in cache system
"""
if isinstance(request_or_obj, AccessAttempt):
@ -61,8 +62,8 @@ def get_cache_key(request_or_obj):
ua = request_or_obj.user_agent
else:
ip = get_client_ip(request_or_obj)
un = request_or_obj.POST.get(settings.AXES_USERNAME_FORM_FIELD, None)
ua = request_or_obj.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
un = get_client_username(request_or_obj, credentials)
ip = ip.encode('utf-8') if ip else ''.encode('utf-8')
un = un.encode('utf-8') if un else ''.encode('utf-8')
@ -96,10 +97,10 @@ def get_cache_timeout():
return cache_timeout
def get_user_attempts(request):
def get_user_attempts(request, credentials):
force_reload = False
attempts = _query_user_attempts(request)
cache_hash_key = get_cache_key(request)
attempts = _query_user_attempts(request, credentials)
cache_hash_key = get_cache_key(request, credentials)
cache_timeout = get_cache_timeout()
cool_off = settings.AXES_COOLOFF_TIME
@ -125,13 +126,13 @@ def get_user_attempts(request):
# If objects were deleted, we need to update the queryset to reflect this,
# so force a reload.
if force_reload:
attempts = _query_user_attempts(request)
attempts = _query_user_attempts(request, credentials)
return attempts
def reset_user_attempts(request):
attempts = _query_user_attempts(request)
def reset_user_attempts(request, credentials):
attempts = _query_user_attempts(request, credentials)
count, _ = attempts.delete()
return count
@ -151,7 +152,7 @@ def ip_in_blacklist(ip):
return ip in settings.AXES_IP_BLACKLIST
def is_user_lockable(request):
def is_user_lockable(request, credentials):
"""Check if the user has a profile with nolockout
If so, then return the value to see if this user is special
and doesn't get their account locked out
@ -165,7 +166,7 @@ def is_user_lockable(request):
try:
field = getattr(get_user_model(), 'USERNAME_FIELD', 'username')
kwargs = {
field: get_client_username(request)
field: get_client_username(request, credentials)
}
user = get_user_model().objects.get(**kwargs)
@ -182,7 +183,7 @@ def is_user_lockable(request):
return True
def is_already_locked(request):
def is_already_locked(request, credentials=None):
ip = get_client_ip(request)
if (
@ -200,10 +201,10 @@ def is_already_locked(request):
if ip_in_blacklist(ip):
return True
if not is_user_lockable(request):
if not is_user_lockable(request, credentials):
return False
cache_hash_key = get_cache_key(request)
cache_hash_key = get_cache_key(request, credentials)
failures_cached = get_axes_cache().get(cache_hash_key)
if failures_cached is not None:
return (
@ -211,7 +212,7 @@ def is_already_locked(request):
settings.AXES_LOCK_OUT_AT_FAILURE
)
for attempt in get_user_attempts(request):
for attempt in get_user_attempts(request, credentials):
if (
attempt.failures_since_start >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE

View file

@ -4,6 +4,7 @@ from django.contrib.auth.backends import ModelBackend
from django.core.exceptions import PermissionDenied
from axes.attempts import is_already_locked
from axes.conf import settings
from axes.utils import get_lockout_message
@ -30,10 +31,13 @@ class AxesModelBackend(ModelBackend):
:return: Nothing, but will update return_context with lockout message if user is locked out.
"""
# Create credentials dictionary from username field
credentials = {settings.AXES_USERNAME_FORM_FIELD: username}
if request is None:
raise AxesModelBackend.RequestParameterRequired()
if is_already_locked(request):
if is_already_locked(request, credentials):
# locked out, don't try to authenticate, just update return_context and return
# Its a bit weird to pass a context and expect a response value but its nice to get a "why" back.
error_msg = get_lockout_message()

View file

@ -38,7 +38,7 @@ def log_user_login_failed(sender, credentials, request, **kwargs): # pylint: di
return
ip_address = get_client_ip(request)
username = get_client_username(request)
username = get_client_username(request, credentials)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
@ -47,8 +47,8 @@ def log_user_login_failed(sender, credentials, request, **kwargs): # pylint: di
return
failures = 0
attempts = get_user_attempts(request)
cache_hash_key = get_cache_key(request)
attempts = get_user_attempts(request, credentials)
cache_hash_key = get_cache_key(request, credentials)
cache_timeout = get_cache_timeout()
failures_cached = get_axes_cache().get(cache_hash_key)
@ -110,7 +110,7 @@ def log_user_login_failed(sender, credentials, request, **kwargs): # pylint: di
if (
failures >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE and
is_user_lockable(request)
is_user_lockable(request, credentials)
):
log.warning(
'AXES: locked out %s after repeated login attempts.',
@ -148,7 +148,9 @@ def log_user_logged_in(sender, request, user, **kwargs): # pylint: disable=unus
)
if settings.AXES_RESET_ON_SUCCESS:
count = reset_user_attempts(request)
# Create credentials dictionary from the username field
credentials = {settings.AXES_USERNAME_FORM_FIELD: username}
count = reset_user_attempts(request, credentials)
log.info(
'AXES: Deleted %d failed login attempts by %s.',
count,

View file

@ -206,8 +206,11 @@ class AccessAttemptTest(TestCase):
'username': self.VALID_USERNAME,
'password': 'test'
})
credentials = {
'username': self.VALID_USERNAME
}
self.assertEqual(cache_hash_key, get_cache_key(request))
self.assertEqual(cache_hash_key, get_cache_key(request, credentials))
# Getting cache key from AccessAttempt Object
attempt = AccessAttempt(

View file

@ -146,7 +146,7 @@ class UtilsTest(TestCase):
self.assertEqual(expected, actual)
@override_settings(AXES_USERNAME_FORM_FIELD='username')
def test_default_get_client_username(self):
def test_default_get_client_username_from_request(self):
expected = 'test-username'
request = HttpRequest()
@ -156,12 +156,27 @@ class UtilsTest(TestCase):
self.assertEqual(expected, actual)
def sample_customize_username(request):
@override_settings(AXES_USERNAME_FORM_FIELD='username')
def test_default_get_client_username_from_credentials(self):
expected = 'test-username'
expected_in_credentials = 'test-credentials-username'
request = HttpRequest()
request.POST['username'] = expected
credentials = {
'username': expected_in_credentials
}
actual = get_client_username(request, credentials)
self.assertEqual(expected_in_credentials, actual)
def sample_customize_username_from_request(request, credentials):
return 'prefixed-' + request.POST.get('username')
@override_settings(AXES_USERNAME_FORM_FIELD='username')
@override_settings(AXES_USERNAME_CALLABLE=sample_customize_username)
def test_custom_get_client_username(self):
@override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_from_request)
def test_custom_get_client_username_from_request(self):
provided = 'test-username'
expected = 'prefixed-' + provided
@ -171,3 +186,22 @@ class UtilsTest(TestCase):
actual = get_client_username(request)
self.assertEqual(expected, actual)
def sample_customize_username_from_credentials(request, credentials):
return 'prefixed-' + credentials.get('username')
@override_settings(AXES_USERNAME_FORM_FIELD='username')
@override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_from_credentials)
def test_custom_get_client_username_from_credentials(self):
provided = 'test-username'
expected = 'prefixed-' + provided
provided_in_credentials = 'test-username'
expected_in_credentials = 'prefixed-' + provided_in_credentials
request = HttpRequest()
request.POST['username'] = provided
credentials = {'username': provided_in_credentials}
actual = get_client_username(request, credentials)
self.assertEqual(expected_in_credentials, actual)

View file

@ -69,10 +69,12 @@ def get_client_ip(request):
return getattr(request, client_ip_attribute)
def get_client_username(request):
def get_client_username(request, credentials=None):
if settings.AXES_USERNAME_CALLABLE:
return settings.AXES_USERNAME_CALLABLE(request)
return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None)
return settings.AXES_USERNAME_CALLABLE(request, credentials)
if credentials is None:
return request.POST.get('username', None)
return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None)
def is_ipv6(ip):

View file

@ -124,10 +124,14 @@ These should be defined in your ``settings.py`` file.
Default: ``True``
* ``AXES_USERNAME_FORM_FIELD``: the name of the form field that contains your
users usernames. Default: ``username``
* ``AXES_USERNAME_CALLABLE``: A callable function that takes a request object
and returns the username. If empty, axes just fetches it from the POST body
* ``AXES_USERNAME_CALLABLE``: A callable function that takes two arguments:
The request object and A dictionary of keyword arguments containing the user credentials
that were passed to authenticate() or your own custom authentication backend.
Credentials matching a set of sensitive patterns, (including password) are not contained.
The function must return the username.
If no function is supplied, axes just fetches the username from the credentials or requst.POST fields
based on ``AXES_USERNAME_FORM_FIELD``. Default: ``None``
* ``AXES_PASSWORD_FORM_FIELD``: the name of the form field that contains your
* ``AXES_PASSWORD_FORM_FIELD``: the name of the form or credentials field that contains your
users password. Default: ``password``
* ``AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP``: If ``True`` prevents the login
from IP under a particular user if the attempt limit has been exceeded,

View file

@ -187,7 +187,8 @@ removing client-set prefixes. In these cases, ``axes`` needs to know how to make
these changes so that it can correctly identify the user without any form
cleaning or validation. This is where the ``AXES_USERNAME_CALLABLE`` setting
comes in. You can define how to make these modifications in a callable that
takes a request object, and provide that callable to ``axes`` via this setting.
takes a request object and a credentials dictionary,
and provide that callable to ``axes`` via this setting.
For example, a function like this could take a post body with something like
``username='prefixed-username'`` and ``namespace=my_namespace`` and turn it
@ -195,7 +196,7 @@ into ``my_namespace-username``:
*settings.py:* ::
def sample_username_modifier(request):
def sample_username_modifier(request, credentials):
provided_username = request.POST.get('username')
some_namespace = request.POST.get('namespace')
return '-'.join([some_namespace, provided_username[9:]])