diff --git a/axes/attempts.py b/axes/attempts.py index ff7317b..3308f62 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -7,7 +7,7 @@ from django.utils.timezone import datetime, now from axes.conf import settings from axes.models import AccessAttempt -from axes.utils import ( +from axes.helpers import ( get_client_ip_address, get_client_username, get_client_user_agent, @@ -87,24 +87,6 @@ def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int: return count -def reset(ip: str = None, username: str = None) -> int: - """ - Reset records that match IP or username, and return the count of removed attempts. - - This utility method is meant to be used from the CLI or via Python API. - """ - - attempts = AccessAttempt.objects.all() - - if ip: - attempts = attempts.filter(ip_address=ip) - if username: - attempts = attempts.filter(username=username) - - count, _ = attempts.delete() - log.info('AXES: Reset %s access attempts from database.', count) - - return count def is_user_attempt_whitelisted(request: HttpRequest, credentials: dict = None) -> bool: diff --git a/axes/backends.py b/axes/backends.py index c6caa46..2745374 100644 --- a/axes/backends.py +++ b/axes/backends.py @@ -2,7 +2,7 @@ from django.contrib.auth.backends import ModelBackend from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired from axes.handlers.proxy import AxesProxyHandler -from axes.utils import get_credentials, get_lockout_message +from axes.helpers import get_credentials, get_lockout_message class AxesBackend(ModelBackend): diff --git a/axes/decorators.py b/axes/decorators.py index 6e523fb..1656a6f 100644 --- a/axes/decorators.py +++ b/axes/decorators.py @@ -1,7 +1,7 @@ from functools import wraps from axes.handlers.proxy import AxesProxyHandler -from axes.utils import get_lockout_response +from axes.helpers import get_lockout_response def axes_dispatch(func): diff --git a/axes/handlers/base.py b/axes/handlers/base.py index fd336e4..ddc885f 100644 --- a/axes/handlers/base.py +++ b/axes/handlers/base.py @@ -2,7 +2,7 @@ from django.http import HttpRequest from django.utils.timezone import datetime from axes.conf import settings -from axes.utils import ( +from axes.helpers import ( is_client_ip_address_blacklisted, is_client_ip_address_whitelisted, is_client_method_whitelisted, diff --git a/axes/handlers/cache.py b/axes/handlers/cache.py index 0557b98..ff63b1e 100644 --- a/axes/handlers/cache.py +++ b/axes/handlers/cache.py @@ -4,7 +4,7 @@ from axes.conf import settings from axes.exceptions import AxesSignalPermissionDenied from axes.handlers.base import AxesBaseHandler from axes.signals import user_locked_out -from axes.utils import ( +from axes.helpers import ( get_cache, get_cache_timeout, get_client_cache_key, diff --git a/axes/handlers/database.py b/axes/handlers/database.py index a97b826..aa6f24f 100644 --- a/axes/handlers/database.py +++ b/axes/handlers/database.py @@ -15,7 +15,7 @@ from axes.exceptions import AxesSignalPermissionDenied from axes.handlers.base import AxesBaseHandler from axes.models import AccessLog, AccessAttempt from axes.signals import user_locked_out -from axes.utils import ( +from axes.helpers import ( get_client_ip_address, get_client_path_info, get_client_http_accept, diff --git a/axes/helpers.py b/axes/helpers.py new file mode 100644 index 0000000..1ef0d22 --- /dev/null +++ b/axes/helpers.py @@ -0,0 +1,393 @@ +from collections import OrderedDict +from datetime import timedelta +from hashlib import md5 +from logging import getLogger +from typing import Any, Optional, Type, Union + +from django.core.cache import caches, BaseCache +from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, JsonResponse, QueryDict +from django.shortcuts import render +from django.utils.module_loading import import_string + +import ipware.ip2 + +from axes.conf import settings + +log = getLogger(__name__) + + +def get_cache() -> BaseCache: + """ + Get the cache instance Axes is configured to use with ``settings.AXES_CACHE`` and use ``'default'`` if not set. + """ + + return caches[getattr(settings, 'AXES_CACHE', 'default')] + + +def get_cache_timeout() -> Optional[int]: + """ + Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME. + + The cache timeout can be either None if not configured or integer of seconds if configured. + + Notice that the settings.AXES_COOLOFF_TIME can be None, timedelta, or integer of hours, + and this function offers a unified _integer or None_ representation of that configuration + for use with the Django cache backends. + """ + + cool_off = get_cool_off() + if cool_off is None: + return None + return int(cool_off.total_seconds()) + + +def get_cool_off() -> Optional[timedelta]: + """ + Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME. + + The return value is either None or timedelta. + + Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, or integer of hours, + and this function offers a unified _timedelta or None_ representation of that configuration + for use with the Axes internal implementations. + + :exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type. + """ + + cool_off = settings.AXES_COOLOFF_TIME + + if isinstance(cool_off, int): + return timedelta(hours=cool_off) + return cool_off + + +def get_cool_off_iso8601(delta: timedelta) -> str: + """ + Return datetime.timedelta translated to ISO 8601 formatted duration for use in e.g. cool offs. + """ + + seconds = delta.total_seconds() + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + days, hours = divmod(hours, 24) + + days_str = '{:.0f}D'.format(days) if days else '' + + time_str = ''.join( + '{value:.0f}{designator}'.format(value=value, designator=designator) + for value, designator + in [ + [hours, 'H'], + [minutes, 'M'], + [seconds, 'S'], + ] + if value + ) + + if time_str: + template = 'P{days_str}T{time_str}' + else: + template = 'P{days_str}' + + return template.format(days_str=days_str, time_str=time_str) + + +def get_credentials(username: str = None, **kwargs) -> dict: + """ + Calculate credentials for Axes to use internally from given username and kwargs. + + Axes will set the username value into the key defined with ``settings.AXES_USERNAME_FORM_FIELD`` + and update the credentials dictionary with the kwargs given on top of that. + """ + + credentials = {settings.AXES_USERNAME_FORM_FIELD: username} + credentials.update(kwargs) + return credentials + + +def get_client_username(request: HttpRequest, credentials: dict = None) -> str: + """ + Resolve client username from the given request or credentials if supplied. + + The order of preference for fetching the username is as follows: + + 1. If configured, use ``AXES_USERNAME_CALLABLE``, and supply ``request, credentials`` as arguments + 2. If given, use ``credentials`` and fetch username from ``AXES_USERNAME_FORM_FIELD`` (defaults to ``username``) + 3. Use request.POST and fetch username from ``AXES_USERNAME_FORM_FIELD`` (defaults to ``username``) + + :param request: incoming Django ``HttpRequest`` or similar object from authentication backend or other source + :param credentials: incoming credentials ``dict`` or similar object from authentication backend or other source + """ + + if settings.AXES_USERNAME_CALLABLE: + log.debug('Using settings.AXES_USERNAME_CALLABLE to get username') + + if callable(settings.AXES_USERNAME_CALLABLE): + return settings.AXES_USERNAME_CALLABLE(request, credentials) + if isinstance(settings.AXES_USERNAME_CALLABLE, str): + return import_string(settings.AXES_USERNAME_CALLABLE)(request, credentials) + raise TypeError('settings.AXES_USERNAME_CALLABLE needs to be a string, callable, or None.') + + if credentials: + log.debug('Using parameter credentials to get username with key settings.AXES_USERNAME_FORM_FIELD') + return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None) + + log.debug('Using parameter request.POST to get username with key settings.AXES_USERNAME_FORM_FIELD') + return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) + + +def get_client_ip_address(request: HttpRequest) -> str: + """ + Get client IP address as configured by the user. + + The primary method is to fetch the reques attribute configured with the + ``settings.AXES_IP_ATTRIBUTE`` that can be set by the user on the view layer. + + If the ``settings.AXES_IP_ATTRIBUTE`` is not set, the ``ipware`` package will be utilized to resolve the address + according to the client IP configurations that can be defined by the user to suit the reverse proxy configuration + that is used in the users HTTP proxy or *aaS service layers. Refer to the documentation for more information. + """ + + client_ip_attribute = settings.AXES_CLIENT_IP_ATTRIBUTE + + if not hasattr(request, client_ip_attribute): + client_ip, _ = ipware.ip2.get_client_ip( + request, + proxy_order=settings.AXES_PROXY_ORDER, + proxy_count=settings.AXES_PROXY_COUNT, + proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS, + request_header_order=settings.AXES_META_PRECEDENCE_ORDER, + ) + + setattr(request, client_ip_attribute, client_ip) + + return getattr(request, client_ip_attribute) + + +def get_client_user_agent(request: HttpRequest) -> str: + return request.META.get('HTTP_USER_AGENT', '')[:255] + + +def get_client_path_info(request: HttpRequest) -> str: + return request.META.get('PATH_INFO', '')[:255] + + +def get_client_http_accept(request: HttpRequest) -> str: + return request.META.get('HTTP_ACCEPT', '')[:1025] + + +def get_client_parameters(username: str, ip_address: str, user_agent: str) -> OrderedDict: + """ + Get query parameters for filtering AccessAttempt queryset. + + This method returns an OrderedDict that guarantees iteration order for keys and values, + and can so be used in e.g. the generation of hash keys or other deterministic functions. + """ + + filter_kwargs = OrderedDict() # type: OrderedDict + + if settings.AXES_ONLY_USER_FAILURES: + # 1. Only individual usernames can be tracked with parametrization + filter_kwargs['username'] = username + else: + if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP: + # 2. A combination of username and IP address can be used as well + filter_kwargs['username'] = username + filter_kwargs['ip_address'] = ip_address + else: + # 3. Default case is to track the IP address only, which is the most secure option + filter_kwargs['ip_address'] = ip_address + + if settings.AXES_USE_USER_AGENT: + # 4. The HTTP User-Agent can be used to track e.g. one browser + filter_kwargs['user_agent'] = user_agent + + return filter_kwargs + + +def get_client_str(username: str, ip_address: str, user_agent: str, path_info: str) -> str: + """ + Get a readable string that can be used in e.g. logging to distinguish client requests. + + Example log format would be ``{username: "example", ip_address: "127.0.0.1", path_info: "/example/"}`` + """ + + client_dict = OrderedDict() # type: OrderedDict + + if settings.AXES_VERBOSE: + # Verbose mode logs every attribute that is available + client_dict['username'] = username + client_dict['ip_address'] = ip_address + client_dict['user_agent'] = user_agent + else: + # Other modes initialize the attributes that are used for the actual lockouts + client_dict = get_client_parameters(username, ip_address, user_agent) + + # Path info is always included as last component in the client string for traceability purposes + if path_info and isinstance(path_info, (tuple, list)): + path_info = path_info[0] + client_dict['path_info'] = path_info + + # Template the internal dictionary representation into a readable and concatenated key: "value" format + template = ', '.join( + '{key}: "{value}"'.format(key=key, value=value) + for key, value + in client_dict.items() + ) + + # Wrap the internal dict into a single {key: "value"} bracing in the output + # which requires double braces when done with the Python string templating system + # i.e. {{key: "value"}} becomes {key: "value"} when run through a .format() call + template = '{{' + template + '}}' + + return template.format(client_dict) + + +def get_query_str(query: Type[QueryDict], max_length: int = 1024) -> str: + """ + Turns a query dictionary into an easy-to-read list of key-value pairs. + + If a field is called either ``'password'`` or ``settings.AXES_PASSWORD_FORM_FIELD`` it will be excluded. + + The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads. + """ + + query_dict = query.copy() + query_dict.pop('password', None) + query_dict.pop(settings.AXES_PASSWORD_FORM_FIELD, None) + + query_str = '\n'.join( + '{key}={value}'.format(key=key, value=value) + for key, value + in query_dict.items() + ) + + return query_str[:max_length] + + +def get_lockout_message() -> str: + if settings.AXES_COOLOFF_TIME: + return settings.AXES_COOLOFF_MESSAGE + return settings.AXES_PERMALOCK_MESSAGE + + +def get_lockout_response(request: HttpRequest, credentials: dict = None) -> HttpResponse: + status = 403 + context = { + 'failure_limit': settings.AXES_FAILURE_LIMIT, + 'username': get_client_username(request, credentials) or '' + } + + cool_off = get_cool_off() + if cool_off: + context.update({ + 'cool_off': get_cool_off_iso8601(cool_off) + }) + + if request.is_ajax(): + return JsonResponse( + context, + status=status, + ) + + if settings.AXES_LOCKOUT_TEMPLATE: + return render( + request, + settings.AXES_LOCKOUT_TEMPLATE, + context, + status=status, + ) + + if settings.AXES_LOCKOUT_URL: + return HttpResponseRedirect( + settings.AXES_LOCKOUT_URL, + ) + + return HttpResponse( + get_lockout_message(), + status=status, + ) + + +def is_ip_address_in_whitelist(ip_address: str) -> bool: + if not settings.AXES_IP_WHITELIST: + return False + + return ip_address in settings.AXES_IP_WHITELIST + + +def is_ip_address_in_blacklist(ip_address: str) -> bool: + if not settings.AXES_IP_BLACKLIST: + return False + + return ip_address in settings.AXES_IP_BLACKLIST + + +def is_client_ip_address_whitelisted(request: HttpRequest): + """ + Check if the given request refers to a whitelisted IP. + """ + + ip_address = get_client_ip_address(request) + + if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(ip_address): + return True + + if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(ip_address): + return True + + return False + + +def is_client_ip_address_blacklisted(request: HttpRequest) -> bool: + """ + Check if the given request refers to a blacklisted IP. + """ + + ip_address = get_client_ip_address(request) + + if is_ip_address_in_blacklist(ip_address): + return True + + if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(ip_address): + return True + + return False + + +def is_client_method_whitelisted(request: HttpRequest) -> bool: + """ + Check if the given request uses a whitelisted method. + """ + + if settings.AXES_NEVER_LOCKOUT_GET and request.method == 'GET': + return True + + return False + + +def get_client_cache_key(request_or_attempt: Union[HttpRequest, Any], credentials: dict = None) -> str: + """ + Build cache key name from request or AccessAttempt object. + + :param request_or_attempt: HttpRequest or AccessAttempt object + :param credentials: credentials containing user information + :return cache_key: Hash key that is usable for Django cache backends + """ + + if isinstance(request_or_attempt, HttpRequest): + username = get_client_username(request_or_attempt, credentials) + ip_address = get_client_ip_address(request_or_attempt) + user_agent = get_client_user_agent(request_or_attempt) + else: + username = request_or_attempt.username + ip_address = request_or_attempt.ip_address + user_agent = request_or_attempt.user_agent + + filter_kwargs = get_client_parameters(username, ip_address, user_agent) + + cache_key_components = ''.join(filter_kwargs.values()) + cache_key_digest = md5(cache_key_components.encode()).hexdigest() + cache_key = 'axes-{}'.format(cache_key_digest) + + return cache_key diff --git a/axes/management/commands/axes_reset.py b/axes/management/commands/axes_reset.py index aadf039..f28c9f4 100644 --- a/axes/management/commands/axes_reset.py +++ b/axes/management/commands/axes_reset.py @@ -1,6 +1,6 @@ from django.core.management.base import BaseCommand -from axes.attempts import reset +from axes.utils import reset class Command(BaseCommand): diff --git a/axes/management/commands/axes_reset_ip.py b/axes/management/commands/axes_reset_ip.py index 899fdec..657a1ac 100644 --- a/axes/management/commands/axes_reset_ip.py +++ b/axes/management/commands/axes_reset_ip.py @@ -1,6 +1,6 @@ from django.core.management.base import BaseCommand -from axes.attempts import reset +from axes.utils import reset class Command(BaseCommand): diff --git a/axes/management/commands/axes_reset_username.py b/axes/management/commands/axes_reset_username.py index 6bbca5f..2e6ec7c 100644 --- a/axes/management/commands/axes_reset_username.py +++ b/axes/management/commands/axes_reset_username.py @@ -1,6 +1,6 @@ from django.core.management.base import BaseCommand -from axes.attempts import reset +from axes.utils import reset class Command(BaseCommand): diff --git a/axes/middleware.py b/axes/middleware.py index ca3d822..79c1f94 100644 --- a/axes/middleware.py +++ b/axes/middleware.py @@ -1,5 +1,5 @@ from axes.exceptions import AxesSignalPermissionDenied -from axes.utils import get_lockout_response +from axes.helpers import get_lockout_response class AxesMiddleware: diff --git a/axes/tests/base.py b/axes/tests/base.py index 2e75973..a2259ff 100644 --- a/axes/tests/base.py +++ b/axes/tests/base.py @@ -7,9 +7,9 @@ from django.http import HttpRequest from django.test import TestCase from django.urls import reverse -from axes.attempts import reset +from axes.utils import reset from axes.conf import settings -from axes.utils import get_cache, get_cool_off, get_credentials +from axes.helpers import get_cache, get_cool_off, get_credentials from axes.models import AccessLog, AccessAttempt diff --git a/axes/tests/test_attempts.py b/axes/tests/test_attempts.py index 9e5efe0..7560b30 100644 --- a/axes/tests/test_attempts.py +++ b/axes/tests/test_attempts.py @@ -3,12 +3,10 @@ from unittest.mock import patch from django.contrib.auth import get_user_model from django.http import HttpRequest -from axes.attempts import ( - is_user_attempt_whitelisted, - reset, -) +from axes.attempts import is_user_attempt_whitelisted from axes.models import AccessAttempt from axes.tests.base import AxesTestCase +from axes.utils import reset class ResetTestCase(AxesTestCase): diff --git a/axes/tests/test_handlers.py b/axes/tests/test_handlers.py index 2ac109c..2fd1d35 100644 --- a/axes/tests/test_handlers.py +++ b/axes/tests/test_handlers.py @@ -6,7 +6,7 @@ from django.utils.timezone import timedelta from axes.conf import settings from axes.handlers.proxy import AxesProxyHandler from axes.tests.base import AxesTestCase -from axes.utils import get_client_str +from axes.helpers import get_client_str @override_settings(AXES_HANDLER='axes.handlers.base.AxesBaseHandler') diff --git a/axes/tests/test_utils.py b/axes/tests/test_utils.py index 98c66bf..7361d9d 100644 --- a/axes/tests/test_utils.py +++ b/axes/tests/test_utils.py @@ -8,7 +8,7 @@ from django.test import override_settings, RequestFactory from axes import get_version from axes.models import AccessAttempt from axes.tests.base import AxesTestCase -from axes.utils import ( +from axes.helpers import ( get_cache_timeout, get_client_str, get_client_username, @@ -501,7 +501,7 @@ class LockoutResponseTestCase(AxesTestCase): get_lockout_response(request=self.request) @override_settings(AXES_LOCKOUT_TEMPLATE='example.html') - @patch('axes.utils.render') + @patch('axes.helpers.render') def test_get_lockout_response_lockout_template(self, render): self.assertFalse(render.called) get_lockout_response(request=self.request) diff --git a/axes/utils.py b/axes/utils.py index 292240d..67bf815 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -1,393 +1,32 @@ -from collections import OrderedDict -from datetime import timedelta -from hashlib import md5 +""" +Axes utility functions that are publicly available. + +This module is separate for historical reasons +and offers a backwards compatible import path. +""" + from logging import getLogger -from typing import Any, Optional, Type, Union -from django.core.cache import caches, BaseCache -from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, JsonResponse, QueryDict -from django.shortcuts import render -from django.utils.module_loading import import_string +from axes.models import AccessAttempt -import ipware.ip2 - -from axes.conf import settings - -logger = getLogger(__name__) +log = getLogger(__name__) -def get_cache() -> BaseCache: +def reset(ip: str = None, username: str = None) -> int: """ - Get the cache instance Axes is configured to use with ``settings.AXES_CACHE`` and use ``'default'`` if not set. + Reset records that match IP or username, and return the count of removed attempts. + + This utility method is meant to be used from the CLI or via Python API. """ - return caches[getattr(settings, 'AXES_CACHE', 'default')] + attempts = AccessAttempt.objects.all() + if ip: + attempts = attempts.filter(ip_address=ip) + if username: + attempts = attempts.filter(username=username) -def get_cache_timeout() -> Optional[int]: - """ - Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME. + count, _ = attempts.delete() + log.info('AXES: Reset %s access attempts from database.', count) - The cache timeout can be either None if not configured or integer of seconds if configured. - - Notice that the settings.AXES_COOLOFF_TIME can be None, timedelta, or integer of hours, - and this function offers a unified _integer or None_ representation of that configuration - for use with the Django cache backends. - """ - - cool_off = get_cool_off() - if cool_off is None: - return None - return int(cool_off.total_seconds()) - - -def get_cool_off() -> Optional[timedelta]: - """ - Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME. - - The return value is either None or timedelta. - - Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, or integer of hours, - and this function offers a unified _timedelta or None_ representation of that configuration - for use with the Axes internal implementations. - - :exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type. - """ - - cool_off = settings.AXES_COOLOFF_TIME - - if isinstance(cool_off, int): - return timedelta(hours=cool_off) - return cool_off - - -def get_cool_off_iso8601(delta: timedelta) -> str: - """ - Return datetime.timedelta translated to ISO 8601 formatted duration for use in e.g. cool offs. - """ - - seconds = delta.total_seconds() - minutes, seconds = divmod(seconds, 60) - hours, minutes = divmod(minutes, 60) - days, hours = divmod(hours, 24) - - days_str = '{:.0f}D'.format(days) if days else '' - - time_str = ''.join( - '{value:.0f}{designator}'.format(value=value, designator=designator) - for value, designator - in [ - [hours, 'H'], - [minutes, 'M'], - [seconds, 'S'], - ] - if value - ) - - if time_str: - template = 'P{days_str}T{time_str}' - else: - template = 'P{days_str}' - - return template.format(days_str=days_str, time_str=time_str) - - -def get_credentials(username: str = None, **kwargs) -> dict: - """ - Calculate credentials for Axes to use internally from given username and kwargs. - - Axes will set the username value into the key defined with ``settings.AXES_USERNAME_FORM_FIELD`` - and update the credentials dictionary with the kwargs given on top of that. - """ - - credentials = {settings.AXES_USERNAME_FORM_FIELD: username} - credentials.update(kwargs) - return credentials - - -def get_client_username(request: HttpRequest, credentials: dict = None) -> str: - """ - Resolve client username from the given request or credentials if supplied. - - The order of preference for fetching the username is as follows: - - 1. If configured, use ``AXES_USERNAME_CALLABLE``, and supply ``request, credentials`` as arguments - 2. If given, use ``credentials`` and fetch username from ``AXES_USERNAME_FORM_FIELD`` (defaults to ``username``) - 3. Use request.POST and fetch username from ``AXES_USERNAME_FORM_FIELD`` (defaults to ``username``) - - :param request: incoming Django ``HttpRequest`` or similar object from authentication backend or other source - :param credentials: incoming credentials ``dict`` or similar object from authentication backend or other source - """ - - if settings.AXES_USERNAME_CALLABLE: - logger.debug('Using settings.AXES_USERNAME_CALLABLE to get username') - - if callable(settings.AXES_USERNAME_CALLABLE): - return settings.AXES_USERNAME_CALLABLE(request, credentials) - if isinstance(settings.AXES_USERNAME_CALLABLE, str): - return import_string(settings.AXES_USERNAME_CALLABLE)(request, credentials) - raise TypeError('settings.AXES_USERNAME_CALLABLE needs to be a string, callable, or None.') - - if credentials: - logger.debug('Using parameter credentials to get username with key settings.AXES_USERNAME_FORM_FIELD') - return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None) - - logger.debug('Using parameter request.POST to get username with key settings.AXES_USERNAME_FORM_FIELD') - return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) - - -def get_client_ip_address(request: HttpRequest) -> str: - """ - Get client IP address as configured by the user. - - The primary method is to fetch the reques attribute configured with the - ``settings.AXES_IP_ATTRIBUTE`` that can be set by the user on the view layer. - - If the ``settings.AXES_IP_ATTRIBUTE`` is not set, the ``ipware`` package will be utilized to resolve the address - according to the client IP configurations that can be defined by the user to suit the reverse proxy configuration - that is used in the users HTTP proxy or *aaS service layers. Refer to the documentation for more information. - """ - - client_ip_attribute = settings.AXES_CLIENT_IP_ATTRIBUTE - - if not hasattr(request, client_ip_attribute): - client_ip, _ = ipware.ip2.get_client_ip( - request, - proxy_order=settings.AXES_PROXY_ORDER, - proxy_count=settings.AXES_PROXY_COUNT, - proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS, - request_header_order=settings.AXES_META_PRECEDENCE_ORDER, - ) - - setattr(request, client_ip_attribute, client_ip) - - return getattr(request, client_ip_attribute) - - -def get_client_user_agent(request: HttpRequest) -> str: - return request.META.get('HTTP_USER_AGENT', '')[:255] - - -def get_client_path_info(request: HttpRequest) -> str: - return request.META.get('PATH_INFO', '')[:255] - - -def get_client_http_accept(request: HttpRequest) -> str: - return request.META.get('HTTP_ACCEPT', '')[:1025] - - -def get_client_parameters(username: str, ip_address: str, user_agent: str) -> OrderedDict: - """ - Get query parameters for filtering AccessAttempt queryset. - - This method returns an OrderedDict that guarantees iteration order for keys and values, - and can so be used in e.g. the generation of hash keys or other deterministic functions. - """ - - filter_kwargs = OrderedDict() # type: OrderedDict - - if settings.AXES_ONLY_USER_FAILURES: - # 1. Only individual usernames can be tracked with parametrization - filter_kwargs['username'] = username - else: - if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP: - # 2. A combination of username and IP address can be used as well - filter_kwargs['username'] = username - filter_kwargs['ip_address'] = ip_address - else: - # 3. Default case is to track the IP address only, which is the most secure option - filter_kwargs['ip_address'] = ip_address - - if settings.AXES_USE_USER_AGENT: - # 4. The HTTP User-Agent can be used to track e.g. one browser - filter_kwargs['user_agent'] = user_agent - - return filter_kwargs - - -def get_client_str(username: str, ip_address: str, user_agent: str, path_info: str) -> str: - """ - Get a readable string that can be used in e.g. logging to distinguish client requests. - - Example log format would be ``{username: "example", ip_address: "127.0.0.1", path_info: "/example/"}`` - """ - - client_dict = OrderedDict() # type: OrderedDict - - if settings.AXES_VERBOSE: - # Verbose mode logs every attribute that is available - client_dict['username'] = username - client_dict['ip_address'] = ip_address - client_dict['user_agent'] = user_agent - else: - # Other modes initialize the attributes that are used for the actual lockouts - client_dict = get_client_parameters(username, ip_address, user_agent) - - # Path info is always included as last component in the client string for traceability purposes - if path_info and isinstance(path_info, (tuple, list)): - path_info = path_info[0] - client_dict['path_info'] = path_info - - # Template the internal dictionary representation into a readable and concatenated key: "value" format - template = ', '.join( - '{key}: "{value}"'.format(key=key, value=value) - for key, value - in client_dict.items() - ) - - # Wrap the internal dict into a single {key: "value"} bracing in the output - # which requires double braces when done with the Python string templating system - # i.e. {{key: "value"}} becomes {key: "value"} when run through a .format() call - template = '{{' + template + '}}' - - return template.format(client_dict) - - -def get_query_str(query: Type[QueryDict], max_length: int = 1024) -> str: - """ - Turns a query dictionary into an easy-to-read list of key-value pairs. - - If a field is called either ``'password'`` or ``settings.AXES_PASSWORD_FORM_FIELD`` it will be excluded. - - The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads. - """ - - query_dict = query.copy() - query_dict.pop('password', None) - query_dict.pop(settings.AXES_PASSWORD_FORM_FIELD, None) - - query_str = '\n'.join( - '{key}={value}'.format(key=key, value=value) - for key, value - in query_dict.items() - ) - - return query_str[:max_length] - - -def get_lockout_message() -> str: - if settings.AXES_COOLOFF_TIME: - return settings.AXES_COOLOFF_MESSAGE - return settings.AXES_PERMALOCK_MESSAGE - - -def get_lockout_response(request: HttpRequest, credentials: dict = None) -> HttpResponse: - status = 403 - context = { - 'failure_limit': settings.AXES_FAILURE_LIMIT, - 'username': get_client_username(request, credentials) or '' - } - - cool_off = get_cool_off() - if cool_off: - context.update({ - 'cool_off': get_cool_off_iso8601(cool_off) - }) - - if request.is_ajax(): - return JsonResponse( - context, - status=status, - ) - - if settings.AXES_LOCKOUT_TEMPLATE: - return render( - request, - settings.AXES_LOCKOUT_TEMPLATE, - context, - status=status, - ) - - if settings.AXES_LOCKOUT_URL: - return HttpResponseRedirect( - settings.AXES_LOCKOUT_URL, - ) - - return HttpResponse( - get_lockout_message(), - status=status, - ) - - -def is_ip_address_in_whitelist(ip_address: str) -> bool: - if not settings.AXES_IP_WHITELIST: - return False - - return ip_address in settings.AXES_IP_WHITELIST - - -def is_ip_address_in_blacklist(ip_address: str) -> bool: - if not settings.AXES_IP_BLACKLIST: - return False - - return ip_address in settings.AXES_IP_BLACKLIST - - -def is_client_ip_address_whitelisted(request: HttpRequest): - """ - Check if the given request refers to a whitelisted IP. - """ - - ip_address = get_client_ip_address(request) - - if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(ip_address): - return True - - if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(ip_address): - return True - - return False - - -def is_client_ip_address_blacklisted(request: HttpRequest) -> bool: - """ - Check if the given request refers to a blacklisted IP. - """ - - ip_address = get_client_ip_address(request) - - if is_ip_address_in_blacklist(ip_address): - return True - - if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(ip_address): - return True - - return False - - -def is_client_method_whitelisted(request: HttpRequest) -> bool: - """ - Check if the given request uses a whitelisted method. - """ - - if settings.AXES_NEVER_LOCKOUT_GET and request.method == 'GET': - return True - - return False - - -def get_client_cache_key(request_or_attempt: Union[HttpRequest, Any], credentials: dict = None) -> str: - """ - Build cache key name from request or AccessAttempt object. - - :param request_or_attempt: HttpRequest or AccessAttempt object - :param credentials: credentials containing user information - :return cache_key: Hash key that is usable for Django cache backends - """ - - if isinstance(request_or_attempt, HttpRequest): - username = get_client_username(request_or_attempt, credentials) - ip_address = get_client_ip_address(request_or_attempt) - user_agent = get_client_user_agent(request_or_attempt) - else: - username = request_or_attempt.username - ip_address = request_or_attempt.ip_address - user_agent = request_or_attempt.user_agent - - filter_kwargs = get_client_parameters(username, ip_address, user_agent) - - cache_key_components = ''.join(filter_kwargs.values()) - cache_key_digest = md5(cache_key_components.encode()).hexdigest() - cache_key = 'axes-{}'.format(cache_key_digest) - - return cache_key + return count diff --git a/docs/3_usage.rst b/docs/3_usage.rst index a31e972..fec58d2 100644 --- a/docs/3_usage.rst +++ b/docs/3_usage.rst @@ -22,7 +22,7 @@ management commands with the Django ``manage.py`` command helpers: - ``python manage.py axes_reset_username username [username ...]`` will clear lockouts and records for the given usernames. -In your code, you can use the ``axes.attempts.reset`` function. +In your code, you can use the ``axes.utils.reset`` function. - ``reset()`` will reset all lockouts and access records. - ``reset(ip=ip)`` will clear lockouts and records for the given IP address. diff --git a/docs/7_migration.rst b/docs/7_migration.rst index 716a7f2..785d2d4 100644 --- a/docs/7_migration.rst +++ b/docs/7_migration.rst @@ -18,7 +18,6 @@ API endpoints or names for compatibility reasons. Login monitoring is now implemented with signal handlers and locking users out is implemented with a combination of a custom authentication backend, middleware, and signals. -- ``axes.utils.reset`` was moved to ``axes.attempts.reset``. - ``AxesModelBackend`` was renamed to ``AxesBackend`` for better naming and preventing the risk of users accidentally upgrading without noticing that the APIs have changed.