Merge pull request #397 from jazzband/development

Version 5.0.0 draft
This commit is contained in:
Aleksi Häkli 2019-02-10 19:53:41 +02:00 committed by GitHub
commit b8f417d485
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
55 changed files with 1265 additions and 646 deletions

View file

@ -1,2 +1,3 @@
[run]
source=axes
branch = True
source = axes

View file

@ -2,8 +2,6 @@ dist: xenial
language: python
cache: pip
python:
- 2.7
- 3.4
- 3.5
- 3.6
- 3.7
@ -14,15 +12,7 @@ env:
- DJANGO=master
matrix:
exclude:
- python: 2.7
env: DJANGO=2.0
- python: 2.7
env: DJANGO=2.1
- python: 2.7
env: DJANGO=master
- python: 3.4
env: DJANGO=2.1
- python: 3.4
- python: 3.5
env: DJANGO=master
install: pip install tox-travis codecov
script: tox

View file

@ -1,6 +1,37 @@
Changes
=======
5.0.0 (WIP)
-----------
- Improve managment commands and separate commands for resetting
all access attempts, attempts by IP and attempts by username.
Add tests for the management commands for better coverage.
[aleksihakli]
- Add a Django native authentication stack that utilizes
``AUTHENTICATION_BACKENDS``, ``MIDDLEWARE``, and signal handlers
for tracking login attempts and implementing user lockouts.
This results in configuration changes, refer to the documentation.
[aleksihakli]
- Remove automatic decoration of Django login views and forms.
Leave decorations available for application users who wish to
decorate their own login or other views as before.
[aleksihakli]
- Clean up code, tests, and documentation.
[aleksihakli]
- Drop support for Python 2.7 and Python 3.4.
[aleksihakli]
- Drop old single-argument signature format for ``AXES_USERNAME_CALLABLE``.
[aleksihakli]
- Improve tests and raise Codecov monitoring threshold to 90%.
[aleksihakli]
4.5.4 (2019-01-15)
------------------

View file

@ -1,5 +1,3 @@
from __future__ import unicode_literals
__version__ = '4.5.4'
default_app_config = 'axes.apps.AppConfig'

View file

@ -1,5 +1,3 @@
from __future__ import unicode_literals
from django.contrib import admin
from django.utils.translation import gettext_lazy as _
@ -7,6 +5,7 @@ from axes.models import AccessLog
from axes.models import AccessAttempt
@admin.register(AccessAttempt)
class AccessAttemptAdmin(admin.ModelAdmin):
list_display = (
'attempt_time',
@ -59,9 +58,7 @@ class AccessAttemptAdmin(admin.ModelAdmin):
return False
admin.site.register(AccessAttempt, AccessAttemptAdmin)
@admin.register(AccessLog)
class AccessLogAdmin(admin.ModelAdmin):
list_display = (
'attempt_time',
@ -109,6 +106,3 @@ class AccessLogAdmin(admin.ModelAdmin):
def has_add_permission(self, request):
return False
admin.site.register(AccessLog, AccessLogAdmin)

View file

@ -1,5 +1,3 @@
from __future__ import unicode_literals
from django import apps
@ -7,23 +5,5 @@ class AppConfig(apps.AppConfig):
name = 'axes'
def ready(self):
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
if settings.CACHES[getattr(settings, 'AXES_CACHE', 'default')]['BACKEND'] == \
'django.core.cache.backends.locmem.LocMemCache':
raise ImproperlyConfigured(
'django-axes does not work properly with LocMemCache as the default cache backend'
' please add e.g. a DummyCache backend for axes and configure it with AXES_CACHE'
)
from django.contrib.auth.views import LoginView
from django.utils.decorators import method_decorator
from axes import signals # pylint: disable=unused-import,unused-variable
from axes.decorators import axes_dispatch
from axes.decorators import axes_form_invalid
LoginView.dispatch = method_decorator(axes_dispatch)(LoginView.dispatch)
LoginView.form_invalid = method_decorator(axes_form_invalid)(LoginView.form_invalid)
from axes import signals
signals.ProxyHandler.initialize()

View file

@ -1,5 +1,3 @@
from __future__ import unicode_literals
from datetime import timedelta
from hashlib import md5
@ -12,9 +10,10 @@ from axes.utils import get_axes_cache, get_client_ip, get_client_username
def _query_user_attempts(request, credentials=None):
"""Returns access attempt record if it exists.
Otherwise return None.
"""
Return access attempt record if it exists. Otherwise return None.
"""
ip = get_client_ip(request)
username = get_client_username(request, credentials)
@ -52,9 +51,11 @@ def _query_user_attempts(request, credentials=None):
def get_cache_key(request_or_obj, credentials=None):
"""
Build cache key name from request or AccessAttempt object.
:param request_or_obj: Request or AccessAttempt object
:return cache-key: String, key to be used in cache system
"""
if isinstance(request_or_obj, AccessAttempt):
ip = request_or_obj.ip_address
un = request_or_obj.username
@ -84,7 +85,10 @@ def get_cache_key(request_or_obj, credentials=None):
def get_cache_timeout():
"""Returns timeout according to COOLOFF_TIME."""
"""
Return timeout according to COOLOFF_TIME.
"""
cache_timeout = None
cool_off = settings.AXES_COOLOFF_TIME
if cool_off:
@ -147,10 +151,12 @@ def ip_in_blacklist(ip):
def is_user_lockable(request, credentials=None):
"""Check if the user has a profile with nolockout
If so, then return the value to see if this user is special
and doesn't get their account locked out
"""
Check if the user has a profile with nolockout attribute set.
If so, then return the value to see if this user is special and does not get their account locked out.
"""
if request.method != 'POST':
return True

View file

@ -1,23 +1,18 @@
from __future__ import unicode_literals
from django.contrib.auth.backends import ModelBackend
from django.core.exceptions import PermissionDenied
from axes.attempts import is_already_locked
from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired
from axes.utils import get_credentials, get_lockout_message
class AxesModelBackend(ModelBackend):
class RequestParameterRequired(Exception):
msg = 'AxesModelBackend requires calls to authenticate to pass `request` as an argument.'
def __init__(self):
super(AxesModelBackend.RequestParameterRequired, self).__init__(
AxesModelBackend.RequestParameterRequired.msg)
class AxesBackend(ModelBackend):
"""
Authentication backend that forbids login attempts for locked out users.
"""
def authenticate(self, request, username=None, password=None, **kwargs):
"""Checks user lock out status and raises PermissionDenied if user is not allowed to log in.
"""
Check user lock out status and raises PermissionDenied if user is not allowed to log in.
Inserts errors directly to `return_context` that is supplied as a keyword argument.
@ -26,15 +21,18 @@ class AxesModelBackend(ModelBackend):
Note that this method does not log your user in and delegates login to other backends.
:param request: see ModelBackend.authenticate
:param kwargs: see ModelBackend.authenticate
:param request: see django.contrib.auth.backends.ModelBackend.authenticate
:param username: see django.contrib.auth.backends.ModelBackend.authenticate
:param password: see django.contrib.auth.backends.ModelBackend.authenticate
:param kwargs: see django.contrib.auth.backends.ModelBackend.authenticate
:keyword response_context: context dict that will be updated with error information
:raises PermissionDenied: if user is already locked out
:raises AxesBackendRequestParameterRequired: if request parameter is not given correctly
:raises AxesBackendPermissionDenied: if user is already locked out
:return: None
"""
if request is None:
raise AxesModelBackend.RequestParameterRequired()
raise AxesBackendRequestParameterRequired('AxesBackend requires a request as an argument to authenticate')
credentials = get_credentials(username=username, password=password, **kwargs)
@ -44,6 +42,14 @@ class AxesModelBackend(ModelBackend):
error_msg = get_lockout_message()
response_context = kwargs.get('response_context', {})
response_context['error'] = error_msg
raise PermissionDenied(error_msg)
# Raise an error that stops the authentication flows at django.contrib.auth.authenticate.
# This error stops bubbling up at the authenticate call which catches backend PermissionDenied errors.
# After this error is caught by authenticate it emits a signal indicating user login failed,
# which is processed by axes.signals.log_user_login_failed which logs the attempt and raises
# a second exception which bubbles up the middleware stack and produces a HTTP 403 Forbidden reply
# in the axes.middleware.AxesMiddleware.process_exception middleware exception handler.
raise AxesBackendPermissionDenied('AxesBackend detected that the given user is locked out')
# No-op

54
axes/checks.py Normal file
View file

@ -0,0 +1,54 @@
from django.core.checks import Error, Tags, register
from django.conf import settings
class Messages:
CACHE_MISSING = 'missing cache configuration for AXES_CACHE'
CACHE_INVALID = 'invalid cache configuration for settings.AXES_CACHE'
class Hints:
CACHE_MISSING = (
'django-axes needs to have a cache configured with settings.AXES_CACHE'
)
CACHE_INVALID = (
'django-axes does not work properly with LocMemCache as the cache backend'
' please add e.g. a DummyCache backend and configure it with settings.AXES_CACHE'
)
class Codes:
CACHE_MISSING = 'axes.E001'
CACHE_INVALID = 'axes.E002'
@register(Tags.caches)
def axes_cache_backend_check(app_configs, **kwargs): # pylint: disable=unused-argument
errors = []
axes_cache_key = getattr(settings, 'AXES_CACHE', 'default')
axes_cache_config = settings.CACHES.get(axes_cache_key, {})
axes_cache_backend = axes_cache_config.get('BACKEND', '')
axes_cache_incompatible_backends = [
'django.core.cache.backends.locmem.LocMemCache',
]
if not axes_cache_config:
errors.append(Error(
msg=Messages.CACHE_MISSING,
hint=Hints.CACHE_MISSING,
obj=settings.CACHES,
id=Codes.CACHE_MISSING,
))
if axes_cache_backend in axes_cache_incompatible_backends:
errors.append(Error(
msg=Messages.CACHE_INVALID,
hint=Hints.CACHE_INVALID,
obj=settings.CACHES,
id=Codes.CACHE_INVALID,
))
return errors

View file

@ -1,5 +1,3 @@
from __future__ import unicode_literals
from django.conf import settings
from django.utils.translation import gettext_lazy as _
@ -37,6 +35,8 @@ class MyAppConf(AppConf):
DISABLE_SUCCESS_ACCESS_LOG = False
HANDLER = 'axes.handlers.AxesHandler'
LOGGER = 'axes.watch_login'
LOCKOUT_TEMPLATE = None

View file

@ -1,18 +1,10 @@
from __future__ import unicode_literals
from datetime import timedelta
from functools import wraps
import json
import logging
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.shortcuts import render
from axes import get_version
from axes.conf import settings
from axes.attempts import is_already_locked
from axes.utils import iso8601, get_client_username, get_lockout_message
from axes.utils import get_lockout_response
log = logging.getLogger(settings.AXES_LOGGER)
if settings.AXES_VERBOSE:
@ -29,7 +21,7 @@ if settings.AXES_VERBOSE:
def axes_dispatch(func):
def inner(request, *args, **kwargs):
if is_already_locked(request):
return lockout_response(request)
return get_lockout_response(request)
return func(request, *args, **kwargs)
@ -40,41 +32,8 @@ def axes_form_invalid(func):
@wraps(func)
def inner(self, *args, **kwargs):
if is_already_locked(self.request):
return lockout_response(self.request)
return get_lockout_response(self.request)
return func(self, *args, **kwargs)
return inner
def lockout_response(request):
context = {
'failure_limit': settings.AXES_FAILURE_LIMIT,
'username': get_client_username(request) or ''
}
cool_off = settings.AXES_COOLOFF_TIME
if cool_off:
if isinstance(cool_off, (int, float)):
cool_off = timedelta(hours=cool_off)
context.update({
'cooloff_time': iso8601(cool_off)
})
if request.is_ajax():
return HttpResponse(
json.dumps(context),
content_type='application/json',
status=403,
)
if settings.AXES_LOCKOUT_TEMPLATE:
return render(
request, settings.AXES_LOCKOUT_TEMPLATE, context, status=403
)
if settings.AXES_LOCKOUT_URL:
return HttpResponseRedirect(settings.AXES_LOCKOUT_URL)
return HttpResponse(get_lockout_message(), status=403)

38
axes/exceptions.py Normal file
View file

@ -0,0 +1,38 @@
from django.core.exceptions import PermissionDenied
class AxesPermissionDenied(PermissionDenied):
"""
Base class for permission denied errors raised by axes specifically for easier debugging.
Two different types of errors are used because of the behaviour Django has:
- If an authentication backend raises a PermissionDenied error the authentication flow is aborted.
- If another component raises a PermissionDenied error a HTTP 403 Forbidden response is returned.
"""
pass
class AxesSignalPermissionDenied(AxesPermissionDenied):
"""
Raised by signal handler on failed authentication attempts to send user a HTTP 403 Forbidden status code.
"""
pass
class AxesBackendPermissionDenied(AxesPermissionDenied):
"""
Raised by authentication backend on locked out requests to stop the Django authentication flow.
"""
pass
class AxesBackendRequestParameterRequired(ValueError):
"""
Raised by authentication backend on invalid or missing request parameter value.
"""
pass

196
axes/handlers.py Normal file
View file

@ -0,0 +1,196 @@
import logging
from django.utils.timezone import now
from axes.conf import settings
from axes.attempts import get_cache_key
from axes.attempts import get_cache_timeout
from axes.attempts import get_user_attempts
from axes.attempts import is_user_lockable
from axes.attempts import ip_in_whitelist
from axes.attempts import reset_user_attempts
from axes.exceptions import AxesSignalPermissionDenied
from axes.models import AccessLog, AccessAttempt
from axes.signals import user_locked_out
from axes.utils import get_client_str
from axes.utils import query2str
from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials
log = logging.getLogger(settings.AXES_LOGGER)
class AxesHandler: # pylint: disable=too-many-locals
"""
Signal handler implementation that records user login attempts to database and locks users out if necessary.
"""
def user_login_failed(self, sender, credentials, request, **kwargs): # pylint: disable=unused-argument
"""
When user login fails, save AccessAttempt record in database and lock user out if necessary.
:raises AxesSignalPermissionDenied: if user should is locked out
"""
if request is None:
log.warning('Attempt to authenticate with a custom backend failed.')
return
ip_address = get_client_ip(request)
username = get_client_username(request, credentials)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address):
return
failures = 0
attempts = get_user_attempts(request, credentials)
cache_hash_key = get_cache_key(request, credentials)
cache_timeout = get_cache_timeout()
failures_cached = get_axes_cache().get(cache_hash_key)
if failures_cached is not None:
failures = failures_cached
else:
for attempt in attempts:
failures = max(failures, attempt.failures_since_start)
# add a failed attempt for this user
failures += 1
get_axes_cache().set(cache_hash_key, failures, cache_timeout)
# has already attempted, update the info
if attempts:
for attempt in attempts:
attempt.get_data = '%s\n---------\n%s' % (
attempt.get_data,
query2str(request.GET),
)
attempt.post_data = '%s\n---------\n%s' % (
attempt.post_data,
query2str(request.POST)
)
attempt.http_accept = http_accept
attempt.path_info = path_info
attempt.failures_since_start = failures
attempt.attempt_time = now()
attempt.save()
log.info(
'AXES: Repeated login failure by %s. Count = %d of %d',
get_client_str(username, ip_address, user_agent, path_info),
failures,
settings.AXES_FAILURE_LIMIT,
)
else:
# Record failed attempt. Whether or not the IP address or user agent is
# used in counting failures is handled elsewhere, so we just record
# everything here.
AccessAttempt.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
get_data=query2str(request.GET),
post_data=query2str(request.POST),
http_accept=http_accept,
path_info=path_info,
failures_since_start=failures,
)
log.info(
'AXES: New login failure by %s. Creating access record.',
get_client_str(username, ip_address, user_agent, path_info),
)
# no matter what, we want to lock them out if they're past the number of
# attempts allowed, unless the user is set to notlockable
if (
failures >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE and
is_user_lockable(request, credentials)
):
log.warning(
'AXES: Locked out %s after repeated login failures.',
get_client_str(username, ip_address, user_agent, path_info),
)
# send signal when someone is locked out.
user_locked_out.send(
'axes',
request=request,
username=username,
ip_address=ip_address,
)
raise AxesSignalPermissionDenied('Locked out due to repeated login failures.')
def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs in, update the AccessLog related to the user.
"""
username = user.get_username()
credentials = get_credentials(username)
ip_address = get_client_ip(request)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
log.info(
'AXES: Successful login by %s.',
get_client_str(username, ip_address, user_agent, path_info),
)
if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG:
AccessLog.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
http_accept=http_accept,
path_info=path_info,
trusted=True,
)
if settings.AXES_RESET_ON_SUCCESS:
count = reset_user_attempts(request, credentials)
log.info(
'AXES: Deleted %d failed login attempts by %s.',
count,
get_client_str(username, ip_address, user_agent, path_info),
)
def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs out, update the AccessLog related to the user.
"""
log.info('AXES: Successful logout by %s.', user)
if user and not settings.AXES_DISABLE_ACCESS_LOG:
AccessLog.objects.filter(
username=user.get_username(),
logout_time__isnull=True,
).update(
logout_time=now(),
)
def post_save_access_attempt(self, instance, **kwargs): # pylint: disable=unused-argument
"""
Update cache after saving AccessAttempts.
"""
cache_hash_key = get_cache_key(instance)
if not get_axes_cache().get(cache_hash_key):
cache_timeout = get_cache_timeout()
get_axes_cache().set(cache_hash_key, instance.failures_since_start, cache_timeout)
def post_delete_access_attempt(self, instance, **kwargs): # pylint: disable=unused-argument
"""
Update cache after deleting AccessAttempts.
"""
cache_hash_key = get_cache_key(instance)
get_axes_cache().delete(cache_hash_key)

View file

@ -1 +0,0 @@
from __future__ import unicode_literals

View file

@ -1 +0,0 @@
from __future__ import unicode_literals

View file

@ -1,15 +1,12 @@
from __future__ import unicode_literals
from django.core.management.base import BaseCommand
from axes.models import AccessAttempt
class Command(BaseCommand):
args = ''
help = ('List registered login attempts')
help = 'List access attempts'
def handle(self, *args, **kwargs): # pylint: disable=unused-argument
def handle(self, *args, **options): # pylint: disable=unused-argument
for obj in AccessAttempt.objects.all():
self.stdout.write('{ip}\t{username}\t{failures}'.format(
ip=obj.ip_address,

View file

@ -1,27 +1,15 @@
from __future__ import unicode_literals
from django.core.management.base import BaseCommand
from axes.utils import reset
class Command(BaseCommand):
help = ("resets any lockouts or failed login records. If called with an "
"IP, resets only for that IP")
help = 'Reset all access attempts and lockouts'
def add_arguments(self, parser):
parser.add_argument('ip', nargs='*')
def handle(self, *args, **options): # pylint: disable=unused-argument
count = reset()
def handle(self, *args, **kwargs):
count = 0
if kwargs and kwargs.get('ip'):
for ip in kwargs['ip'][1:]:
count += reset(ip=ip)
if count:
self.stdout.write('{0} attempts removed.'.format(count))
else:
count = reset()
if kwargs['verbosity']:
if count:
self.stdout.write('{0} attempts removed.'.format(count))
else:
self.stdout.write('No attempts found.')
self.stdout.write('No attempts found.')

View file

@ -0,0 +1,21 @@
from django.core.management.base import BaseCommand
from axes.utils import reset
class Command(BaseCommand):
help = 'Reset all access attempts and lockouts for given IP addresses'
def add_arguments(self, parser):
parser.add_argument('ip', nargs='+', type=str)
def handle(self, *args, **options):
count = 0
for ip in options['ip']:
count += reset(ip=ip)
if count:
self.stdout.write('{0} attempts removed.'.format(count))
else:
self.stdout.write('No attempts found.')

View file

@ -1,22 +0,0 @@
from __future__ import unicode_literals
from django.core.management.base import BaseCommand
from axes.utils import reset
class Command(BaseCommand):
help = ("Resets any lockouts or failed login records. If called with an "
"User name, resets only for that User name")
def add_arguments(self, parser):
parser.add_argument('username')
def handle(self, *args, **kwargs):
count = 0
count += reset(username=kwargs['username'])
if kwargs['verbosity']:
if count:
self.stdout.write('{0} attempts removed.'.format(count))
else:
self.stdout.write('No attempts found.')

View file

@ -0,0 +1,21 @@
from django.core.management.base import BaseCommand
from axes.utils import reset
class Command(BaseCommand):
help = 'Reset all access attempts and lockouts for given usernames'
def add_arguments(self, parser):
parser.add_argument('username', nargs='+', type=str)
def handle(self, *args, **options):
count = 0
for username in options['username']:
count += reset(username=username)
if count:
self.stdout.write('{0} attempts removed.'.format(count))
else:
self.stdout.write('No attempts found.')

39
axes/middleware.py Normal file
View file

@ -0,0 +1,39 @@
from axes.exceptions import AxesSignalPermissionDenied
from axes.utils import get_lockout_response
class AxesMiddleware:
"""
Middleware that maps lockout signals into readable HTTP 403 Forbidden responses.
Without this middleware the backend returns HTTP 403 errors with the
django.views.defaults.permission_denied view that renders the 403.html
template from the root template directory if found.
Refer to the Django documentation for further information:
https://docs.djangoproject.com/en/dev/ref/views/#the-403-http-forbidden-view
To customize the error rendering, you can for example inherit this middleware
and change the process_exception handler to your own liking.
"""
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
return self.get_response(request)
def process_exception(self, request, exception): # pylint: disable=inconsistent-return-statements
"""
Exception handler that processes exceptions raised by the axes signal handler when request fails with login.
Refer to axes.signals.log_user_login_failed for the error code.
:param request: HTTPRequest that will be locked out.
:param exception: Exception raised by Django views or signals. Only AxesSignalPermissionDenied will be handled.
:return: HTTPResponse that indicates the lockout or None.
"""
if isinstance(exception, AxesSignalPermissionDenied):
return get_lockout_response(request)

View file

@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models

View file

@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models

View file

@ -1,6 +1,3 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations

View file

@ -1,7 +1,3 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11.16 on 2018-10-24 22:38
from __future__ import unicode_literals
from django.db import migrations, models

View file

@ -1,5 +1,3 @@
# Generated by Django 2.1.4 on 2018-12-23 09:03
from django.db import migrations

View file

@ -1,5 +1,3 @@
from __future__ import unicode_literals
from django.db import models
from django.utils.translation import gettext_lazy as _

View file

@ -1,5 +1,3 @@
from __future__ import unicode_literals
import logging
from django.contrib.auth.signals import user_logged_in
@ -8,20 +6,10 @@ from django.contrib.auth.signals import user_login_failed
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.dispatch import Signal
from django.utils import timezone
from django.utils.module_loading import import_string
from axes.conf import settings
from axes.attempts import get_cache_key
from axes.attempts import get_cache_timeout
from axes.attempts import get_user_attempts
from axes.attempts import is_user_lockable
from axes.attempts import ip_in_whitelist
from axes.attempts import reset_user_attempts
from axes.models import AccessLog, AccessAttempt
from axes.utils import get_client_str
from axes.utils import query2str
from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials
from axes.models import AccessAttempt
log = logging.getLogger(settings.AXES_LOGGER)
@ -29,156 +17,110 @@ log = logging.getLogger(settings.AXES_LOGGER)
user_locked_out = Signal(providing_args=['request', 'username', 'ip_address'])
@receiver(user_login_failed)
def log_user_login_failed(sender, credentials, request, **kwargs): # pylint: disable=unused-argument
""" Create an AccessAttempt record if the login wasn't successful
class ProxyHandler:
"""
if request is None:
log.warning('Attempt to authenticate with a custom backend failed.')
return
Proxy interface for configurable Axes signal handler class.
ip_address = get_client_ip(request)
username = get_client_username(request, credentials)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
If you wish to implement a custom version of this handler,
you can override the settings.AXES_HANDLER configuration string
with a class that implements a compatible interface and methods.
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address):
return
Defaults to using axes.handlers.AxesHandler if not overridden.
Refer to axes.handlers.AxesHandler for default implementation.
"""
failures = 0
attempts = get_user_attempts(request, credentials)
cache_hash_key = get_cache_key(request, credentials)
cache_timeout = get_cache_timeout()
implementation = None # concrete handler that is bootstrapped by the Django application loader
failures_cached = get_axes_cache().get(cache_hash_key)
if failures_cached is not None:
failures = failures_cached
else:
for attempt in attempts:
failures = max(failures, attempt.failures_since_start)
@classmethod
def initialize(cls):
"""
Fetch and initialize concrete handler implementation and memoize it to avoid reinitialization.
# add a failed attempt for this user
failures += 1
get_axes_cache().set(cache_hash_key, failures, cache_timeout)
This method is re-entrant and can be called multiple times.
"""
# has already attempted, update the info
if attempts:
for attempt in attempts:
attempt.get_data = '%s\n---------\n%s' % (
attempt.get_data,
query2str(request.GET),
)
attempt.post_data = '%s\n---------\n%s' % (
attempt.post_data,
query2str(request.POST)
)
attempt.http_accept = http_accept
attempt.path_info = path_info
attempt.failures_since_start = failures
attempt.attempt_time = timezone.now()
attempt.save()
if cls.implementation is None:
cls.implementation = import_string(settings.AXES_HANDLER)()
log.info(
'AXES: Repeated login failure by %s. Count = %d of %d',
get_client_str(username, ip_address, user_agent, path_info),
failures,
settings.AXES_FAILURE_LIMIT
)
else:
# Record failed attempt. Whether or not the IP address or user agent is
# used in counting failures is handled elsewhere, so we just record
# everything here.
AccessAttempt.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
get_data=query2str(request.GET),
post_data=query2str(request.POST),
http_accept=http_accept,
path_info=path_info,
failures_since_start=failures,
)
@classmethod
def user_login_failed(cls, sender, credentials, request, **kwargs):
"""
Handle user login failure event.
log.info(
'AXES: New login failure by %s. Creating access record.',
get_client_str(username, ip_address, user_agent, path_info)
)
:param credentials: credentials used for authentication attempt
:param request: request used for failed authentication attempt
:return: None
"""
# no matter what, we want to lock them out if they're past the number of
# attempts allowed, unless the user is set to notlockable
if (
failures >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE and
is_user_lockable(request, credentials)
):
log.warning(
'AXES: locked out %s after repeated login attempts.',
get_client_str(username, ip_address, user_agent, path_info)
)
cls.implementation.user_login_failed(sender, credentials, request, **kwargs)
# send signal when someone is locked out.
user_locked_out.send(
'axes', request=request, username=username, ip_address=ip_address
)
@classmethod
def user_logged_in(cls, sender, request, user, **kwargs):
"""
Handle user login event.
:param credentials: credentials used for successful authentication
:param request: request used for successful authentication
:return: None
"""
cls.implementation.user_logged_in(sender, request, user, **kwargs)
@classmethod
def user_logged_out(cls, sender, request, user, **kwargs):
"""
Handle user logout event.
:param request: request used for logout
:param user: user used for logout
:return: None
"""
cls.implementation.user_logged_out(sender, request, user, **kwargs)
@classmethod
def post_save_access_attempt(cls, instance, **kwargs):
"""
Handle AccessAttempt save event.
:param instance: axes.models.AccessAttempt instance that will be saved
:return: None
"""
cls.implementation.post_save_access_attempt(instance, **kwargs)
@classmethod
def post_delete_access_attempt(cls, instance, **kwargs):
"""
Handle AccessAttempt delete event.
:param instance: axes.models.AccessAttempt instance that was deleted
:return: None
"""
cls.implementation.post_delete_access_attempt(instance, **kwargs)
@receiver(user_login_failed)
def handle_user_login_failed(*args, **kwargs):
ProxyHandler.user_login_failed(*args, **kwargs)
@receiver(user_logged_in)
def log_user_logged_in(sender, request, user, **kwargs): # pylint: disable=unused-argument
""" When a user logs in, update the access log
"""
username = user.get_username()
credentials = get_credentials(username)
ip_address = get_client_ip(request)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
log.info(
'AXES: Successful login by %s.',
get_client_str(username, ip_address, user_agent, path_info)
)
if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG:
AccessLog.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
http_accept=http_accept,
path_info=path_info,
trusted=True,
)
if settings.AXES_RESET_ON_SUCCESS:
count = reset_user_attempts(request, credentials)
log.info(
'AXES: Deleted %d failed login attempts by %s.',
count,
get_client_str(username, ip_address, user_agent, path_info)
)
def handle_user_logged_in(*args, **kwargs):
ProxyHandler.user_logged_in(*args, **kwargs)
@receiver(user_logged_out)
def log_user_logged_out(sender, request, user, **kwargs): # pylint: disable=unused-argument
""" When a user logs out, update the access log
"""
log.info('AXES: Successful logout by %s.', user)
if user and not settings.AXES_DISABLE_ACCESS_LOG:
AccessLog.objects.filter(
username=user.get_username(),
logout_time__isnull=True,
).update(logout_time=timezone.now())
def handle_user_logged_out(*args, **kwargs):
ProxyHandler.user_logged_out(*args, **kwargs)
@receiver(post_save, sender=AccessAttempt)
def update_cache_after_save(instance, **kwargs): # pylint: disable=unused-argument
cache_hash_key = get_cache_key(instance)
if not get_axes_cache().get(cache_hash_key):
cache_timeout = get_cache_timeout()
get_axes_cache().set(cache_hash_key, instance.failures_since_start, cache_timeout)
def handle_post_save_access_attempt(*args, **kwargs):
ProxyHandler.post_save_access_attempt(*args, **kwargs)
@receiver(post_delete, sender=AccessAttempt)
def delete_cache_after_delete(instance, **kwargs): # pylint: disable=unused-argument
cache_hash_key = get_cache_key(instance)
get_axes_cache().delete(cache_hash_key)
def handle_post_delete_access_attempt(*args, **kwargs):
ProxyHandler.post_delete_access_attempt(*args, **kwargs)

View file

@ -1,5 +1,3 @@
from __future__ import unicode_literals
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
@ -15,29 +13,37 @@ CACHES = {
SITE_ID = 1
MIDDLEWARE = (
MIDDLEWARE = [
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
)
AUTHENTICATION_BACKENDS = (
'axes.backends.AxesModelBackend',
'axes.middleware.AxesMiddleware',
]
AUTHENTICATION_BACKENDS = [
'axes.backends.AxesBackend',
'django.contrib.auth.backends.ModelBackend',
)
]
PASSWORD_HASHERS = [
'django.contrib.auth.hashers.MD5PasswordHasher',
]
ROOT_URLCONF = 'axes.test_urls'
INSTALLED_APPS = (
INSTALLED_APPS = [
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
'django.contrib.messages',
'django.contrib.admin',
'axes',
)
]
TEMPLATES = [
{

View file

@ -1,11 +0,0 @@
from __future__ import unicode_literals
from .test_settings import * # pylint: disable=unused-wildcard-import
AXES_CACHE = 'axes'
CACHES = {
'axes': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'
}
}

View file

@ -1,8 +1,7 @@
from __future__ import unicode_literals
from django.conf.urls import url
from django.contrib import admin
urlpatterns = [
url(r'^admin/', admin.site.urls),
]

View file

@ -1 +0,0 @@
from __future__ import unicode_literals

View file

@ -1,6 +0,0 @@
from __future__ import unicode_literals
try:
from unittest.mock import patch # pylint: disable=unused-import
except ImportError:
from mock import patch # pylint: disable=unused-import

View file

@ -1,11 +1,9 @@
from __future__ import unicode_literals
import datetime
import hashlib
import json
import random
import string
import time
from unittest.mock import patch
from django.http import HttpRequest
from django.test import TestCase, override_settings
@ -18,24 +16,28 @@ from axes.conf import settings
from axes.attempts import get_cache_key
from axes.models import AccessAttempt, AccessLog
from axes.signals import user_locked_out
from axes.tests.compatibility import patch
from axes.utils import reset
@override_settings(AXES_COOLOFF_TIME=datetime.timedelta(seconds=2))
class AccessAttemptTest(TestCase):
"""Test case using custom settings for testing
"""
Test case using custom settings for testing.
"""
VALID_USERNAME = 'valid-username'
VALID_PASSWORD = 'valid-password'
LOCKED_MESSAGE = 'Account locked: too many login attempts.'
LOGIN_FORM_KEY = '<input type="submit" value="Log in" />'
def _login(self, is_valid_username=False, is_valid_password=False,
is_json=False, **kwargs):
"""Login a user. A valid credential is used when is_valid_username is True,
def _login(self, is_valid_username=False, is_valid_password=False, **kwargs):
"""
Login a user.
A valid credential is used when is_valid_username is True,
otherwise it will use a random string to make a failed login.
"""
if is_valid_username:
# Use a valid username
username = self.VALID_USERNAME
@ -59,22 +61,17 @@ class AccessAttemptTest(TestCase):
}
post_data.update(kwargs)
if is_json:
headers.update({
'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest',
'content_type': 'application/json',
})
post_data = json.dumps(post_data)
response = self.client.post(
reverse('admin:login'), post_data, **headers
return self.client.post(
reverse('admin:login'),
post_data,
**headers
)
return response
def setUp(self):
"""Create a valid user for login
"""
Create a valid user for login.
"""
self.user = User.objects.create_superuser(
username=self.VALID_USERNAME,
email='test@example.com',
@ -82,9 +79,10 @@ class AccessAttemptTest(TestCase):
)
def test_failure_limit_once(self):
"""Tests the login lock trying to login one more time
than failure limit
"""
Test the login lock trying to login one more time than failure limit.
"""
# test until one try before the limit
for _ in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login()
@ -97,9 +95,10 @@ class AccessAttemptTest(TestCase):
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
def test_failure_limit_many(self):
"""Tests the login lock trying to login a lot of times more
than failure limit
"""
Test the login lock trying to login a lot of times more than failure limit.
"""
for _ in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
@ -112,14 +111,18 @@ class AccessAttemptTest(TestCase):
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
def test_valid_login(self):
"""Tests a valid login for a real username
"""
Test a valid login for a real username.
"""
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True)
def test_valid_logout(self):
"""Tests a valid logout and make sure the logout_time is updated
"""
Test a valid logout and make sure the logout_time is updated.
"""
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertEqual(AccessLog.objects.latest('id').logout_time, None)
@ -128,8 +131,10 @@ class AccessAttemptTest(TestCase):
self.assertContains(response, 'Logged out')
def test_cooling_off(self):
"""Tests if the cooling time allows a user to login
"""
Test if the cooling time allows a user to login.
"""
self.test_failure_limit_once()
# Wait for the cooling off period
@ -139,8 +144,10 @@ class AccessAttemptTest(TestCase):
self.test_valid_login()
def test_cooling_off_for_trusted_user(self):
"""Test the cooling time for a trusted user
"""
Test the cooling time for a trusted user.
"""
# Test successful login-logout, this makes the user trusted.
self.test_valid_logout()
@ -148,8 +155,10 @@ class AccessAttemptTest(TestCase):
self.test_cooling_off()
def test_long_user_agent_valid(self):
"""Tests if can handle a long user agent
"""
Test if can handle a long user agent.
"""
long_user_agent = 'ie6' * 1024
response = self._login(
is_valid_username=True,
@ -159,8 +168,10 @@ class AccessAttemptTest(TestCase):
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True)
def test_long_user_agent_not_valid(self):
"""Tests if can handle a long user agent with failure
"""
Test if can handle a long user agent with failure.
"""
long_user_agent = 'ie6' * 1024
for _ in range(settings.AXES_FAILURE_LIMIT + 1):
response = self._login(user_agent=long_user_agent)
@ -168,8 +179,10 @@ class AccessAttemptTest(TestCase):
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
def test_reset_ip(self):
"""Tests if can reset an ip address
"""
Test resetting all attempts for an IP address.
"""
# Make a lockout
self.test_failure_limit_once()
@ -180,8 +193,10 @@ class AccessAttemptTest(TestCase):
self.test_valid_login()
def test_reset_all(self):
"""Tests if can reset all attempts
"""
Test resetting all attempts.
"""
# Make a lockout
self.test_failure_limit_once()
@ -193,7 +208,10 @@ class AccessAttemptTest(TestCase):
@patch('axes.utils.get_client_ip', return_value='127.0.0.1')
def test_get_cache_key(self, _):
""" Test the cache key format"""
"""
Test the cache key format.
"""
# Getting cache key from request
ip_address = '127.0.0.1'
cache_hash_key = 'axes-{}'.format(
@ -201,11 +219,13 @@ class AccessAttemptTest(TestCase):
)
request_factory = RequestFactory()
request = request_factory.post('/admin/login/',
data={
'username': self.VALID_USERNAME,
'password': 'test'
})
request = request_factory.post(
'/admin/login/',
data={
'username': self.VALID_USERNAME,
'password': 'test',
},
)
self.assertEqual(cache_hash_key, get_cache_key(request))
@ -220,12 +240,15 @@ class AccessAttemptTest(TestCase):
path_info=request.META.get('PATH_INFO', '<unknown>'),
failures_since_start=0,
)
self.assertEqual(cache_hash_key, get_cache_key(attempt))
self.assertEqual(cache_hash_key, get_cache_key(attempt))
@patch('axes.utils.get_client_ip', return_value='127.0.0.1')
def test_get_cache_key_credentials(self, _):
""" Test the cache key format"""
"""
Test the cache key format.
"""
# Getting cache key from request
ip_address = '127.0.0.1'
cache_hash_key = 'axes-{}'.format(
@ -258,8 +281,10 @@ class AccessAttemptTest(TestCase):
self.assertEqual(cache_hash_key, get_cache_key(attempt))
def test_send_lockout_signal(self):
"""Test if the lockout signal is emitted
"""
Test if the lockout signal is emitted.
"""
# this "hack" is needed so we don't have to use global variables or python3 features
class Scope(object): pass
scope = Scope()
@ -284,9 +309,10 @@ class AccessAttemptTest(TestCase):
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_combination_user_and_ip(self):
"""Tests the login lock with a valid username and invalid password
when AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP is True
"""
Test login failure when AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP is True.
"""
# test until one try before the limit
for _ in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login(
@ -303,9 +329,10 @@ class AccessAttemptTest(TestCase):
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_only(self):
"""Tests the login lock with a valid username and invalid password
when AXES_ONLY_USER_FAILURES is True
"""
Test login failure when AXES_ONLY_USER_FAILURES is True.
"""
# test until one try before the limit
for _ in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login(
@ -342,9 +369,10 @@ class AccessAttemptTest(TestCase):
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True)
def test_log_data_truncated(self):
"""Tests that query2str properly truncates data to the
max_length (default 1024)
"""
Test that query2str properly truncates data to the max_length (default 1024).
"""
# An impossibly large post dict
extra_data = {string.ascii_letters * x: x for x in range(0, 1000)}
self._login(**extra_data)
@ -352,14 +380,6 @@ class AccessAttemptTest(TestCase):
len(AccessAttempt.objects.latest('id').post_data), 1024
)
def test_json_response(self):
"""Tests response content type and status code for the ajax request
"""
self.test_failure_limit_once()
response = self._login(is_json=True)
self.assertEqual(response.status_code, 403)
self.assertEqual(response.get('Content-Type'), 'application/json')
@override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True)
def test_valid_logout_without_success_log(self):
AccessLog.objects.all().delete()
@ -373,9 +393,9 @@ class AccessAttemptTest(TestCase):
@override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True)
def test_valid_login_without_success_log(self):
"""
A valid login doesn't generate an AccessLog when
`DISABLE_SUCCESS_ACCESS_LOG=True`.
Test that a valid login does not generate an AccessLog when DISABLE_SUCCESS_ACCESS_LOG is True.
"""
AccessLog.objects.all().delete()
response = self._login(is_valid_username=True, is_valid_password=True)
@ -396,8 +416,7 @@ class AccessAttemptTest(TestCase):
@override_settings(AXES_DISABLE_ACCESS_LOG=True)
def test_non_valid_login_without_log(self):
"""
A non-valid login does generate an AccessLog when
`DISABLE_ACCESS_LOG=True`.
Test that a non-valid login does generate an AccessLog when DISABLE_ACCESS_LOG is True.
"""
AccessLog.objects.all().delete()
@ -420,10 +439,9 @@ class AccessAttemptTest(TestCase):
self.assertEqual(response.status_code, 200)
def test_custom_authentication_backend(self):
'''
``log_user_login_failed`` should shortcircuit if an attempt to authenticate
with a custom authentication backend fails.
'''
"""
Test that log_user_login_failed skips if an attempt to authenticate with a custom authentication backend fails.
"""
request = HttpRequest()
request.user = self.user
@ -431,8 +449,10 @@ class AccessAttemptTest(TestCase):
self.assertEqual(AccessLog.objects.all().count(), 0)
def _assert_resets_on_success(self):
"""Sets up for testing the AXES_RESET_ON_SUCCESS setting.
"""
Sets the AXES_RESET_ON_SUCCESS up for testing.
"""
# test until one try before the limit
for _ in range(settings.AXES_FAILURE_LIMIT - 1):
response = self._login()
@ -447,9 +467,10 @@ class AccessAttemptTest(TestCase):
# by default, AXES_RESET_ON_SUCCESS = False
def test_reset_on_success_default(self):
"""Tests that the failure attempts does not reset after one successful
attempt by default.
"""
Test that the failure attempts does not reset after one successful attempt by default.
"""
response = self._assert_resets_on_success()
# So, we shouldn't have found a lock-out yet.
@ -458,9 +479,10 @@ class AccessAttemptTest(TestCase):
@override_settings(AXES_RESET_ON_SUCCESS=True)
def test_reset_on_success(self):
"""Tests that the failure attempts resets after one successful
attempt when using the corresponding setting.
"""
Test that the failure attempts resets after one successful attempt when using the corresponding setting.
"""
response = self._assert_resets_on_success()
# So, we shouldn't have found a lock-out yet.

View file

@ -1,7 +1,3 @@
from __future__ import unicode_literals
import json
from django.test import TestCase, override_settings
from django.urls import reverse
from django.contrib.auth.models import User
@ -10,11 +6,13 @@ from axes.conf import settings
class AccessAttemptConfigTest(TestCase):
""" This set of tests checks for lockouts under different configurations
and circumstances to prevent false positives and false negatives.
"""
Test for lockouts under different configurations and circumstances to prevent false positives and false negatives.
Always block attempted logins for the same user from the same IP.
Always allow attempted logins for a different user from a different IP.
"""
IP_1 = '10.1.1.1'
IP_2 = '10.2.2.2'
USER_1 = 'valid-user-1'
@ -26,11 +24,13 @@ class AccessAttemptConfigTest(TestCase):
ALLOWED = 302
BLOCKED = 403
def _login(self, username, password, ip_addr='127.0.0.1',
is_json=False, **kwargs):
"""Login a user and get the response.
def _login(self, username, password, ip_addr='127.0.0.1', **kwargs):
"""
Login a user and get the response.
IP address can be configured to test IP blocking functionality.
"""
headers = {
'user_agent': 'test-browser'
}
@ -41,17 +41,12 @@ class AccessAttemptConfigTest(TestCase):
}
post_data.update(kwargs)
if is_json:
headers.update({
'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest',
'content_type': 'application/json',
})
post_data = json.dumps(post_data)
response = self.client.post(
reverse('admin:login'), post_data, REMOTE_ADDR=ip_addr, **headers
return self.client.post(
reverse('admin:login'),
post_data,
REMOTE_ADDR=ip_addr,
**headers
)
return response
def _lockout_user_from_ip(self, username, ip_addr):
for _ in range(settings.AXES_FAILURE_LIMIT):
@ -69,8 +64,10 @@ class AccessAttemptConfigTest(TestCase):
)
def setUp(self):
"""Create two valid users for authentication.
"""
Create two valid users for authentication.
"""
self.user = User.objects.create_superuser(
username=self.USER_1,
email='test_1@example.com',

View file

@ -0,0 +1,21 @@
from unittest.mock import patch, MagicMock
from django.test import TestCase
from axes.backends import AxesBackend
from axes.exceptions import AxesBackendRequestParameterRequired, AxesBackendPermissionDenied
class BackendTestCase(TestCase):
def test_authenticate_raises_on_missing_request(self):
request = None
with self.assertRaises(AxesBackendRequestParameterRequired):
AxesBackend().authenticate(request)
@patch('axes.backends.is_already_locked', return_value=True)
def test_authenticate_raises_on_locked_request(self, _):
request = MagicMock()
with self.assertRaises(AxesBackendPermissionDenied):
AxesBackend().authenticate(request)

40
axes/tests/test_checks.py Normal file
View file

@ -0,0 +1,40 @@
from django.core.checks import run_checks, Error
from django.conf import settings
from django.test import TestCase, override_settings
from axes.checks import Messages, Hints, Codes
class CacheCheckTestCase(TestCase):
@override_settings(
AXES_CACHE='nonexistent',
)
def test_cache_missing_produces_check_error(self):
errors = run_checks()
error = Error(
msg=Messages.CACHE_MISSING,
hint=Hints.CACHE_MISSING,
obj=settings.CACHES,
id=Codes.CACHE_MISSING,
)
self.assertIn(error, errors)
@override_settings(
AXES_CACHE='axes',
CACHES={
'axes': {
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
},
},
)
def test_cache_misconfiguration_produces_check_error(self):
errors = run_checks()
error = Error(
msg=Messages.CACHE_INVALID,
hint=Hints.CACHE_INVALID,
obj=settings.CACHES,
id=Codes.CACHE_INVALID,
)
self.assertIn(error, errors)

View file

@ -0,0 +1,40 @@
from unittest.mock import MagicMock, patch
from django.http import HttpResponse
from django.test import TestCase
from axes.decorators import axes_dispatch, axes_form_invalid
class DecoratorTestCase(TestCase):
SUCCESS_RESPONSE = HttpResponse(status=200, content='Dispatched')
LOCKOUT_RESPONSE = HttpResponse(status=403, content='Locked out')
def setUp(self):
self.request = MagicMock()
self.cls = MagicMock(return_value=self.request)
self.func = MagicMock(return_value=self.SUCCESS_RESPONSE)
@patch('axes.decorators.is_already_locked', return_value=True)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_axes_dispatch_locks_out(self, _, __):
response = axes_dispatch(self.func)(self.request)
self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content)
@patch('axes.decorators.is_already_locked', return_value=False)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_axes_dispatch_dispatches(self, _, __):
response = axes_dispatch(self.func)(self.request)
self.assertEqual(response.content, self.SUCCESS_RESPONSE.content)
@patch('axes.decorators.is_already_locked', return_value=True)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_axes_form_invalid_locks_out(self, _, __):
response = axes_form_invalid(self.func)(self.cls)
self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content)
@patch('axes.decorators.is_already_locked', return_value=False)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_axes_form_invalid_dispatches(self, _, __):
response = axes_form_invalid(self.func)(self.cls)
self.assertEqual(response.content, self.SUCCESS_RESPONSE.content)

View file

@ -0,0 +1,51 @@
from unittest.mock import MagicMock, patch
from django.test import TestCase
from axes.handlers import AxesHandler
from axes.signals import ProxyHandler
class ProxyHandlerTestCase(TestCase):
def setUp(self):
self.sender = MagicMock()
self.credentials = MagicMock()
self.request = MagicMock()
self.user = MagicMock()
self.instance = MagicMock()
@patch('axes.signals.ProxyHandler.implementation', None)
def test_initialize(self):
self.assertIsNone(ProxyHandler.implementation)
ProxyHandler.initialize()
self.assertIsInstance(ProxyHandler.implementation, AxesHandler)
@patch('axes.signals.ProxyHandler.implementation')
def test_user_login_failed(self, handler):
self.assertFalse(handler.user_login_failed.called)
ProxyHandler().user_login_failed(self.sender, self.credentials, self.request)
self.assertTrue(handler.user_login_failed.called)
@patch('axes.signals.ProxyHandler.implementation')
def test_user_logged_in(self, handler):
self.assertFalse(handler.user_logged_in.called)
ProxyHandler().user_logged_in(self.sender, self.request, self.user)
self.assertTrue(handler.user_logged_in.called)
@patch('axes.signals.ProxyHandler.implementation')
def test_user_logged_out(self, handler):
self.assertFalse(handler.user_logged_out.called)
ProxyHandler().user_logged_out(self.sender, self.request, self.user)
self.assertTrue(handler.user_logged_out.called)
@patch('axes.signals.ProxyHandler.implementation')
def test_post_save_access_attempt(self, handler):
self.assertFalse(handler.post_save_access_attempt.called)
ProxyHandler().post_save_access_attempt(self.instance)
self.assertTrue(handler.post_save_access_attempt.called)
@patch('axes.signals.ProxyHandler.implementation')
def test_post_delete_access_attempt(self, handler):
self.assertFalse(handler.post_delete_access_attempt.called)
ProxyHandler().post_delete_access_attempt(self.instance)
self.assertTrue(handler.post_delete_access_attempt.called)

View file

@ -0,0 +1,63 @@
from io import StringIO
from django.core.management import call_command
from django.test import TestCase
from axes.models import AccessAttempt
class ManagementCommandTestCase(TestCase):
def setUp(self):
AccessAttempt.objects.create(
username='jane.doe',
ip_address='10.0.0.1',
failures_since_start='4',
)
AccessAttempt.objects.create(
username='john.doe',
ip_address='10.0.0.2',
failures_since_start='15',
)
def test_axes_list_attempts(self):
out = StringIO()
call_command('axes_list_attempts', stdout=out)
expected = '10.0.0.1\tjane.doe\t4\n10.0.0.2\tjohn.doe\t15\n'
self.assertEqual(expected, out.getvalue())
def test_axes_reset(self):
out = StringIO()
call_command('axes_reset', stdout=out)
expected = '2 attempts removed.\n'
self.assertEqual(expected, out.getvalue())
def test_axes_reset_ip(self):
out = StringIO()
call_command('axes_reset_ip', '10.0.0.1', stdout=out)
expected = '1 attempts removed.\n'
self.assertEqual(expected, out.getvalue())
def test_axes_reset_ip_not_found(self):
out = StringIO()
call_command('axes_reset_ip', '10.0.0.3', stdout=out)
expected = 'No attempts found.\n'
self.assertEqual(expected, out.getvalue())
def test_axes_reset_username(self):
out = StringIO()
call_command('axes_reset_username', 'john.doe', stdout=out)
expected = '1 attempts removed.\n'
self.assertEqual(expected, out.getvalue())
def test_axes_reset_username_not_found(self):
out = StringIO()
call_command('axes_reset_username', 'ivan.renko', stdout=out)
expected = 'No attempts found.\n'
self.assertEqual(expected, out.getvalue())

View file

@ -0,0 +1,28 @@
from unittest.mock import patch, MagicMock
from django.http import HttpResponse
from django.test import TestCase
from axes.exceptions import AxesSignalPermissionDenied
from axes.middleware import AxesMiddleware
class MiddlewareTestCase(TestCase):
SUCCESS_RESPONSE = HttpResponse(status=200, content='Dispatched')
LOCKOUT_RESPONSE = HttpResponse(status=403, content='Locked out')
def setUp(self):
self.request = MagicMock()
self.get_response = MagicMock()
@patch('axes.middleware.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_process_exception_axes(self, _):
exception = AxesSignalPermissionDenied()
response = AxesMiddleware(self.get_response).process_exception(self.request, exception)
self.assertEqual(response, self.LOCKOUT_RESPONSE)
@patch('axes.middleware.get_lockout_response', return_value=LOCKOUT_RESPONSE)
def test_process_exception_other(self, _):
exception = Exception()
response = AxesMiddleware(self.get_response).process_exception(self.request, exception)
self.assertEqual(response, None)

View file

@ -1,27 +1,28 @@
from django.apps.registry import apps
from django.db import connection
from django.db.migrations.autodetector import MigrationAutodetector
from django.db.migrations.executor import MigrationExecutor
from django.db.migrations.state import ProjectState
from django.test import TestCase
from django.utils import translation
class MigrationsCheck(TestCase):
def setUp(self):
from django.utils import translation
self.saved_locale = translation.get_language()
translation.deactivate_all()
def tearDown(self):
if self.saved_locale is not None:
from django.utils import translation
translation.activate(self.saved_locale)
def test_missing_migrations(self):
from django.db import connection
from django.apps.registry import apps
from django.db.migrations.executor import MigrationExecutor
executor = MigrationExecutor(connection)
from django.db.migrations.autodetector import MigrationAutodetector
from django.db.migrations.state import ProjectState
autodetector = MigrationAutodetector(
executor.loader.project_state(),
ProjectState.from_apps(apps),
)
changes = autodetector.changes(graph=executor.loader.graph)
self.assertEqual({}, changes)

View file

@ -1,5 +1,3 @@
from __future__ import unicode_literals
import datetime
from django.http import HttpRequest
@ -9,10 +7,17 @@ from django.utils import six
from axes.utils import iso8601, is_ipv6, get_client_str, get_client_username
def get_expected_client_str(*args, **kwargs):
client_str_template = '{{user: "{0}", ip: "{1}", user-agent: "{2}", path: "{3}"}}'
return client_str_template.format(*args, **kwargs)
class UtilsTest(TestCase):
def test_iso8601(self):
"""Tests iso8601 correctly translates datetime.timdelta to ISO 8601
formatted duration."""
"""
Test iso8601 correctly translates datetime.timdelta to ISO 8601 formatted duration.
"""
EXPECTED = {
datetime.timedelta(days=1, hours=25, minutes=42, seconds=8):
'P2DT1H42M8S',
@ -46,8 +51,7 @@ class UtilsTest(TestCase):
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
expected = get_expected_client_str(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@ -72,8 +76,7 @@ class UtilsTest(TestCase):
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
expected = get_expected_client_str(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@ -99,8 +102,7 @@ class UtilsTest(TestCase):
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
expected = get_expected_client_str(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@ -126,8 +128,7 @@ class UtilsTest(TestCase):
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
expected = get_expected_client_str(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@ -171,22 +172,9 @@ class UtilsTest(TestCase):
self.assertEqual(expected_in_credentials, actual)
def sample_customize_username(request):
def sample_customize_username(request, credentials):
return 'prefixed-' + request.POST.get('username')
@override_settings(AXES_USERNAME_FORM_FIELD='username')
@override_settings(AXES_USERNAME_CALLABLE=sample_customize_username)
def test_custom_get_client_username(self):
provided = 'test-username'
expected = 'prefixed-' + provided
request = HttpRequest()
request.POST['username'] = provided
actual = get_client_username(request)
self.assertEqual(expected, actual)
@override_settings(AXES_USERNAME_FORM_FIELD='username')
@override_settings(AXES_USERNAME_CALLABLE=sample_customize_username)
def test_custom_get_client_username_from_request(self):
@ -222,18 +210,30 @@ class UtilsTest(TestCase):
self.assertEqual(expected_in_credentials, actual)
def sample_get_client_username_too_few_arguments():
def sample_get_client_username(request, credentials):
return 'example'
@override_settings(AXES_USERNAME_CALLABLE=sample_get_client_username)
def test_get_client_username(self):
self.assertEqual('example', get_client_username(HttpRequest(), {}))
@override_settings(AXES_USERNAME_CALLABLE=sample_get_client_username)
def test_get_client_username_too_many_arguments(self):
with self.assertRaises(TypeError):
actual = get_client_username(HttpRequest(), {}, None)
def sample_get_client_username_too_few_arguments(request):
pass
@override_settings(AXES_USERNAME_CALLABLE=sample_get_client_username_too_few_arguments)
def test_get_client_username_too_few_arguments_invalid_callable(self):
def test_get_client_username_invalid_callable_too_few_arguments(self):
with self.assertRaises(TypeError):
actual = get_client_username(HttpRequest(), {})
def sample_get_client_username_too_many_arguments(one, two, three):
def sample_get_client_username_too_many_arguments(request, credentials, extra_argument):
pass
@override_settings(AXES_USERNAME_CALLABLE=sample_get_client_username_too_many_arguments)
def test_get_client_username_too_many_arguments_invalid_callable(self):
def test_get_client_username_invalid_callable_too_many_arguments(self):
with self.assertRaises(TypeError):
actual = get_client_username(HttpRequest(), {})

View file

@ -1,15 +1,10 @@
from __future__ import unicode_literals
try:
import win_inet_pton # pylint: disable=unused-import
except ImportError:
pass
from inspect import getargspec
from datetime import timedelta
from logging import getLogger
from socket import error, inet_pton, AF_INET6
from django.core.cache import caches
from django.http import HttpResponse, HttpResponseRedirect, JsonResponse
from django.shortcuts import render
from django.utils import six
import ipware.ip2
@ -19,18 +14,20 @@ from axes.models import AccessAttempt
logger = getLogger(__name__)
def get_axes_cache():
return caches[getattr(settings, 'AXES_CACHE', 'default')]
def query2str(items, max_length=1024):
"""Turns a dictionary into an easy-to-read list of key-value pairs.
If there's a field called "password" it will be excluded from the output.
The length of the output is limited to max_length to avoid a DoS attack
via excessively large payloads.
"""
Turns a dictionary into an easy-to-read list of key-value pairs.
If there is a field called password it will be excluded from the output.
The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads.
"""
return '\n'.join([
'%s=%s' % (k, v) for k, v in six.iteritems(items)
if k != settings.AXES_PASSWORD_FORM_FIELD
@ -41,7 +38,7 @@ def get_client_str(username, ip_address, user_agent=None, path_info=None):
if settings.AXES_VERBOSE:
if isinstance(path_info, tuple):
path_info = path_info[0]
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
details = '{{user: "{0}", ip: "{1}", user-agent: "{2}", path: "{3}"}}'
return details.format(username, ip_address, user_agent, path_info)
if settings.AXES_ONLY_USER_FAILURES:
@ -73,34 +70,22 @@ def get_client_ip(request):
def get_client_username(request, credentials=None):
"""Resolve client username from the given request or credentials if supplied
"""
Resolve client username from the given request or credentials if supplied.
The order of preference for fetching the username is as follows:
1. If configured, use `AXES_USERNAME_CALLABLE`, and supply either `request` or `request, credentials` as arguments
depending on the function argument count (multiple signatures are supported for backwards compatibility)
2. If given, use `credentials` and fetch username from `AXES_USERNAME_FORM_FIELD` (defaults to `username`)
3. Use request.POST and fetch username from `AXES_USERNAME_FORM_FIELD` (defaults to `username`)
1. If configured, use ``AXES_USERNAME_CALLABLE``, and supply ``request, credentials`` as arguments
2. If given, use ``credentials`` and fetch username from ``AXES_USERNAME_FORM_FIELD`` (defaults to ``username``)
3. Use request.POST and fetch username from ``AXES_USERNAME_FORM_FIELD`` (defaults to ``username``)
:param request: incoming Django `HttpRequest` or similar object from authentication backend or other source
:param credentials: incoming credentials `dict` or similar object from authentication backend or other source
:param request: incoming Django ``HttpRequest`` or similar object from authentication backend or other source
:param credentials: incoming credentials ``dict`` or similar object from authentication backend or other source
"""
if settings.AXES_USERNAME_CALLABLE:
num_args = len(
getargspec(settings.AXES_USERNAME_CALLABLE).args # pylint: disable=deprecated-method
)
if num_args == 2:
logger.debug('Using AXES_USERNAME_CALLABLE for username with two arguments: request, credentials')
return settings.AXES_USERNAME_CALLABLE(request, credentials)
if num_args == 1:
logger.debug('Using AXES_USERNAME_CALLABLE for username with one argument: request')
return settings.AXES_USERNAME_CALLABLE(request)
logger.error('Using AXES_USERNAME_CALLABLE for username failed: wrong number of arguments %s', num_args)
raise TypeError('Wrong number of arguments in function call to AXES_USERNAME_CALLABLE', num_args)
logger.debug('Using AXES_USERNAME_CALLABLE to get username')
return settings.AXES_USERNAME_CALLABLE(request, credentials)
if credentials:
logger.debug('Using `credentials` to get username with key AXES_USERNAME_FORM_FIELD')
@ -125,8 +110,8 @@ def is_ipv6(ip):
def reset(ip=None, username=None):
"""Reset records that match ip or username, and
return the count of removed attempts.
"""
Reset records that match IP or username, and return the count of removed attempts.
"""
attempts = AccessAttempt.objects.all()
@ -141,8 +126,10 @@ def reset(ip=None, username=None):
def iso8601(timestamp):
"""Returns datetime.timedelta translated to ISO 8601 formatted duration.
"""
Return datetime.timedelta translated to ISO 8601 formatted duration.
"""
seconds = timestamp.total_seconds()
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
@ -165,3 +152,40 @@ def get_lockout_message():
if settings.AXES_COOLOFF_TIME:
return settings.AXES_COOLOFF_MESSAGE
return settings.AXES_PERMALOCK_MESSAGE
def get_lockout_response(request):
context = {
'failure_limit': settings.AXES_FAILURE_LIMIT,
'username': get_client_username(request) or ''
}
cool_off = settings.AXES_COOLOFF_TIME
if cool_off:
if isinstance(cool_off, (int, float)):
cool_off = timedelta(hours=cool_off)
context.update({
'cooloff_time': iso8601(cool_off)
})
status = 403
if request.is_ajax():
return JsonResponse(
context,
status=status,
)
if settings.AXES_LOCKOUT_TEMPLATE:
return render(
request,
settings.AXES_LOCKOUT_TEMPLATE,
context,
status=status,
)
if settings.AXES_LOCKOUT_URL:
return HttpResponseRedirect(settings.AXES_LOCKOUT_URL)
return HttpResponse(get_lockout_message(), status=status)

9
codecov.yml Normal file
View file

@ -0,0 +1,9 @@
coverage:
status:
patch: off
project:
default:
# Minimum test coverage required for pass
target: 90%
# Maximum test coverage change allowed for pass
threshold: 20%

View file

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*-
#
# Django Axes documentation build configuration file, created by
# sphinx-quickstart on Sat Jul 30 16:37:41 2016.
#

View file

@ -5,28 +5,151 @@ Configuration
Add ``axes`` to your ``INSTALLED_APPS``::
INSTALLED_APPS = (
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
# ...
'axes',
# ...
)
'django.contrib.messages',
'django.contrib.staticfiles',
Add ``axes.backends.AxesModelBackend`` to the top of ``AUTHENTICATION_BACKENDS``::
# ... other applications per your preference.
'axes',
]
Add ``axes.backends.AxesBackend`` to the top of ``AUTHENTICATION_BACKENDS``::
AUTHENTICATION_BACKENDS = [
'axes.backends.AxesModelBackend',
# ...
# AxesBackend should be the first backend in the list.
# It stops the authentication flow when a user is locked out.
'axes.backends.AxesBackend',
# ... other authentication backends per your preference.
# Django ModelBackend is the default authentication backend.
'django.contrib.auth.backends.ModelBackend',
# ...
]
Add ``axes.middleware.AxesMiddleware`` to your list of ``MIDDLEWARE``::
MIDDLEWARE = [
# The following is the list of default middleware in new Django projects.
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
# ... other middleware per your preference.
# AxesMiddleware should be the last middleware in the list.
# It pretty formats authentication errors into readable responses.
'axes.middleware.AxesMiddleware',
]
Run ``python manage.py migrate`` to sync the database.
How does Axes function?
-----------------------
When a user tries to log in in Django, the login is usually performed
by running a number of authentication backends that check user login
information by calling the ``django.contrib.auth.authenticate`` function.
If an authentication backend does not approve of a user login,
it can raise a ``django.core.exceptions.PermissionDenied`` exception.
If a login fails, Django then fires a
``from django.contrib.auth.signals.user_login_failed`` signal.
If this signal raises an exception, it is propagated through the
Django middleware stack where it can be caught, or alternatively
where it can bubble up to the default Django exception handlers.
A normal login flow for Django runs as follows::
1. Django or plugin login view is called by
for example user sending form data with browser.
2. django.contrib.auth.authenticate is called by
the view code to check the authentication request
for user and return a user object matching it.
3. AUTHENTICATION_BACKENDS are iterated over
and their authenticate methods called one-by-one.
4. An authentication backend either returns
a user object which results in that user
being logged in or returns None.
If a PermissionDenied error is raised
by any of the authentication backends
the whole request authentication flow
is aborted and signal handlers triggered.
Axes monitors logins with the ``user_login_failed`` signal handler
and after login attempts exceed the given maximum, starts blocking them.
The blocking is done by ``AxesBackend`` which checks every request
coming through the Django authentication flow and verifies they
are not blocked, and allows the requests to go through if the check passes.
If any of the checks fails, an exception is raised which interrupts
the login process and triggers the Django login failed signal handlers.
Another exception is raised by a Axes signal handler, which is
then caught by ``AxesMiddleware`` and converted into a readable
error because the user is currently locked out of the system.
Axes implements the lockout flow as follows::
1. Django or plugin login view is called.
2. django.contrib.auth.authenticate is called.
3. AUTHENTICATION_BACKENDS are iterated over
where axes.backends.AxesBackend is the first.
4. AxesBackend checks authentication request
for lockouts rules and either aborts the
authentication flow or lets the authentication
process proceed to the next
configured authentication backend.
[The lockout happens at this stage if appropriate]
5. User is locked out and signal handlers
are notified of the failed login attempt.
6. axes.signals.log_user_login_failed runs
and raises a AxesSignalPermissionDenied
exception that bubbles up the middleware stack.
7. AxesMiddleware processes the exception
and returns a readable error to the user.
This plugin assumes that the login views either call
the django.contrib.auth.authenticate method to log in users
or otherwise take care of notifying Axes of authentication
attempts or login failures the same way Django does.
The login flows can be customized and the Axes
authentication backend or middleware can be easily swapped.
Running checks
--------------
Use the ``python manage.py check`` command to verify the correct configuration in both
development and production environments. It is probably best to use this step as part
of your regular CI workflows to verify that your project is not misconfigured.
Axes uses the checks to verify your cache configuration to see that your caches
should be functional with the configuration of Axes. Many people have different configurations
for their development and production environments.
Known configuration problems
----------------------------
@ -62,7 +185,7 @@ add an extra cache to ``CACHES`` with a name of your choice::
}
}
The next step is to tell axes to use this cache through adding ``AXES_CACHE``
The next step is to tell Axes to use this cache through adding ``AXES_CACHE``
to your ``settings.py`` file::
AXES_CACHE = 'axes_cache'
@ -73,7 +196,7 @@ There are no known problems in other cache backends such as
Authentication backend problems
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
If you get ``AxesModelBackend.RequestParameterRequired`` exceptions,
If you get ``AxesBackendRequestParameterRequired`` exceptions,
make sure any auth libraries and middleware you use pass the request object to authenticate.
Notably in older versions of Django Rest Framework (DRF) (before 3.7.0), ``BasicAuthentication`` does not pass request.
`Here is an example workaround for DRF <https://gist.github.com/markddavidoff/7e442b1ea2a2e68d390e76731c35afe7>`_.
@ -98,7 +221,7 @@ Customizing Axes
You have a couple options available to you to customize ``django-axes`` a bit.
These should be defined in your ``settings.py`` file.
* ``AXES_CACHE``: The name of the cache for axes to use.
* ``AXES_CACHE``: The name of the cache for Axes to use.
Default: ``'default'``
* ``AXES_FAILURE_LIMIT``: The number of login attempts allowed before a
record is created for the failed logins. Default: ``3``
@ -112,7 +235,9 @@ These should be defined in your ``settings.py`` file.
old failed login attempts will be forgotten. Can be set to a python
timedelta object or an integer. If an integer, will be interpreted as a
number of hours. Default: ``None``
* ``AXES_LOGGER``: If set, specifies a logging mechanism for axes to use.
* ``AXES_HANDLER``: If set, overrides the default signal handler backend.
Default: ``'axes.handlers.AxesHandler'``
* ``AXES_LOGGER``: If set, specifies a logging mechanism for Axes to use.
Default: ``'axes.watch_login'``
* ``AXES_LOCKOUT_TEMPLATE``: If set, specifies a template to render when a
user is locked out. Template receives cooloff_time and failure_limit as
@ -124,11 +249,11 @@ These should be defined in your ``settings.py`` file.
Default: ``True``
* ``AXES_USERNAME_FORM_FIELD``: the name of the form field that contains your
users usernames. Default: ``username``
* ``AXES_USERNAME_CALLABLE``: A callable function that takes either one or two arguments:
``AXES_USERNAME_CALLABLE(request)`` or ``AXES_USERNAME_CALLABLE(request, credentials)``.
* ``AXES_USERNAME_CALLABLE``: A callable function that takes two arguments:
``AXES_USERNAME_CALLABLE(request, credentials)``.
The ``request`` is a HttpRequest like object and the ``credentials`` is a dictionary like object.
``credentials`` are the ones that were passed to Django ``authenticate()`` in the login flow.
If no function is supplied, axes fetches the username from the ``credentials`` or ``request.POST``
If no function is supplied, Axes fetches the username from the ``credentials`` or ``request.POST``
dictionaries based on ``AXES_USERNAME_FORM_FIELD``. Default: ``None``
* ``AXES_PASSWORD_FORM_FIELD``: the name of the form or credentials field that contains your
users password. Default: ``password``

View file

@ -16,6 +16,7 @@ Contents
installation
configuration
migration
usage
requirements
development

37
docs/migration.rst Normal file
View file

@ -0,0 +1,37 @@
.. _migration:
Migration
=========
This page contains migration instructions between different django-axes
versions so that users might more confidently upgrade their installations.
From django-axes version 4 to version 5
---------------------------------------
Application version 5 has a few differences compared to django-axes 4.
You might need to search your own codebase and check if you need to change
API endpoints or names for compatibility reasons.
- Login and logout view monkey-patching was removed.
Login monitoring is now implemented with signals
and locking users out is implemented with a combination
of a custom authentication backend, middlware, and signals.
- ``AxesModelBackend`` was renamed to ``AxesBackend``
for better naming and preventing the risk of users accidentally
upgrading without noticing that the APIs have changed.
Documentation was improved. Exceptions were renamed.
- ``axes.backends.AxesModelBackend.RequestParameterRequired``
exception was renamed, retyped to ``ValueError`` from ``Exception``, and
moved to ``axes.exception.AxesBackendRequestParameterRequired``.
- ``AxesBackend`` now raises a
``axes.exceptions.AxesBackendPermissionDenied``
exception when user is locked out which triggers signal handler
to run on failed logins, checking user lockout statuses.
- Axes lockout signal handler now raises exception
``axes.exceptions.AxesSignalPermissionDenied`` on lockouts.
- ``AxesMiddleware`` was added to return lockout responses.
The middleware handles ``axes.exception.AxesSignalPermissionDenied``.
- ``AXES_USERNAME_CALLABLE`` is now always called with two arguments,
``request`` and ``credentials`` instead of ``request``.

View file

@ -2,6 +2,7 @@
Usage
=====
``django-axes`` listens to signals from ``django.contrib.auth.signals`` to
log access attempts:
@ -16,27 +17,29 @@ log the access attempts.
Quickstart
----------
Once ``axes`` is in your ``INSTALLED_APPS`` in your project settings file,
you can login and logout of your application via the ``django.contrib.auth``
views. The access attempts will be logged and visible in the "Access Attempts"
secion of the admin app.
Once ``axes`` is in your ``INSTALLED_APPS`` in your project settings file, you can
login and logout of your application via the ``django.contrib.auth`` views.
The attempts will be logged and visible in the "Access Attempts" section in admin.
By default, django-axes will lock out repeated attempts from the same IP
address. You can allow this IP to attempt again by deleting the relevant
``AccessAttempt`` records in the admin.
By default, Axes will lock out repeated access attempts from the same IP address.
You can allow this IP to attempt again by deleting relevant ``AccessAttempt`` records.
You can also use the ``axes_reset`` and ``axes_reset_user`` management commands
using Django's ``manage.py``.
Records can be deleted, for example, by using the Django admin application.
You can also use the ``axes_reset``, ``axes_reset_ip``, and ``axes_reset_user``
management commands with the Django ``manage.py`` command helpers:
* ``manage.py axes_reset`` will reset all lockouts and access records.
* ``manage.py axes_reset ip`` will clear lockout/records for ip
* ``manage.py axes_reset_user username`` will clear lockout/records for an username
* ``manage.py axes_reset_ip ip [ip ...]``
will clear lockouts and records for the given IP addresses.
* ``manage.py axes_reset_user username [username ...]``
will clear lockouts and records for the given usernames.
In your code, you can use ``from axes.utils import reset``.
In your code, you can use the ``axes.utils.reset`` function.
* ``reset()`` will reset all lockouts and access records.
* ``reset(ip=ip)`` will clear lockout/records for ip
* ``reset(username=username)`` will clear lockout/records for a username
* ``reset(ip=ip)`` will clear lockouts and records for the given IP address.
* ``reset(username=username)`` will clear lockouts and records for the given username.
Example usage
-------------
@ -58,63 +61,70 @@ them as per the example.
*views.py:* ::
from django.views.decorators.csrf import csrf_exempt
from django.utils.decorators import method_decorator
from django.http import JsonResponse, HttpResponse
from django.contrib.auth.signals import user_logged_in,\
user_logged_out,\
user_login_failed
import json
from django.utils.decorators import method_decorator
from django.contrib.auth import signals
from django.views import View
from django.views.decorators.csrf import csrf_exempt
from axes.decorators import axes_dispatch
from myapp.forms import LoginForm
from myapp.auth import custom_authenticate, custom_login
from axes.decorators import axes_dispatch
@method_decorator(axes_dispatch, name='dispatch')
@method_decorator(csrf_exempt, name='dispatch')
class Login(View):
''' Custom login view that takes JSON credentials '''
"""
Custom login view that takes JSON credentials
"""
http_method_names = ['post',]
http_method_names = ['post']
def post(self, request):
# decode post json to dict & validate
post_data = json.loads(request.body.decode('utf-8'))
form = LoginForm(post_data)
form = LoginForm(request.POST)
if not form.is_valid():
# inform axes of failed login
user_login_failed.send(
sender = User,
request = request,
credentials = {
'username': form.cleaned_data.get('username')
}
# inform django-axes of failed login
signals.user_login_failed.send(
sender=User,
request=request,
credentials={
'username': form.cleaned_data.get('username'),
},
)
return HttpResponse(status=400)
user = custom_authenticate(
request = request,
username = form.cleaned_data.get('username'),
password = form.cleaned_data.get('password'),
request=request,
username=form.cleaned_data.get('username'),
password=form.cleaned_data.get('password'),
)
if user is not None:
custom_login(request, user)
user_logged_in.send(
sender = User,
request = request,
user = user,
signals.user_logged_in.send(
sender=User,
request=request,
user=user,
)
return JsonResponse({'message':'success!'}, status=200)
else:
user_login_failed.send(
sender = User,
request = request,
credentials = {
'username':form.cleaned_data.get('username')
},
)
return HttpResponse(status=403)
return JsonResponse({
'message':'success'
}, status=200)
# inform django-axes of failed login
signals.user_login_failed.send(
sender=User,
request=request,
credentials={
'username': form.cleaned_data.get('username'),
},
)
return HttpResponse(status=403)
*urls.py:* ::

View file

@ -1,9 +1,10 @@
#!/usr/bin/env python
import os
import sys
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "axes.test_settings")
if __name__ == '__main__':
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'axes.test_settings')
from django.core.management import execute_from_command_line

View file

@ -1,6 +1,5 @@
-e .
coverage==4.5.2
mock==2.0.0 ; python_version <= "2.7"
prospector==1.1.6.2
sphinx_rtd_theme==0.4.2
tox==3.7.0

View file

@ -1,43 +0,0 @@
#!/usr/bin/env python
from __future__ import unicode_literals
import os
import sys
import django
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.test.utils import get_runner
def run_tests():
os.environ['DJANGO_SETTINGS_MODULE'] = 'axes.test_settings'
django.setup()
TestRunner = get_runner(settings)
test_runner = TestRunner()
failures = test_runner.run_tests(['axes.tests'])
sys.exit(bool(failures))
def run_tests_cache():
"""Check that using a wrong cache backend (LocMemCache) throws correctly
This is due to LocMemCache not working with AccessAttempt caching,
please see issue https://github.com/jazzband/django-axes/issues/288
"""
try:
os.environ['DJANGO_SETTINGS_MODULE'] = 'axes.test_settings_cache'
django.setup()
print('Using LocMemCache as a cache backend does not throw')
sys.exit(1)
except ImproperlyConfigured:
print('Using LocMemCache as a cache backend throws correctly')
sys.exit(0)
if __name__ == '__main__':
if 'cache' in sys.argv:
run_tests_cache()
run_tests()

View file

@ -1,7 +1,4 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
import codecs
from setuptools import setup, find_packages
@ -28,7 +25,6 @@ setup(
'django',
'django-appconf',
'django-ipware>=2.0.2',
'win_inet_pton ; python_version < "3.4" and sys_platform == "win32"',
],
include_package_data=True,
packages=find_packages(),
@ -44,10 +40,7 @@ setup(
'License :: OSI Approved :: MIT License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',

11
tox.ini
View file

@ -1,14 +1,10 @@
[tox]
envlist =
py{27,34,35,36,37}-django111
py{34,35,36,37}-django20
py{35,36,37}-django21
py{35,36,37}-djangomaster
py{35,36,37}-django{111,20,21}
py{36,37}-djangomaster
[travis]
python =
2.7: py27
3.4: py34
3.5: py35
3.6: py36
3.7: py37
@ -31,8 +27,7 @@ usedevelop = True
ignore_outcome =
djangomaster: True
commands =
coverage run -a runtests.py -v2
coverage run -a runtests.py -v2 cache
coverage run -a manage.py test -v2
coverage report
prospector
setenv =