Pluggable signal handler backend for lockouts

Implements a proxied API which enables overriding
a handler backend with a user supplied implementation.

Fixes #399

Signed-off-by: Aleksi Häkli <aleksi.hakli@iki.fi>
This commit is contained in:
Aleksi Häkli 2019-02-07 20:20:49 +02:00
parent fcef40748a
commit a9e9b0d984
No known key found for this signature in database
GPG key ID: 3E7146964D726BBE
5 changed files with 298 additions and 162 deletions

View file

@ -5,4 +5,5 @@ class AppConfig(apps.AppConfig):
name = 'axes'
def ready(self):
from axes import signals # pylint: disable=unused-import,unused-variable
from axes import signals
signals.ProxyHandler.initialize()

View file

@ -35,6 +35,8 @@ class MyAppConf(AppConf):
DISABLE_SUCCESS_ACCESS_LOG = False
HANDLER = 'axes.handlers.AxesHandler'
LOGGER = 'axes.watch_login'
LOCKOUT_TEMPLATE = None

196
axes/handlers.py Normal file
View file

@ -0,0 +1,196 @@
import logging
from django.utils.timezone import now
from axes.conf import settings
from axes.attempts import get_cache_key
from axes.attempts import get_cache_timeout
from axes.attempts import get_user_attempts
from axes.attempts import is_user_lockable
from axes.attempts import ip_in_whitelist
from axes.attempts import reset_user_attempts
from axes.exceptions import AxesSignalPermissionDenied
from axes.models import AccessLog, AccessAttempt
from axes.signals import user_locked_out
from axes.utils import get_client_str
from axes.utils import query2str
from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials
log = logging.getLogger(settings.AXES_LOGGER)
class AxesHandler: # pylint: disable=too-many-locals
"""
Signal handler implementation that records user login attempts to database and locks users out if necessary.
"""
def user_login_failed(self, sender, credentials, request, **kwargs): # pylint: disable=unused-argument
"""
When user login fails, save AccessAttempt record in database and lock user out if necessary.
:raises AxesSignalPermissionDenied: if user should is locked out
"""
if request is None:
log.warning('Attempt to authenticate with a custom backend failed.')
return
ip_address = get_client_ip(request)
username = get_client_username(request, credentials)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address):
return
failures = 0
attempts = get_user_attempts(request, credentials)
cache_hash_key = get_cache_key(request, credentials)
cache_timeout = get_cache_timeout()
failures_cached = get_axes_cache().get(cache_hash_key)
if failures_cached is not None:
failures = failures_cached
else:
for attempt in attempts:
failures = max(failures, attempt.failures_since_start)
# add a failed attempt for this user
failures += 1
get_axes_cache().set(cache_hash_key, failures, cache_timeout)
# has already attempted, update the info
if attempts:
for attempt in attempts:
attempt.get_data = '%s\n---------\n%s' % (
attempt.get_data,
query2str(request.GET),
)
attempt.post_data = '%s\n---------\n%s' % (
attempt.post_data,
query2str(request.POST)
)
attempt.http_accept = http_accept
attempt.path_info = path_info
attempt.failures_since_start = failures
attempt.attempt_time = now()
attempt.save()
log.info(
'AXES: Repeated login failure by %s. Count = %d of %d',
get_client_str(username, ip_address, user_agent, path_info),
failures,
settings.AXES_FAILURE_LIMIT,
)
else:
# Record failed attempt. Whether or not the IP address or user agent is
# used in counting failures is handled elsewhere, so we just record
# everything here.
AccessAttempt.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
get_data=query2str(request.GET),
post_data=query2str(request.POST),
http_accept=http_accept,
path_info=path_info,
failures_since_start=failures,
)
log.info(
'AXES: New login failure by %s. Creating access record.',
get_client_str(username, ip_address, user_agent, path_info),
)
# no matter what, we want to lock them out if they're past the number of
# attempts allowed, unless the user is set to notlockable
if (
failures >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE and
is_user_lockable(request, credentials)
):
log.warning(
'AXES: Locked out %s after repeated login failures.',
get_client_str(username, ip_address, user_agent, path_info),
)
# send signal when someone is locked out.
user_locked_out.send(
'axes',
request=request,
username=username,
ip_address=ip_address,
)
raise AxesSignalPermissionDenied('Locked out due to repeated login failures.')
def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs in, update the AccessLog related to the user.
"""
username = user.get_username()
credentials = get_credentials(username)
ip_address = get_client_ip(request)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
log.info(
'AXES: Successful login by %s.',
get_client_str(username, ip_address, user_agent, path_info),
)
if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG:
AccessLog.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
http_accept=http_accept,
path_info=path_info,
trusted=True,
)
if settings.AXES_RESET_ON_SUCCESS:
count = reset_user_attempts(request, credentials)
log.info(
'AXES: Deleted %d failed login attempts by %s.',
count,
get_client_str(username, ip_address, user_agent, path_info),
)
def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs out, update the AccessLog related to the user.
"""
log.info('AXES: Successful logout by %s.', user)
if user and not settings.AXES_DISABLE_ACCESS_LOG:
AccessLog.objects.filter(
username=user.get_username(),
logout_time__isnull=True,
).update(
logout_time=now(),
)
def post_save_access_attempt(self, instance, **kwargs): # pylint: disable=unused-argument
"""
Update cache after saving AccessAttempts.
"""
cache_hash_key = get_cache_key(instance)
if not get_axes_cache().get(cache_hash_key):
cache_timeout = get_cache_timeout()
get_axes_cache().set(cache_hash_key, instance.failures_since_start, cache_timeout)
def post_delete_access_attempt(self, instance, **kwargs): # pylint: disable=unused-argument
"""
Update cache after deleting AccessAttempts.
"""
cache_hash_key = get_cache_key(instance)
get_axes_cache().delete(cache_hash_key)

View file

@ -6,21 +6,10 @@ from django.contrib.auth.signals import user_login_failed
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.dispatch import Signal
from django.utils import timezone
from django.utils.module_loading import import_string
from axes.conf import settings
from axes.attempts import get_cache_key
from axes.attempts import get_cache_timeout
from axes.attempts import get_user_attempts
from axes.attempts import is_user_lockable
from axes.attempts import ip_in_whitelist
from axes.attempts import reset_user_attempts
from axes.exceptions import AxesSignalPermissionDenied
from axes.models import AccessLog, AccessAttempt
from axes.utils import get_client_str
from axes.utils import query2str
from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials
from axes.models import AccessAttempt
log = logging.getLogger(settings.AXES_LOGGER)
@ -28,164 +17,110 @@ log = logging.getLogger(settings.AXES_LOGGER)
user_locked_out = Signal(providing_args=['request', 'username', 'ip_address'])
class ProxyHandler:
"""
Proxy interface for configurable Axes signal handler class.
If you wish to implement a custom version of this handler,
you can override the settings.AXES_HANDLER configuration string
with a class that implements a compatible interface and methods.
Defaults to using axes.handlers.AxesHandler if not overridden.
Refer to axes.handlers.AxesHandler for default implementation.
"""
implementation = None # concrete handler that is bootstrapped by the Django application loader
@classmethod
def initialize(cls):
"""
Fetch and initialize concrete handler implementation and memoize it to avoid reinitialization.
This method is re-entrant and can be called multiple times.
"""
if cls.implementation is None:
cls.implementation = import_string(settings.AXES_HANDLER)()
@classmethod
def user_login_failed(cls, sender, credentials, request, **kwargs):
"""
Handle user login failure event.
:param credentials: credentials used for authentication attempt
:param request: request used for failed authentication attempt
:return: None
"""
cls.implementation.user_login_failed(sender, credentials, request, **kwargs)
@classmethod
def user_logged_in(cls, sender, request, user, **kwargs):
"""
Handle user login event.
:param credentials: credentials used for successful authentication
:param request: request used for successful authentication
:return: None
"""
cls.implementation.user_logged_in(sender, request, user, **kwargs)
@classmethod
def user_logged_out(cls, sender, request, user, **kwargs):
"""
Handle user logout event.
:param request: request used for logout
:param user: user used for logout
:return: None
"""
cls.implementation.user_logged_out(sender, request, user, **kwargs)
@classmethod
def post_save_access_attempt(cls, instance, **kwargs):
"""
Handle AccessAttempt save event.
:param instance: axes.models.AccessAttempt instance that will be saved
:return: None
"""
cls.implementation.post_save_access_attempt(instance, **kwargs)
@classmethod
def post_delete_access_attempt(cls, instance, **kwargs):
"""
Handle AccessAttempt delete event.
:param instance: axes.models.AccessAttempt instance that was deleted
:return: None
"""
cls.implementation.post_delete_access_attempt(instance, **kwargs)
@receiver(user_login_failed)
def log_user_login_failed(sender, credentials, request, **kwargs): # pylint: disable=unused-argument
"""
When user login fails, create an AccessAttempt record.
"""
if request is None:
log.warning('Attempt to authenticate with a custom backend failed.')
return
ip_address = get_client_ip(request)
username = get_client_username(request, credentials)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address):
return
failures = 0
attempts = get_user_attempts(request, credentials)
cache_hash_key = get_cache_key(request, credentials)
cache_timeout = get_cache_timeout()
failures_cached = get_axes_cache().get(cache_hash_key)
if failures_cached is not None:
failures = failures_cached
else:
for attempt in attempts:
failures = max(failures, attempt.failures_since_start)
# add a failed attempt for this user
failures += 1
get_axes_cache().set(cache_hash_key, failures, cache_timeout)
# has already attempted, update the info
if attempts:
for attempt in attempts:
attempt.get_data = '%s\n---------\n%s' % (
attempt.get_data,
query2str(request.GET),
)
attempt.post_data = '%s\n---------\n%s' % (
attempt.post_data,
query2str(request.POST)
)
attempt.http_accept = http_accept
attempt.path_info = path_info
attempt.failures_since_start = failures
attempt.attempt_time = timezone.now()
attempt.save()
log.info(
'AXES: Repeated login failure by %s. Count = %d of %d',
get_client_str(username, ip_address, user_agent, path_info),
failures,
settings.AXES_FAILURE_LIMIT
)
else:
# Record failed attempt. Whether or not the IP address or user agent is
# used in counting failures is handled elsewhere, so we just record
# everything here.
AccessAttempt.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
get_data=query2str(request.GET),
post_data=query2str(request.POST),
http_accept=http_accept,
path_info=path_info,
failures_since_start=failures,
)
log.info(
'AXES: New login failure by %s. Creating access record.',
get_client_str(username, ip_address, user_agent, path_info)
)
# no matter what, we want to lock them out if they're past the number of
# attempts allowed, unless the user is set to notlockable
if (
failures >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE and
is_user_lockable(request, credentials)
):
log.warning(
'AXES: locked out %s after repeated login attempts.',
get_client_str(username, ip_address, user_agent, path_info)
)
# send signal when someone is locked out.
user_locked_out.send(
'axes', request=request, username=username, ip_address=ip_address
)
raise AxesSignalPermissionDenied('User locked out due to failed login attempts')
def handle_user_login_failed(*args, **kwargs):
ProxyHandler.user_login_failed(*args, **kwargs)
@receiver(user_logged_in)
def log_user_logged_in(sender, request, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs in, update the access log.
"""
username = user.get_username()
credentials = get_credentials(username)
ip_address = get_client_ip(request)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
log.info(
'AXES: Successful login by %s.',
get_client_str(username, ip_address, user_agent, path_info)
)
if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG:
AccessLog.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
http_accept=http_accept,
path_info=path_info,
trusted=True,
)
if settings.AXES_RESET_ON_SUCCESS:
count = reset_user_attempts(request, credentials)
log.info(
'AXES: Deleted %d failed login attempts by %s.',
count,
get_client_str(username, ip_address, user_agent, path_info)
)
def handle_user_logged_in(*args, **kwargs):
ProxyHandler.user_logged_in(*args, **kwargs)
@receiver(user_logged_out)
def log_user_logged_out(sender, request, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs out, update the access log.
"""
log.info('AXES: Successful logout by %s.', user)
if user and not settings.AXES_DISABLE_ACCESS_LOG:
AccessLog.objects.filter(
username=user.get_username(),
logout_time__isnull=True,
).update(logout_time=timezone.now())
def handle_user_logged_out(*args, **kwargs):
ProxyHandler.user_logged_out(*args, **kwargs)
@receiver(post_save, sender=AccessAttempt)
def update_cache_after_save(instance, **kwargs): # pylint: disable=unused-argument
cache_hash_key = get_cache_key(instance)
if not get_axes_cache().get(cache_hash_key):
cache_timeout = get_cache_timeout()
get_axes_cache().set(cache_hash_key, instance.failures_since_start, cache_timeout)
def handle_post_save_access_attempt(*args, **kwargs):
ProxyHandler.post_save_access_attempt(*args, **kwargs)
@receiver(post_delete, sender=AccessAttempt)
def delete_cache_after_delete(instance, **kwargs): # pylint: disable=unused-argument
cache_hash_key = get_cache_key(instance)
get_axes_cache().delete(cache_hash_key)
def handle_post_delete_access_attempt(*args, **kwargs):
ProxyHandler.post_delete_access_attempt(*args, **kwargs)

View file

@ -235,6 +235,8 @@ These should be defined in your ``settings.py`` file.
old failed login attempts will be forgotten. Can be set to a python
timedelta object or an integer. If an integer, will be interpreted as a
number of hours. Default: ``None``
* ``AXES_HANDLER``: If set, overrides the default signal handler backend.
Default: ``'axes.handlers.AxesHandler'``
* ``AXES_LOGGER``: If set, specifies a logging mechanism for Axes to use.
Default: ``'axes.watch_login'``
* ``AXES_LOCKOUT_TEMPLATE``: If set, specifies a template to render when a