Set Axes request attributes in middleware

Fixes #415

Signed-off-by: Aleksi Häkli <aleksi.hakli@iki.fi>
This commit is contained in:
Aleksi Häkli 2019-03-03 21:56:57 +02:00
parent e323b1989c
commit 3bece1aaaa
No known key found for this signature in database
GPG key ID: 3E7146964D726BBE
15 changed files with 174 additions and 170 deletions

View file

@ -2,15 +2,13 @@ from logging import getLogger
from django.contrib.auth import get_user_model
from django.db.models import QuerySet
from django.http import HttpRequest
from django.utils.timezone import datetime, now
from axes.conf import settings
from axes.request import AxesHttpRequest
from axes.models import AccessAttempt
from axes.helpers import (
get_client_ip_address,
get_client_username,
get_client_user_agent,
get_client_parameters,
get_cool_off,
)
@ -29,21 +27,19 @@ def get_cool_off_threshold(attempt_time: datetime = None) -> datetime:
return attempt_time - get_cool_off()
def filter_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet:
def filter_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet:
"""
Return a queryset of AccessAttempts that match the given request and credentials.
"""
username = get_client_username(request, credentials)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
filter_kwargs = get_client_parameters(username, ip_address, user_agent)
filter_kwargs = get_client_parameters(username, request.axes_ip_address, request.axes_user_agent)
return AccessAttempt.objects.filter(**filter_kwargs)
def get_user_attempts(request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> QuerySet:
def get_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> QuerySet:
"""
Get valid user attempts that match the given request and credentials.
"""
@ -54,7 +50,7 @@ def get_user_attempts(request: HttpRequest, credentials: dict = None, attempt_ti
log.debug('AXES: Getting all access attempts from database because no AXES_COOLOFF_TIME is configured')
return attempts
threshold = get_cool_off_threshold(attempt_time)
threshold = get_cool_off_threshold(request.axes_attempt_time)
log.debug('AXES: Getting access attempts that are newer than %s', threshold)
return attempts.filter(attempt_time__gte=threshold)
@ -74,7 +70,7 @@ def clean_expired_user_attempts(attempt_time: datetime = None) -> int:
return count
def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int:
def reset_user_attempts(request: AxesHttpRequest, credentials: dict = None) -> int:
"""
Reset all user attempts that match the given request and credentials.
"""
@ -87,9 +83,7 @@ def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int:
return count
def is_user_attempt_whitelisted(request: HttpRequest, credentials: dict = None) -> bool:
def is_user_attempt_whitelisted(request: AxesHttpRequest, credentials: dict = None) -> bool:
"""
Check if the given request or credentials refer to a whitelisted username.

View file

@ -3,6 +3,7 @@ from django.contrib.auth.backends import ModelBackend
from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired
from axes.handlers.proxy import AxesProxyHandler
from axes.helpers import get_credentials, get_lockout_message
from axes.request import AxesHttpRequest
class AxesBackend(ModelBackend):
@ -10,7 +11,7 @@ class AxesBackend(ModelBackend):
Authentication backend that forbids login attempts for locked out users.
"""
def authenticate(self, request, username=None, password=None, **kwargs):
def authenticate(self, request: AxesHttpRequest, username: str = None, password: str = None, **kwargs):
"""
Check user lock out status and raises PermissionDenied if user is not allowed to log in.

View file

@ -69,9 +69,6 @@ class AxesAppConf(AppConf):
IP_BLACKLIST = None
# if no attribute is set by your backend, a value is calculated dynamically with the ipware package
CLIENT_IP_ATTRIBUTE = 'axes_client_ip'
# message to show when locked out and have cooloff enabled
COOLOFF_MESSAGE = _('Account locked: too many login attempts. Please try again later')

View file

@ -2,10 +2,11 @@ from functools import wraps
from axes.handlers.proxy import AxesProxyHandler
from axes.helpers import get_lockout_response
from axes.request import AxesHttpRequest
def axes_dispatch(func):
def inner(request, *args, **kwargs):
def inner(request: AxesHttpRequest, *args, **kwargs):
if AxesProxyHandler.is_allowed(request):
return func(request, *args, **kwargs)
@ -22,5 +23,4 @@ def axes_form_invalid(func):
return get_lockout_response(self.request)
return inner

View file

@ -1,12 +1,10 @@
from django.http import HttpRequest
from django.utils.timezone import datetime
from axes.conf import settings
from axes.helpers import (
is_client_ip_address_blacklisted,
is_client_ip_address_whitelisted,
is_client_method_whitelisted,
)
from axes.request import AxesHttpRequest
class AxesBaseHandler: # pylint: disable=unused-argument
@ -28,7 +26,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument
and define the class to be used with ``settings.AXES_HANDLER = 'dotted.full.path.to.YourClass'``.
"""
def is_allowed(self, request: HttpRequest, credentials: dict = None) -> bool:
def is_allowed(self, request: AxesHttpRequest, credentials: dict = None) -> bool:
"""
Check if the user is allowed to access or use given functionality such as a login view or authentication.
@ -54,17 +52,17 @@ class AxesBaseHandler: # pylint: disable=unused-argument
return True
def user_login_failed(self, sender, credentials: dict, request: HttpRequest = None, **kwargs):
def user_login_failed(self, sender, credentials: dict, request: AxesHttpRequest = None, **kwargs):
"""
Handle the Django user_login_failed authentication signal.
"""
def user_logged_in(self, sender, request: HttpRequest, user, **kwargs):
def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs):
"""
Handle the Django user_logged_in authentication signal.
"""
def user_logged_out(self, sender, request: HttpRequest, user, **kwargs):
def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs):
"""
Handle the Django user_logged_out authentication signal.
"""
@ -79,7 +77,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument
Handle the Axes AccessAttempt object post delete signal.
"""
def is_blacklisted(self, request: HttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument
def is_blacklisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument
"""
Check if the request or given credentials are blacklisted from access.
"""
@ -89,7 +87,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument
return False
def is_whitelisted(self, request: HttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument
def is_whitelisted(self, request: AxesHttpRequest, credentials: dict = None) -> bool: # pylint: disable=unused-argument
"""
Check if the request or given credentials are whitelisted for access.
"""
@ -102,17 +100,17 @@ class AxesBaseHandler: # pylint: disable=unused-argument
return False
def is_locked(self, request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> bool:
def is_locked(self, request: AxesHttpRequest, credentials: dict = None) -> bool:
"""
Check if the request or given credentials are locked.
"""
if settings.AXES_LOCK_OUT_AT_FAILURE:
return self.get_failures(request, credentials, attempt_time) >= settings.AXES_FAILURE_LIMIT
return self.get_failures(request, credentials) >= settings.AXES_FAILURE_LIMIT
return False
def get_failures(self, request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> int:
def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int:
"""
Check the number of failures associated to the given request and credentials.
"""

View file

@ -2,17 +2,15 @@ from logging import getLogger
from axes.conf import settings
from axes.exceptions import AxesSignalPermissionDenied
from axes.request import AxesHttpRequest
from axes.handlers.base import AxesBaseHandler
from axes.signals import user_locked_out
from axes.helpers import (
get_cache,
get_cache_timeout,
get_client_cache_key,
get_client_ip_address,
get_client_path_info,
get_client_str,
get_client_username,
get_client_user_agent,
get_credentials,
)
@ -28,11 +26,17 @@ class AxesCacheHandler(AxesBaseHandler): # pylint: disable=too-many-locals
self.cache = get_cache()
self.cache_timeout = get_cache_timeout()
def get_failures(self, request, credentials=None, attempt_time=None) -> int:
def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int:
cache_key = get_client_cache_key(request, credentials)
return self.cache.get(cache_key, default=0)
def user_login_failed(self, sender, credentials, request=None, **kwargs): # pylint: disable=too-many-locals
def user_login_failed(
self,
sender,
credentials: dict,
request: AxesHttpRequest = None,
**kwargs
): # pylint: disable=too-many-locals
"""
When user login fails, save attempt record in cache and lock user out if necessary.
@ -44,10 +48,7 @@ class AxesCacheHandler(AxesBaseHandler): # pylint: disable=too-many-locals
return
username = get_client_username(request, credentials)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = get_client_path_info(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
if self.is_whitelisted(request, credentials):
log.info('AXES: Login failed from whitelisted client %s.', client_str)
@ -78,22 +79,19 @@ class AxesCacheHandler(AxesBaseHandler): # pylint: disable=too-many-locals
'axes',
request=request,
username=username,
ip_address=ip_address,
ip_address=request.axes_ip_address,
)
raise AxesSignalPermissionDenied('Locked out due to repeated login failures.')
def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs in, update the AccessLog related to the user.
"""
username = user.get_username()
credentials = get_credentials(username)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = get_client_path_info(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
log.info('AXES: Successful login by %s.', client_str)
@ -103,11 +101,8 @@ class AxesCacheHandler(AxesBaseHandler): # pylint: disable=too-many-locals
self.cache.delete(cache_key)
log.info('AXES: Deleted %d failed login attempts by %s from cache.', failures_since_start, client_str)
def user_logged_out(self, sender, request, user, **kwargs):
def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs):
username = user.get_username()
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = get_client_path_info(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
log.info('AXES: Successful logout by %s.', client_str)

View file

@ -2,7 +2,6 @@ from logging import getLogger
from django.db.models import Max, Value
from django.db.models.functions import Concat
from django.utils.timezone import now
from axes.attempts import (
clean_expired_user_attempts,
@ -13,15 +12,12 @@ from axes.attempts import (
from axes.conf import settings
from axes.exceptions import AxesSignalPermissionDenied
from axes.handlers.base import AxesBaseHandler
from axes.request import AxesHttpRequest
from axes.models import AccessLog, AccessAttempt
from axes.signals import user_locked_out
from axes.helpers import (
get_client_ip_address,
get_client_path_info,
get_client_http_accept,
get_client_str,
get_client_username,
get_client_user_agent,
get_credentials,
get_query_str,
)
@ -35,38 +31,38 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals
Signal handler implementation that records user login attempts to database and locks users out if necessary.
"""
def get_failures(self, request, credentials=None, attempt_time=None) -> int:
attempts = get_user_attempts(request, credentials, attempt_time)
def get_failures(self, request: AxesHttpRequest, credentials: dict = None) -> int:
attempts = get_user_attempts(request, credentials)
return attempts.aggregate(Max('failures_since_start'))['failures_since_start__max'] or 0
def is_locked(self, request, credentials=None, attempt_time=None):
def is_locked(self, request: AxesHttpRequest, credentials: dict = None):
if is_user_attempt_whitelisted(request, credentials):
return False
return super().is_locked(request, credentials, attempt_time)
return super().is_locked(request, credentials)
def user_login_failed(self, sender, credentials, request=None, **kwargs): # pylint: disable=too-many-locals
def user_login_failed(
self,
sender,
credentials: dict,
request: AxesHttpRequest = None,
**kwargs
): # pylint: disable=too-many-locals
"""
When user login fails, save AccessAttempt record in database and lock user out if necessary.
:raises AxesSignalPermissionDenied: if user should be locked out.
"""
attempt_time = now()
# 1. database query: Clean up expired user attempts from the database before logging new attempts
clean_expired_user_attempts(attempt_time)
if request is None:
log.error('AXES: AxesDatabaseHandler.user_login_failed does not function without a request.')
return
# 1. database query: Clean up expired user attempts from the database before logging new attempts
clean_expired_user_attempts(request.axes_attempt_time)
username = get_client_username(request, credentials)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = get_client_path_info(request)
http_accept = get_client_http_accept(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
get_data = get_query_str(request.GET)
post_data = get_query_str(request.POST)
@ -76,7 +72,7 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals
return
# 2. database query: Calculate the current maximum failure number from the existing attempts
failures_since_start = 1 + self.get_failures(request, credentials, attempt_time)
failures_since_start = 1 + self.get_failures(request, credentials)
# 3. database query: Insert or update access records with the new failure data
if failures_since_start > 1:
@ -93,14 +89,14 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals
separator = '\n---------\n'
attempts = get_user_attempts(request, credentials, attempt_time)
attempts = get_user_attempts(request, credentials)
attempts.update(
get_data=Concat('get_data', Value(separator + get_data)),
post_data=Concat('post_data', Value(separator + post_data)),
http_accept=http_accept,
path_info=path_info,
http_accept=request.axes_http_accept,
path_info=request.axes_path_info,
failures_since_start=failures_since_start,
attempt_time=attempt_time,
attempt_time=request.axes_attempt_time,
)
else:
# Record failed attempt with all the relevant information.
@ -114,14 +110,14 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals
AccessAttempt.objects.create(
username=username,
ip_address=ip_address,
user_agent=user_agent,
ip_address=request.axes_ip_address,
user_agent=request.axes_user_agent,
get_data=get_data,
post_data=post_data,
http_accept=http_accept,
path_info=path_info,
http_accept=request.axes_http_accept,
path_info=request.axes_path_info,
failures_since_start=failures_since_start,
attempt_time=attempt_time,
attempt_time=request.axes_attempt_time,
)
if failures_since_start >= settings.AXES_FAILURE_LIMIT:
@ -131,28 +127,22 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals
'axes',
request=request,
username=username,
ip_address=ip_address,
ip_address=request.axes_ip_address,
)
raise AxesSignalPermissionDenied('Locked out due to repeated login failures.')
def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
def user_logged_in(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs in, update the AccessLog related to the user.
"""
attempt_time = now()
# 1. database query: Clean up expired user attempts from the database
clean_expired_user_attempts(attempt_time)
clean_expired_user_attempts(request.axes_attempt_time)
username = user.get_username()
credentials = get_credentials(username)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = get_client_path_info(request)
http_accept = get_client_http_accept(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
log.info('AXES: Successful login by %s.', client_str)
@ -160,11 +150,11 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals
# 2. database query: Insert new access logs with login time
AccessLog.objects.create(
username=username,
ip_address=ip_address,
user_agent=user_agent,
http_accept=http_accept,
path_info=path_info,
attempt_time=attempt_time,
ip_address=request.axes_ip_address,
user_agent=request.axes_user_agent,
http_accept=request.axes_http_accept,
path_info=request.axes_path_info,
attempt_time=request.axes_attempt_time,
trusted=True,
)
@ -173,21 +163,16 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals
count = reset_user_attempts(request, credentials)
log.info('AXES: Deleted %d failed login attempts by %s from database.', count, client_str)
def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
def user_logged_out(self, sender, request: AxesHttpRequest, user, **kwargs): # pylint: disable=unused-argument
"""
When user logs out, update the AccessLog related to the user.
"""
attempt_time = now()
# 1. database query: Clean up expired user attempts from the database
clean_expired_user_attempts(attempt_time)
clean_expired_user_attempts(request.axes_attempt_time)
username = user.get_username()
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = get_client_path_info(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
log.info('AXES: Successful logout by %s.', client_str)
@ -197,5 +182,5 @@ class AxesDatabaseHandler(AxesBaseHandler): # pylint: disable=too-many-locals
username=username,
logout_time__isnull=True,
).update(
logout_time=attempt_time,
logout_time=request.axes_attempt_time,
)

View file

@ -1,11 +1,10 @@
from logging import getLogger
from django.http import HttpRequest
from django.utils.module_loading import import_string
from django.utils.timezone import datetime
from axes.conf import settings
from axes.handlers.base import AxesBaseHandler
from axes.request import AxesHttpRequest
log = getLogger(settings.AXES_LOGGER)
@ -37,23 +36,23 @@ class AxesProxyHandler(AxesBaseHandler):
return cls.implementation
@classmethod
def is_locked(cls, request: HttpRequest, credentials: dict = None, attempt_time: datetime = None) -> bool:
def is_locked(cls, request: AxesHttpRequest, credentials: dict = None) -> bool:
return cls.get_implementation().is_locked(request, credentials)
@classmethod
def is_allowed(cls, request: HttpRequest, credentials: dict = None) -> bool:
def is_allowed(cls, request: AxesHttpRequest, credentials: dict = None) -> bool:
return cls.get_implementation().is_allowed(request, credentials)
@classmethod
def user_login_failed(cls, sender, credentials: dict, request: HttpRequest = None, **kwargs):
def user_login_failed(cls, sender, credentials: dict, request: AxesHttpRequest = None, **kwargs):
return cls.get_implementation().user_login_failed(sender, credentials, request, **kwargs)
@classmethod
def user_logged_in(cls, sender, request: HttpRequest, user, **kwargs):
def user_logged_in(cls, sender, request: AxesHttpRequest, user, **kwargs):
return cls.get_implementation().user_logged_in(sender, request, user, **kwargs)
@classmethod
def user_logged_out(cls, sender, request: HttpRequest, user, **kwargs):
def user_logged_out(cls, sender, request: AxesHttpRequest, user, **kwargs):
return cls.get_implementation().user_logged_out(sender, request, user, **kwargs)
@classmethod

View file

@ -1,17 +1,19 @@
from collections import OrderedDict
from datetime import timedelta
from hashlib import md5
from ipaddress import ip_address
from logging import getLogger
from typing import Any, Optional, Type, Union
from django.core.cache import caches, BaseCache
from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, JsonResponse, QueryDict
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, JsonResponse, QueryDict
from django.shortcuts import render
from django.utils.module_loading import import_string
import ipware.ip2
from axes.conf import settings
from axes.request import AxesHttpRequest
log = getLogger(__name__)
@ -105,7 +107,7 @@ def get_credentials(username: str = None, **kwargs) -> dict:
return credentials
def get_client_username(request: HttpRequest, credentials: dict = None) -> str:
def get_client_username(request: AxesHttpRequest, credentials: dict = None) -> str:
"""
Resolve client username from the given request or credentials if supplied.
@ -148,20 +150,15 @@ def get_client_ip_address(request: HttpRequest) -> str:
that is used in the users HTTP proxy or *aaS service layers. Refer to the documentation for more information.
"""
client_ip_attribute = settings.AXES_CLIENT_IP_ATTRIBUTE
client_ip_address, _ = ipware.ip2.get_client_ip(
request,
proxy_order=settings.AXES_PROXY_ORDER,
proxy_count=settings.AXES_PROXY_COUNT,
proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS,
request_header_order=settings.AXES_META_PRECEDENCE_ORDER,
)
if not hasattr(request, client_ip_attribute):
client_ip, _ = ipware.ip2.get_client_ip(
request,
proxy_order=settings.AXES_PROXY_ORDER,
proxy_count=settings.AXES_PROXY_COUNT,
proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS,
request_header_order=settings.AXES_META_PRECEDENCE_ORDER,
)
setattr(request, client_ip_attribute, client_ip)
return getattr(request, client_ip_attribute)
return str(ip_address(client_ip_address))
def get_client_user_agent(request: HttpRequest) -> str:
@ -271,7 +268,7 @@ def get_lockout_message() -> str:
return settings.AXES_PERMALOCK_MESSAGE
def get_lockout_response(request: HttpRequest, credentials: dict = None) -> HttpResponse:
def get_lockout_response(request: AxesHttpRequest, credentials: dict = None) -> HttpResponse:
status = 403
context = {
'failure_limit': settings.AXES_FAILURE_LIMIT,
@ -323,39 +320,35 @@ def is_ip_address_in_blacklist(ip_address: str) -> bool:
return ip_address in settings.AXES_IP_BLACKLIST
def is_client_ip_address_whitelisted(request: HttpRequest):
def is_client_ip_address_whitelisted(request: AxesHttpRequest):
"""
Check if the given request refers to a whitelisted IP.
"""
ip_address = get_client_ip_address(request)
if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(ip_address):
if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(request.axes_ip_address):
return True
if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(ip_address):
if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(request.axes_ip_address):
return True
return False
def is_client_ip_address_blacklisted(request: HttpRequest) -> bool:
def is_client_ip_address_blacklisted(request: AxesHttpRequest) -> bool:
"""
Check if the given request refers to a blacklisted IP.
"""
ip_address = get_client_ip_address(request)
if is_ip_address_in_blacklist(ip_address):
if is_ip_address_in_blacklist(request.axes_ip_address):
return True
if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(ip_address):
if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(request.axes_ip_address):
return True
return False
def is_client_method_whitelisted(request: HttpRequest) -> bool:
def is_client_method_whitelisted(request: AxesHttpRequest) -> bool:
"""
Check if the given request uses a whitelisted method.
"""

View file

@ -1,5 +1,15 @@
from django.http import HttpRequest
from django.utils.timezone import now
from axes.exceptions import AxesSignalPermissionDenied
from axes.helpers import get_lockout_response
from axes.helpers import (
get_client_ip_address,
get_client_user_agent,
get_client_path_info,
get_client_http_accept,
get_lockout_response,
)
from axes.request import AxesHttpRequest
class AxesMiddleware:
@ -21,10 +31,16 @@ class AxesMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
def __call__(self, request: HttpRequest):
request.axes_attempt_time = now()
request.axes_ip_address = get_client_ip_address(request)
request.axes_user_agent = get_client_user_agent(request)
request.axes_path_info = get_client_path_info(request)
request.axes_http_accept = get_client_http_accept(request)
return self.get_response(request)
def process_exception(self, request, exception): # pylint: disable=inconsistent-return-statements
def process_exception(self, request: AxesHttpRequest, exception): # pylint: disable=inconsistent-return-statements
"""
Exception handler that processes exceptions raised by the axes signal handler when request fails with login.

19
axes/request.py Normal file
View file

@ -0,0 +1,19 @@
from datetime import datetime # noqa
from django.http import HttpRequest
class AxesHttpRequest(HttpRequest):
"""
Type definition for the HTTP request Axes uses.
"""
def __init__(self):
super().__init__()
# TODO: Move attribute definitions to class level in Python 3.6+
self.axes_attempt_time = None # type: datetime
self.axes_ip_address = None # type: str
self.axes_user_agent = None # type: str
self.axes_path_info = None # type: str
self.axes_http_accept = None # type: str

View file

@ -6,11 +6,20 @@ from django.contrib.auth import get_user_model
from django.http import HttpRequest
from django.test import TestCase
from django.urls import reverse
from django.utils.timezone import now
from axes.utils import reset
from axes.conf import settings
from axes.helpers import get_cache, get_cool_off, get_credentials
from axes.models import AccessLog, AccessAttempt
from axes.helpers import (
get_cache,
get_client_http_accept,
get_client_ip_address,
get_client_path_info,
get_client_user_agent,
get_cool_off,
get_credentials,
)
from axes.models import AccessAttempt
class AxesTestCase(TestCase):
@ -61,6 +70,12 @@ class AxesTestCase(TestCase):
self.request.META['HTTP_USER_AGENT'] = self.user_agent
self.request.META['PATH_INFO'] = self.path_info
self.request.axes_attempt_time = now()
self.request.axes_ip_address = get_client_ip_address(self.request)
self.request.axes_user_agent = get_client_user_agent(self.request)
self.request.axes_path_info = get_client_path_info(self.request)
self.request.axes_http_accept = get_client_http_accept(self.request)
self.credentials = get_credentials(self.username)
def tearDown(self):

View file

@ -49,16 +49,6 @@ class AppsTestCase(AxesTestCase):
class AccessLogTestCase(AxesTestCase):
def test_authenticate_invalid_parameters(self):
"""
Test that logging is not done if an attempt to authenticate with a custom authentication backend fails.
"""
request = HttpRequest()
request.META['REMOTE_ADDR'] = '127.0.0.1'
authenticate(request=request, foo='bar')
self.assertEqual(AccessLog.objects.all().count(), 0)
def test_access_log_on_logout(self):
"""
Test a valid logout and make sure the logout_time is updated.

View file

@ -2,7 +2,7 @@ from datetime import timedelta
from hashlib import md5
from unittest.mock import patch
from django.http import HttpRequest, JsonResponse, HttpResponseRedirect, HttpResponse
from django.http import JsonResponse, HttpResponseRedirect, HttpResponse
from django.test import override_settings, RequestFactory
from axes import get_version
@ -20,7 +20,9 @@ from axes.helpers import (
is_client_ip_address_whitelisted,
is_ip_address_in_blacklist,
is_ip_address_in_whitelist,
is_client_method_whitelisted)
is_client_method_whitelisted,
)
from axes.request import AxesHttpRequest
class VersionTestCase(AxesTestCase):
@ -331,7 +333,7 @@ class UsernameTestCase(AxesTestCase):
def test_default_get_client_username(self):
expected = 'test-username'
request = HttpRequest()
request = AxesHttpRequest()
request.POST['username'] = expected
actual = get_client_username(request)
@ -343,7 +345,7 @@ class UsernameTestCase(AxesTestCase):
expected = 'test-username'
expected_in_credentials = 'test-credentials-username'
request = HttpRequest()
request = AxesHttpRequest()
request.POST['username'] = expected
credentials = {
'username': expected_in_credentials
@ -363,7 +365,7 @@ class UsernameTestCase(AxesTestCase):
expected = 'prefixed-' + provided
provided_in_credentials = 'test-credentials-username'
request = HttpRequest()
request = AxesHttpRequest()
request.POST['username'] = provided
credentials = {'username': provided_in_credentials}
@ -381,7 +383,7 @@ class UsernameTestCase(AxesTestCase):
provided_in_credentials = 'test-credentials-username'
expected_in_credentials = 'prefixed-' + provided_in_credentials
request = HttpRequest()
request = AxesHttpRequest()
request.POST['username'] = provided
credentials = {'username': provided_in_credentials}
@ -391,40 +393,41 @@ class UsernameTestCase(AxesTestCase):
@override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials: 'example') # pragma: no cover
def test_get_client_username(self):
self.assertEqual(get_client_username(HttpRequest(), {}), 'example')
self.assertEqual(get_client_username(AxesHttpRequest(), {}), 'example')
@override_settings(AXES_USERNAME_CALLABLE=lambda request: None) # pragma: no cover
def test_get_client_username_invalid_callable_too_few_arguments(self):
with self.assertRaises(TypeError):
get_client_username(HttpRequest(), {})
get_client_username(AxesHttpRequest(), {})
@override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials, extra: None) # pragma: no cover
def test_get_client_username_invalid_callable_too_many_arguments(self):
with self.assertRaises(TypeError):
get_client_username(HttpRequest(), {})
get_client_username(AxesHttpRequest(), {})
@override_settings(AXES_USERNAME_CALLABLE=True)
def test_get_client_username_not_callable(self):
with self.assertRaises(TypeError):
get_client_username(HttpRequest(), {})
get_client_username(AxesHttpRequest(), {})
@override_settings(AXES_USERNAME_CALLABLE='axes.tests.test_utils.get_username')
def test_get_client_username_str(self):
self.assertEqual(
get_client_username(HttpRequest(), {}),
get_client_username(AxesHttpRequest(), {}),
'username',
)
def get_username(request: HttpRequest, credentials: dict) -> str:
def get_username(request: AxesHttpRequest, credentials: dict) -> str:
return 'username'
class IPWhitelistTestCase(AxesTestCase):
def setUp(self):
self.request = HttpRequest()
self.request = AxesHttpRequest()
self.request.method = 'POST'
self.request.META['REMOTE_ADDR'] = '127.0.0.1'
self.request.axes_ip_address = '127.0.0.1'
@override_settings(AXES_IP_WHITELIST=None)
def test_ip_in_whitelist_none(self):
@ -480,7 +483,7 @@ class IPWhitelistTestCase(AxesTestCase):
class MethodWhitelistTestCase(AxesTestCase):
def setUp(self):
self.request = HttpRequest()
self.request = AxesHttpRequest()
self.request.method = 'GET'
@override_settings(AXES_NEVER_LOCKOUT_GET=True)
@ -494,7 +497,7 @@ class MethodWhitelistTestCase(AxesTestCase):
class LockoutResponseTestCase(AxesTestCase):
def setUp(self):
self.request = HttpRequest()
self.request = AxesHttpRequest()
@override_settings(AXES_COOLOFF_TIME=42)
def test_get_lockout_response_cool_off(self):

View file

@ -66,7 +66,6 @@ The following ``settings.py`` options are available for customizing Axes behavio
Default: ``False``
* ``AXES_NEVER_LOCKOUT_WHITELIST``: If ``True``, users can always login from whitelisted IP addresses.
Default: ``False``
* ``AXES_CLIENT_IP_ATTRIBUTE``: A string that is used to lookup and set client IP on the request object. Default: ``'axes_client_ip'``
* ``AXES_IP_BLACKLIST``: An iterable of IPs to be blacklisted. Takes precedence over whitelists. For example: ``AXES_IP_BLACKLIST = ['0.0.0.0']``. Default: ``None``
* ``AXES_IP_WHITELIST``: An iterable of IPs to be whitelisted. For example: ``AXES_IP_WHITELIST = ['0.0.0.0']``. Default: ``None``
* ``AXES_DISABLE_ACCESS_LOG``: If ``True``, disable all access logging, so the admin interface will be empty. Default: ``False``