diff --git a/axes/apps.py b/axes/apps.py index 92839ba..3d77be0 100644 --- a/axes/apps.py +++ b/axes/apps.py @@ -5,4 +5,5 @@ class AppConfig(apps.AppConfig): name = 'axes' def ready(self): - from axes import signals # pylint: disable=unused-import,unused-variable + from axes import signals + signals.ProxyHandler.initialize() diff --git a/axes/conf.py b/axes/conf.py index fc7c8a1..7e2e659 100644 --- a/axes/conf.py +++ b/axes/conf.py @@ -35,6 +35,8 @@ class MyAppConf(AppConf): DISABLE_SUCCESS_ACCESS_LOG = False + HANDLER = 'axes.handlers.AxesHandler' + LOGGER = 'axes.watch_login' LOCKOUT_TEMPLATE = None diff --git a/axes/handlers.py b/axes/handlers.py new file mode 100644 index 0000000..0845356 --- /dev/null +++ b/axes/handlers.py @@ -0,0 +1,196 @@ +import logging + +from django.utils.timezone import now + +from axes.conf import settings +from axes.attempts import get_cache_key +from axes.attempts import get_cache_timeout +from axes.attempts import get_user_attempts +from axes.attempts import is_user_lockable +from axes.attempts import ip_in_whitelist +from axes.attempts import reset_user_attempts +from axes.exceptions import AxesSignalPermissionDenied +from axes.models import AccessLog, AccessAttempt +from axes.signals import user_locked_out +from axes.utils import get_client_str +from axes.utils import query2str +from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials + + +log = logging.getLogger(settings.AXES_LOGGER) + + +class AxesHandler: # pylint: disable=too-many-locals + """ + Signal handler implementation that records user login attempts to database and locks users out if necessary. + """ + + def user_login_failed(self, sender, credentials, request, **kwargs): # pylint: disable=unused-argument + """ + When user login fails, save AccessAttempt record in database and lock user out if necessary. + + :raises AxesSignalPermissionDenied: if user should is locked out + """ + + if request is None: + log.warning('Attempt to authenticate with a custom backend failed.') + return + + ip_address = get_client_ip(request) + username = get_client_username(request, credentials) + user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] + path_info = request.META.get('PATH_INFO', '')[:255] + http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] + + if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address): + return + + failures = 0 + attempts = get_user_attempts(request, credentials) + cache_hash_key = get_cache_key(request, credentials) + cache_timeout = get_cache_timeout() + + failures_cached = get_axes_cache().get(cache_hash_key) + if failures_cached is not None: + failures = failures_cached + else: + for attempt in attempts: + failures = max(failures, attempt.failures_since_start) + + # add a failed attempt for this user + failures += 1 + get_axes_cache().set(cache_hash_key, failures, cache_timeout) + + # has already attempted, update the info + if attempts: + for attempt in attempts: + attempt.get_data = '%s\n---------\n%s' % ( + attempt.get_data, + query2str(request.GET), + ) + attempt.post_data = '%s\n---------\n%s' % ( + attempt.post_data, + query2str(request.POST) + ) + attempt.http_accept = http_accept + attempt.path_info = path_info + attempt.failures_since_start = failures + attempt.attempt_time = now() + attempt.save() + + log.info( + 'AXES: Repeated login failure by %s. Count = %d of %d', + get_client_str(username, ip_address, user_agent, path_info), + failures, + settings.AXES_FAILURE_LIMIT, + ) + else: + # Record failed attempt. Whether or not the IP address or user agent is + # used in counting failures is handled elsewhere, so we just record + # everything here. + AccessAttempt.objects.create( + user_agent=user_agent, + ip_address=ip_address, + username=username, + get_data=query2str(request.GET), + post_data=query2str(request.POST), + http_accept=http_accept, + path_info=path_info, + failures_since_start=failures, + ) + + log.info( + 'AXES: New login failure by %s. Creating access record.', + get_client_str(username, ip_address, user_agent, path_info), + ) + + # no matter what, we want to lock them out if they're past the number of + # attempts allowed, unless the user is set to notlockable + if ( + failures >= settings.AXES_FAILURE_LIMIT and + settings.AXES_LOCK_OUT_AT_FAILURE and + is_user_lockable(request, credentials) + ): + log.warning( + 'AXES: Locked out %s after repeated login failures.', + get_client_str(username, ip_address, user_agent, path_info), + ) + + # send signal when someone is locked out. + user_locked_out.send( + 'axes', + request=request, + username=username, + ip_address=ip_address, + ) + + raise AxesSignalPermissionDenied('Locked out due to repeated login failures.') + + def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument + """ + When user logs in, update the AccessLog related to the user. + """ + + username = user.get_username() + credentials = get_credentials(username) + ip_address = get_client_ip(request) + user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] + path_info = request.META.get('PATH_INFO', '')[:255] + http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] + + log.info( + 'AXES: Successful login by %s.', + get_client_str(username, ip_address, user_agent, path_info), + ) + + if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG: + AccessLog.objects.create( + user_agent=user_agent, + ip_address=ip_address, + username=username, + http_accept=http_accept, + path_info=path_info, + trusted=True, + ) + + if settings.AXES_RESET_ON_SUCCESS: + count = reset_user_attempts(request, credentials) + log.info( + 'AXES: Deleted %d failed login attempts by %s.', + count, + get_client_str(username, ip_address, user_agent, path_info), + ) + + def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument + """ + When user logs out, update the AccessLog related to the user. + """ + + log.info('AXES: Successful logout by %s.', user) + + if user and not settings.AXES_DISABLE_ACCESS_LOG: + AccessLog.objects.filter( + username=user.get_username(), + logout_time__isnull=True, + ).update( + logout_time=now(), + ) + + def post_save_access_attempt(self, instance, **kwargs): # pylint: disable=unused-argument + """ + Update cache after saving AccessAttempts. + """ + + cache_hash_key = get_cache_key(instance) + + if not get_axes_cache().get(cache_hash_key): + cache_timeout = get_cache_timeout() + get_axes_cache().set(cache_hash_key, instance.failures_since_start, cache_timeout) + + def post_delete_access_attempt(self, instance, **kwargs): # pylint: disable=unused-argument + """ + Update cache after deleting AccessAttempts. + """ + + cache_hash_key = get_cache_key(instance) + get_axes_cache().delete(cache_hash_key) diff --git a/axes/signals.py b/axes/signals.py index 906b149..bd8ba60 100644 --- a/axes/signals.py +++ b/axes/signals.py @@ -6,21 +6,10 @@ from django.contrib.auth.signals import user_login_failed from django.db.models.signals import post_save, post_delete from django.dispatch import receiver from django.dispatch import Signal -from django.utils import timezone +from django.utils.module_loading import import_string from axes.conf import settings -from axes.attempts import get_cache_key -from axes.attempts import get_cache_timeout -from axes.attempts import get_user_attempts -from axes.attempts import is_user_lockable -from axes.attempts import ip_in_whitelist -from axes.attempts import reset_user_attempts -from axes.exceptions import AxesSignalPermissionDenied -from axes.models import AccessLog, AccessAttempt -from axes.utils import get_client_str -from axes.utils import query2str -from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials - +from axes.models import AccessAttempt log = logging.getLogger(settings.AXES_LOGGER) @@ -28,164 +17,110 @@ log = logging.getLogger(settings.AXES_LOGGER) user_locked_out = Signal(providing_args=['request', 'username', 'ip_address']) +class ProxyHandler: + """ + Proxy interface for configurable Axes signal handler class. + + If you wish to implement a custom version of this handler, + you can override the settings.AXES_HANDLER configuration string + with a class that implements a compatible interface and methods. + + Defaults to using axes.handlers.AxesHandler if not overridden. + Refer to axes.handlers.AxesHandler for default implementation. + """ + + implementation = None # concrete handler that is bootstrapped by the Django application loader + + @classmethod + def initialize(cls): + """ + Fetch and initialize concrete handler implementation and memoize it to avoid reinitialization. + + This method is re-entrant and can be called multiple times. + """ + + if cls.implementation is None: + cls.implementation = import_string(settings.AXES_HANDLER)() + + @classmethod + def user_login_failed(cls, sender, credentials, request, **kwargs): + """ + Handle user login failure event. + + :param credentials: credentials used for authentication attempt + :param request: request used for failed authentication attempt + :return: None + """ + + cls.implementation.user_login_failed(sender, credentials, request, **kwargs) + + @classmethod + def user_logged_in(cls, sender, request, user, **kwargs): + """ + Handle user login event. + + :param credentials: credentials used for successful authentication + :param request: request used for successful authentication + :return: None + """ + + cls.implementation.user_logged_in(sender, request, user, **kwargs) + + @classmethod + def user_logged_out(cls, sender, request, user, **kwargs): + """ + Handle user logout event. + + :param request: request used for logout + :param user: user used for logout + :return: None + """ + + cls.implementation.user_logged_out(sender, request, user, **kwargs) + + @classmethod + def post_save_access_attempt(cls, instance, **kwargs): + """ + Handle AccessAttempt save event. + + :param instance: axes.models.AccessAttempt instance that will be saved + :return: None + """ + + cls.implementation.post_save_access_attempt(instance, **kwargs) + + @classmethod + def post_delete_access_attempt(cls, instance, **kwargs): + """ + Handle AccessAttempt delete event. + + :param instance: axes.models.AccessAttempt instance that was deleted + :return: None + """ + + cls.implementation.post_delete_access_attempt(instance, **kwargs) + + @receiver(user_login_failed) -def log_user_login_failed(sender, credentials, request, **kwargs): # pylint: disable=unused-argument - """ - When user login fails, create an AccessAttempt record. - """ - - if request is None: - log.warning('Attempt to authenticate with a custom backend failed.') - return - - ip_address = get_client_ip(request) - username = get_client_username(request, credentials) - user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] - path_info = request.META.get('PATH_INFO', '')[:255] - http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] - - if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address): - return - - failures = 0 - attempts = get_user_attempts(request, credentials) - cache_hash_key = get_cache_key(request, credentials) - cache_timeout = get_cache_timeout() - - failures_cached = get_axes_cache().get(cache_hash_key) - if failures_cached is not None: - failures = failures_cached - else: - for attempt in attempts: - failures = max(failures, attempt.failures_since_start) - - # add a failed attempt for this user - failures += 1 - get_axes_cache().set(cache_hash_key, failures, cache_timeout) - - # has already attempted, update the info - if attempts: - for attempt in attempts: - attempt.get_data = '%s\n---------\n%s' % ( - attempt.get_data, - query2str(request.GET), - ) - attempt.post_data = '%s\n---------\n%s' % ( - attempt.post_data, - query2str(request.POST) - ) - attempt.http_accept = http_accept - attempt.path_info = path_info - attempt.failures_since_start = failures - attempt.attempt_time = timezone.now() - attempt.save() - - log.info( - 'AXES: Repeated login failure by %s. Count = %d of %d', - get_client_str(username, ip_address, user_agent, path_info), - failures, - settings.AXES_FAILURE_LIMIT - ) - else: - # Record failed attempt. Whether or not the IP address or user agent is - # used in counting failures is handled elsewhere, so we just record - # everything here. - AccessAttempt.objects.create( - user_agent=user_agent, - ip_address=ip_address, - username=username, - get_data=query2str(request.GET), - post_data=query2str(request.POST), - http_accept=http_accept, - path_info=path_info, - failures_since_start=failures, - ) - - log.info( - 'AXES: New login failure by %s. Creating access record.', - get_client_str(username, ip_address, user_agent, path_info) - ) - - # no matter what, we want to lock them out if they're past the number of - # attempts allowed, unless the user is set to notlockable - if ( - failures >= settings.AXES_FAILURE_LIMIT and - settings.AXES_LOCK_OUT_AT_FAILURE and - is_user_lockable(request, credentials) - ): - log.warning( - 'AXES: locked out %s after repeated login attempts.', - get_client_str(username, ip_address, user_agent, path_info) - ) - - # send signal when someone is locked out. - user_locked_out.send( - 'axes', request=request, username=username, ip_address=ip_address - ) - - raise AxesSignalPermissionDenied('User locked out due to failed login attempts') +def handle_user_login_failed(*args, **kwargs): + ProxyHandler.user_login_failed(*args, **kwargs) @receiver(user_logged_in) -def log_user_logged_in(sender, request, user, **kwargs): # pylint: disable=unused-argument - """ - When user logs in, update the access log. - """ - - username = user.get_username() - credentials = get_credentials(username) - ip_address = get_client_ip(request) - user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] - path_info = request.META.get('PATH_INFO', '')[:255] - http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] - log.info( - 'AXES: Successful login by %s.', - get_client_str(username, ip_address, user_agent, path_info) - ) - - if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG: - AccessLog.objects.create( - user_agent=user_agent, - ip_address=ip_address, - username=username, - http_accept=http_accept, - path_info=path_info, - trusted=True, - ) - - if settings.AXES_RESET_ON_SUCCESS: - count = reset_user_attempts(request, credentials) - log.info( - 'AXES: Deleted %d failed login attempts by %s.', - count, - get_client_str(username, ip_address, user_agent, path_info) - ) +def handle_user_logged_in(*args, **kwargs): + ProxyHandler.user_logged_in(*args, **kwargs) @receiver(user_logged_out) -def log_user_logged_out(sender, request, user, **kwargs): # pylint: disable=unused-argument - """ - When user logs out, update the access log. - """ - - log.info('AXES: Successful logout by %s.', user) - - if user and not settings.AXES_DISABLE_ACCESS_LOG: - AccessLog.objects.filter( - username=user.get_username(), - logout_time__isnull=True, - ).update(logout_time=timezone.now()) +def handle_user_logged_out(*args, **kwargs): + ProxyHandler.user_logged_out(*args, **kwargs) @receiver(post_save, sender=AccessAttempt) -def update_cache_after_save(instance, **kwargs): # pylint: disable=unused-argument - cache_hash_key = get_cache_key(instance) - if not get_axes_cache().get(cache_hash_key): - cache_timeout = get_cache_timeout() - get_axes_cache().set(cache_hash_key, instance.failures_since_start, cache_timeout) +def handle_post_save_access_attempt(*args, **kwargs): + ProxyHandler.post_save_access_attempt(*args, **kwargs) @receiver(post_delete, sender=AccessAttempt) -def delete_cache_after_delete(instance, **kwargs): # pylint: disable=unused-argument - cache_hash_key = get_cache_key(instance) - get_axes_cache().delete(cache_hash_key) +def handle_post_delete_access_attempt(*args, **kwargs): + ProxyHandler.post_delete_access_attempt(*args, **kwargs) diff --git a/docs/configuration.rst b/docs/configuration.rst index 9050ceb..d7207f7 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -235,6 +235,8 @@ These should be defined in your ``settings.py`` file. old failed login attempts will be forgotten. Can be set to a python timedelta object or an integer. If an integer, will be interpreted as a number of hours. Default: ``None`` +* ``AXES_HANDLER``: If set, overrides the default signal handler backend. + Default: ``'axes.handlers.AxesHandler'`` * ``AXES_LOGGER``: If set, specifies a logging mechanism for Axes to use. Default: ``'axes.watch_login'`` * ``AXES_LOCKOUT_TEMPLATE``: If set, specifies a template to render when a