Refactor handlers to a more pluggable format

- Define a base handler API with method signatures
- Move proxy handler to a separate path for importability
- Implement a database handler with clean external dependencies
- Change the authentication backend and decorators to use the authentication backend

This enables clean pluggable authentication backend definitions that users
can override and specialize with e.g. cached handlers in their own packages.

Signed-off-by: Aleksi Häkli <aleksi.hakli@iki.fi>
This commit is contained in:
Aleksi Häkli 2019-02-17 23:56:48 +02:00 committed by Aleksi Häkli
parent 139a2b38fb
commit e69d479f6a
No known key found for this signature in database
GPG key ID: 3E7146964D726BBE
18 changed files with 513 additions and 419 deletions

View file

@ -4,6 +4,7 @@ from django import apps
from axes import get_version
from axes.conf import settings
from axes.handlers.proxy import AxesProxyHandler
log = getLogger(settings.AXES_LOGGER)
@ -41,5 +42,6 @@ class AppConfig(apps.AppConfig):
def ready(self):
self.initialize()
from axes import signals
signals.ProxyHandler.initialize()
AxesProxyHandler.initialize()
from axes import signals # noqa

View file

@ -2,7 +2,6 @@ from hashlib import md5
from logging import getLogger
from typing import Union
from django.contrib.auth import get_user_model
from django.db.models import QuerySet
from django.http import HttpRequest
from django.utils import timezone
@ -103,112 +102,3 @@ def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int:
count, _ = attempts.delete()
return count
def ip_in_whitelist(ip: str) -> bool:
if not settings.AXES_IP_WHITELIST:
return False
return ip in settings.AXES_IP_WHITELIST
def ip_in_blacklist(ip: str) -> bool:
if not settings.AXES_IP_BLACKLIST:
return False
return ip in settings.AXES_IP_BLACKLIST
def is_ip_blacklisted(request: HttpRequest) -> bool:
"""
Check if the given request refers to a blacklisted IP.
"""
ip = get_client_ip_address(request)
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip):
return False
if settings.AXES_ONLY_WHITELIST and not ip_in_whitelist(ip):
return True
if ip_in_blacklist(ip):
return True
return False
def is_user_lockable(request: HttpRequest, credentials: dict = None) -> bool:
"""
Check if the given request or credentials refer to a whitelisted user object
A whitelisted user has the magic ``nolockout`` property set.
If the property is unknown or False or the user can not be found,
this implementation fails gracefully and returns True.
"""
username_field = getattr(get_user_model(), 'USERNAME_FIELD', 'username')
username_value = get_client_username(request, credentials)
kwargs = {
username_field: username_value
}
UserModel = get_user_model()
try:
user = UserModel.objects.get(**kwargs)
return not user.nolockout
except (UserModel.DoesNotExist, AttributeError):
pass
return True
def is_already_locked(request: HttpRequest, credentials: dict = None) -> bool:
"""
Check if the request or given credentials are already locked by Axes.
This function is called from
- function decorators defined in ``axes.decorators``,
- authentication backends defined in ``axes.backends``, and
- signal handlers defined in ``axes.handlers``.
This function checks the following facts for a given request:
1. Is the request HTTP method _whitelisted_? If it is, return ``False``.
2. Is the request IP address _blacklisted_? If it is, return ``True``.
3. Is the request user _whitelisted_? If it is, return ``False``.
4. Is the request failure count over the attempt limit? If it is, return ``True``.
Refer to the function source code for the exact implementation.
"""
if settings.AXES_NEVER_LOCKOUT_GET and request.method == 'GET':
return False
if is_ip_blacklisted(request):
return True
if not is_user_lockable(request, credentials):
return False
if not settings.AXES_LOCK_OUT_AT_FAILURE:
return False
# Check failure statistics against cache
cache_hash_key = get_cache_key(request, credentials)
num_failures_cached = get_axes_cache().get(cache_hash_key)
# Do not hit the database if we have an answer in the cache
if num_failures_cached is not None:
return num_failures_cached >= settings.AXES_FAILURE_LIMIT
# Check failure statistics against database
attempts = get_user_attempts(request, credentials)
failures = attempts.filter(
failures_since_start__gte=settings.AXES_FAILURE_LIMIT,
)
return failures.exists()

View file

@ -1,7 +1,7 @@
from django.contrib.auth.backends import ModelBackend
from axes.attempts import is_already_locked
from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired
from axes.handlers.proxy import AxesProxyHandler
from axes.utils import get_credentials, get_lockout_message
@ -36,20 +36,21 @@ class AxesBackend(ModelBackend):
credentials = get_credentials(username=username, password=password, **kwargs)
if is_already_locked(request, credentials):
# locked out, don't try to authenticate, just update return_context and return
# Its a bit weird to pass a context and expect a response value but its nice to get a "why" back.
error_msg = get_lockout_message()
response_context = kwargs.get('response_context', {})
response_context['error'] = error_msg
if AxesProxyHandler.is_allowed_to_authenticate(request, credentials):
return
# Raise an error that stops the authentication flows at django.contrib.auth.authenticate.
# This error stops bubbling up at the authenticate call which catches backend PermissionDenied errors.
# After this error is caught by authenticate it emits a signal indicating user login failed,
# which is processed by axes.signals.log_user_login_failed which logs the attempt and raises
# a second exception which bubbles up the middleware stack and produces a HTTP 403 Forbidden reply
# in the axes.middleware.AxesMiddleware.process_exception middleware exception handler.
# Locked out, don't try to authenticate, just update response_context and return.
# Its a bit weird to pass a context and expect a response value but its nice to get a "why" back.
raise AxesBackendPermissionDenied('AxesBackend detected that the given user is locked out')
error_msg = get_lockout_message()
response_context = kwargs.get('response_context', {})
response_context['error'] = error_msg
# No-op
# Raise an error that stops the authentication flows at django.contrib.auth.authenticate.
# This error stops bubbling up at the authenticate call which catches backend PermissionDenied errors.
# After this error is caught by authenticate it emits a signal indicating user login failed,
# which is processed by axes.signals.log_user_login_failed which logs the attempt and raises
# a second exception which bubbles up the middleware stack and produces a HTTP 403 Forbidden reply
# in the axes.middleware.AxesMiddleware.process_exception middleware exception handler.
raise AxesBackendPermissionDenied('AxesBackend detected that the given user is locked out')

View file

@ -35,7 +35,7 @@ class MyAppConf(AppConf):
DISABLE_SUCCESS_ACCESS_LOG = False
HANDLER = 'axes.handlers.AxesHandler'
HANDLER = 'axes.handlers.database.AxesDatabaseHandler'
LOGGER = 'axes.watch_login'

View file

@ -1,15 +1,15 @@
from functools import wraps
from axes.attempts import is_already_locked
from axes.handlers.proxy import AxesProxyHandler
from axes.utils import get_lockout_response
def axes_dispatch(func):
def inner(request, *args, **kwargs):
if is_already_locked(request):
return get_lockout_response(request)
if AxesProxyHandler.is_allowed_to_authenticate(request):
return func(request, *args, **kwargs)
return func(request, *args, **kwargs)
return get_lockout_response(request)
return inner
@ -17,9 +17,10 @@ def axes_dispatch(func):
def axes_form_invalid(func):
@wraps(func)
def inner(self, *args, **kwargs):
if is_already_locked(self.request):
return get_lockout_response(self.request)
if AxesProxyHandler.is_allowed_to_authenticate(self.request):
return func(self, *args, **kwargs)
return get_lockout_response(self.request)
return func(self, *args, **kwargs)
return inner

View file

48
axes/handlers/base.py Normal file
View file

@ -0,0 +1,48 @@
from typing import Any, Dict, Optional
from django.http import HttpRequest
class AxesBaseHandler: # pylint: disable=unused-argument
"""
Handler API definition for subclassing handlers that can be used with the AxesProxyHandler.
If you wish to implement your own handler class just override the methods you wish to specialize
and define the class to be used with ``settings.AXES_HANDLER = 'dotted.full.path.to.YourClass'``.
"""
def is_allowed_to_authenticate(
self,
request: HttpRequest,
credentials: Optional[Dict[str, Any]] = None,
) -> bool:
"""
Check if the user is allowed to authenticate into the site.
"""
raise NotImplementedError('The Axes handler class needs a method definition for is_allowed_to_authenticate')
def user_login_failed(self, sender, credentials: Dict[str, Any], request: HttpRequest, **kwargs):
"""
Handle the Django user_login_failed authentication signal.
"""
def user_logged_in(self, sender, request: HttpRequest, user, **kwargs):
"""
Handle the Django user_logged_in authentication signal.
"""
def user_logged_out(self, sender, request: HttpRequest, user, **kwargs):
"""
Handle the Django user_logged_out authentication signal.
"""
def post_save_access_attempt(self, instance, **kwargs):
"""
Handle the Axes AccessAttempt object post save signal.
"""
def post_delete_access_attempt(self, instance, **kwargs):
"""
Handle the Axes AccessAttempt object post delete signal.
"""

View file

@ -1,19 +1,23 @@
from logging import getLogger
from typing import Any, Dict, Optional
from django.db.models import Max
from django.http import HttpRequest
from django.utils.timezone import now
from axes.attempts import get_cache_key, is_already_locked
from axes.attempts import get_cache_timeout
from axes.attempts import get_user_attempts
from axes.attempts import ip_in_whitelist
from axes.attempts import reset_user_attempts
from axes.attempts import (
get_cache_key,
get_user_attempts,
reset_user_attempts,
)
from axes.conf import settings
from axes.exceptions import AxesSignalPermissionDenied
from axes.models import AccessLog, AccessAttempt
from axes.signals import user_locked_out
from axes.handlers.base import AxesBaseHandler
from axes.utils import (
get_axes_cache,
get_cache_timeout,
get_client_ip_address,
get_client_path_info,
get_client_http_accept,
@ -22,18 +26,76 @@ from axes.utils import (
get_client_user_agent,
get_credentials,
get_query_str,
is_ip_address_in_whitelist,
is_client_ip_address_blacklisted,
is_client_ip_address_whitelisted,
is_client_method_whitelisted,
is_client_username_whitelisted,
)
log = getLogger(settings.AXES_LOGGER)
class AxesHandler: # pylint: disable=too-many-locals
class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals
"""
Signal handler implementation that records user login attempts to database and locks users out if necessary.
"""
def user_login_failed(self, sender, credentials, request, **kwargs): # pylint: disable=unused-argument
def is_allowed_to_authenticate(self, request: HttpRequest, credentials: Optional[Dict[str, Any]] = None) -> bool:
"""
Check if the request or given credentials are already locked by Axes.
This function is called from
- function decorators defined in ``axes.decorators``,
- authentication backends defined in ``axes.backends``, and
- signal handlers defined in ``axes.handlers``.
This function checks the following facts for a given request:
1. Is the request IP address _blacklisted_? If it is, return ``False``.
2. Is the request IP address _whitelisted_? If it is, return ``True``.
4. Is the request HTTP method _whitelisted_? If it is, return ``True``.
3. Is the request user _whitelisted_? If it is, return ``True``.
5. Is failed authentication attempt always allowed to proceed? If it is, return ``True``.
6. Is failed authentication attempt count over the attempt limit? If it is, return ``False``.
Refer to the function source code for the exact implementation.
"""
if is_client_ip_address_blacklisted(request):
return False
if is_client_ip_address_whitelisted(request):
return True
if is_client_method_whitelisted(request):
return True
if is_client_username_whitelisted(request, credentials):
return True
if not settings.AXES_LOCK_OUT_AT_FAILURE:
return True
# Check failure statistics against cache
cache_hash_key = get_cache_key(request, credentials)
num_failures_cached = get_axes_cache().get(cache_hash_key)
# Do not hit the database if we have an answer in the cache
if num_failures_cached is not None:
return num_failures_cached < settings.AXES_FAILURE_LIMIT
# Check failure statistics against database
attempts = get_user_attempts(request, credentials)
failures = attempts.filter(
failures_since_start__gte=settings.AXES_FAILURE_LIMIT,
)
return not failures.exists()
def user_login_failed(self, sender, credentials, request, **kwargs): # pylint: disable=too-many-locals
"""
When user login fails, save AccessAttempt record in database and lock user out if necessary.
@ -41,7 +103,7 @@ class AxesHandler: # pylint: disable=too-many-locals
"""
if request is None:
log.warning('AxesHandler.user_login_failed does not function without a request.')
log.warning('AXES: AxesDatabaseHandler.user_login_failed does not function without a request.')
return
username = get_client_username(request, credentials)
@ -51,8 +113,8 @@ class AxesHandler: # pylint: disable=too-many-locals
http_accept = get_client_http_accept(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address):
log.info('Login failed from whitelisted IP %s.', ip_address)
if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(ip_address):
log.info('AXES: Login failed from whitelisted IP %s.', ip_address)
return
attempts = get_user_attempts(request, credentials)
@ -122,7 +184,7 @@ class AxesHandler: # pylint: disable=too-many-locals
client_str,
)
if is_already_locked(request, credentials):
if not self.is_allowed_to_authenticate(request, credentials):
log.warning(
'AXES: Locked out %s after repeated login failures.',
client_str,

64
axes/handlers/proxy.py Normal file
View file

@ -0,0 +1,64 @@
from logging import getLogger
from typing import Any, Dict, Optional
from django.http import HttpRequest
from django.utils.module_loading import import_string
from axes.conf import settings
from axes.handlers.base import AxesBaseHandler
log = getLogger(settings.AXES_LOGGER)
class AxesProxyHandler(AxesBaseHandler):
"""
Proxy interface for configurable Axes signal handler class.
If you wish to implement a custom version of this handler,
you can override the settings.AXES_HANDLER configuration string
with a class that implements a compatible interface and methods.
Defaults to using axes.handlers.proxy.AxesProxyHandler if not overridden.
Refer to axes.handlers.proxy.AxesProxyHandler for default implementation.
"""
implementation = None # type: AxesBaseHandler
@classmethod
def initialize(cls, force=False):
"""
Fetch and initialize configured handler implementation and memoize it to avoid reinitialization.
This method is re-entrant and can be called multiple times from e.g. Django application loader.
"""
if force or not cls.implementation:
cls.implementation = cls.get_implementation()
@classmethod
def get_implementation(cls) -> AxesBaseHandler:
return import_string(settings.AXES_HANDLER)()
@classmethod
def is_allowed_to_authenticate(cls, request: HttpRequest, credentials: Optional[Dict[str, Any]] = None) -> bool:
return cls.implementation.is_allowed_to_authenticate(request, credentials)
@classmethod
def user_login_failed(cls, sender: Any, credentials: Dict[str, Any], request: HttpRequest, **kwargs):
return cls.implementation.user_login_failed(sender, credentials, request, **kwargs)
@classmethod
def user_logged_in(cls, sender: Any, request: HttpRequest, user, **kwargs):
return cls.implementation.user_logged_in(sender, request, user, **kwargs)
@classmethod
def user_logged_out(cls, sender: Any, request: HttpRequest, user, **kwargs):
return cls.implementation.user_logged_out(sender, request, user, **kwargs)
@classmethod
def post_save_access_attempt(cls, instance, **kwargs):
return cls.implementation.post_save_access_attempt(instance, **kwargs)
@classmethod
def post_delete_access_attempt(cls, instance, **kwargs):
return cls.implementation.post_delete_access_attempt(instance, **kwargs)

View file

@ -1,16 +1,14 @@
from logging import getLogger
from django.contrib.auth.signals import user_logged_in
from django.contrib.auth.signals import user_logged_out
from django.contrib.auth.signals import user_login_failed
from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed
from django.core.signals import setting_changed
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.dispatch import Signal
from django.utils.module_loading import import_string
from axes.conf import settings
from axes.models import AccessAttempt
from axes.handlers.proxy import AxesProxyHandler
log = getLogger(settings.AXES_LOGGER)
@ -18,113 +16,29 @@ log = getLogger(settings.AXES_LOGGER)
user_locked_out = Signal(providing_args=['request', 'username', 'ip_address'])
class ProxyHandler:
"""
Proxy interface for configurable Axes signal handler class.
If you wish to implement a custom version of this handler,
you can override the settings.AXES_HANDLER configuration string
with a class that implements a compatible interface and methods.
Defaults to using axes.handlers.AxesHandler if not overridden.
Refer to axes.handlers.AxesHandler for default implementation.
"""
implementation = None # concrete handler that is bootstrapped by the Django application loader
@classmethod
def initialize(cls, force=False):
"""
Fetch and initialize concrete handler implementation and memoize it to avoid reinitialization.
This method is re-entrant and can be called multiple times.
"""
if force or cls.implementation is None:
cls.implementation = import_string(settings.AXES_HANDLER)()
@classmethod
def user_login_failed(cls, sender, credentials, request, **kwargs):
"""
Handle user login failure event.
:param credentials: credentials used for authentication attempt
:param request: request used for failed authentication attempt
:return: None
"""
cls.implementation.user_login_failed(sender, credentials, request, **kwargs)
@classmethod
def user_logged_in(cls, sender, request, user, **kwargs):
"""
Handle user login event.
:param credentials: credentials used for successful authentication
:param request: request used for successful authentication
:return: None
"""
cls.implementation.user_logged_in(sender, request, user, **kwargs)
@classmethod
def user_logged_out(cls, sender, request, user, **kwargs):
"""
Handle user logout event.
:param request: request used for logout
:param user: user used for logout
:return: None
"""
cls.implementation.user_logged_out(sender, request, user, **kwargs)
@classmethod
def post_save_access_attempt(cls, instance, **kwargs):
"""
Handle AccessAttempt save event.
:param instance: axes.models.AccessAttempt instance that will be saved
:return: None
"""
cls.implementation.post_save_access_attempt(instance, **kwargs)
@classmethod
def post_delete_access_attempt(cls, instance, **kwargs):
"""
Handle AccessAttempt delete event.
:param instance: axes.models.AccessAttempt instance that was deleted
:return: None
"""
cls.implementation.post_delete_access_attempt(instance, **kwargs)
@receiver(user_login_failed)
def handle_user_login_failed(*args, **kwargs):
ProxyHandler.user_login_failed(*args, **kwargs)
AxesProxyHandler.user_login_failed(*args, **kwargs)
@receiver(user_logged_in)
def handle_user_logged_in(*args, **kwargs):
ProxyHandler.user_logged_in(*args, **kwargs)
AxesProxyHandler.user_logged_in(*args, **kwargs)
@receiver(user_logged_out)
def handle_user_logged_out(*args, **kwargs):
ProxyHandler.user_logged_out(*args, **kwargs)
AxesProxyHandler.user_logged_out(*args, **kwargs)
@receiver(post_save, sender=AccessAttempt)
def handle_post_save_access_attempt(*args, **kwargs):
ProxyHandler.post_save_access_attempt(*args, **kwargs)
AxesProxyHandler.post_save_access_attempt(*args, **kwargs)
@receiver(post_delete, sender=AccessAttempt)
def handle_post_delete_access_attempt(*args, **kwargs):
ProxyHandler.post_delete_access_attempt(*args, **kwargs)
AxesProxyHandler.post_delete_access_attempt(*args, **kwargs)
@receiver(setting_changed)
@ -135,4 +49,4 @@ def handle_setting_changed(sender, setting, value, enter, **kwargs): # pylint:
"""
if enter and setting == 'AXES_HANDLER':
ProxyHandler.initialize(force=enter)
AxesProxyHandler.initialize(force=enter)

View file

@ -5,7 +5,7 @@ import string
import time
from unittest.mock import patch, MagicMock
from django.contrib.auth import authenticate, get_user_model
from django.contrib.auth import authenticate
from django.contrib.auth.models import User
from django.http import HttpRequest
from django.test import TestCase, override_settings
@ -14,11 +14,6 @@ from django.urls import reverse
from axes.attempts import (
get_cache_key,
get_cache_timeout,
is_already_locked,
ip_in_blacklist,
ip_in_whitelist,
is_user_lockable,
get_client_parameters,
get_user_attempts,
)
@ -445,7 +440,7 @@ class AccessAttemptTest(TestCase):
# reset the username only and make sure we can log in now even though
# our IP has failed each time
AccessAttempt.objects.filter(username=AccessAttemptTest.VALID_USERNAME).delete()
AccessAttempt.objects.filter(username=self.VALID_USERNAME).delete()
response = self._login(
is_valid_username=True,
is_valid_password=True,
@ -593,103 +588,3 @@ class AccessAttemptTest(TestCase):
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
class AttemptUtilsTestCase(TestCase):
def setUp(self):
self.request = HttpRequest()
self.request.method = 'POST'
self.request.META['REMOTE_ADDR'] = '127.0.0.1'
@override_settings(AXES_IP_WHITELIST=None)
def test_ip_in_whitelist_none(self):
self.assertFalse(ip_in_whitelist('127.0.0.2'))
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
def test_ip_in_whitelist(self):
self.assertTrue(ip_in_whitelist('127.0.0.1'))
self.assertFalse(ip_in_whitelist('127.0.0.2'))
@override_settings(AXES_IP_BLACKLIST=None)
def test_ip_in_blacklist_none(self):
self.assertFalse(ip_in_blacklist('127.0.0.2'))
@override_settings(AXES_IP_BLACKLIST=['127.0.0.1'])
def test_ip_in_blacklist(self):
self.assertTrue(ip_in_blacklist('127.0.0.1'))
self.assertFalse(ip_in_blacklist('127.0.0.2'))
@override_settings(AXES_IP_BLACKLIST=['127.0.0.1'])
def test_is_already_locked_ip_in_blacklist(self):
self.assertTrue(is_already_locked(self.request))
@override_settings(AXES_IP_BLACKLIST=['127.0.0.2'])
def test_is_already_locked_ip_not_in_blacklist(self):
self.assertFalse(is_already_locked(self.request))
@override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
def test_is_already_locked_ip_in_whitelist(self):
self.assertFalse(is_already_locked(self.request))
@override_settings(AXES_ONLY_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.2'])
def test_is_already_locked_ip_not_in_whitelist(self):
self.assertTrue(is_already_locked(self.request))
@override_settings(AXES_COOLOFF_TIME=3) # hours
def test_get_cache_timeout(self):
timeout_seconds = float(60 * 60 * 3)
self.assertEqual(get_cache_timeout(), timeout_seconds)
@override_settings(AXES_LOCK_OUT_AT_FAILURE=True)
@override_settings(AXES_FAILURE_LIMIT=40)
@patch('axes.attempts.get_axes_cache')
def test_is_already_locked_cache(self, get_cache):
cache = MagicMock()
cache.get.return_value = 42
get_cache.return_value = cache
self.assertTrue(is_already_locked(self.request, {}))
self.assertTrue(cache.get.call_count)
@override_settings(AXES_LOCK_OUT_AT_FAILURE=False)
@override_settings(AXES_FAILURE_LIMIT=40)
@patch('axes.attempts.get_axes_cache')
def test_is_already_locked_do_not_lock_out_at_failure(self, get_cache):
cache = MagicMock()
cache.get.return_value = 42
get_cache.return_value = cache
self.assertFalse(is_already_locked(self.request))
@override_settings(AXES_NEVER_LOCKOUT_GET=True)
def test_is_already_locked_never_lockout_get(self):
request = HttpRequest()
request.method = 'GET'
self.assertFalse(is_already_locked(request))
def test_is_already_locked_nolockable(self):
UserModel = get_user_model()
user = UserModel.objects.create(username='jane.doe')
with self.subTest('User is marked as nolockout.'):
with patch.object(UserModel, 'nolockout', True, create=True):
locked = is_already_locked(self.request, {UserModel.USERNAME_FIELD: user.username})
self.assertFalse(locked)
def test_is_user_lockable(self):
UserModel = get_user_model()
user = UserModel.objects.create(username='jane.doe')
with self.subTest('User is marked as nolockout.'):
with patch.object(UserModel, 'nolockout', True, create=True):
lockable = is_user_lockable(self.request, {UserModel.USERNAME_FIELD: user.username})
self.assertFalse(lockable)
with self.subTest('User exists but attemptee can be locked out.'):
lockable = is_user_lockable(self.request, {UserModel.USERNAME_FIELD: user.username})
self.assertTrue(lockable)
with self.subTest('User does not exist and attemptee can be locked out.'):
lockable = is_user_lockable(self.request, {UserModel.USERNAME_FIELD: 'not.' + user.username})
self.assertTrue(lockable)

View file

@ -13,7 +13,7 @@ class BackendTestCase(TestCase):
with self.assertRaises(AxesBackendRequestParameterRequired):
AxesBackend().authenticate(request)
@patch('axes.backends.is_already_locked', return_value=True)
@patch('axes.handlers.proxy.AxesProxyHandler.is_allowed_to_authenticate', return_value=False)
def test_authenticate_raises_on_locked_request(self, _):
request = MagicMock()

View file

@ -15,25 +15,25 @@ class DecoratorTestCase(TestCase):
self.cls = MagicMock(return_value=self.request)
self.func = MagicMock(return_value=self.SUCCESS_RESPONSE)
@patch('axes.decorators.is_already_locked', return_value=True)
@patch('axes.handlers.proxy.AxesProxyHandler.is_allowed_to_authenticate', return_value=False)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_axes_dispatch_locks_out(self, _, __):
response = axes_dispatch(self.func)(self.request)
self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content)
@patch('axes.decorators.is_already_locked', return_value=False)
@patch('axes.handlers.proxy.AxesProxyHandler.is_allowed_to_authenticate', return_value=True)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_axes_dispatch_dispatches(self, _, __):
response = axes_dispatch(self.func)(self.request)
self.assertEqual(response.content, self.SUCCESS_RESPONSE.content)
@patch('axes.decorators.is_already_locked', return_value=True)
@patch('axes.handlers.proxy.AxesProxyHandler.is_allowed_to_authenticate', return_value=False)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_axes_form_invalid_locks_out(self, _, __):
response = axes_form_invalid(self.func)(self.cls)
self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content)
@patch('axes.decorators.is_already_locked', return_value=False)
@patch('axes.handlers.proxy.AxesProxyHandler.is_allowed_to_authenticate', return_value=True)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_axes_form_invalid_dispatches(self, _, __):
response = axes_form_invalid(self.func)(self.cls)

View file

@ -3,12 +3,22 @@ from unittest.mock import MagicMock, patch
from django.http import HttpRequest
from django.test import TestCase, override_settings
from axes.handlers import AxesHandler
from axes.handlers.base import AxesBaseHandler
from axes.handlers.proxy import AxesProxyHandler
from axes.models import AccessAttempt
from axes.signals import ProxyHandler
class ProxyHandlerTestCase(TestCase):
class AxesBaseHandlerTestCase(TestCase):
def test_base_handler_raises_on_undefined_is_allowed_to_authenticate(self):
class FaultyHandler(AxesBaseHandler):
pass
with self.assertRaises(NotImplementedError):
handler = FaultyHandler()
handler.is_allowed_to_authenticate(HttpRequest(), {})
class AxesProxyHandlerTestCase(TestCase):
def setUp(self):
self.sender = MagicMock()
self.credentials = MagicMock()
@ -16,65 +26,47 @@ class ProxyHandlerTestCase(TestCase):
self.user = MagicMock()
self.instance = MagicMock()
@patch('axes.signals.import_string', return_value=AxesHandler)
def test_setting_changed_signal_triggers_handler_reimport(self, importer):
self.assertEqual(0, importer.call_count)
@patch('axes.handlers.proxy.AxesProxyHandler.implementation', None)
def test_setting_changed_signal_triggers_handler_reimport(self):
self.assertIsNone(AxesProxyHandler().implementation)
with self.settings(
AXES_HANDLER='axes.handlers.AxesHandler'
):
self.assertEqual(1, importer.call_count)
with self.settings(AXES_HANDLER='axes.handlers.database.AxesDatabaseHandler'):
self.assertIsNotNone(AxesProxyHandler().implementation)
@patch('axes.signals.ProxyHandler.implementation', None)
@patch('axes.signals.import_string', return_value=AxesHandler)
def test_initialize(self, importer):
self.assertEqual(0, importer.call_count)
self.assertIsNone(ProxyHandler.implementation)
ProxyHandler.initialize()
self.assertEqual(1, importer.call_count)
self.assertIsInstance(ProxyHandler.implementation, AxesHandler)
ProxyHandler.initialize()
self.assertEqual(1, importer.call_count)
self.assertIsInstance(ProxyHandler.implementation, AxesHandler)
@patch('axes.signals.ProxyHandler.implementation')
@patch('axes.handlers.proxy.AxesProxyHandler.implementation')
def test_user_login_failed(self, handler):
self.assertFalse(handler.user_login_failed.called)
ProxyHandler().user_login_failed(self.sender, self.credentials, self.request)
AxesProxyHandler().user_login_failed(self.sender, self.credentials, self.request)
self.assertTrue(handler.user_login_failed.called)
@patch('axes.signals.ProxyHandler.implementation')
@patch('axes.handlers.proxy.AxesProxyHandler.implementation')
def test_user_logged_in(self, handler):
self.assertFalse(handler.user_logged_in.called)
ProxyHandler().user_logged_in(self.sender, self.request, self.user)
AxesProxyHandler().user_logged_in(self.sender, self.request, self.user)
self.assertTrue(handler.user_logged_in.called)
@patch('axes.signals.ProxyHandler.implementation')
@patch('axes.handlers.proxy.AxesProxyHandler.implementation')
def test_user_logged_out(self, handler):
self.assertFalse(handler.user_logged_out.called)
ProxyHandler().user_logged_out(self.sender, self.request, self.user)
AxesProxyHandler().user_logged_out(self.sender, self.request, self.user)
self.assertTrue(handler.user_logged_out.called)
@patch('axes.signals.ProxyHandler.implementation')
@patch('axes.handlers.proxy.AxesProxyHandler.implementation')
def test_post_save_access_attempt(self, handler):
self.assertFalse(handler.post_save_access_attempt.called)
ProxyHandler().post_save_access_attempt(self.instance)
AxesProxyHandler().post_save_access_attempt(self.instance)
self.assertTrue(handler.post_save_access_attempt.called)
@patch('axes.signals.ProxyHandler.implementation')
@patch('axes.handlers.proxy.AxesProxyHandler.implementation')
def test_post_delete_access_attempt(self, handler):
self.assertFalse(handler.post_delete_access_attempt.called)
ProxyHandler().post_delete_access_attempt(self.instance)
AxesProxyHandler().post_delete_access_attempt(self.instance)
self.assertTrue(handler.post_delete_access_attempt.called)
class AxesHandlerTestCase(TestCase):
class AxesDatabaseHandlerTestCase(TestCase):
def setUp(self):
self.handler = AxesHandler()
self.handler = AxesProxyHandler()
self.attempt = AccessAttempt.objects.create(
username='jane.doe',
@ -83,36 +75,42 @@ class AxesHandlerTestCase(TestCase):
failures_since_start=42,
)
@patch('axes.handlers.log')
self.request = HttpRequest()
self.request.method = 'POST'
self.request.META['REMOTE_ADDR'] = '127.0.0.1'
@patch('axes.handlers.database.log')
def test_user_login_failed_no_request(self, log):
self.handler.user_login_failed(sender=None, credentials=None, request=None)
log.warning.assert_called_with('AxesHandler.user_login_failed does not function without a request.')
self.handler.user_login_failed(sender=None, credentials={}, request=None)
log.warning.assert_called_with(
'AXES: AxesDatabaseHandler.user_login_failed does not function without a request.'
)
@override_settings(AXES_NEVER_LOCKOUT_WHITELIST=['127.0.0.1'])
@patch('axes.handlers.get_client_ip_address', return_value='127.0.0.1')
@patch('axes.handlers.ip_in_whitelist', return_value=True)
@patch('axes.handlers.log')
@patch('axes.handlers.database.get_client_ip_address', return_value='127.0.0.1')
@patch('axes.handlers.database.is_ip_address_in_whitelist', return_value=True)
@patch('axes.handlers.database.log')
def test_user_login_failed_whitelist(self, log, _, __):
request = HttpRequest()
self.handler.user_login_failed(sender=None, credentials=None, request=request)
log.info.assert_called_with('Login failed from whitelisted IP %s.', '127.0.0.1')
self.handler.user_login_failed(sender=None, credentials={}, request=self.request)
log.info.assert_called_with('AXES: Login failed from whitelisted IP %s.', '127.0.0.1')
@patch('axes.handlers.get_axes_cache')
@patch('axes.handlers.database.get_axes_cache')
def test_post_save_access_attempt_updates_cache(self, get_cache):
cache = MagicMock()
cache.get.return_value = None
cache.set.return_value = None
get_cache.return_value = cache
cache.get.assert_not_called()
cache.set.assert_not_called()
self.assertFalse(cache.get.call_count)
self.assertFalse(cache.set.call_count)
self.handler.post_save_access_attempt(self.attempt)
self.assertTrue(cache.get.call_count)
self.assertTrue(cache.set.call_count)
@patch('axes.handlers.get_axes_cache')
@patch('axes.handlers.database.get_axes_cache')
def test_user_login_failed_utilizes_cache(self, get_cache):
cache = MagicMock()
cache.get.return_value = 1
@ -120,11 +118,35 @@ class AxesHandlerTestCase(TestCase):
sender = MagicMock()
credentials = {'username': self.attempt.username}
request = HttpRequest()
request.META['REMOTE_ADDR'] = '127.0.0.1'
cache.get.assert_not_called()
self.assertFalse(cache.get.call_count)
self.handler.user_login_failed(sender, credentials, request)
self.handler.user_login_failed(sender, credentials, self.request)
self.assertTrue(cache.get.call_count)
@override_settings(AXES_LOCK_OUT_AT_FAILURE=True)
@override_settings(AXES_FAILURE_LIMIT=40)
@patch('axes.handlers.database.get_axes_cache')
def test_is_already_locked_cache(self, get_cache):
cache = MagicMock()
cache.get.return_value = 42
get_cache.return_value = cache
self.assertFalse(AxesProxyHandler.is_allowed_to_authenticate(self.request, {}))
@override_settings(AXES_LOCK_OUT_AT_FAILURE=False)
@override_settings(AXES_FAILURE_LIMIT=40)
@patch('axes.handlers.database.get_axes_cache')
def test_is_already_locked_do_not_lock_out_at_failure(self, get_cache):
cache = MagicMock()
cache.get.return_value = 42
get_cache.return_value = cache
self.assertTrue(AxesProxyHandler.is_allowed_to_authenticate(self.request, {}))
@override_settings(AXES_NEVER_LOCKOUT_GET=True)
def test_is_already_locked_never_lockout_get(self):
self.request.method = 'GET'
self.assertTrue(AxesProxyHandler.is_allowed_to_authenticate(self.request, {}))

View file

@ -1,11 +1,15 @@
"""
Test access from purely the
"""
from django.test import TestCase, override_settings
from django.urls import reverse
from django.contrib.auth.models import User
from django.contrib.auth import get_user_model
from axes.conf import settings
class AccessAttemptConfigTest(TestCase):
class LoginTestCase(TestCase):
"""
Test for lockouts under different configurations and circumstances to prevent false positives and false negatives.
@ -66,12 +70,12 @@ class AccessAttemptConfigTest(TestCase):
Create two valid users for authentication.
"""
self.user = User.objects.create_superuser(
self.user = get_user_model().objects.create_superuser(
username=self.USER_1,
email='test_1@example.com',
password=self.VALID_PASSWORD,
)
self.user = User.objects.create_superuser(
self.user = get_user_model().objects.create_superuser(
username=self.USER_2,
email='test_2@example.com',
password=self.VALID_PASSWORD,

View file

@ -1,11 +1,22 @@
from datetime import timedelta
from unittest.mock import patch
from django.contrib.auth import get_user_model
from django.http import HttpRequest, JsonResponse, HttpResponseRedirect, HttpResponse
from django.test import TestCase, override_settings
from axes import get_version
from axes.utils import get_cool_off_iso8601, get_client_str, get_client_username, get_lockout_response
from axes.utils import (
get_cool_off_iso8601,
get_client_str,
get_client_username,
get_lockout_response,
is_client_ip_address_blacklisted,
is_ip_address_in_blacklist,
is_ip_address_in_whitelist,
get_cache_timeout,
is_client_username_whitelisted,
is_client_ip_address_whitelisted)
def get_username(request: HttpRequest, credentials: dict) -> str:
@ -17,13 +28,46 @@ def get_expected_client_str(*args, **kwargs):
return client_str_template.format(*args, **kwargs)
class AxesTestCase(TestCase):
class VersionTestCase(TestCase):
@patch('axes.__version__', 'test')
def test_get_version(self):
self.assertEqual(get_version(), 'test')
class UtilsTestCase(TestCase):
class CacheTestCase(TestCase):
@override_settings(AXES_COOLOFF_TIME=3) # hours
def test_get_cache_timeout(self):
timeout_seconds = float(60 * 60 * 3)
self.assertEqual(get_cache_timeout(), timeout_seconds)
class UserTestCase(TestCase):
def setUp(self):
self.user_model = get_user_model()
self.user = self.user_model.objects.create(username='jane.doe')
self.request = HttpRequest()
def test_is_client_username_whitelisted(self):
with patch.object(self.user_model, 'nolockout', True, create=True):
self.assertTrue(is_client_username_whitelisted(
self.request,
{self.user_model.USERNAME_FIELD: self.user.username},
))
def test_is_client_username_whitelisted_not(self):
self.assertFalse(is_client_username_whitelisted(
self.request,
{self.user_model.USERNAME_FIELD: self.user.username},
))
def test_is_client_username_whitelisted_does_not_exist(self):
self.assertFalse(is_client_username_whitelisted(
self.request,
{self.user_model.USERNAME_FIELD: 'not.' + self.user.username},
))
class TimestampTestCase(TestCase):
def test_iso8601(self):
"""
Test get_cool_off_iso8601 correctly translates datetime.timdelta to ISO 8601 formatted duration.
@ -52,6 +96,8 @@ class UtilsTestCase(TestCase):
with self.subTest(iso_duration):
self.assertEqual(get_cool_off_iso8601(delta), iso_duration)
class ClientStringTestCase(TestCase):
@override_settings(AXES_VERBOSE=True)
def test_verbose_ip_only_client_details(self):
username = 'test@example.com'
@ -166,6 +212,8 @@ class UtilsTestCase(TestCase):
self.assertEqual(expected, actual)
class UsernameTestCase(TestCase):
@override_settings(AXES_USERNAME_FORM_FIELD='username')
def test_default_get_client_username(self):
expected = 'test-username'
@ -257,6 +305,64 @@ class UtilsTestCase(TestCase):
)
class WhitelistTestCase(TestCase):
def setUp(self):
self.request = HttpRequest()
self.request.method = 'POST'
self.request.META['REMOTE_ADDR'] = '127.0.0.1'
@override_settings(AXES_IP_WHITELIST=None)
def test_ip_in_whitelist_none(self):
self.assertFalse(is_ip_address_in_whitelist('127.0.0.2'))
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
def test_ip_in_whitelist(self):
self.assertTrue(is_ip_address_in_whitelist('127.0.0.1'))
self.assertFalse(is_ip_address_in_whitelist('127.0.0.2'))
@override_settings(AXES_IP_BLACKLIST=None)
def test_ip_in_blacklist_none(self):
self.assertFalse(is_ip_address_in_blacklist('127.0.0.2'))
@override_settings(AXES_IP_BLACKLIST=['127.0.0.1'])
def test_ip_in_blacklist(self):
self.assertTrue(is_ip_address_in_blacklist('127.0.0.1'))
self.assertFalse(is_ip_address_in_blacklist('127.0.0.2'))
@override_settings(AXES_IP_BLACKLIST=['127.0.0.1'])
def test_is_client_ip_address_blacklisted_ip_in_blacklist(self):
self.assertTrue(is_client_ip_address_blacklisted(self.request))
@override_settings(AXES_IP_BLACKLIST=['127.0.0.2'])
def test_is_is_client_ip_address_blacklisted_ip_not_in_blacklist(self):
self.assertFalse(is_client_ip_address_blacklisted(self.request))
@override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
def test_is_client_ip_address_blacklisted_ip_in_whitelist(self):
self.assertFalse(is_client_ip_address_blacklisted(self.request))
@override_settings(AXES_ONLY_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.2'])
def test_is_already_locked_ip_not_in_whitelist(self):
self.assertTrue(is_client_ip_address_blacklisted(self.request))
@override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
def test_is_client_ip_address_whitelisted_never_lockout(self):
self.assertTrue(is_client_ip_address_whitelisted(self.request))
@override_settings(AXES_ONLY_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
def test_is_client_ip_address_whitelisted_only_allow(self):
self.assertTrue(is_client_ip_address_whitelisted(self.request))
@override_settings(AXES_ONLY_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.2'])
def test_is_client_ip_address_whitelisted_not(self):
self.assertFalse(is_client_ip_address_whitelisted(self.request))
class LockoutResponseTestCase(TestCase):
def setUp(self):
self.request = HttpRequest()

View file

@ -1,10 +1,11 @@
from collections import OrderedDict
from datetime import timedelta
from logging import getLogger
from typing import Optional
from typing import Optional, Type
from django.contrib.auth import get_user_model
from django.core.cache import caches, BaseCache
from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, JsonResponse
from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, JsonResponse, QueryDict
from django.shortcuts import render
from django.utils.module_loading import import_string
@ -259,7 +260,7 @@ def get_client_str(username: str, ip_address: str, user_agent: str, path_info: s
return template.format(client_dict)
def get_query_str(query: dict, max_length: int = 1024) -> str:
def get_query_str(query: Type[QueryDict], max_length: int = 1024) -> str:
"""
Turns a query dictionary into an easy-to-read list of key-value pairs.
@ -323,3 +324,87 @@ def get_lockout_response(request: HttpRequest, credentials: dict = None) -> Http
get_lockout_message(),
status=status,
)
def is_ip_address_in_whitelist(ip_address: str) -> bool:
if not settings.AXES_IP_WHITELIST:
return False
return ip_address in settings.AXES_IP_WHITELIST
def is_ip_address_in_blacklist(ip_address: str) -> bool:
if not settings.AXES_IP_BLACKLIST:
return False
return ip_address in settings.AXES_IP_BLACKLIST
def is_client_ip_address_whitelisted(request: HttpRequest):
"""
Check if the given request refers to a whitelisted IP.
"""
ip_address = get_client_ip_address(request)
if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(ip_address):
return True
if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(ip_address):
return True
return False
def is_client_ip_address_blacklisted(request: HttpRequest) -> bool:
"""
Check if the given request refers to a blacklisted IP.
"""
ip_address = get_client_ip_address(request)
if is_ip_address_in_blacklist(ip_address):
return True
if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(ip_address):
return True
return False
def is_client_method_whitelisted(request: HttpRequest) -> bool:
"""
Check if the given request uses a whitelisted method.
"""
if settings.AXES_NEVER_LOCKOUT_GET and request.method == 'GET':
return True
return False
def is_client_username_whitelisted(request: HttpRequest, credentials: dict = None) -> bool:
"""
Check if the given request or credentials refer to a whitelisted username.
A whitelisted user has the magic ``nolockout`` property set.
If the property is unknown or False or the user can not be found,
this implementation fails gracefully and returns True.
"""
username_field = getattr(get_user_model(), 'USERNAME_FIELD', 'username')
username_value = get_client_username(request, credentials)
kwargs = {
username_field: username_value
}
user_model = get_user_model()
try:
user = user_model.objects.get(**kwargs)
return user.nolockout
except (user_model.DoesNotExist, AttributeError):
pass
return False

View file

@ -236,7 +236,7 @@ These should be defined in your ``settings.py`` file.
timedelta object or an integer. If an integer, will be interpreted as a
number of hours. Default: ``None``
* ``AXES_HANDLER``: If set, overrides the default signal handler backend.
Default: ``'axes.handlers.AxesHandler'``
Default: ``'axes.handlers.database.AxesDatabaseHandler'``
* ``AXES_LOGGER``: If set, specifies a logging mechanism for Axes to use.
Default: ``'axes.watch_login'``
* ``AXES_LOCKOUT_TEMPLATE``: If set, specifies a template to render when a
@ -271,7 +271,7 @@ These should be defined in your ``settings.py`` file.
* ``AXES_NEVER_LOCKOUT_WHITELIST``: If ``True``, users can always login from whitelisted IP addresses.
Default: ``False``
* ``AXES_CLIENT_IP_ATTRIBUTE``: A string that is used to lookup and set client IP on the request object. Default: ``'axes_client_ip'``
* ``AXES_IP_BLACKLIST``: An iterable of IPs to be blacklisted. For example: ``AXES_IP_BLACKLIST = ['0.0.0.0']``. Default: ``None``
* ``AXES_IP_BLACKLIST``: An iterable of IPs to be blacklisted. Takes precedence over whitelists. For example: ``AXES_IP_BLACKLIST = ['0.0.0.0']``. Default: ``None``
* ``AXES_IP_WHITELIST``: An iterable of IPs to be whitelisted. For example: ``AXES_IP_WHITELIST = ['0.0.0.0']``. Default: ``None``
* ``AXES_DISABLE_ACCESS_LOG``: If ``True``, disable all access logging, so the admin interface will be empty. Default: ``False``
* ``AXES_DISABLE_SUCCESS_ACCESS_LOG``: If ``True``, successful logins will not be logged, so the access log shown in the admin interface will only list unsuccessful login attempts. Default: ``False``