mirror of
https://github.com/jazzband/django-axes.git
synced 2026-03-16 22:30:23 +00:00
Refactor attempts and add NEVER_LOCKOUT_GET flag
- Move cache and cool off utility functions to the axes.utils module - Clean up axes.attempts duplicate code in attempt and cache calculations - Add stable implementation for AccessAttempt attribute calculation in the axes.attempts.get_filter_kwargs function Fixes #398 Signed-off-by: Aleksi Häkli <aleksi.hakli@iki.fi>
This commit is contained in:
parent
77103c42fe
commit
cd56631865
7 changed files with 390 additions and 111 deletions
|
|
@ -32,6 +32,10 @@ Changes
|
|||
- Improve tests and raise Codecov monitoring threshold to 90%.
|
||||
[aleksihakli]
|
||||
|
||||
- Add ``AXES_NEVER_LOCKOUT_GET`` flag that allows skipping checks on GET requests.
|
||||
[aleksihakli]
|
||||
|
||||
|
||||
4.5.4 (2019-01-15)
|
||||
------------------
|
||||
|
||||
|
|
|
|||
210
axes/attempts.py
210
axes/attempts.py
|
|
@ -1,119 +1,100 @@
|
|||
from datetime import timedelta
|
||||
from collections import OrderedDict
|
||||
from hashlib import md5
|
||||
from logging import getLogger
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
from django.db.models import QuerySet
|
||||
from django.http import HttpRequest
|
||||
from django.utils import timezone
|
||||
|
||||
from axes.conf import settings
|
||||
from axes.models import AccessAttempt
|
||||
from axes.utils import get_axes_cache, get_client_ip, get_client_username
|
||||
from axes.utils import (
|
||||
get_axes_cache,
|
||||
get_client_ip,
|
||||
get_client_username,
|
||||
get_client_user_agent,
|
||||
get_cache_timeout,
|
||||
get_cool_off,
|
||||
)
|
||||
|
||||
log = getLogger(settings.AXES_LOGGER)
|
||||
|
||||
|
||||
def _query_user_attempts(request, credentials=None):
|
||||
def get_filter_kwargs(username: str, ip_address: str, user_agent: str) -> OrderedDict:
|
||||
"""
|
||||
Return access attempt record if it exists. Otherwise return None.
|
||||
Get query parameters for filtering AccessAttempt queryset.
|
||||
|
||||
This method returns an OrderedDict that guarantees iteration order for keys and values,
|
||||
and can so be used in e.g. the generation of hash keys or other deterministic functions.
|
||||
"""
|
||||
|
||||
ip = get_client_ip(request)
|
||||
username = get_client_username(request, credentials)
|
||||
query = OrderedDict() # type: OrderedDict
|
||||
|
||||
if settings.AXES_ONLY_USER_FAILURES:
|
||||
attempts = AccessAttempt.objects.filter(username=username)
|
||||
elif settings.AXES_USE_USER_AGENT:
|
||||
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
|
||||
attempts = AccessAttempt.objects.filter(
|
||||
user_agent=ua, ip_address=ip, username=username
|
||||
)
|
||||
query['username'] = username
|
||||
else:
|
||||
attempts = AccessAttempt.objects.filter(
|
||||
ip_address=ip, username=username
|
||||
)
|
||||
|
||||
if not attempts:
|
||||
params = {}
|
||||
|
||||
if settings.AXES_ONLY_USER_FAILURES:
|
||||
params['username'] = username
|
||||
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
|
||||
params['username'] = username
|
||||
params['ip_address'] = ip
|
||||
if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
|
||||
query['username'] = username
|
||||
query['ip_address'] = ip_address
|
||||
else:
|
||||
params['ip_address'] = ip
|
||||
query['ip_address'] = ip_address
|
||||
|
||||
if settings.AXES_USE_USER_AGENT and not settings.AXES_ONLY_USER_FAILURES:
|
||||
params['user_agent'] = ua
|
||||
if settings.AXES_USE_USER_AGENT:
|
||||
query['user_agent'] = user_agent
|
||||
|
||||
attempts = AccessAttempt.objects.filter(**params)
|
||||
|
||||
return attempts
|
||||
return query
|
||||
|
||||
|
||||
def get_cache_key(request_or_obj, credentials=None) -> str:
|
||||
def query_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet:
|
||||
"""
|
||||
Return a queryset of AccessAttempts that match the given request and credentials.
|
||||
"""
|
||||
|
||||
username = get_client_username(request, credentials)
|
||||
ip_address = get_client_ip(request)
|
||||
user_agent = get_client_user_agent(request)
|
||||
|
||||
filter_kwargs = get_filter_kwargs(username, ip_address, user_agent)
|
||||
|
||||
return AccessAttempt.objects.filter(**filter_kwargs)
|
||||
|
||||
|
||||
def get_cache_key(request_or_attempt, credentials: dict = None) -> str:
|
||||
"""
|
||||
Build cache key name from request or AccessAttempt object.
|
||||
|
||||
:param request_or_obj: Request or AccessAttempt object
|
||||
:return cache-key: String, key to be used in cache system
|
||||
:param request_or_attempt: HttpRequest or AccessAttempt object
|
||||
:param credentials: credentials containing user information
|
||||
:return cache_key: Hash key that is usable for Django cache backends
|
||||
"""
|
||||
|
||||
if isinstance(request_or_obj, AccessAttempt):
|
||||
ip = request_or_obj.ip_address
|
||||
un = request_or_obj.username
|
||||
ua = request_or_obj.user_agent
|
||||
if isinstance(request_or_attempt, AccessAttempt):
|
||||
username = request_or_attempt.username
|
||||
ip_address = request_or_attempt.ip_address
|
||||
user_agent = request_or_attempt.user_agent
|
||||
else:
|
||||
ip = get_client_ip(request_or_obj)
|
||||
un = get_client_username(request_or_obj, credentials)
|
||||
ua = request_or_obj.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
|
||||
username = get_client_username(request_or_attempt, credentials)
|
||||
ip_address = get_client_ip(request_or_attempt)
|
||||
user_agent = get_client_user_agent(request_or_attempt)
|
||||
|
||||
ip = ip.encode('utf-8') if ip else ''.encode('utf-8')
|
||||
un = un.encode('utf-8') if un else ''.encode('utf-8')
|
||||
ua = ua.encode('utf-8') if ua else ''.encode('utf-8')
|
||||
filter_kwargs = get_filter_kwargs(username, ip_address, user_agent)
|
||||
|
||||
if settings.AXES_ONLY_USER_FAILURES:
|
||||
attributes = un
|
||||
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
|
||||
attributes = ip + un
|
||||
else:
|
||||
attributes = ip
|
||||
cache_key_components = ''.join(filter_kwargs.values())
|
||||
cache_key_digest = md5(cache_key_components.encode()).hexdigest()
|
||||
cache_key = 'axes-{}'.format(cache_key_digest)
|
||||
|
||||
if settings.AXES_USE_USER_AGENT and not settings.AXES_ONLY_USER_FAILURES:
|
||||
attributes += ua
|
||||
|
||||
cache_hash_key = 'axes-{}'.format(md5(attributes).hexdigest())
|
||||
|
||||
return cache_hash_key
|
||||
return cache_key
|
||||
|
||||
|
||||
def get_cache_timeout():
|
||||
"""
|
||||
Return timeout according to COOLOFF_TIME.
|
||||
"""
|
||||
|
||||
cache_timeout = None
|
||||
cool_off = settings.AXES_COOLOFF_TIME
|
||||
if cool_off:
|
||||
if isinstance(cool_off, (int, float)):
|
||||
cache_timeout = timedelta(hours=cool_off).total_seconds()
|
||||
else:
|
||||
cache_timeout = cool_off.total_seconds()
|
||||
|
||||
return cache_timeout
|
||||
|
||||
|
||||
def get_user_attempts(request, credentials=None):
|
||||
def get_user_attempts(request: HttpRequest, credentials: dict = None):
|
||||
force_reload = False
|
||||
attempts = _query_user_attempts(request, credentials)
|
||||
attempts = query_user_attempts(request, credentials)
|
||||
cache_hash_key = get_cache_key(request, credentials)
|
||||
cache_timeout = get_cache_timeout()
|
||||
cool_off = get_cool_off()
|
||||
|
||||
cool_off = settings.AXES_COOLOFF_TIME
|
||||
if cool_off:
|
||||
if isinstance(cool_off, (int, float)):
|
||||
cool_off = timedelta(hours=cool_off)
|
||||
|
||||
for attempt in attempts:
|
||||
if attempt.attempt_time + cool_off < timezone.now():
|
||||
attempt.delete()
|
||||
|
|
@ -127,36 +108,61 @@ def get_user_attempts(request, credentials=None):
|
|||
# If objects were deleted, we need to update the queryset to reflect this,
|
||||
# so force a reload.
|
||||
if force_reload:
|
||||
attempts = _query_user_attempts(request, credentials)
|
||||
attempts = query_user_attempts(request, credentials)
|
||||
|
||||
return attempts
|
||||
|
||||
|
||||
def reset_user_attempts(request, credentials=None) -> int:
|
||||
attempts = _query_user_attempts(request, credentials)
|
||||
def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int:
|
||||
attempts = query_user_attempts(request, credentials)
|
||||
count, _ = attempts.delete()
|
||||
|
||||
return count
|
||||
|
||||
|
||||
def ip_in_whitelist(ip) -> bool:
|
||||
def ip_in_whitelist(ip: str) -> bool:
|
||||
if not settings.AXES_IP_WHITELIST:
|
||||
return False
|
||||
|
||||
return ip in settings.AXES_IP_WHITELIST
|
||||
|
||||
|
||||
def ip_in_blacklist(ip) -> bool:
|
||||
def ip_in_blacklist(ip: str) -> bool:
|
||||
if not settings.AXES_IP_BLACKLIST:
|
||||
return False
|
||||
|
||||
return ip in settings.AXES_IP_BLACKLIST
|
||||
|
||||
|
||||
def is_user_lockable(request, credentials=None) -> bool:
|
||||
if request.method != 'POST':
|
||||
def is_ip_blacklisted(request: HttpRequest) -> bool:
|
||||
"""
|
||||
Check if the given request refers to a blacklisted IP.
|
||||
"""
|
||||
|
||||
ip = get_client_ip(request)
|
||||
|
||||
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip):
|
||||
return False
|
||||
|
||||
if settings.AXES_ONLY_WHITELIST and not ip_in_whitelist(ip):
|
||||
return True
|
||||
|
||||
if ip_in_blacklist(ip):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def is_user_lockable(request: HttpRequest, credentials: dict = None) -> bool:
|
||||
"""
|
||||
Check if the given request or credentials refer to a whitelisted user object
|
||||
|
||||
A whitelisted user has the magic ``nolockout`` property set.
|
||||
|
||||
If the property is unknown or False or the user can not be found,
|
||||
this implementation fails gracefully and returns True.
|
||||
"""
|
||||
|
||||
username_field = getattr(get_user_model(), 'USERNAME_FIELD', 'username')
|
||||
username_value = get_client_username(request, credentials)
|
||||
kwargs = {
|
||||
|
|
@ -165,33 +171,39 @@ def is_user_lockable(request, credentials=None) -> bool:
|
|||
|
||||
UserModel = get_user_model()
|
||||
|
||||
# Special users with nolockout attribute set should never be locked out
|
||||
try:
|
||||
user = UserModel.objects.get(**kwargs)
|
||||
return not user.nolockout
|
||||
except (UserModel.DoesNotExist, AttributeError):
|
||||
pass
|
||||
|
||||
# Default case is that users can be locked out
|
||||
return True
|
||||
|
||||
|
||||
def is_already_locked(request, credentials=None) -> bool:
|
||||
ip = get_client_ip(request)
|
||||
def is_already_locked(request: HttpRequest, credentials: dict = None) -> bool:
|
||||
"""
|
||||
Check if the request or given credentials are already locked by Axes.
|
||||
|
||||
if (
|
||||
settings.AXES_ONLY_USER_FAILURES or
|
||||
settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP
|
||||
) and request.method == 'GET':
|
||||
This function is called from
|
||||
|
||||
- function decorators defined in ``axes.decorators``,
|
||||
- authentication backends defined in ``axes.backends``, and
|
||||
- signal handlers defined in ``axes.handlers``.
|
||||
|
||||
This function checks the following facts for a given request:
|
||||
|
||||
1. Is the request HTTP method _whitelisted_? If it is, return ``False``.
|
||||
2. Is the request IP address _blacklisted_? If it is, return ``True``.
|
||||
3. Does the request or given credentials refer to a whitelisted user? If it does, return ``False``.
|
||||
4. Does the request exceed the configured maximum attempt limit? If it does, return ``True``.
|
||||
|
||||
Refer to the function source code for the exact implementation.
|
||||
"""
|
||||
|
||||
if settings.AXES_NEVER_LOCKOUT_GET and request.method == 'GET':
|
||||
return False
|
||||
|
||||
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip):
|
||||
return False
|
||||
|
||||
if settings.AXES_ONLY_WHITELIST and not ip_in_whitelist(ip):
|
||||
return True
|
||||
|
||||
if ip_in_blacklist(ip):
|
||||
if is_ip_blacklisted(request):
|
||||
return True
|
||||
|
||||
if not is_user_lockable(request, credentials):
|
||||
|
|
|
|||
|
|
@ -51,6 +51,8 @@ class MyAppConf(AppConf):
|
|||
# TODO: convert the strings to IPv4 on startup to avoid type conversion during processing
|
||||
NEVER_LOCKOUT_WHITELIST = False
|
||||
|
||||
NEVER_LOCKOUT_GET = False
|
||||
|
||||
ONLY_WHITELIST = False
|
||||
|
||||
IP_WHITELIST = None
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@ import hashlib
|
|||
import random
|
||||
import string
|
||||
import time
|
||||
from unittest.mock import patch
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from django.contrib.auth import authenticate, get_user_model
|
||||
from django.contrib.auth.models import User
|
||||
|
|
@ -12,7 +12,16 @@ from django.test import TestCase, override_settings
|
|||
from django.test.client import RequestFactory
|
||||
from django.urls import reverse
|
||||
|
||||
from axes.attempts import get_cache_key, is_user_lockable
|
||||
from axes.attempts import (
|
||||
get_cache_key,
|
||||
get_cache_timeout,
|
||||
is_already_locked,
|
||||
ip_in_blacklist,
|
||||
ip_in_whitelist,
|
||||
is_user_lockable,
|
||||
get_filter_kwargs,
|
||||
get_user_attempts,
|
||||
)
|
||||
from axes.conf import settings
|
||||
from axes.models import AccessAttempt, AccessLog
|
||||
from axes.signals import user_locked_out
|
||||
|
|
@ -72,6 +81,10 @@ class AccessAttemptTest(TestCase):
|
|||
Create a valid user for login.
|
||||
"""
|
||||
|
||||
self.username = self.VALID_USERNAME
|
||||
self.ip_address = '127.0.0.1'
|
||||
self.user_agent = 'test-browser'
|
||||
|
||||
self.user = User.objects.create_superuser(
|
||||
username=self.VALID_USERNAME,
|
||||
email='test@example.com',
|
||||
|
|
@ -206,6 +219,100 @@ class AccessAttemptTest(TestCase):
|
|||
# Make a login attempt again
|
||||
self.test_valid_login()
|
||||
|
||||
@override_settings(
|
||||
AXES_ONLY_USER_FAILURES=True,
|
||||
)
|
||||
def test_get_filter_kwargs_user(self):
|
||||
self.assertEqual(
|
||||
dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)),
|
||||
{'username': self.username},
|
||||
)
|
||||
|
||||
@override_settings(
|
||||
AXES_ONLY_USER_FAILURES=False,
|
||||
AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=False,
|
||||
AXES_USE_USER_AGENT=False,
|
||||
)
|
||||
def test_get_filter_kwargs_ip(self):
|
||||
self.assertEqual(
|
||||
dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)),
|
||||
{'ip_address': self.ip_address},
|
||||
)
|
||||
|
||||
@override_settings(
|
||||
AXES_ONLY_USER_FAILURES=False,
|
||||
AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True,
|
||||
AXES_USE_USER_AGENT=False,
|
||||
)
|
||||
def test_get_filter_kwargs_user_and_ip(self):
|
||||
self.assertEqual(
|
||||
dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)),
|
||||
{'username': self.username, 'ip_address': self.ip_address},
|
||||
)
|
||||
|
||||
@override_settings(
|
||||
AXES_ONLY_USER_FAILURES=False,
|
||||
AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=False,
|
||||
AXES_USE_USER_AGENT=True,
|
||||
)
|
||||
def test_get_filter_kwargs_ip_and_agent(self):
|
||||
self.assertEqual(
|
||||
dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)),
|
||||
{'ip_address': self.ip_address, 'user_agent': self.user_agent},
|
||||
)
|
||||
|
||||
@override_settings(
|
||||
AXES_ONLY_USER_FAILURES=False,
|
||||
AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True,
|
||||
AXES_USE_USER_AGENT=True,
|
||||
)
|
||||
def test_get_filter_kwargs_user_ip_agent(self):
|
||||
self.assertEqual(
|
||||
dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)),
|
||||
{'username': self.username, 'ip_address': self.ip_address, 'user_agent': self.user_agent},
|
||||
)
|
||||
|
||||
@patch('axes.attempts.get_axes_cache')
|
||||
def test_get_user_attempts_updates_cache(self, get_cache):
|
||||
cache = MagicMock()
|
||||
cache.get.return_value = 1
|
||||
cache.set.return_value = None
|
||||
get_cache.return_value = cache
|
||||
|
||||
attempt = AccessAttempt.objects.create(
|
||||
username=self.username,
|
||||
ip_address=self.ip_address,
|
||||
user_agent=self.user_agent,
|
||||
failures_since_start=0,
|
||||
)
|
||||
|
||||
request = HttpRequest()
|
||||
request.META['REMOTE_ADDR'] = self.ip_address
|
||||
request.META['HTTP_USER_AGENT'] = self.user_agent
|
||||
credentials = {'username': self.username}
|
||||
|
||||
# Check that the function does nothing if cool off has not passed
|
||||
cache.get.assert_not_called()
|
||||
cache.set.assert_not_called()
|
||||
|
||||
self.assertEqual(
|
||||
list(get_user_attempts(request, credentials)),
|
||||
[attempt],
|
||||
)
|
||||
|
||||
cache.get.assert_not_called()
|
||||
cache.set.assert_not_called()
|
||||
|
||||
time.sleep(settings.AXES_COOLOFF_TIME.seconds)
|
||||
|
||||
self.assertEqual(
|
||||
list(get_user_attempts(request, credentials)),
|
||||
[],
|
||||
)
|
||||
|
||||
cache.get.assert_called_once()
|
||||
cache.set.assert_called_once()
|
||||
|
||||
@patch('axes.utils.get_client_ip', return_value='127.0.0.1')
|
||||
def test_get_cache_key(self, _):
|
||||
"""
|
||||
|
|
@ -444,7 +551,7 @@ class AccessAttemptTest(TestCase):
|
|||
"""
|
||||
|
||||
request = HttpRequest()
|
||||
request.user = self.user
|
||||
request.META['REMOTE_ADDR'] = '127.0.0.1'
|
||||
authenticate(request=request, foo='bar')
|
||||
self.assertEqual(AccessLog.objects.all().count(), 0)
|
||||
|
||||
|
|
@ -497,23 +604,93 @@ class AccessAttemptTest(TestCase):
|
|||
response = self._login()
|
||||
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
|
||||
|
||||
@patch('axes.attempts.log')
|
||||
def test_is_user_lockable(self, log):
|
||||
request = HttpRequest()
|
||||
request.method = 'POST'
|
||||
|
||||
class AttemptUtilsTestCase(TestCase):
|
||||
def setUp(self):
|
||||
self.request = HttpRequest()
|
||||
self.request.method = 'POST'
|
||||
self.request.META['REMOTE_ADDR'] = '127.0.0.1'
|
||||
|
||||
@override_settings(AXES_IP_WHITELIST=None)
|
||||
def test_ip_in_whitelist_none(self):
|
||||
self.assertFalse(ip_in_whitelist('127.0.0.2'))
|
||||
|
||||
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
|
||||
def test_ip_in_whitelist(self):
|
||||
self.assertTrue(ip_in_whitelist('127.0.0.1'))
|
||||
self.assertFalse(ip_in_whitelist('127.0.0.2'))
|
||||
|
||||
@override_settings(AXES_IP_BLACKLIST=None)
|
||||
def test_ip_in_blacklist_none(self):
|
||||
self.assertFalse(ip_in_blacklist('127.0.0.2'))
|
||||
|
||||
@override_settings(AXES_IP_BLACKLIST=['127.0.0.1'])
|
||||
def test_ip_in_blacklist(self):
|
||||
self.assertTrue(ip_in_blacklist('127.0.0.1'))
|
||||
self.assertFalse(ip_in_blacklist('127.0.0.2'))
|
||||
|
||||
@override_settings(AXES_IP_BLACKLIST=['127.0.0.1'])
|
||||
def test_is_already_locked_ip_in_blacklist(self):
|
||||
self.assertTrue(is_already_locked(self.request))
|
||||
|
||||
@override_settings(AXES_IP_BLACKLIST=['127.0.0.2'])
|
||||
def test_is_already_locked_ip_not_in_blacklist(self):
|
||||
self.assertFalse(is_already_locked(self.request))
|
||||
|
||||
@override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True)
|
||||
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
|
||||
def test_is_already_locked_ip_in_whitelist(self):
|
||||
self.assertFalse(is_already_locked(self.request))
|
||||
|
||||
@override_settings(AXES_ONLY_WHITELIST=True)
|
||||
@override_settings(AXES_IP_WHITELIST=['127.0.0.2'])
|
||||
def test_is_already_locked_ip_not_in_whitelist(self):
|
||||
self.assertTrue(is_already_locked(self.request))
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=3) # hours
|
||||
def test_get_cache_timeout(self):
|
||||
timeout_seconds = float(60 * 60 * 3)
|
||||
self.assertEqual(get_cache_timeout(), timeout_seconds)
|
||||
|
||||
@override_settings(AXES_LOCK_OUT_AT_FAILURE=True)
|
||||
@override_settings(AXES_FAILURE_LIMIT=40)
|
||||
@patch('axes.attempts.get_axes_cache')
|
||||
def test_is_already_locked_cache(self, get_cache):
|
||||
cache = MagicMock()
|
||||
cache.get.return_value = 42
|
||||
get_cache.return_value = cache
|
||||
|
||||
self.assertTrue(is_already_locked(self.request, {}))
|
||||
cache.get.assert_called_once()
|
||||
|
||||
@override_settings(AXES_NEVER_LOCKOUT_GET=True)
|
||||
def test_is_already_locked_never_lockout_get(self):
|
||||
request = HttpRequest()
|
||||
request.method = 'GET'
|
||||
self.assertFalse(is_already_locked(request))
|
||||
|
||||
def test_is_already_locked_nolockable(self):
|
||||
UserModel = get_user_model()
|
||||
UserModel.objects.create(username='jane.doe')
|
||||
user = UserModel.objects.create(username='jane.doe')
|
||||
|
||||
with self.subTest('User is marked as nolockout.'):
|
||||
with patch.object(UserModel, 'nolockout', True, create=True):
|
||||
lockable = is_user_lockable(request, {'username': 'jane.doe'})
|
||||
locked = is_already_locked(self.request, {UserModel.USERNAME_FIELD: user.username})
|
||||
self.assertFalse(locked)
|
||||
|
||||
def test_is_user_lockable(self):
|
||||
UserModel = get_user_model()
|
||||
user = UserModel.objects.create(username='jane.doe')
|
||||
|
||||
with self.subTest('User is marked as nolockout.'):
|
||||
with patch.object(UserModel, 'nolockout', True, create=True):
|
||||
lockable = is_user_lockable(self.request, {UserModel.USERNAME_FIELD: user.username})
|
||||
self.assertFalse(lockable)
|
||||
|
||||
with self.subTest('User exists but attemptee can be locked out.'):
|
||||
lockable = is_user_lockable(request, {'username': 'jane.doe'})
|
||||
lockable = is_user_lockable(self.request, {UserModel.USERNAME_FIELD: user.username})
|
||||
self.assertTrue(lockable)
|
||||
|
||||
with self.subTest('User does not exist and attemptee can be locked out.'):
|
||||
lockable = is_user_lockable(request, {'username': 'john.doe'})
|
||||
lockable = is_user_lockable(self.request, {UserModel.USERNAME_FIELD: 'not.' + user.username})
|
||||
self.assertTrue(lockable)
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ from django.http import HttpRequest
|
|||
from django.test import TestCase, override_settings
|
||||
|
||||
from axes.handlers import AxesHandler
|
||||
from axes.models import AccessAttempt
|
||||
from axes.signals import ProxyHandler
|
||||
|
||||
|
||||
|
|
@ -66,6 +67,13 @@ class AxesHandlerTestCase(TestCase):
|
|||
def setUp(self):
|
||||
self.handler = AxesHandler()
|
||||
|
||||
self.attempt = AccessAttempt.objects.create(
|
||||
username='jane.doe',
|
||||
ip_address='127.0.0.1',
|
||||
user_agent='test-browser',
|
||||
failures_since_start=42,
|
||||
)
|
||||
|
||||
@patch('axes.handlers.log')
|
||||
def test_user_login_failed_no_request(self, log):
|
||||
self.handler.user_login_failed(sender=None, credentials=None, request=None)
|
||||
|
|
@ -79,3 +87,35 @@ class AxesHandlerTestCase(TestCase):
|
|||
request = HttpRequest()
|
||||
self.handler.user_login_failed(sender=None, credentials=None, request=request)
|
||||
log.info.assert_called_with('Login failed from whitelisted IP %s.', '127.0.0.1')
|
||||
|
||||
@patch('axes.handlers.get_axes_cache')
|
||||
def test_post_save_access_attempt_updates_cache(self, get_cache):
|
||||
cache = MagicMock()
|
||||
cache.get.return_value = None
|
||||
cache.set.return_value = None
|
||||
get_cache.return_value = cache
|
||||
|
||||
cache.get.assert_not_called()
|
||||
cache.set.assert_not_called()
|
||||
|
||||
self.handler.post_save_access_attempt(self.attempt)
|
||||
|
||||
cache.get.assert_called_once()
|
||||
cache.set.assert_called_once()
|
||||
|
||||
@patch('axes.handlers.get_axes_cache')
|
||||
def test_user_login_failed_utilizes_cache(self, get_cache):
|
||||
cache = MagicMock()
|
||||
cache.get.return_value = 1
|
||||
get_cache.return_value = cache
|
||||
|
||||
sender = MagicMock()
|
||||
credentials = {'username': self.attempt.username}
|
||||
request = HttpRequest()
|
||||
request.META['REMOTE_ADDR'] = '127.0.0.1'
|
||||
|
||||
cache.get.assert_not_called()
|
||||
|
||||
self.handler.user_login_failed(sender, credentials, request)
|
||||
|
||||
self.assertGreater(cache.get.call_count, 0)
|
||||
|
|
|
|||
|
|
@ -1,3 +1,4 @@
|
|||
from typing import Optional
|
||||
from datetime import timedelta
|
||||
from logging import getLogger
|
||||
from socket import error, inet_pton, AF_INET6
|
||||
|
|
@ -69,7 +70,7 @@ def get_client_ip(request: HttpRequest) -> str:
|
|||
return getattr(request, client_ip_attribute)
|
||||
|
||||
|
||||
def get_client_username(request: HttpRequest, credentials: dict = None):
|
||||
def get_client_username(request: HttpRequest, credentials: dict = None) -> str:
|
||||
"""
|
||||
Resolve client username from the given request or credentials if supplied.
|
||||
|
||||
|
|
@ -95,12 +96,53 @@ def get_client_username(request: HttpRequest, credentials: dict = None):
|
|||
return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None)
|
||||
|
||||
|
||||
def get_client_user_agent(request: HttpRequest) -> str:
|
||||
return request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
|
||||
|
||||
|
||||
def get_credentials(username: str = None, **kwargs):
|
||||
credentials = {settings.AXES_USERNAME_FORM_FIELD: username}
|
||||
credentials.update(kwargs)
|
||||
return credentials
|
||||
|
||||
|
||||
def get_cool_off() -> Optional[timedelta]:
|
||||
"""
|
||||
Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME.
|
||||
|
||||
The return value is either None or timedelta.
|
||||
|
||||
Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, or integer of hours,
|
||||
and this function offers a unified _timedelta or None_ representation of that configuration
|
||||
for use with the Axes internal implementations.
|
||||
|
||||
:exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type.
|
||||
"""
|
||||
|
||||
cool_off = settings.AXES_COOLOFF_TIME
|
||||
|
||||
if isinstance(cool_off, int):
|
||||
return timedelta(hours=cool_off)
|
||||
return cool_off
|
||||
|
||||
|
||||
def get_cache_timeout() -> Optional[int]:
|
||||
"""
|
||||
Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME.
|
||||
|
||||
The cache timeout can be either None if not configured or integer of seconds if configured.
|
||||
|
||||
Notice that the settings.AXES_COOLOFF_TIME can be None, timedelta, or integer of hours,
|
||||
and this function offers a unified _integer or None_ representation of that configuration
|
||||
for use with the Django cache backends.
|
||||
"""
|
||||
|
||||
cool_off = get_cool_off()
|
||||
if cool_off is None:
|
||||
return None
|
||||
return int(cool_off.total_seconds())
|
||||
|
||||
|
||||
def is_ipv6(ip: str):
|
||||
try:
|
||||
inet_pton(AF_INET6, ip)
|
||||
|
|
|
|||
|
|
@ -264,6 +264,8 @@ These should be defined in your ``settings.py`` file.
|
|||
* ``AXES_ONLY_USER_FAILURES`` : If ``True`` only locks based on user id and never locks by IP
|
||||
if attempts limit exceed, otherwise utilize the existing IP and user locking logic
|
||||
Default: ``False``
|
||||
* ``AXES_NEVER_LOCKOUT_GET``: If ``True``, Axes will never lock out HTTP GET requests.
|
||||
Default: ``False``
|
||||
* ``AXES_NEVER_LOCKOUT_WHITELIST``: If ``True``, users can always login from whitelisted IP addresses.
|
||||
Default: ``False``
|
||||
* ``AXES_IP_WHITELIST``: An iterable of IPs to be whitelisted. For example: ``AXES_IP_WHITELIST = ['0.0.0.0']``. Default: ``None``
|
||||
|
|
|
|||
Loading…
Reference in a new issue