diff --git a/.coveragerc b/.coveragerc index b271c81..0df8e4b 100644 --- a/.coveragerc +++ b/.coveragerc @@ -1,2 +1,3 @@ [run] -source=axes +branch = True +source = axes diff --git a/.travis.yml b/.travis.yml index d99a75a..f5e6795 100644 --- a/.travis.yml +++ b/.travis.yml @@ -2,8 +2,6 @@ dist: xenial language: python cache: pip python: - - 2.7 - - 3.4 - 3.5 - 3.6 - 3.7 @@ -14,15 +12,7 @@ env: - DJANGO=master matrix: exclude: - - python: 2.7 - env: DJANGO=2.0 - - python: 2.7 - env: DJANGO=2.1 - - python: 2.7 - env: DJANGO=master - - python: 3.4 - env: DJANGO=2.1 - - python: 3.4 + - python: 3.5 env: DJANGO=master install: pip install tox-travis codecov script: tox diff --git a/CHANGES.txt b/CHANGES.txt index 2c55b17..776cd55 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,6 +1,37 @@ Changes ======= +5.0.0 (WIP) +----------- + +- Improve managment commands and separate commands for resetting + all access attempts, attempts by IP and attempts by username. + Add tests for the management commands for better coverage. + [aleksihakli] + +- Add a Django native authentication stack that utilizes + ``AUTHENTICATION_BACKENDS``, ``MIDDLEWARE``, and signal handlers + for tracking login attempts and implementing user lockouts. + This results in configuration changes, refer to the documentation. + [aleksihakli] + +- Remove automatic decoration of Django login views and forms. + Leave decorations available for application users who wish to + decorate their own login or other views as before. + [aleksihakli] + +- Clean up code, tests, and documentation. + [aleksihakli] + +- Drop support for Python 2.7 and Python 3.4. + [aleksihakli] + +- Drop old single-argument signature format for ``AXES_USERNAME_CALLABLE``. + [aleksihakli] + +- Improve tests and raise Codecov monitoring threshold to 90%. + [aleksihakli] + 4.5.4 (2019-01-15) ------------------ diff --git a/axes/__init__.py b/axes/__init__.py index 4b1c174..bbc0eda 100644 --- a/axes/__init__.py +++ b/axes/__init__.py @@ -1,5 +1,3 @@ -from __future__ import unicode_literals - __version__ = '4.5.4' default_app_config = 'axes.apps.AppConfig' diff --git a/axes/admin.py b/axes/admin.py index af416b3..a6c8eb5 100644 --- a/axes/admin.py +++ b/axes/admin.py @@ -1,5 +1,3 @@ -from __future__ import unicode_literals - from django.contrib import admin from django.utils.translation import gettext_lazy as _ @@ -7,6 +5,7 @@ from axes.models import AccessLog from axes.models import AccessAttempt +@admin.register(AccessAttempt) class AccessAttemptAdmin(admin.ModelAdmin): list_display = ( 'attempt_time', @@ -59,9 +58,7 @@ class AccessAttemptAdmin(admin.ModelAdmin): return False -admin.site.register(AccessAttempt, AccessAttemptAdmin) - - +@admin.register(AccessLog) class AccessLogAdmin(admin.ModelAdmin): list_display = ( 'attempt_time', @@ -109,6 +106,3 @@ class AccessLogAdmin(admin.ModelAdmin): def has_add_permission(self, request): return False - - -admin.site.register(AccessLog, AccessLogAdmin) diff --git a/axes/apps.py b/axes/apps.py index f4b641b..3d77be0 100644 --- a/axes/apps.py +++ b/axes/apps.py @@ -1,5 +1,3 @@ -from __future__ import unicode_literals - from django import apps @@ -7,23 +5,5 @@ class AppConfig(apps.AppConfig): name = 'axes' def ready(self): - from django.conf import settings - from django.core.exceptions import ImproperlyConfigured - - if settings.CACHES[getattr(settings, 'AXES_CACHE', 'default')]['BACKEND'] == \ - 'django.core.cache.backends.locmem.LocMemCache': - raise ImproperlyConfigured( - 'django-axes does not work properly with LocMemCache as the default cache backend' - ' please add e.g. a DummyCache backend for axes and configure it with AXES_CACHE' - ) - - from django.contrib.auth.views import LoginView - from django.utils.decorators import method_decorator - - from axes import signals # pylint: disable=unused-import,unused-variable - - from axes.decorators import axes_dispatch - from axes.decorators import axes_form_invalid - - LoginView.dispatch = method_decorator(axes_dispatch)(LoginView.dispatch) - LoginView.form_invalid = method_decorator(axes_form_invalid)(LoginView.form_invalid) + from axes import signals + signals.ProxyHandler.initialize() diff --git a/axes/attempts.py b/axes/attempts.py index 64ba0df..e116165 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -1,5 +1,3 @@ -from __future__ import unicode_literals - from datetime import timedelta from hashlib import md5 @@ -12,9 +10,10 @@ from axes.utils import get_axes_cache, get_client_ip, get_client_username def _query_user_attempts(request, credentials=None): - """Returns access attempt record if it exists. - Otherwise return None. """ + Return access attempt record if it exists. Otherwise return None. + """ + ip = get_client_ip(request) username = get_client_username(request, credentials) @@ -52,9 +51,11 @@ def _query_user_attempts(request, credentials=None): def get_cache_key(request_or_obj, credentials=None): """ Build cache key name from request or AccessAttempt object. + :param request_or_obj: Request or AccessAttempt object :return cache-key: String, key to be used in cache system """ + if isinstance(request_or_obj, AccessAttempt): ip = request_or_obj.ip_address un = request_or_obj.username @@ -84,7 +85,10 @@ def get_cache_key(request_or_obj, credentials=None): def get_cache_timeout(): - """Returns timeout according to COOLOFF_TIME.""" + """ + Return timeout according to COOLOFF_TIME. + """ + cache_timeout = None cool_off = settings.AXES_COOLOFF_TIME if cool_off: @@ -147,10 +151,12 @@ def ip_in_blacklist(ip): def is_user_lockable(request, credentials=None): - """Check if the user has a profile with nolockout - If so, then return the value to see if this user is special - and doesn't get their account locked out """ + Check if the user has a profile with nolockout attribute set. + + If so, then return the value to see if this user is special and does not get their account locked out. + """ + if request.method != 'POST': return True diff --git a/axes/backends.py b/axes/backends.py index d153fa3..e7ef014 100644 --- a/axes/backends.py +++ b/axes/backends.py @@ -1,23 +1,18 @@ -from __future__ import unicode_literals - from django.contrib.auth.backends import ModelBackend -from django.core.exceptions import PermissionDenied from axes.attempts import is_already_locked +from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired from axes.utils import get_credentials, get_lockout_message -class AxesModelBackend(ModelBackend): - - class RequestParameterRequired(Exception): - msg = 'AxesModelBackend requires calls to authenticate to pass `request` as an argument.' - - def __init__(self): - super(AxesModelBackend.RequestParameterRequired, self).__init__( - AxesModelBackend.RequestParameterRequired.msg) +class AxesBackend(ModelBackend): + """ + Authentication backend that forbids login attempts for locked out users. + """ def authenticate(self, request, username=None, password=None, **kwargs): - """Checks user lock out status and raises PermissionDenied if user is not allowed to log in. + """ + Check user lock out status and raises PermissionDenied if user is not allowed to log in. Inserts errors directly to `return_context` that is supplied as a keyword argument. @@ -26,15 +21,18 @@ class AxesModelBackend(ModelBackend): Note that this method does not log your user in and delegates login to other backends. - :param request: see ModelBackend.authenticate - :param kwargs: see ModelBackend.authenticate + :param request: see django.contrib.auth.backends.ModelBackend.authenticate + :param username: see django.contrib.auth.backends.ModelBackend.authenticate + :param password: see django.contrib.auth.backends.ModelBackend.authenticate + :param kwargs: see django.contrib.auth.backends.ModelBackend.authenticate :keyword response_context: context dict that will be updated with error information - :raises PermissionDenied: if user is already locked out + :raises AxesBackendRequestParameterRequired: if request parameter is not given correctly + :raises AxesBackendPermissionDenied: if user is already locked out :return: None """ if request is None: - raise AxesModelBackend.RequestParameterRequired() + raise AxesBackendRequestParameterRequired('AxesBackend requires a request as an argument to authenticate') credentials = get_credentials(username=username, password=password, **kwargs) @@ -44,6 +42,14 @@ class AxesModelBackend(ModelBackend): error_msg = get_lockout_message() response_context = kwargs.get('response_context', {}) response_context['error'] = error_msg - raise PermissionDenied(error_msg) + + # Raise an error that stops the authentication flows at django.contrib.auth.authenticate. + # This error stops bubbling up at the authenticate call which catches backend PermissionDenied errors. + # After this error is caught by authenticate it emits a signal indicating user login failed, + # which is processed by axes.signals.log_user_login_failed which logs the attempt and raises + # a second exception which bubbles up the middleware stack and produces a HTTP 403 Forbidden reply + # in the axes.middleware.AxesMiddleware.process_exception middleware exception handler. + + raise AxesBackendPermissionDenied('AxesBackend detected that the given user is locked out') # No-op diff --git a/axes/checks.py b/axes/checks.py new file mode 100644 index 0000000..2e92af1 --- /dev/null +++ b/axes/checks.py @@ -0,0 +1,54 @@ +from django.core.checks import Error, Tags, register + +from django.conf import settings + + +class Messages: + CACHE_MISSING = 'missing cache configuration for AXES_CACHE' + CACHE_INVALID = 'invalid cache configuration for settings.AXES_CACHE' + + +class Hints: + CACHE_MISSING = ( + 'django-axes needs to have a cache configured with settings.AXES_CACHE' + ) + CACHE_INVALID = ( + 'django-axes does not work properly with LocMemCache as the cache backend' + ' please add e.g. a DummyCache backend and configure it with settings.AXES_CACHE' + ) + + +class Codes: + CACHE_MISSING = 'axes.E001' + CACHE_INVALID = 'axes.E002' + + +@register(Tags.caches) +def axes_cache_backend_check(app_configs, **kwargs): # pylint: disable=unused-argument + errors = [] + + axes_cache_key = getattr(settings, 'AXES_CACHE', 'default') + axes_cache_config = settings.CACHES.get(axes_cache_key, {}) + axes_cache_backend = axes_cache_config.get('BACKEND', '') + + axes_cache_incompatible_backends = [ + 'django.core.cache.backends.locmem.LocMemCache', + ] + + if not axes_cache_config: + errors.append(Error( + msg=Messages.CACHE_MISSING, + hint=Hints.CACHE_MISSING, + obj=settings.CACHES, + id=Codes.CACHE_MISSING, + )) + + if axes_cache_backend in axes_cache_incompatible_backends: + errors.append(Error( + msg=Messages.CACHE_INVALID, + hint=Hints.CACHE_INVALID, + obj=settings.CACHES, + id=Codes.CACHE_INVALID, + )) + + return errors diff --git a/axes/conf.py b/axes/conf.py index 4c8d0b0..7e2e659 100644 --- a/axes/conf.py +++ b/axes/conf.py @@ -1,5 +1,3 @@ -from __future__ import unicode_literals - from django.conf import settings from django.utils.translation import gettext_lazy as _ @@ -37,6 +35,8 @@ class MyAppConf(AppConf): DISABLE_SUCCESS_ACCESS_LOG = False + HANDLER = 'axes.handlers.AxesHandler' + LOGGER = 'axes.watch_login' LOCKOUT_TEMPLATE = None diff --git a/axes/decorators.py b/axes/decorators.py index 7a50857..918edd6 100644 --- a/axes/decorators.py +++ b/axes/decorators.py @@ -1,18 +1,10 @@ -from __future__ import unicode_literals - -from datetime import timedelta from functools import wraps -import json import logging -from django.http import HttpResponse -from django.http import HttpResponseRedirect -from django.shortcuts import render - from axes import get_version from axes.conf import settings from axes.attempts import is_already_locked -from axes.utils import iso8601, get_client_username, get_lockout_message +from axes.utils import get_lockout_response log = logging.getLogger(settings.AXES_LOGGER) if settings.AXES_VERBOSE: @@ -29,7 +21,7 @@ if settings.AXES_VERBOSE: def axes_dispatch(func): def inner(request, *args, **kwargs): if is_already_locked(request): - return lockout_response(request) + return get_lockout_response(request) return func(request, *args, **kwargs) @@ -40,41 +32,8 @@ def axes_form_invalid(func): @wraps(func) def inner(self, *args, **kwargs): if is_already_locked(self.request): - return lockout_response(self.request) + return get_lockout_response(self.request) return func(self, *args, **kwargs) return inner - - -def lockout_response(request): - context = { - 'failure_limit': settings.AXES_FAILURE_LIMIT, - 'username': get_client_username(request) or '' - } - - cool_off = settings.AXES_COOLOFF_TIME - if cool_off: - if isinstance(cool_off, (int, float)): - cool_off = timedelta(hours=cool_off) - - context.update({ - 'cooloff_time': iso8601(cool_off) - }) - - if request.is_ajax(): - return HttpResponse( - json.dumps(context), - content_type='application/json', - status=403, - ) - - if settings.AXES_LOCKOUT_TEMPLATE: - return render( - request, settings.AXES_LOCKOUT_TEMPLATE, context, status=403 - ) - - if settings.AXES_LOCKOUT_URL: - return HttpResponseRedirect(settings.AXES_LOCKOUT_URL) - - return HttpResponse(get_lockout_message(), status=403) diff --git a/axes/exceptions.py b/axes/exceptions.py new file mode 100644 index 0000000..75fb841 --- /dev/null +++ b/axes/exceptions.py @@ -0,0 +1,38 @@ +from django.core.exceptions import PermissionDenied + + +class AxesPermissionDenied(PermissionDenied): + """ + Base class for permission denied errors raised by axes specifically for easier debugging. + + Two different types of errors are used because of the behaviour Django has: + + - If an authentication backend raises a PermissionDenied error the authentication flow is aborted. + - If another component raises a PermissionDenied error a HTTP 403 Forbidden response is returned. + """ + + pass + + +class AxesSignalPermissionDenied(AxesPermissionDenied): + """ + Raised by signal handler on failed authentication attempts to send user a HTTP 403 Forbidden status code. + """ + + pass + + +class AxesBackendPermissionDenied(AxesPermissionDenied): + """ + Raised by authentication backend on locked out requests to stop the Django authentication flow. + """ + + pass + + +class AxesBackendRequestParameterRequired(ValueError): + """ + Raised by authentication backend on invalid or missing request parameter value. + """ + + pass diff --git a/axes/handlers.py b/axes/handlers.py new file mode 100644 index 0000000..0845356 --- /dev/null +++ b/axes/handlers.py @@ -0,0 +1,196 @@ +import logging + +from django.utils.timezone import now + +from axes.conf import settings +from axes.attempts import get_cache_key +from axes.attempts import get_cache_timeout +from axes.attempts import get_user_attempts +from axes.attempts import is_user_lockable +from axes.attempts import ip_in_whitelist +from axes.attempts import reset_user_attempts +from axes.exceptions import AxesSignalPermissionDenied +from axes.models import AccessLog, AccessAttempt +from axes.signals import user_locked_out +from axes.utils import get_client_str +from axes.utils import query2str +from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials + + +log = logging.getLogger(settings.AXES_LOGGER) + + +class AxesHandler: # pylint: disable=too-many-locals + """ + Signal handler implementation that records user login attempts to database and locks users out if necessary. + """ + + def user_login_failed(self, sender, credentials, request, **kwargs): # pylint: disable=unused-argument + """ + When user login fails, save AccessAttempt record in database and lock user out if necessary. + + :raises AxesSignalPermissionDenied: if user should is locked out + """ + + if request is None: + log.warning('Attempt to authenticate with a custom backend failed.') + return + + ip_address = get_client_ip(request) + username = get_client_username(request, credentials) + user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] + path_info = request.META.get('PATH_INFO', '')[:255] + http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] + + if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address): + return + + failures = 0 + attempts = get_user_attempts(request, credentials) + cache_hash_key = get_cache_key(request, credentials) + cache_timeout = get_cache_timeout() + + failures_cached = get_axes_cache().get(cache_hash_key) + if failures_cached is not None: + failures = failures_cached + else: + for attempt in attempts: + failures = max(failures, attempt.failures_since_start) + + # add a failed attempt for this user + failures += 1 + get_axes_cache().set(cache_hash_key, failures, cache_timeout) + + # has already attempted, update the info + if attempts: + for attempt in attempts: + attempt.get_data = '%s\n---------\n%s' % ( + attempt.get_data, + query2str(request.GET), + ) + attempt.post_data = '%s\n---------\n%s' % ( + attempt.post_data, + query2str(request.POST) + ) + attempt.http_accept = http_accept + attempt.path_info = path_info + attempt.failures_since_start = failures + attempt.attempt_time = now() + attempt.save() + + log.info( + 'AXES: Repeated login failure by %s. Count = %d of %d', + get_client_str(username, ip_address, user_agent, path_info), + failures, + settings.AXES_FAILURE_LIMIT, + ) + else: + # Record failed attempt. Whether or not the IP address or user agent is + # used in counting failures is handled elsewhere, so we just record + # everything here. + AccessAttempt.objects.create( + user_agent=user_agent, + ip_address=ip_address, + username=username, + get_data=query2str(request.GET), + post_data=query2str(request.POST), + http_accept=http_accept, + path_info=path_info, + failures_since_start=failures, + ) + + log.info( + 'AXES: New login failure by %s. Creating access record.', + get_client_str(username, ip_address, user_agent, path_info), + ) + + # no matter what, we want to lock them out if they're past the number of + # attempts allowed, unless the user is set to notlockable + if ( + failures >= settings.AXES_FAILURE_LIMIT and + settings.AXES_LOCK_OUT_AT_FAILURE and + is_user_lockable(request, credentials) + ): + log.warning( + 'AXES: Locked out %s after repeated login failures.', + get_client_str(username, ip_address, user_agent, path_info), + ) + + # send signal when someone is locked out. + user_locked_out.send( + 'axes', + request=request, + username=username, + ip_address=ip_address, + ) + + raise AxesSignalPermissionDenied('Locked out due to repeated login failures.') + + def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument + """ + When user logs in, update the AccessLog related to the user. + """ + + username = user.get_username() + credentials = get_credentials(username) + ip_address = get_client_ip(request) + user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] + path_info = request.META.get('PATH_INFO', '')[:255] + http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] + + log.info( + 'AXES: Successful login by %s.', + get_client_str(username, ip_address, user_agent, path_info), + ) + + if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG: + AccessLog.objects.create( + user_agent=user_agent, + ip_address=ip_address, + username=username, + http_accept=http_accept, + path_info=path_info, + trusted=True, + ) + + if settings.AXES_RESET_ON_SUCCESS: + count = reset_user_attempts(request, credentials) + log.info( + 'AXES: Deleted %d failed login attempts by %s.', + count, + get_client_str(username, ip_address, user_agent, path_info), + ) + + def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument + """ + When user logs out, update the AccessLog related to the user. + """ + + log.info('AXES: Successful logout by %s.', user) + + if user and not settings.AXES_DISABLE_ACCESS_LOG: + AccessLog.objects.filter( + username=user.get_username(), + logout_time__isnull=True, + ).update( + logout_time=now(), + ) + + def post_save_access_attempt(self, instance, **kwargs): # pylint: disable=unused-argument + """ + Update cache after saving AccessAttempts. + """ + + cache_hash_key = get_cache_key(instance) + + if not get_axes_cache().get(cache_hash_key): + cache_timeout = get_cache_timeout() + get_axes_cache().set(cache_hash_key, instance.failures_since_start, cache_timeout) + + def post_delete_access_attempt(self, instance, **kwargs): # pylint: disable=unused-argument + """ + Update cache after deleting AccessAttempts. + """ + + cache_hash_key = get_cache_key(instance) + get_axes_cache().delete(cache_hash_key) diff --git a/axes/management/__init__.py b/axes/management/__init__.py index baffc48..e69de29 100644 --- a/axes/management/__init__.py +++ b/axes/management/__init__.py @@ -1 +0,0 @@ -from __future__ import unicode_literals diff --git a/axes/management/commands/__init__.py b/axes/management/commands/__init__.py index baffc48..e69de29 100644 --- a/axes/management/commands/__init__.py +++ b/axes/management/commands/__init__.py @@ -1 +0,0 @@ -from __future__ import unicode_literals diff --git a/axes/management/commands/axes_list_attempts.py b/axes/management/commands/axes_list_attempts.py index f182c35..b9bd6df 100644 --- a/axes/management/commands/axes_list_attempts.py +++ b/axes/management/commands/axes_list_attempts.py @@ -1,15 +1,12 @@ -from __future__ import unicode_literals - from django.core.management.base import BaseCommand from axes.models import AccessAttempt class Command(BaseCommand): - args = '' - help = ('List registered login attempts') + help = 'List access attempts' - def handle(self, *args, **kwargs): # pylint: disable=unused-argument + def handle(self, *args, **options): # pylint: disable=unused-argument for obj in AccessAttempt.objects.all(): self.stdout.write('{ip}\t{username}\t{failures}'.format( ip=obj.ip_address, diff --git a/axes/management/commands/axes_reset.py b/axes/management/commands/axes_reset.py index cb1a548..f28c9f4 100644 --- a/axes/management/commands/axes_reset.py +++ b/axes/management/commands/axes_reset.py @@ -1,27 +1,15 @@ -from __future__ import unicode_literals - from django.core.management.base import BaseCommand from axes.utils import reset class Command(BaseCommand): - help = ("resets any lockouts or failed login records. If called with an " - "IP, resets only for that IP") + help = 'Reset all access attempts and lockouts' - def add_arguments(self, parser): - parser.add_argument('ip', nargs='*') + def handle(self, *args, **options): # pylint: disable=unused-argument + count = reset() - def handle(self, *args, **kwargs): - count = 0 - if kwargs and kwargs.get('ip'): - for ip in kwargs['ip'][1:]: - count += reset(ip=ip) + if count: + self.stdout.write('{0} attempts removed.'.format(count)) else: - count = reset() - - if kwargs['verbosity']: - if count: - self.stdout.write('{0} attempts removed.'.format(count)) - else: - self.stdout.write('No attempts found.') + self.stdout.write('No attempts found.') diff --git a/axes/management/commands/axes_reset_ip.py b/axes/management/commands/axes_reset_ip.py new file mode 100644 index 0000000..657a1ac --- /dev/null +++ b/axes/management/commands/axes_reset_ip.py @@ -0,0 +1,21 @@ +from django.core.management.base import BaseCommand + +from axes.utils import reset + + +class Command(BaseCommand): + help = 'Reset all access attempts and lockouts for given IP addresses' + + def add_arguments(self, parser): + parser.add_argument('ip', nargs='+', type=str) + + def handle(self, *args, **options): + count = 0 + + for ip in options['ip']: + count += reset(ip=ip) + + if count: + self.stdout.write('{0} attempts removed.'.format(count)) + else: + self.stdout.write('No attempts found.') diff --git a/axes/management/commands/axes_reset_user.py b/axes/management/commands/axes_reset_user.py deleted file mode 100644 index 10562de..0000000 --- a/axes/management/commands/axes_reset_user.py +++ /dev/null @@ -1,22 +0,0 @@ -from __future__ import unicode_literals - -from django.core.management.base import BaseCommand - -from axes.utils import reset - - -class Command(BaseCommand): - help = ("Resets any lockouts or failed login records. If called with an " - "User name, resets only for that User name") - - def add_arguments(self, parser): - parser.add_argument('username') - - def handle(self, *args, **kwargs): - count = 0 - count += reset(username=kwargs['username']) - if kwargs['verbosity']: - if count: - self.stdout.write('{0} attempts removed.'.format(count)) - else: - self.stdout.write('No attempts found.') diff --git a/axes/management/commands/axes_reset_username.py b/axes/management/commands/axes_reset_username.py new file mode 100644 index 0000000..2e6ec7c --- /dev/null +++ b/axes/management/commands/axes_reset_username.py @@ -0,0 +1,21 @@ +from django.core.management.base import BaseCommand + +from axes.utils import reset + + +class Command(BaseCommand): + help = 'Reset all access attempts and lockouts for given usernames' + + def add_arguments(self, parser): + parser.add_argument('username', nargs='+', type=str) + + def handle(self, *args, **options): + count = 0 + + for username in options['username']: + count += reset(username=username) + + if count: + self.stdout.write('{0} attempts removed.'.format(count)) + else: + self.stdout.write('No attempts found.') diff --git a/axes/middleware.py b/axes/middleware.py new file mode 100644 index 0000000..ca3d822 --- /dev/null +++ b/axes/middleware.py @@ -0,0 +1,39 @@ +from axes.exceptions import AxesSignalPermissionDenied +from axes.utils import get_lockout_response + + +class AxesMiddleware: + """ + Middleware that maps lockout signals into readable HTTP 403 Forbidden responses. + + Without this middleware the backend returns HTTP 403 errors with the + django.views.defaults.permission_denied view that renders the 403.html + template from the root template directory if found. + + Refer to the Django documentation for further information: + + https://docs.djangoproject.com/en/dev/ref/views/#the-403-http-forbidden-view + + To customize the error rendering, you can for example inherit this middleware + and change the process_exception handler to your own liking. + """ + + def __init__(self, get_response): + self.get_response = get_response + + def __call__(self, request): + return self.get_response(request) + + def process_exception(self, request, exception): # pylint: disable=inconsistent-return-statements + """ + Exception handler that processes exceptions raised by the axes signal handler when request fails with login. + + Refer to axes.signals.log_user_login_failed for the error code. + + :param request: HTTPRequest that will be locked out. + :param exception: Exception raised by Django views or signals. Only AxesSignalPermissionDenied will be handled. + :return: HTTPResponse that indicates the lockout or None. + """ + + if isinstance(exception, AxesSignalPermissionDenied): + return get_lockout_response(request) diff --git a/axes/migrations/0001_initial.py b/axes/migrations/0001_initial.py index 24aa82c..5c5d510 100644 --- a/axes/migrations/0001_initial.py +++ b/axes/migrations/0001_initial.py @@ -1,6 +1,3 @@ -# -*- coding: utf-8 -*- -from __future__ import unicode_literals - from django.db import migrations, models diff --git a/axes/migrations/0002_auto_20151217_2044.py b/axes/migrations/0002_auto_20151217_2044.py index a6cbc12..0d880d0 100644 --- a/axes/migrations/0002_auto_20151217_2044.py +++ b/axes/migrations/0002_auto_20151217_2044.py @@ -1,6 +1,3 @@ -# -*- coding: utf-8 -*- -from __future__ import unicode_literals - from django.db import migrations, models diff --git a/axes/migrations/0003_auto_20160322_0929.py b/axes/migrations/0003_auto_20160322_0929.py index b47d693..e337e54 100644 --- a/axes/migrations/0003_auto_20160322_0929.py +++ b/axes/migrations/0003_auto_20160322_0929.py @@ -1,6 +1,3 @@ -# -*- coding: utf-8 -*- -from __future__ import unicode_literals - from django.db import models, migrations diff --git a/axes/migrations/0004_auto_20181024_1538.py b/axes/migrations/0004_auto_20181024_1538.py index 266f5d0..215b18d 100644 --- a/axes/migrations/0004_auto_20181024_1538.py +++ b/axes/migrations/0004_auto_20181024_1538.py @@ -1,7 +1,3 @@ -# -*- coding: utf-8 -*- -# Generated by Django 1.11.16 on 2018-10-24 22:38 -from __future__ import unicode_literals - from django.db import migrations, models diff --git a/axes/migrations/0005_remove_accessattempt_trusted.py b/axes/migrations/0005_remove_accessattempt_trusted.py index 10db717..9e004a0 100644 --- a/axes/migrations/0005_remove_accessattempt_trusted.py +++ b/axes/migrations/0005_remove_accessattempt_trusted.py @@ -1,5 +1,3 @@ -# Generated by Django 2.1.4 on 2018-12-23 09:03 - from django.db import migrations diff --git a/axes/models.py b/axes/models.py index 2a6c1de..cceff60 100644 --- a/axes/models.py +++ b/axes/models.py @@ -1,5 +1,3 @@ -from __future__ import unicode_literals - from django.db import models from django.utils.translation import gettext_lazy as _ diff --git a/axes/signals.py b/axes/signals.py index be2bbea..bd8ba60 100644 --- a/axes/signals.py +++ b/axes/signals.py @@ -1,5 +1,3 @@ -from __future__ import unicode_literals - import logging from django.contrib.auth.signals import user_logged_in @@ -8,20 +6,10 @@ from django.contrib.auth.signals import user_login_failed from django.db.models.signals import post_save, post_delete from django.dispatch import receiver from django.dispatch import Signal -from django.utils import timezone +from django.utils.module_loading import import_string from axes.conf import settings -from axes.attempts import get_cache_key -from axes.attempts import get_cache_timeout -from axes.attempts import get_user_attempts -from axes.attempts import is_user_lockable -from axes.attempts import ip_in_whitelist -from axes.attempts import reset_user_attempts -from axes.models import AccessLog, AccessAttempt -from axes.utils import get_client_str -from axes.utils import query2str -from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials - +from axes.models import AccessAttempt log = logging.getLogger(settings.AXES_LOGGER) @@ -29,156 +17,110 @@ log = logging.getLogger(settings.AXES_LOGGER) user_locked_out = Signal(providing_args=['request', 'username', 'ip_address']) -@receiver(user_login_failed) -def log_user_login_failed(sender, credentials, request, **kwargs): # pylint: disable=unused-argument - """ Create an AccessAttempt record if the login wasn't successful +class ProxyHandler: """ - if request is None: - log.warning('Attempt to authenticate with a custom backend failed.') - return + Proxy interface for configurable Axes signal handler class. - ip_address = get_client_ip(request) - username = get_client_username(request, credentials) - user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] - path_info = request.META.get('PATH_INFO', '')[:255] - http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] + If you wish to implement a custom version of this handler, + you can override the settings.AXES_HANDLER configuration string + with a class that implements a compatible interface and methods. - if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address): - return + Defaults to using axes.handlers.AxesHandler if not overridden. + Refer to axes.handlers.AxesHandler for default implementation. + """ - failures = 0 - attempts = get_user_attempts(request, credentials) - cache_hash_key = get_cache_key(request, credentials) - cache_timeout = get_cache_timeout() + implementation = None # concrete handler that is bootstrapped by the Django application loader - failures_cached = get_axes_cache().get(cache_hash_key) - if failures_cached is not None: - failures = failures_cached - else: - for attempt in attempts: - failures = max(failures, attempt.failures_since_start) + @classmethod + def initialize(cls): + """ + Fetch and initialize concrete handler implementation and memoize it to avoid reinitialization. - # add a failed attempt for this user - failures += 1 - get_axes_cache().set(cache_hash_key, failures, cache_timeout) + This method is re-entrant and can be called multiple times. + """ - # has already attempted, update the info - if attempts: - for attempt in attempts: - attempt.get_data = '%s\n---------\n%s' % ( - attempt.get_data, - query2str(request.GET), - ) - attempt.post_data = '%s\n---------\n%s' % ( - attempt.post_data, - query2str(request.POST) - ) - attempt.http_accept = http_accept - attempt.path_info = path_info - attempt.failures_since_start = failures - attempt.attempt_time = timezone.now() - attempt.save() + if cls.implementation is None: + cls.implementation = import_string(settings.AXES_HANDLER)() - log.info( - 'AXES: Repeated login failure by %s. Count = %d of %d', - get_client_str(username, ip_address, user_agent, path_info), - failures, - settings.AXES_FAILURE_LIMIT - ) - else: - # Record failed attempt. Whether or not the IP address or user agent is - # used in counting failures is handled elsewhere, so we just record - # everything here. - AccessAttempt.objects.create( - user_agent=user_agent, - ip_address=ip_address, - username=username, - get_data=query2str(request.GET), - post_data=query2str(request.POST), - http_accept=http_accept, - path_info=path_info, - failures_since_start=failures, - ) + @classmethod + def user_login_failed(cls, sender, credentials, request, **kwargs): + """ + Handle user login failure event. - log.info( - 'AXES: New login failure by %s. Creating access record.', - get_client_str(username, ip_address, user_agent, path_info) - ) + :param credentials: credentials used for authentication attempt + :param request: request used for failed authentication attempt + :return: None + """ - # no matter what, we want to lock them out if they're past the number of - # attempts allowed, unless the user is set to notlockable - if ( - failures >= settings.AXES_FAILURE_LIMIT and - settings.AXES_LOCK_OUT_AT_FAILURE and - is_user_lockable(request, credentials) - ): - log.warning( - 'AXES: locked out %s after repeated login attempts.', - get_client_str(username, ip_address, user_agent, path_info) - ) + cls.implementation.user_login_failed(sender, credentials, request, **kwargs) - # send signal when someone is locked out. - user_locked_out.send( - 'axes', request=request, username=username, ip_address=ip_address - ) + @classmethod + def user_logged_in(cls, sender, request, user, **kwargs): + """ + Handle user login event. + + :param credentials: credentials used for successful authentication + :param request: request used for successful authentication + :return: None + """ + + cls.implementation.user_logged_in(sender, request, user, **kwargs) + + @classmethod + def user_logged_out(cls, sender, request, user, **kwargs): + """ + Handle user logout event. + + :param request: request used for logout + :param user: user used for logout + :return: None + """ + + cls.implementation.user_logged_out(sender, request, user, **kwargs) + + @classmethod + def post_save_access_attempt(cls, instance, **kwargs): + """ + Handle AccessAttempt save event. + + :param instance: axes.models.AccessAttempt instance that will be saved + :return: None + """ + + cls.implementation.post_save_access_attempt(instance, **kwargs) + + @classmethod + def post_delete_access_attempt(cls, instance, **kwargs): + """ + Handle AccessAttempt delete event. + + :param instance: axes.models.AccessAttempt instance that was deleted + :return: None + """ + + cls.implementation.post_delete_access_attempt(instance, **kwargs) + + +@receiver(user_login_failed) +def handle_user_login_failed(*args, **kwargs): + ProxyHandler.user_login_failed(*args, **kwargs) @receiver(user_logged_in) -def log_user_logged_in(sender, request, user, **kwargs): # pylint: disable=unused-argument - """ When a user logs in, update the access log - """ - username = user.get_username() - credentials = get_credentials(username) - ip_address = get_client_ip(request) - user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] - path_info = request.META.get('PATH_INFO', '')[:255] - http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] - log.info( - 'AXES: Successful login by %s.', - get_client_str(username, ip_address, user_agent, path_info) - ) - - if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG: - AccessLog.objects.create( - user_agent=user_agent, - ip_address=ip_address, - username=username, - http_accept=http_accept, - path_info=path_info, - trusted=True, - ) - - if settings.AXES_RESET_ON_SUCCESS: - count = reset_user_attempts(request, credentials) - log.info( - 'AXES: Deleted %d failed login attempts by %s.', - count, - get_client_str(username, ip_address, user_agent, path_info) - ) +def handle_user_logged_in(*args, **kwargs): + ProxyHandler.user_logged_in(*args, **kwargs) @receiver(user_logged_out) -def log_user_logged_out(sender, request, user, **kwargs): # pylint: disable=unused-argument - """ When a user logs out, update the access log - """ - log.info('AXES: Successful logout by %s.', user) - - if user and not settings.AXES_DISABLE_ACCESS_LOG: - AccessLog.objects.filter( - username=user.get_username(), - logout_time__isnull=True, - ).update(logout_time=timezone.now()) +def handle_user_logged_out(*args, **kwargs): + ProxyHandler.user_logged_out(*args, **kwargs) @receiver(post_save, sender=AccessAttempt) -def update_cache_after_save(instance, **kwargs): # pylint: disable=unused-argument - cache_hash_key = get_cache_key(instance) - if not get_axes_cache().get(cache_hash_key): - cache_timeout = get_cache_timeout() - get_axes_cache().set(cache_hash_key, instance.failures_since_start, cache_timeout) +def handle_post_save_access_attempt(*args, **kwargs): + ProxyHandler.post_save_access_attempt(*args, **kwargs) @receiver(post_delete, sender=AccessAttempt) -def delete_cache_after_delete(instance, **kwargs): # pylint: disable=unused-argument - cache_hash_key = get_cache_key(instance) - get_axes_cache().delete(cache_hash_key) +def handle_post_delete_access_attempt(*args, **kwargs): + ProxyHandler.post_delete_access_attempt(*args, **kwargs) diff --git a/axes/test_settings.py b/axes/test_settings.py index f552dd0..770832a 100644 --- a/axes/test_settings.py +++ b/axes/test_settings.py @@ -1,5 +1,3 @@ -from __future__ import unicode_literals - DATABASES = { 'default': { 'ENGINE': 'django.db.backends.sqlite3', @@ -15,29 +13,37 @@ CACHES = { SITE_ID = 1 -MIDDLEWARE = ( +MIDDLEWARE = [ 'django.middleware.common.CommonMiddleware', 'django.contrib.sessions.middleware.SessionMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', -) -AUTHENTICATION_BACKENDS = ( - 'axes.backends.AxesModelBackend', + 'axes.middleware.AxesMiddleware', +] + +AUTHENTICATION_BACKENDS = [ + 'axes.backends.AxesBackend', + 'django.contrib.auth.backends.ModelBackend', -) +] + +PASSWORD_HASHERS = [ + 'django.contrib.auth.hashers.MD5PasswordHasher', +] ROOT_URLCONF = 'axes.test_urls' -INSTALLED_APPS = ( +INSTALLED_APPS = [ 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', 'django.contrib.sites', 'django.contrib.messages', 'django.contrib.admin', + 'axes', -) +] TEMPLATES = [ { diff --git a/axes/test_settings_cache.py b/axes/test_settings_cache.py deleted file mode 100644 index 88bb0b6..0000000 --- a/axes/test_settings_cache.py +++ /dev/null @@ -1,11 +0,0 @@ -from __future__ import unicode_literals - -from .test_settings import * # pylint: disable=unused-wildcard-import - -AXES_CACHE = 'axes' - -CACHES = { - 'axes': { - 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache' - } -} diff --git a/axes/test_urls.py b/axes/test_urls.py index e417cfa..d52422e 100644 --- a/axes/test_urls.py +++ b/axes/test_urls.py @@ -1,8 +1,7 @@ -from __future__ import unicode_literals - from django.conf.urls import url from django.contrib import admin + urlpatterns = [ url(r'^admin/', admin.site.urls), ] diff --git a/axes/tests/__init__.py b/axes/tests/__init__.py index baffc48..e69de29 100644 --- a/axes/tests/__init__.py +++ b/axes/tests/__init__.py @@ -1 +0,0 @@ -from __future__ import unicode_literals diff --git a/axes/tests/compatibility.py b/axes/tests/compatibility.py deleted file mode 100644 index 2e5c08b..0000000 --- a/axes/tests/compatibility.py +++ /dev/null @@ -1,6 +0,0 @@ -from __future__ import unicode_literals - -try: - from unittest.mock import patch # pylint: disable=unused-import -except ImportError: - from mock import patch # pylint: disable=unused-import diff --git a/axes/tests/test_access_attempt.py b/axes/tests/test_access_attempt.py index 5cdcc44..bcca28f 100644 --- a/axes/tests/test_access_attempt.py +++ b/axes/tests/test_access_attempt.py @@ -1,11 +1,9 @@ -from __future__ import unicode_literals - import datetime import hashlib -import json import random import string import time +from unittest.mock import patch from django.http import HttpRequest from django.test import TestCase, override_settings @@ -18,24 +16,28 @@ from axes.conf import settings from axes.attempts import get_cache_key from axes.models import AccessAttempt, AccessLog from axes.signals import user_locked_out -from axes.tests.compatibility import patch from axes.utils import reset @override_settings(AXES_COOLOFF_TIME=datetime.timedelta(seconds=2)) class AccessAttemptTest(TestCase): - """Test case using custom settings for testing """ + Test case using custom settings for testing. + """ + VALID_USERNAME = 'valid-username' VALID_PASSWORD = 'valid-password' LOCKED_MESSAGE = 'Account locked: too many login attempts.' LOGIN_FORM_KEY = '' - def _login(self, is_valid_username=False, is_valid_password=False, - is_json=False, **kwargs): - """Login a user. A valid credential is used when is_valid_username is True, + def _login(self, is_valid_username=False, is_valid_password=False, **kwargs): + """ + Login a user. + + A valid credential is used when is_valid_username is True, otherwise it will use a random string to make a failed login. """ + if is_valid_username: # Use a valid username username = self.VALID_USERNAME @@ -59,22 +61,17 @@ class AccessAttemptTest(TestCase): } post_data.update(kwargs) - if is_json: - headers.update({ - 'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest', - 'content_type': 'application/json', - }) - post_data = json.dumps(post_data) - - response = self.client.post( - reverse('admin:login'), post_data, **headers + return self.client.post( + reverse('admin:login'), + post_data, + **headers ) - return response - def setUp(self): - """Create a valid user for login """ + Create a valid user for login. + """ + self.user = User.objects.create_superuser( username=self.VALID_USERNAME, email='test@example.com', @@ -82,9 +79,10 @@ class AccessAttemptTest(TestCase): ) def test_failure_limit_once(self): - """Tests the login lock trying to login one more time - than failure limit """ + Test the login lock trying to login one more time than failure limit. + """ + # test until one try before the limit for _ in range(1, settings.AXES_FAILURE_LIMIT): response = self._login() @@ -97,9 +95,10 @@ class AccessAttemptTest(TestCase): self.assertContains(response, self.LOCKED_MESSAGE, status_code=403) def test_failure_limit_many(self): - """Tests the login lock trying to login a lot of times more - than failure limit """ + Test the login lock trying to login a lot of times more than failure limit. + """ + for _ in range(1, settings.AXES_FAILURE_LIMIT): response = self._login() # Check if we are in the same login page @@ -112,14 +111,18 @@ class AccessAttemptTest(TestCase): self.assertContains(response, self.LOCKED_MESSAGE, status_code=403) def test_valid_login(self): - """Tests a valid login for a real username """ + Test a valid login for a real username. + """ + response = self._login(is_valid_username=True, is_valid_password=True) self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True) def test_valid_logout(self): - """Tests a valid logout and make sure the logout_time is updated """ + Test a valid logout and make sure the logout_time is updated. + """ + response = self._login(is_valid_username=True, is_valid_password=True) self.assertEqual(AccessLog.objects.latest('id').logout_time, None) @@ -128,8 +131,10 @@ class AccessAttemptTest(TestCase): self.assertContains(response, 'Logged out') def test_cooling_off(self): - """Tests if the cooling time allows a user to login """ + Test if the cooling time allows a user to login. + """ + self.test_failure_limit_once() # Wait for the cooling off period @@ -139,8 +144,10 @@ class AccessAttemptTest(TestCase): self.test_valid_login() def test_cooling_off_for_trusted_user(self): - """Test the cooling time for a trusted user """ + Test the cooling time for a trusted user. + """ + # Test successful login-logout, this makes the user trusted. self.test_valid_logout() @@ -148,8 +155,10 @@ class AccessAttemptTest(TestCase): self.test_cooling_off() def test_long_user_agent_valid(self): - """Tests if can handle a long user agent """ + Test if can handle a long user agent. + """ + long_user_agent = 'ie6' * 1024 response = self._login( is_valid_username=True, @@ -159,8 +168,10 @@ class AccessAttemptTest(TestCase): self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True) def test_long_user_agent_not_valid(self): - """Tests if can handle a long user agent with failure """ + Test if can handle a long user agent with failure. + """ + long_user_agent = 'ie6' * 1024 for _ in range(settings.AXES_FAILURE_LIMIT + 1): response = self._login(user_agent=long_user_agent) @@ -168,8 +179,10 @@ class AccessAttemptTest(TestCase): self.assertContains(response, self.LOCKED_MESSAGE, status_code=403) def test_reset_ip(self): - """Tests if can reset an ip address """ + Test resetting all attempts for an IP address. + """ + # Make a lockout self.test_failure_limit_once() @@ -180,8 +193,10 @@ class AccessAttemptTest(TestCase): self.test_valid_login() def test_reset_all(self): - """Tests if can reset all attempts """ + Test resetting all attempts. + """ + # Make a lockout self.test_failure_limit_once() @@ -193,7 +208,10 @@ class AccessAttemptTest(TestCase): @patch('axes.utils.get_client_ip', return_value='127.0.0.1') def test_get_cache_key(self, _): - """ Test the cache key format""" + """ + Test the cache key format. + """ + # Getting cache key from request ip_address = '127.0.0.1' cache_hash_key = 'axes-{}'.format( @@ -201,11 +219,13 @@ class AccessAttemptTest(TestCase): ) request_factory = RequestFactory() - request = request_factory.post('/admin/login/', - data={ - 'username': self.VALID_USERNAME, - 'password': 'test' - }) + request = request_factory.post( + '/admin/login/', + data={ + 'username': self.VALID_USERNAME, + 'password': 'test', + }, + ) self.assertEqual(cache_hash_key, get_cache_key(request)) @@ -220,12 +240,15 @@ class AccessAttemptTest(TestCase): path_info=request.META.get('PATH_INFO', ''), failures_since_start=0, ) - self.assertEqual(cache_hash_key, get_cache_key(attempt)) + self.assertEqual(cache_hash_key, get_cache_key(attempt)) @patch('axes.utils.get_client_ip', return_value='127.0.0.1') def test_get_cache_key_credentials(self, _): - """ Test the cache key format""" + """ + Test the cache key format. + """ + # Getting cache key from request ip_address = '127.0.0.1' cache_hash_key = 'axes-{}'.format( @@ -258,8 +281,10 @@ class AccessAttemptTest(TestCase): self.assertEqual(cache_hash_key, get_cache_key(attempt)) def test_send_lockout_signal(self): - """Test if the lockout signal is emitted """ + Test if the lockout signal is emitted. + """ + # this "hack" is needed so we don't have to use global variables or python3 features class Scope(object): pass scope = Scope() @@ -284,9 +309,10 @@ class AccessAttemptTest(TestCase): @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) def test_lockout_by_combination_user_and_ip(self): - """Tests the login lock with a valid username and invalid password - when AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP is True """ + Test login failure when AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP is True. + """ + # test until one try before the limit for _ in range(1, settings.AXES_FAILURE_LIMIT): response = self._login( @@ -303,9 +329,10 @@ class AccessAttemptTest(TestCase): @override_settings(AXES_ONLY_USER_FAILURES=True) def test_lockout_by_user_only(self): - """Tests the login lock with a valid username and invalid password - when AXES_ONLY_USER_FAILURES is True """ + Test login failure when AXES_ONLY_USER_FAILURES is True. + """ + # test until one try before the limit for _ in range(1, settings.AXES_FAILURE_LIMIT): response = self._login( @@ -342,9 +369,10 @@ class AccessAttemptTest(TestCase): self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True) def test_log_data_truncated(self): - """Tests that query2str properly truncates data to the - max_length (default 1024) """ + Test that query2str properly truncates data to the max_length (default 1024). + """ + # An impossibly large post dict extra_data = {string.ascii_letters * x: x for x in range(0, 1000)} self._login(**extra_data) @@ -352,14 +380,6 @@ class AccessAttemptTest(TestCase): len(AccessAttempt.objects.latest('id').post_data), 1024 ) - def test_json_response(self): - """Tests response content type and status code for the ajax request - """ - self.test_failure_limit_once() - response = self._login(is_json=True) - self.assertEqual(response.status_code, 403) - self.assertEqual(response.get('Content-Type'), 'application/json') - @override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True) def test_valid_logout_without_success_log(self): AccessLog.objects.all().delete() @@ -373,9 +393,9 @@ class AccessAttemptTest(TestCase): @override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True) def test_valid_login_without_success_log(self): """ - A valid login doesn't generate an AccessLog when - `DISABLE_SUCCESS_ACCESS_LOG=True`. + Test that a valid login does not generate an AccessLog when DISABLE_SUCCESS_ACCESS_LOG is True. """ + AccessLog.objects.all().delete() response = self._login(is_valid_username=True, is_valid_password=True) @@ -396,8 +416,7 @@ class AccessAttemptTest(TestCase): @override_settings(AXES_DISABLE_ACCESS_LOG=True) def test_non_valid_login_without_log(self): """ - A non-valid login does generate an AccessLog when - `DISABLE_ACCESS_LOG=True`. + Test that a non-valid login does generate an AccessLog when DISABLE_ACCESS_LOG is True. """ AccessLog.objects.all().delete() @@ -420,10 +439,9 @@ class AccessAttemptTest(TestCase): self.assertEqual(response.status_code, 200) def test_custom_authentication_backend(self): - ''' - ``log_user_login_failed`` should shortcircuit if an attempt to authenticate - with a custom authentication backend fails. - ''' + """ + Test that log_user_login_failed skips if an attempt to authenticate with a custom authentication backend fails. + """ request = HttpRequest() request.user = self.user @@ -431,8 +449,10 @@ class AccessAttemptTest(TestCase): self.assertEqual(AccessLog.objects.all().count(), 0) def _assert_resets_on_success(self): - """Sets up for testing the AXES_RESET_ON_SUCCESS setting. """ + Sets the AXES_RESET_ON_SUCCESS up for testing. + """ + # test until one try before the limit for _ in range(settings.AXES_FAILURE_LIMIT - 1): response = self._login() @@ -447,9 +467,10 @@ class AccessAttemptTest(TestCase): # by default, AXES_RESET_ON_SUCCESS = False def test_reset_on_success_default(self): - """Tests that the failure attempts does not reset after one successful - attempt by default. """ + Test that the failure attempts does not reset after one successful attempt by default. + """ + response = self._assert_resets_on_success() # So, we shouldn't have found a lock-out yet. @@ -458,9 +479,10 @@ class AccessAttemptTest(TestCase): @override_settings(AXES_RESET_ON_SUCCESS=True) def test_reset_on_success(self): - """Tests that the failure attempts resets after one successful - attempt when using the corresponding setting. """ + Test that the failure attempts resets after one successful attempt when using the corresponding setting. + """ + response = self._assert_resets_on_success() # So, we shouldn't have found a lock-out yet. diff --git a/axes/tests/test_access_attempt_config.py b/axes/tests/test_access_attempt_config.py index b3ac2c6..faf8d35 100644 --- a/axes/tests/test_access_attempt_config.py +++ b/axes/tests/test_access_attempt_config.py @@ -1,7 +1,3 @@ -from __future__ import unicode_literals - -import json - from django.test import TestCase, override_settings from django.urls import reverse from django.contrib.auth.models import User @@ -10,11 +6,13 @@ from axes.conf import settings class AccessAttemptConfigTest(TestCase): - """ This set of tests checks for lockouts under different configurations - and circumstances to prevent false positives and false negatives. + """ + Test for lockouts under different configurations and circumstances to prevent false positives and false negatives. + Always block attempted logins for the same user from the same IP. Always allow attempted logins for a different user from a different IP. """ + IP_1 = '10.1.1.1' IP_2 = '10.2.2.2' USER_1 = 'valid-user-1' @@ -26,11 +24,13 @@ class AccessAttemptConfigTest(TestCase): ALLOWED = 302 BLOCKED = 403 - def _login(self, username, password, ip_addr='127.0.0.1', - is_json=False, **kwargs): - """Login a user and get the response. + def _login(self, username, password, ip_addr='127.0.0.1', **kwargs): + """ + Login a user and get the response. + IP address can be configured to test IP blocking functionality. """ + headers = { 'user_agent': 'test-browser' } @@ -41,17 +41,12 @@ class AccessAttemptConfigTest(TestCase): } post_data.update(kwargs) - if is_json: - headers.update({ - 'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest', - 'content_type': 'application/json', - }) - post_data = json.dumps(post_data) - - response = self.client.post( - reverse('admin:login'), post_data, REMOTE_ADDR=ip_addr, **headers + return self.client.post( + reverse('admin:login'), + post_data, + REMOTE_ADDR=ip_addr, + **headers ) - return response def _lockout_user_from_ip(self, username, ip_addr): for _ in range(settings.AXES_FAILURE_LIMIT): @@ -69,8 +64,10 @@ class AccessAttemptConfigTest(TestCase): ) def setUp(self): - """Create two valid users for authentication. """ + Create two valid users for authentication. + """ + self.user = User.objects.create_superuser( username=self.USER_1, email='test_1@example.com', diff --git a/axes/tests/test_backends.py b/axes/tests/test_backends.py new file mode 100644 index 0000000..22eec0f --- /dev/null +++ b/axes/tests/test_backends.py @@ -0,0 +1,21 @@ +from unittest.mock import patch, MagicMock + +from django.test import TestCase + +from axes.backends import AxesBackend +from axes.exceptions import AxesBackendRequestParameterRequired, AxesBackendPermissionDenied + + +class BackendTestCase(TestCase): + def test_authenticate_raises_on_missing_request(self): + request = None + + with self.assertRaises(AxesBackendRequestParameterRequired): + AxesBackend().authenticate(request) + + @patch('axes.backends.is_already_locked', return_value=True) + def test_authenticate_raises_on_locked_request(self, _): + request = MagicMock() + + with self.assertRaises(AxesBackendPermissionDenied): + AxesBackend().authenticate(request) diff --git a/axes/tests/test_checks.py b/axes/tests/test_checks.py new file mode 100644 index 0000000..9a954d5 --- /dev/null +++ b/axes/tests/test_checks.py @@ -0,0 +1,40 @@ +from django.core.checks import run_checks, Error +from django.conf import settings +from django.test import TestCase, override_settings + +from axes.checks import Messages, Hints, Codes + + +class CacheCheckTestCase(TestCase): + @override_settings( + AXES_CACHE='nonexistent', + ) + def test_cache_missing_produces_check_error(self): + errors = run_checks() + error = Error( + msg=Messages.CACHE_MISSING, + hint=Hints.CACHE_MISSING, + obj=settings.CACHES, + id=Codes.CACHE_MISSING, + ) + + self.assertIn(error, errors) + + @override_settings( + AXES_CACHE='axes', + CACHES={ + 'axes': { + 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', + }, + }, + ) + def test_cache_misconfiguration_produces_check_error(self): + errors = run_checks() + error = Error( + msg=Messages.CACHE_INVALID, + hint=Hints.CACHE_INVALID, + obj=settings.CACHES, + id=Codes.CACHE_INVALID, + ) + + self.assertIn(error, errors) diff --git a/axes/tests/test_decorators.py b/axes/tests/test_decorators.py new file mode 100644 index 0000000..82c5f29 --- /dev/null +++ b/axes/tests/test_decorators.py @@ -0,0 +1,40 @@ +from unittest.mock import MagicMock, patch + +from django.http import HttpResponse +from django.test import TestCase + +from axes.decorators import axes_dispatch, axes_form_invalid + + +class DecoratorTestCase(TestCase): + SUCCESS_RESPONSE = HttpResponse(status=200, content='Dispatched') + LOCKOUT_RESPONSE = HttpResponse(status=403, content='Locked out') + + def setUp(self): + self.request = MagicMock() + self.cls = MagicMock(return_value=self.request) + self.func = MagicMock(return_value=self.SUCCESS_RESPONSE) + + @patch('axes.decorators.is_already_locked', return_value=True) + @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) + def test_axes_dispatch_locks_out(self, _, __): + response = axes_dispatch(self.func)(self.request) + self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content) + + @patch('axes.decorators.is_already_locked', return_value=False) + @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) + def test_axes_dispatch_dispatches(self, _, __): + response = axes_dispatch(self.func)(self.request) + self.assertEqual(response.content, self.SUCCESS_RESPONSE.content) + + @patch('axes.decorators.is_already_locked', return_value=True) + @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) + def test_axes_form_invalid_locks_out(self, _, __): + response = axes_form_invalid(self.func)(self.cls) + self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content) + + @patch('axes.decorators.is_already_locked', return_value=False) + @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) + def test_axes_form_invalid_dispatches(self, _, __): + response = axes_form_invalid(self.func)(self.cls) + self.assertEqual(response.content, self.SUCCESS_RESPONSE.content) diff --git a/axes/tests/test_handlers.py b/axes/tests/test_handlers.py new file mode 100644 index 0000000..838c78a --- /dev/null +++ b/axes/tests/test_handlers.py @@ -0,0 +1,51 @@ +from unittest.mock import MagicMock, patch + +from django.test import TestCase + +from axes.handlers import AxesHandler +from axes.signals import ProxyHandler + + +class ProxyHandlerTestCase(TestCase): + def setUp(self): + self.sender = MagicMock() + self.credentials = MagicMock() + self.request = MagicMock() + self.user = MagicMock() + self.instance = MagicMock() + + @patch('axes.signals.ProxyHandler.implementation', None) + def test_initialize(self): + self.assertIsNone(ProxyHandler.implementation) + ProxyHandler.initialize() + self.assertIsInstance(ProxyHandler.implementation, AxesHandler) + + @patch('axes.signals.ProxyHandler.implementation') + def test_user_login_failed(self, handler): + self.assertFalse(handler.user_login_failed.called) + ProxyHandler().user_login_failed(self.sender, self.credentials, self.request) + self.assertTrue(handler.user_login_failed.called) + + @patch('axes.signals.ProxyHandler.implementation') + def test_user_logged_in(self, handler): + self.assertFalse(handler.user_logged_in.called) + ProxyHandler().user_logged_in(self.sender, self.request, self.user) + self.assertTrue(handler.user_logged_in.called) + + @patch('axes.signals.ProxyHandler.implementation') + def test_user_logged_out(self, handler): + self.assertFalse(handler.user_logged_out.called) + ProxyHandler().user_logged_out(self.sender, self.request, self.user) + self.assertTrue(handler.user_logged_out.called) + + @patch('axes.signals.ProxyHandler.implementation') + def test_post_save_access_attempt(self, handler): + self.assertFalse(handler.post_save_access_attempt.called) + ProxyHandler().post_save_access_attempt(self.instance) + self.assertTrue(handler.post_save_access_attempt.called) + + @patch('axes.signals.ProxyHandler.implementation') + def test_post_delete_access_attempt(self, handler): + self.assertFalse(handler.post_delete_access_attempt.called) + ProxyHandler().post_delete_access_attempt(self.instance) + self.assertTrue(handler.post_delete_access_attempt.called) diff --git a/axes/tests/test_management.py b/axes/tests/test_management.py new file mode 100644 index 0000000..f7f6aa5 --- /dev/null +++ b/axes/tests/test_management.py @@ -0,0 +1,63 @@ +from io import StringIO + +from django.core.management import call_command +from django.test import TestCase + +from axes.models import AccessAttempt + + +class ManagementCommandTestCase(TestCase): + def setUp(self): + AccessAttempt.objects.create( + username='jane.doe', + ip_address='10.0.0.1', + failures_since_start='4', + ) + + AccessAttempt.objects.create( + username='john.doe', + ip_address='10.0.0.2', + failures_since_start='15', + ) + + def test_axes_list_attempts(self): + out = StringIO() + call_command('axes_list_attempts', stdout=out) + + expected = '10.0.0.1\tjane.doe\t4\n10.0.0.2\tjohn.doe\t15\n' + self.assertEqual(expected, out.getvalue()) + + def test_axes_reset(self): + out = StringIO() + call_command('axes_reset', stdout=out) + + expected = '2 attempts removed.\n' + self.assertEqual(expected, out.getvalue()) + + def test_axes_reset_ip(self): + out = StringIO() + call_command('axes_reset_ip', '10.0.0.1', stdout=out) + + expected = '1 attempts removed.\n' + self.assertEqual(expected, out.getvalue()) + + def test_axes_reset_ip_not_found(self): + out = StringIO() + call_command('axes_reset_ip', '10.0.0.3', stdout=out) + + expected = 'No attempts found.\n' + self.assertEqual(expected, out.getvalue()) + + def test_axes_reset_username(self): + out = StringIO() + call_command('axes_reset_username', 'john.doe', stdout=out) + + expected = '1 attempts removed.\n' + self.assertEqual(expected, out.getvalue()) + + def test_axes_reset_username_not_found(self): + out = StringIO() + call_command('axes_reset_username', 'ivan.renko', stdout=out) + + expected = 'No attempts found.\n' + self.assertEqual(expected, out.getvalue()) diff --git a/axes/tests/test_middleware.py b/axes/tests/test_middleware.py new file mode 100644 index 0000000..cf47d3b --- /dev/null +++ b/axes/tests/test_middleware.py @@ -0,0 +1,28 @@ +from unittest.mock import patch, MagicMock + +from django.http import HttpResponse +from django.test import TestCase + +from axes.exceptions import AxesSignalPermissionDenied +from axes.middleware import AxesMiddleware + + +class MiddlewareTestCase(TestCase): + SUCCESS_RESPONSE = HttpResponse(status=200, content='Dispatched') + LOCKOUT_RESPONSE = HttpResponse(status=403, content='Locked out') + + def setUp(self): + self.request = MagicMock() + self.get_response = MagicMock() + + @patch('axes.middleware.get_lockout_response', return_value=LOCKOUT_RESPONSE) + def test_process_exception_axes(self, _): + exception = AxesSignalPermissionDenied() + response = AxesMiddleware(self.get_response).process_exception(self.request, exception) + self.assertEqual(response, self.LOCKOUT_RESPONSE) + + @patch('axes.middleware.get_lockout_response', return_value=LOCKOUT_RESPONSE) + def test_process_exception_other(self, _): + exception = Exception() + response = AxesMiddleware(self.get_response).process_exception(self.request, exception) + self.assertEqual(response, None) diff --git a/axes/tests/test_models.py b/axes/tests/test_models.py index 7d6df4f..4c311b1 100644 --- a/axes/tests/test_models.py +++ b/axes/tests/test_models.py @@ -1,27 +1,28 @@ +from django.apps.registry import apps +from django.db import connection +from django.db.migrations.autodetector import MigrationAutodetector +from django.db.migrations.executor import MigrationExecutor +from django.db.migrations.state import ProjectState from django.test import TestCase +from django.utils import translation class MigrationsCheck(TestCase): def setUp(self): - from django.utils import translation self.saved_locale = translation.get_language() translation.deactivate_all() def tearDown(self): if self.saved_locale is not None: - from django.utils import translation translation.activate(self.saved_locale) def test_missing_migrations(self): - from django.db import connection - from django.apps.registry import apps - from django.db.migrations.executor import MigrationExecutor executor = MigrationExecutor(connection) - from django.db.migrations.autodetector import MigrationAutodetector - from django.db.migrations.state import ProjectState autodetector = MigrationAutodetector( executor.loader.project_state(), ProjectState.from_apps(apps), ) + changes = autodetector.changes(graph=executor.loader.graph) + self.assertEqual({}, changes) diff --git a/axes/tests/test_utils.py b/axes/tests/test_utils.py index b880cbe..03732c7 100644 --- a/axes/tests/test_utils.py +++ b/axes/tests/test_utils.py @@ -1,5 +1,3 @@ -from __future__ import unicode_literals - import datetime from django.http import HttpRequest @@ -9,10 +7,17 @@ from django.utils import six from axes.utils import iso8601, is_ipv6, get_client_str, get_client_username +def get_expected_client_str(*args, **kwargs): + client_str_template = '{{user: "{0}", ip: "{1}", user-agent: "{2}", path: "{3}"}}' + return client_str_template.format(*args, **kwargs) + + class UtilsTest(TestCase): def test_iso8601(self): - """Tests iso8601 correctly translates datetime.timdelta to ISO 8601 - formatted duration.""" + """ + Test iso8601 correctly translates datetime.timdelta to ISO 8601 formatted duration. + """ + EXPECTED = { datetime.timedelta(days=1, hours=25, minutes=42, seconds=8): 'P2DT1H42M8S', @@ -46,8 +51,7 @@ class UtilsTest(TestCase): user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}" - expected = details.format(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip, user_agent, path_info) actual = get_client_str(username, ip, user_agent, path_info) self.assertEqual(expected, actual) @@ -72,8 +76,7 @@ class UtilsTest(TestCase): user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}" - expected = details.format(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip, user_agent, path_info) actual = get_client_str(username, ip, user_agent, path_info) self.assertEqual(expected, actual) @@ -99,8 +102,7 @@ class UtilsTest(TestCase): user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}" - expected = details.format(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip, user_agent, path_info) actual = get_client_str(username, ip, user_agent, path_info) self.assertEqual(expected, actual) @@ -126,8 +128,7 @@ class UtilsTest(TestCase): user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}" - expected = details.format(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip, user_agent, path_info) actual = get_client_str(username, ip, user_agent, path_info) self.assertEqual(expected, actual) @@ -171,22 +172,9 @@ class UtilsTest(TestCase): self.assertEqual(expected_in_credentials, actual) - def sample_customize_username(request): + def sample_customize_username(request, credentials): return 'prefixed-' + request.POST.get('username') - @override_settings(AXES_USERNAME_FORM_FIELD='username') - @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username) - def test_custom_get_client_username(self): - provided = 'test-username' - expected = 'prefixed-' + provided - - request = HttpRequest() - request.POST['username'] = provided - - actual = get_client_username(request) - - self.assertEqual(expected, actual) - @override_settings(AXES_USERNAME_FORM_FIELD='username') @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username) def test_custom_get_client_username_from_request(self): @@ -222,18 +210,30 @@ class UtilsTest(TestCase): self.assertEqual(expected_in_credentials, actual) - def sample_get_client_username_too_few_arguments(): + def sample_get_client_username(request, credentials): + return 'example' + + @override_settings(AXES_USERNAME_CALLABLE=sample_get_client_username) + def test_get_client_username(self): + self.assertEqual('example', get_client_username(HttpRequest(), {})) + + @override_settings(AXES_USERNAME_CALLABLE=sample_get_client_username) + def test_get_client_username_too_many_arguments(self): + with self.assertRaises(TypeError): + actual = get_client_username(HttpRequest(), {}, None) + + def sample_get_client_username_too_few_arguments(request): pass @override_settings(AXES_USERNAME_CALLABLE=sample_get_client_username_too_few_arguments) - def test_get_client_username_too_few_arguments_invalid_callable(self): + def test_get_client_username_invalid_callable_too_few_arguments(self): with self.assertRaises(TypeError): actual = get_client_username(HttpRequest(), {}) - def sample_get_client_username_too_many_arguments(one, two, three): + def sample_get_client_username_too_many_arguments(request, credentials, extra_argument): pass @override_settings(AXES_USERNAME_CALLABLE=sample_get_client_username_too_many_arguments) - def test_get_client_username_too_many_arguments_invalid_callable(self): + def test_get_client_username_invalid_callable_too_many_arguments(self): with self.assertRaises(TypeError): actual = get_client_username(HttpRequest(), {}) diff --git a/axes/utils.py b/axes/utils.py index f98c024..b4df189 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -1,15 +1,10 @@ -from __future__ import unicode_literals - -try: - import win_inet_pton # pylint: disable=unused-import -except ImportError: - pass - -from inspect import getargspec +from datetime import timedelta from logging import getLogger from socket import error, inet_pton, AF_INET6 from django.core.cache import caches +from django.http import HttpResponse, HttpResponseRedirect, JsonResponse +from django.shortcuts import render from django.utils import six import ipware.ip2 @@ -19,18 +14,20 @@ from axes.models import AccessAttempt logger = getLogger(__name__) + def get_axes_cache(): return caches[getattr(settings, 'AXES_CACHE', 'default')] def query2str(items, max_length=1024): - """Turns a dictionary into an easy-to-read list of key-value pairs. - - If there's a field called "password" it will be excluded from the output. - - The length of the output is limited to max_length to avoid a DoS attack - via excessively large payloads. """ + Turns a dictionary into an easy-to-read list of key-value pairs. + + If there is a field called password it will be excluded from the output. + + The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads. + """ + return '\n'.join([ '%s=%s' % (k, v) for k, v in six.iteritems(items) if k != settings.AXES_PASSWORD_FORM_FIELD @@ -41,7 +38,7 @@ def get_client_str(username, ip_address, user_agent=None, path_info=None): if settings.AXES_VERBOSE: if isinstance(path_info, tuple): path_info = path_info[0] - details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}" + details = '{{user: "{0}", ip: "{1}", user-agent: "{2}", path: "{3}"}}' return details.format(username, ip_address, user_agent, path_info) if settings.AXES_ONLY_USER_FAILURES: @@ -73,34 +70,22 @@ def get_client_ip(request): def get_client_username(request, credentials=None): - """Resolve client username from the given request or credentials if supplied + """ + Resolve client username from the given request or credentials if supplied. The order of preference for fetching the username is as follows: - 1. If configured, use `AXES_USERNAME_CALLABLE`, and supply either `request` or `request, credentials` as arguments - depending on the function argument count (multiple signatures are supported for backwards compatibility) - 2. If given, use `credentials` and fetch username from `AXES_USERNAME_FORM_FIELD` (defaults to `username`) - 3. Use request.POST and fetch username from `AXES_USERNAME_FORM_FIELD` (defaults to `username`) + 1. If configured, use ``AXES_USERNAME_CALLABLE``, and supply ``request, credentials`` as arguments + 2. If given, use ``credentials`` and fetch username from ``AXES_USERNAME_FORM_FIELD`` (defaults to ``username``) + 3. Use request.POST and fetch username from ``AXES_USERNAME_FORM_FIELD`` (defaults to ``username``) - :param request: incoming Django `HttpRequest` or similar object from authentication backend or other source - :param credentials: incoming credentials `dict` or similar object from authentication backend or other source + :param request: incoming Django ``HttpRequest`` or similar object from authentication backend or other source + :param credentials: incoming credentials ``dict`` or similar object from authentication backend or other source """ if settings.AXES_USERNAME_CALLABLE: - num_args = len( - getargspec(settings.AXES_USERNAME_CALLABLE).args # pylint: disable=deprecated-method - ) - - if num_args == 2: - logger.debug('Using AXES_USERNAME_CALLABLE for username with two arguments: request, credentials') - return settings.AXES_USERNAME_CALLABLE(request, credentials) - - if num_args == 1: - logger.debug('Using AXES_USERNAME_CALLABLE for username with one argument: request') - return settings.AXES_USERNAME_CALLABLE(request) - - logger.error('Using AXES_USERNAME_CALLABLE for username failed: wrong number of arguments %s', num_args) - raise TypeError('Wrong number of arguments in function call to AXES_USERNAME_CALLABLE', num_args) + logger.debug('Using AXES_USERNAME_CALLABLE to get username') + return settings.AXES_USERNAME_CALLABLE(request, credentials) if credentials: logger.debug('Using `credentials` to get username with key AXES_USERNAME_FORM_FIELD') @@ -125,8 +110,8 @@ def is_ipv6(ip): def reset(ip=None, username=None): - """Reset records that match ip or username, and - return the count of removed attempts. + """ + Reset records that match IP or username, and return the count of removed attempts. """ attempts = AccessAttempt.objects.all() @@ -141,8 +126,10 @@ def reset(ip=None, username=None): def iso8601(timestamp): - """Returns datetime.timedelta translated to ISO 8601 formatted duration. """ + Return datetime.timedelta translated to ISO 8601 formatted duration. + """ + seconds = timestamp.total_seconds() minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) @@ -165,3 +152,40 @@ def get_lockout_message(): if settings.AXES_COOLOFF_TIME: return settings.AXES_COOLOFF_MESSAGE return settings.AXES_PERMALOCK_MESSAGE + + +def get_lockout_response(request): + context = { + 'failure_limit': settings.AXES_FAILURE_LIMIT, + 'username': get_client_username(request) or '' + } + + cool_off = settings.AXES_COOLOFF_TIME + if cool_off: + if isinstance(cool_off, (int, float)): + cool_off = timedelta(hours=cool_off) + + context.update({ + 'cooloff_time': iso8601(cool_off) + }) + + status = 403 + + if request.is_ajax(): + return JsonResponse( + context, + status=status, + ) + + if settings.AXES_LOCKOUT_TEMPLATE: + return render( + request, + settings.AXES_LOCKOUT_TEMPLATE, + context, + status=status, + ) + + if settings.AXES_LOCKOUT_URL: + return HttpResponseRedirect(settings.AXES_LOCKOUT_URL) + + return HttpResponse(get_lockout_message(), status=status) diff --git a/codecov.yml b/codecov.yml new file mode 100644 index 0000000..2d3df9b --- /dev/null +++ b/codecov.yml @@ -0,0 +1,9 @@ +coverage: + status: + patch: off + project: + default: + # Minimum test coverage required for pass + target: 90% + # Maximum test coverage change allowed for pass + threshold: 20% diff --git a/docs/conf.py b/docs/conf.py index 6deb8e7..961c344 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -1,5 +1,3 @@ -# -*- coding: utf-8 -*- -# # Django Axes documentation build configuration file, created by # sphinx-quickstart on Sat Jul 30 16:37:41 2016. # diff --git a/docs/configuration.rst b/docs/configuration.rst index 8404142..d7207f7 100644 --- a/docs/configuration.rst +++ b/docs/configuration.rst @@ -5,28 +5,151 @@ Configuration Add ``axes`` to your ``INSTALLED_APPS``:: - INSTALLED_APPS = ( + INSTALLED_APPS = [ 'django.contrib.admin', 'django.contrib.auth', 'django.contrib.contenttypes', 'django.contrib.sessions', - 'django.contrib.sites', - # ... - 'axes', - # ... - ) + 'django.contrib.messages', + 'django.contrib.staticfiles', -Add ``axes.backends.AxesModelBackend`` to the top of ``AUTHENTICATION_BACKENDS``:: + # ... other applications per your preference. + + 'axes', + ] + +Add ``axes.backends.AxesBackend`` to the top of ``AUTHENTICATION_BACKENDS``:: AUTHENTICATION_BACKENDS = [ - 'axes.backends.AxesModelBackend', - # ... + # AxesBackend should be the first backend in the list. + # It stops the authentication flow when a user is locked out. + 'axes.backends.AxesBackend', + + # ... other authentication backends per your preference. + + # Django ModelBackend is the default authentication backend. 'django.contrib.auth.backends.ModelBackend', - # ... + ] + +Add ``axes.middleware.AxesMiddleware`` to your list of ``MIDDLEWARE``:: + + MIDDLEWARE = [ + # The following is the list of default middleware in new Django projects. + 'django.middleware.security.SecurityMiddleware', + 'django.contrib.sessions.middleware.SessionMiddleware', + 'django.middleware.common.CommonMiddleware', + 'django.middleware.csrf.CsrfViewMiddleware', + 'django.contrib.auth.middleware.AuthenticationMiddleware', + 'django.contrib.messages.middleware.MessageMiddleware', + 'django.middleware.clickjacking.XFrameOptionsMiddleware', + + # ... other middleware per your preference. + + # AxesMiddleware should be the last middleware in the list. + # It pretty formats authentication errors into readable responses. + 'axes.middleware.AxesMiddleware', ] Run ``python manage.py migrate`` to sync the database. +How does Axes function? +----------------------- + +When a user tries to log in in Django, the login is usually performed +by running a number of authentication backends that check user login +information by calling the ``django.contrib.auth.authenticate`` function. + +If an authentication backend does not approve of a user login, +it can raise a ``django.core.exceptions.PermissionDenied`` exception. + +If a login fails, Django then fires a +``from django.contrib.auth.signals.user_login_failed`` signal. + +If this signal raises an exception, it is propagated through the +Django middleware stack where it can be caught, or alternatively +where it can bubble up to the default Django exception handlers. + +A normal login flow for Django runs as follows:: + + 1. Django or plugin login view is called by + for example user sending form data with browser. + + 2. django.contrib.auth.authenticate is called by + the view code to check the authentication request + for user and return a user object matching it. + + 3. AUTHENTICATION_BACKENDS are iterated over + and their authenticate methods called one-by-one. + + 4. An authentication backend either returns + a user object which results in that user + being logged in or returns None. + If a PermissionDenied error is raised + by any of the authentication backends + the whole request authentication flow + is aborted and signal handlers triggered. + +Axes monitors logins with the ``user_login_failed`` signal handler +and after login attempts exceed the given maximum, starts blocking them. + +The blocking is done by ``AxesBackend`` which checks every request +coming through the Django authentication flow and verifies they +are not blocked, and allows the requests to go through if the check passes. + +If any of the checks fails, an exception is raised which interrupts +the login process and triggers the Django login failed signal handlers. + +Another exception is raised by a Axes signal handler, which is +then caught by ``AxesMiddleware`` and converted into a readable +error because the user is currently locked out of the system. + +Axes implements the lockout flow as follows:: + + 1. Django or plugin login view is called. + + 2. django.contrib.auth.authenticate is called. + + 3. AUTHENTICATION_BACKENDS are iterated over + where axes.backends.AxesBackend is the first. + + 4. AxesBackend checks authentication request + for lockouts rules and either aborts the + authentication flow or lets the authentication + process proceed to the next + configured authentication backend. + + [The lockout happens at this stage if appropriate] + + 5. User is locked out and signal handlers + are notified of the failed login attempt. + + 6. axes.signals.log_user_login_failed runs + and raises a AxesSignalPermissionDenied + exception that bubbles up the middleware stack. + + 7. AxesMiddleware processes the exception + and returns a readable error to the user. + +This plugin assumes that the login views either call +the django.contrib.auth.authenticate method to log in users +or otherwise take care of notifying Axes of authentication +attempts or login failures the same way Django does. + +The login flows can be customized and the Axes +authentication backend or middleware can be easily swapped. + +Running checks +-------------- + +Use the ``python manage.py check`` command to verify the correct configuration in both +development and production environments. It is probably best to use this step as part +of your regular CI workflows to verify that your project is not misconfigured. + +Axes uses the checks to verify your cache configuration to see that your caches +should be functional with the configuration of Axes. Many people have different configurations +for their development and production environments. + + Known configuration problems ---------------------------- @@ -62,7 +185,7 @@ add an extra cache to ``CACHES`` with a name of your choice:: } } -The next step is to tell axes to use this cache through adding ``AXES_CACHE`` +The next step is to tell Axes to use this cache through adding ``AXES_CACHE`` to your ``settings.py`` file:: AXES_CACHE = 'axes_cache' @@ -73,7 +196,7 @@ There are no known problems in other cache backends such as Authentication backend problems ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ -If you get ``AxesModelBackend.RequestParameterRequired`` exceptions, +If you get ``AxesBackendRequestParameterRequired`` exceptions, make sure any auth libraries and middleware you use pass the request object to authenticate. Notably in older versions of Django Rest Framework (DRF) (before 3.7.0), ``BasicAuthentication`` does not pass request. `Here is an example workaround for DRF `_. @@ -98,7 +221,7 @@ Customizing Axes You have a couple options available to you to customize ``django-axes`` a bit. These should be defined in your ``settings.py`` file. -* ``AXES_CACHE``: The name of the cache for axes to use. +* ``AXES_CACHE``: The name of the cache for Axes to use. Default: ``'default'`` * ``AXES_FAILURE_LIMIT``: The number of login attempts allowed before a record is created for the failed logins. Default: ``3`` @@ -112,7 +235,9 @@ These should be defined in your ``settings.py`` file. old failed login attempts will be forgotten. Can be set to a python timedelta object or an integer. If an integer, will be interpreted as a number of hours. Default: ``None`` -* ``AXES_LOGGER``: If set, specifies a logging mechanism for axes to use. +* ``AXES_HANDLER``: If set, overrides the default signal handler backend. + Default: ``'axes.handlers.AxesHandler'`` +* ``AXES_LOGGER``: If set, specifies a logging mechanism for Axes to use. Default: ``'axes.watch_login'`` * ``AXES_LOCKOUT_TEMPLATE``: If set, specifies a template to render when a user is locked out. Template receives cooloff_time and failure_limit as @@ -124,11 +249,11 @@ These should be defined in your ``settings.py`` file. Default: ``True`` * ``AXES_USERNAME_FORM_FIELD``: the name of the form field that contains your users usernames. Default: ``username`` -* ``AXES_USERNAME_CALLABLE``: A callable function that takes either one or two arguments: - ``AXES_USERNAME_CALLABLE(request)`` or ``AXES_USERNAME_CALLABLE(request, credentials)``. +* ``AXES_USERNAME_CALLABLE``: A callable function that takes two arguments: + ``AXES_USERNAME_CALLABLE(request, credentials)``. The ``request`` is a HttpRequest like object and the ``credentials`` is a dictionary like object. ``credentials`` are the ones that were passed to Django ``authenticate()`` in the login flow. - If no function is supplied, axes fetches the username from the ``credentials`` or ``request.POST`` + If no function is supplied, Axes fetches the username from the ``credentials`` or ``request.POST`` dictionaries based on ``AXES_USERNAME_FORM_FIELD``. Default: ``None`` * ``AXES_PASSWORD_FORM_FIELD``: the name of the form or credentials field that contains your users password. Default: ``password`` diff --git a/docs/index.rst b/docs/index.rst index af423c0..4682cb1 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -16,6 +16,7 @@ Contents installation configuration + migration usage requirements development diff --git a/docs/migration.rst b/docs/migration.rst new file mode 100644 index 0000000..1f292f0 --- /dev/null +++ b/docs/migration.rst @@ -0,0 +1,37 @@ +.. _migration: + +Migration +========= + +This page contains migration instructions between different django-axes +versions so that users might more confidently upgrade their installations. + +From django-axes version 4 to version 5 +--------------------------------------- + +Application version 5 has a few differences compared to django-axes 4. + +You might need to search your own codebase and check if you need to change +API endpoints or names for compatibility reasons. + +- Login and logout view monkey-patching was removed. + Login monitoring is now implemented with signals + and locking users out is implemented with a combination + of a custom authentication backend, middlware, and signals. +- ``AxesModelBackend`` was renamed to ``AxesBackend`` + for better naming and preventing the risk of users accidentally + upgrading without noticing that the APIs have changed. + Documentation was improved. Exceptions were renamed. +- ``axes.backends.AxesModelBackend.RequestParameterRequired`` + exception was renamed, retyped to ``ValueError`` from ``Exception``, and + moved to ``axes.exception.AxesBackendRequestParameterRequired``. +- ``AxesBackend`` now raises a + ``axes.exceptions.AxesBackendPermissionDenied`` + exception when user is locked out which triggers signal handler + to run on failed logins, checking user lockout statuses. +- Axes lockout signal handler now raises exception + ``axes.exceptions.AxesSignalPermissionDenied`` on lockouts. +- ``AxesMiddleware`` was added to return lockout responses. + The middleware handles ``axes.exception.AxesSignalPermissionDenied``. +- ``AXES_USERNAME_CALLABLE`` is now always called with two arguments, + ``request`` and ``credentials`` instead of ``request``. diff --git a/docs/usage.rst b/docs/usage.rst index 1e1187b..11b24ac 100644 --- a/docs/usage.rst +++ b/docs/usage.rst @@ -2,6 +2,7 @@ Usage ===== + ``django-axes`` listens to signals from ``django.contrib.auth.signals`` to log access attempts: @@ -16,27 +17,29 @@ log the access attempts. Quickstart ---------- -Once ``axes`` is in your ``INSTALLED_APPS`` in your project settings file, -you can login and logout of your application via the ``django.contrib.auth`` -views. The access attempts will be logged and visible in the "Access Attempts" -secion of the admin app. +Once ``axes`` is in your ``INSTALLED_APPS`` in your project settings file, you can +login and logout of your application via the ``django.contrib.auth`` views. +The attempts will be logged and visible in the "Access Attempts" section in admin. -By default, django-axes will lock out repeated attempts from the same IP -address. You can allow this IP to attempt again by deleting the relevant -``AccessAttempt`` records in the admin. +By default, Axes will lock out repeated access attempts from the same IP address. +You can allow this IP to attempt again by deleting relevant ``AccessAttempt`` records. -You can also use the ``axes_reset`` and ``axes_reset_user`` management commands -using Django's ``manage.py``. +Records can be deleted, for example, by using the Django admin application. + +You can also use the ``axes_reset``, ``axes_reset_ip``, and ``axes_reset_user`` +management commands with the Django ``manage.py`` command helpers: * ``manage.py axes_reset`` will reset all lockouts and access records. -* ``manage.py axes_reset ip`` will clear lockout/records for ip -* ``manage.py axes_reset_user username`` will clear lockout/records for an username +* ``manage.py axes_reset_ip ip [ip ...]`` + will clear lockouts and records for the given IP addresses. +* ``manage.py axes_reset_user username [username ...]`` + will clear lockouts and records for the given usernames. -In your code, you can use ``from axes.utils import reset``. +In your code, you can use the ``axes.utils.reset`` function. * ``reset()`` will reset all lockouts and access records. -* ``reset(ip=ip)`` will clear lockout/records for ip -* ``reset(username=username)`` will clear lockout/records for a username +* ``reset(ip=ip)`` will clear lockouts and records for the given IP address. +* ``reset(username=username)`` will clear lockouts and records for the given username. Example usage ------------- @@ -58,63 +61,70 @@ them as per the example. *views.py:* :: - from django.views.decorators.csrf import csrf_exempt - from django.utils.decorators import method_decorator from django.http import JsonResponse, HttpResponse - from django.contrib.auth.signals import user_logged_in,\ - user_logged_out,\ - user_login_failed - import json + from django.utils.decorators import method_decorator + from django.contrib.auth import signals + from django.views import View + from django.views.decorators.csrf import csrf_exempt + + from axes.decorators import axes_dispatch + from myapp.forms import LoginForm from myapp.auth import custom_authenticate, custom_login - from axes.decorators import axes_dispatch @method_decorator(axes_dispatch, name='dispatch') @method_decorator(csrf_exempt, name='dispatch') class Login(View): - ''' Custom login view that takes JSON credentials ''' + """ + Custom login view that takes JSON credentials + """ - http_method_names = ['post',] + http_method_names = ['post'] def post(self, request): - # decode post json to dict & validate - post_data = json.loads(request.body.decode('utf-8')) - form = LoginForm(post_data) + form = LoginForm(request.POST) if not form.is_valid(): - # inform axes of failed login - user_login_failed.send( - sender = User, - request = request, - credentials = { - 'username': form.cleaned_data.get('username') - } + # inform django-axes of failed login + signals.user_login_failed.send( + sender=User, + request=request, + credentials={ + 'username': form.cleaned_data.get('username'), + }, ) return HttpResponse(status=400) + user = custom_authenticate( - request = request, - username = form.cleaned_data.get('username'), - password = form.cleaned_data.get('password'), + request=request, + username=form.cleaned_data.get('username'), + password=form.cleaned_data.get('password'), ) if user is not None: custom_login(request, user) - user_logged_in.send( - sender = User, - request = request, - user = user, + + signals.user_logged_in.send( + sender=User, + request=request, + user=user, ) - return JsonResponse({'message':'success!'}, status=200) - else: - user_login_failed.send( - sender = User, - request = request, - credentials = { - 'username':form.cleaned_data.get('username') - }, - ) - return HttpResponse(status=403) + + return JsonResponse({ + 'message':'success' + }, status=200) + + # inform django-axes of failed login + signals.user_login_failed.send( + sender=User, + request=request, + credentials={ + 'username': form.cleaned_data.get('username'), + }, + ) + + return HttpResponse(status=403) *urls.py:* :: diff --git a/manage.py b/manage.py index 154db24..9ce901e 100644 --- a/manage.py +++ b/manage.py @@ -1,9 +1,10 @@ #!/usr/bin/env python + import os import sys -if __name__ == "__main__": - os.environ.setdefault("DJANGO_SETTINGS_MODULE", "axes.test_settings") +if __name__ == '__main__': + os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'axes.test_settings') from django.core.management import execute_from_command_line diff --git a/requirements.txt b/requirements.txt index 677d3a1..e2c7ce0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ -e . coverage==4.5.2 -mock==2.0.0 ; python_version <= "2.7" prospector==1.1.6.2 sphinx_rtd_theme==0.4.2 tox==3.7.0 diff --git a/runtests.py b/runtests.py deleted file mode 100755 index 89479c8..0000000 --- a/runtests.py +++ /dev/null @@ -1,43 +0,0 @@ -#!/usr/bin/env python - -from __future__ import unicode_literals - -import os -import sys - -import django -from django.conf import settings -from django.core.exceptions import ImproperlyConfigured -from django.test.utils import get_runner - - -def run_tests(): - os.environ['DJANGO_SETTINGS_MODULE'] = 'axes.test_settings' - django.setup() - TestRunner = get_runner(settings) - test_runner = TestRunner() - failures = test_runner.run_tests(['axes.tests']) - sys.exit(bool(failures)) - - -def run_tests_cache(): - """Check that using a wrong cache backend (LocMemCache) throws correctly - - This is due to LocMemCache not working with AccessAttempt caching, - please see issue https://github.com/jazzband/django-axes/issues/288 - """ - - try: - os.environ['DJANGO_SETTINGS_MODULE'] = 'axes.test_settings_cache' - django.setup() - print('Using LocMemCache as a cache backend does not throw') - sys.exit(1) - except ImproperlyConfigured: - print('Using LocMemCache as a cache backend throws correctly') - sys.exit(0) - - -if __name__ == '__main__': - if 'cache' in sys.argv: - run_tests_cache() - run_tests() diff --git a/setup.py b/setup.py index 9afda4f..a693b96 100644 --- a/setup.py +++ b/setup.py @@ -1,7 +1,4 @@ #!/usr/bin/env python -# -*- coding: utf-8 -*- - -from __future__ import unicode_literals import codecs from setuptools import setup, find_packages @@ -28,7 +25,6 @@ setup( 'django', 'django-appconf', 'django-ipware>=2.0.2', - 'win_inet_pton ; python_version < "3.4" and sys_platform == "win32"', ], include_package_data=True, packages=find_packages(), @@ -44,10 +40,7 @@ setup( 'License :: OSI Approved :: MIT License', 'Operating System :: OS Independent', 'Programming Language :: Python', - 'Programming Language :: Python :: 2', - 'Programming Language :: Python :: 2.7', 'Programming Language :: Python :: 3', - 'Programming Language :: Python :: 3.4', 'Programming Language :: Python :: 3.5', 'Programming Language :: Python :: 3.6', 'Programming Language :: Python :: 3.7', diff --git a/tox.ini b/tox.ini index f489c99..9e35f82 100644 --- a/tox.ini +++ b/tox.ini @@ -1,14 +1,10 @@ [tox] envlist = - py{27,34,35,36,37}-django111 - py{34,35,36,37}-django20 - py{35,36,37}-django21 - py{35,36,37}-djangomaster + py{35,36,37}-django{111,20,21} + py{36,37}-djangomaster [travis] python = - 2.7: py27 - 3.4: py34 3.5: py35 3.6: py36 3.7: py37 @@ -31,8 +27,7 @@ usedevelop = True ignore_outcome = djangomaster: True commands = - coverage run -a runtests.py -v2 - coverage run -a runtests.py -v2 cache + coverage run -a manage.py test -v2 coverage report prospector setenv =