django-axes/axes/tests/test_attempt.py
Aleksi Häkli e69d479f6a
Refactor handlers to a more pluggable format
- Define a base handler API with method signatures
- Move proxy handler to a separate path for importability
- Implement a database handler with clean external dependencies
- Change the authentication backend and decorators to use the authentication backend

This enables clean pluggable authentication backend definitions that users
can override and specialize with e.g. cached handlers in their own packages.

Signed-off-by: Aleksi Häkli <aleksi.hakli@iki.fi>
2019-02-22 19:55:57 +02:00

590 lines
20 KiB
Python

import datetime
import hashlib
import random
import string
import time
from unittest.mock import patch, MagicMock
from django.contrib.auth import authenticate
from django.contrib.auth.models import User
from django.http import HttpRequest
from django.test import TestCase, override_settings
from django.test.client import RequestFactory
from django.urls import reverse
from axes.attempts import (
get_cache_key,
get_client_parameters,
get_user_attempts,
)
from axes.conf import settings
from axes.models import AccessAttempt, AccessLog
from axes.signals import user_locked_out
class AccessAttemptTest(TestCase):
"""
Test case using custom settings for testing.
"""
VALID_USERNAME = 'valid-username'
VALID_PASSWORD = 'valid-password'
LOCKED_MESSAGE = 'Account locked: too many login attempts.'
LOGIN_FORM_KEY = '<input type="submit" value="Log in" />'
def _login(self, is_valid_username=False, is_valid_password=False, **kwargs):
"""
Login a user.
A valid credential is used when is_valid_username is True,
otherwise it will use a random string to make a failed login.
"""
if is_valid_username:
# Use a valid username
username = self.VALID_USERNAME
else:
# Generate a wrong random username
chars = string.ascii_uppercase + string.digits
username = ''.join(random.choice(chars) for _ in range(10))
if is_valid_password:
password = self.VALID_PASSWORD
else:
password = 'invalid-password'
post_data = {
'username': username,
'password': password,
'this_is_the_login_form': 1,
}
post_data.update(kwargs)
return self.client.post(
reverse('admin:login'),
post_data,
HTTP_USER_AGENT='test-browser',
)
def setUp(self):
"""
Create a valid user for login.
"""
self.username = self.VALID_USERNAME
self.ip_address = '127.0.0.1'
self.user_agent = 'test-browser'
self.user = User.objects.create_superuser(
username=self.VALID_USERNAME,
email='test@example.com',
password=self.VALID_PASSWORD,
)
def test_failure_limit_once(self):
"""
Test the login lock trying to login one more time than failure limit.
"""
# test until one try before the limit
for _ in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
def test_failure_limit_many(self):
"""
Test the login lock trying to login a lot of times more than failure limit.
"""
for _ in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
# So, we shouldn't have gotten a lock-out yet.
# We should get a locked message each time we try again
for _ in range(random.randrange(1, settings.AXES_FAILURE_LIMIT)):
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
def test_valid_login(self):
"""
Test a valid login for a real username.
"""
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True)
def test_valid_logout(self):
"""
Test a valid logout and make sure the logout_time is updated.
"""
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertEqual(AccessLog.objects.latest('id').logout_time, None)
response = self.client.get(reverse('admin:logout'))
self.assertNotEqual(AccessLog.objects.latest('id').logout_time, None)
self.assertContains(response, 'Logged out')
@override_settings(AXES_COOLOFF_TIME=datetime.timedelta(milliseconds=420))
def test_cool_off_on_login(self):
"""
Test if the cooling time allows a user to login.
"""
self.test_failure_limit_once()
# Wait for the cooling off period
time.sleep(settings.AXES_COOLOFF_TIME.total_seconds())
# It should be possible to login again, make sure it is.
self.test_valid_login()
@override_settings(AXES_COOLOFF_TIME=datetime.timedelta(milliseconds=420))
@patch('axes.attempts.get_axes_cache')
def test_cooling_off_on_get_user_attempts_updates_cache(self, get_cache):
cache = MagicMock()
cache.get.return_value = 1
cache.set.return_value = None
get_cache.return_value = cache
attempt = AccessAttempt.objects.create(
username=self.username,
ip_address=self.ip_address,
user_agent=self.user_agent,
failures_since_start=0,
)
request = HttpRequest()
request.META['REMOTE_ADDR'] = self.ip_address
request.META['HTTP_USER_AGENT'] = self.user_agent
credentials = {'username': self.username}
# Check that the function does nothing if cool off has not passed
cache.get.assert_not_called()
cache.set.assert_not_called()
self.assertEqual(
list(get_user_attempts(request, credentials)),
[attempt],
)
cache.get.assert_not_called()
cache.set.assert_not_called()
time.sleep(settings.AXES_COOLOFF_TIME.total_seconds())
self.assertEqual(
list(get_user_attempts(request, credentials)),
[],
)
self.assertTrue(cache.get.call_count)
self.assertTrue(cache.set.call_count)
def test_long_user_agent_valid(self):
"""
Test if can handle a long user agent.
"""
long_user_agent = 'ie6' * 1024
response = self._login(
is_valid_username=True,
is_valid_password=True,
user_agent=long_user_agent,
)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True)
def test_long_user_agent_not_valid(self):
"""
Test if can handle a long user agent with failure.
"""
long_user_agent = 'ie6' * 1024
for _ in range(settings.AXES_FAILURE_LIMIT + 1):
response = self._login(user_agent=long_user_agent)
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
def test_reset_ip(self):
"""
Test resetting all attempts for an IP address.
"""
# Make a lockout
self.test_failure_limit_once()
# Reset the ip so we can try again
AccessAttempt.objects.filter(ip_address='127.0.0.1').delete()
# Make a login attempt again
self.test_valid_login()
def test_reset_all(self):
"""
Test resetting all attempts.
"""
# Make a lockout
self.test_failure_limit_once()
# Reset all attempts so we can try again
AccessAttempt.objects.all().delete()
# Make a login attempt again
self.test_valid_login()
@override_settings(
AXES_ONLY_USER_FAILURES=True,
)
def test_get_filter_kwargs_user(self):
self.assertEqual(
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'username': self.username},
)
@override_settings(
AXES_ONLY_USER_FAILURES=False,
AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=False,
AXES_USE_USER_AGENT=False,
)
def test_get_filter_kwargs_ip(self):
self.assertEqual(
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'ip_address': self.ip_address},
)
@override_settings(
AXES_ONLY_USER_FAILURES=False,
AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True,
AXES_USE_USER_AGENT=False,
)
def test_get_filter_kwargs_user_and_ip(self):
self.assertEqual(
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'username': self.username, 'ip_address': self.ip_address},
)
@override_settings(
AXES_ONLY_USER_FAILURES=False,
AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=False,
AXES_USE_USER_AGENT=True,
)
def test_get_filter_kwargs_ip_and_agent(self):
self.assertEqual(
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'ip_address': self.ip_address, 'user_agent': self.user_agent},
)
@override_settings(
AXES_ONLY_USER_FAILURES=False,
AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True,
AXES_USE_USER_AGENT=True,
)
def test_get_filter_kwargs_user_ip_agent(self):
self.assertEqual(
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'username': self.username, 'ip_address': self.ip_address, 'user_agent': self.user_agent},
)
@patch('axes.utils.get_client_ip_address', return_value='127.0.0.1')
def test_get_cache_key(self, _):
"""
Test the cache key format.
"""
# Getting cache key from request
ip_address = '127.0.0.1'
cache_hash_key = 'axes-{}'.format(
hashlib.md5(ip_address.encode()).hexdigest()
)
request_factory = RequestFactory()
request = request_factory.post(
'/admin/login/',
data={
'username': self.VALID_USERNAME,
'password': 'test',
},
)
self.assertEqual(cache_hash_key, get_cache_key(request))
# Getting cache key from AccessAttempt Object
attempt = AccessAttempt(
user_agent='<unknown>',
ip_address=ip_address,
username=self.VALID_USERNAME,
get_data='',
post_data='',
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
failures_since_start=0,
)
self.assertEqual(cache_hash_key, get_cache_key(attempt))
@patch('axes.utils.get_client_ip_address', return_value='127.0.0.1')
def test_get_cache_key_credentials(self, _):
"""
Test the cache key format.
"""
# Getting cache key from request
ip_address = '127.0.0.1'
cache_hash_key = 'axes-{}'.format(
hashlib.md5(ip_address.encode()).hexdigest()
)
request_factory = RequestFactory()
request = request_factory.post(
'/admin/login/',
data={
'username': self.VALID_USERNAME,
'password': 'test'
}
)
# Difference between the upper test: new call signature with credentials
credentials = {'username': self.VALID_USERNAME}
self.assertEqual(cache_hash_key, get_cache_key(request, credentials))
# Getting cache key from AccessAttempt Object
attempt = AccessAttempt(
user_agent='<unknown>',
ip_address=ip_address,
username=self.VALID_USERNAME,
get_data='',
post_data='',
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
failures_since_start=0,
)
self.assertEqual(cache_hash_key, get_cache_key(attempt))
def test_send_lockout_signal(self):
"""
Test if the lockout signal is emitted.
"""
# this "hack" is needed so we don't have to use global variables or python3 features
class Scope(object): pass
scope = Scope()
scope.signal_received = 0
def signal_handler(request, username, ip_address, *args, **kwargs): # pylint: disable=unused-argument
scope.signal_received += 1
self.assertIsNotNone(request)
# Connect signal handler
user_locked_out.connect(signal_handler)
# Make a lockout
self.test_failure_limit_once()
self.assertEqual(scope.signal_received, 1)
AccessAttempt.objects.all().delete()
# Make another lockout
self.test_failure_limit_once()
self.assertEqual(scope.signal_received, 2)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_combination_user_and_ip(self):
"""
Test login failure when AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP is True.
"""
# test until one try before the limit
for _ in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login(
is_valid_username=True,
is_valid_password=False,
)
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login(is_valid_username=True, is_valid_password=False)
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_only(self):
"""
Test login failure when AXES_ONLY_USER_FAILURES is True.
"""
# test until one try before the limit
for _ in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login(
is_valid_username=True,
is_valid_password=False,
)
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login(is_valid_username=True, is_valid_password=False)
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
# reset the username only and make sure we can log in now even though
# our IP has failed each time
AccessAttempt.objects.filter(username=self.VALID_USERNAME).delete()
response = self._login(
is_valid_username=True,
is_valid_password=True,
)
# Check if we are still in the login page
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True)
# now create failure_limit + 1 failed logins and then we should still
# be able to login with valid_username
for _ in range(settings.AXES_FAILURE_LIMIT):
response = self._login(
is_valid_username=False,
is_valid_password=False,
)
# Check if we can still log in with valid user
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True)
def test_log_data_truncated(self):
"""
Test that get_query_str properly truncates data to the max_length (default 1024).
"""
# An impossibly large post dict
extra_data = {string.ascii_letters * x: x for x in range(0, 1000)}
self._login(**extra_data)
self.assertEqual(
len(AccessAttempt.objects.latest('id').post_data), 1024
)
@override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True)
def test_valid_logout_without_success_log(self):
AccessLog.objects.all().delete()
response = self._login(is_valid_username=True, is_valid_password=True)
response = self.client.get(reverse('admin:logout'))
self.assertEqual(AccessLog.objects.all().count(), 0)
self.assertContains(response, 'Logged out', html=True)
@override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True)
def test_valid_login_without_success_log(self):
"""
Test that a valid login does not generate an AccessLog when DISABLE_SUCCESS_ACCESS_LOG is True.
"""
AccessLog.objects.all().delete()
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertEqual(response.status_code, 302)
self.assertEqual(AccessLog.objects.all().count(), 0)
@override_settings(AXES_DISABLE_ACCESS_LOG=True)
def test_valid_logout_without_log(self):
AccessLog.objects.all().delete()
response = self._login(is_valid_username=True, is_valid_password=True)
response = self.client.get(reverse('admin:logout'))
self.assertEqual(AccessLog.objects.first().logout_time, None)
self.assertContains(response, 'Logged out', html=True)
@override_settings(AXES_DISABLE_ACCESS_LOG=True)
def test_non_valid_login_without_log(self):
"""
Test that a non-valid login does generate an AccessLog when DISABLE_ACCESS_LOG is True.
"""
AccessLog.objects.all().delete()
response = self._login(is_valid_username=True, is_valid_password=False)
self.assertEqual(response.status_code, 200)
self.assertEqual(AccessLog.objects.all().count(), 0)
@override_settings(AXES_DISABLE_ACCESS_LOG=True)
def test_check_is_not_made_on_GET(self):
AccessLog.objects.all().delete()
response = self.client.get(reverse('admin:login'))
self.assertEqual(response.status_code, 200)
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertEqual(response.status_code, 302)
response = self.client.get(reverse('admin:index'))
self.assertEqual(response.status_code, 200)
def test_custom_authentication_backend(self):
"""
Test that log_user_login_failed skips if an attempt to authenticate with a custom authentication backend fails.
"""
request = HttpRequest()
request.META['REMOTE_ADDR'] = '127.0.0.1'
authenticate(request=request, foo='bar')
self.assertEqual(AccessLog.objects.all().count(), 0)
def _assert_resets_on_success(self):
"""
Sets the AXES_RESET_ON_SUCCESS up for testing.
"""
# test until one try before the limit
for _ in range(settings.AXES_FAILURE_LIMIT - 1):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
# Perform a valid login
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True)
return self._login()
# by default, AXES_RESET_ON_SUCCESS = False
def test_reset_on_success_default(self):
"""
Test that the failure attempts does not reset after one successful attempt by default.
"""
response = self._assert_resets_on_success()
# So, we shouldn't have found a lock-out yet.
# But we should find one now
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
@override_settings(AXES_RESET_ON_SUCCESS=True)
def test_reset_on_success(self):
"""
Test that the failure attempts resets after one successful attempt when using the corresponding setting.
"""
response = self._assert_resets_on_success()
# So, we shouldn't have found a lock-out yet.
# And we shouldn't find one now
self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
for _ in range(settings.AXES_FAILURE_LIMIT - 2):
response = self._login()
# Check if we are on the same login page.
self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
# But we should find one now
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)