diff --git a/axes/attempts.py b/axes/attempts.py index d76c50b..a5a7993 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -1,6 +1,6 @@ -from collections import OrderedDict from hashlib import md5 from logging import getLogger +from typing import Union from django.contrib.auth import get_user_model from django.db.models import QuerySet @@ -11,56 +11,18 @@ from axes.conf import settings from axes.models import AccessAttempt from axes.utils import ( get_axes_cache, - get_client_ip, + get_client_ip_address, get_client_username, get_client_user_agent, get_cache_timeout, get_cool_off, + get_client_parameters, ) log = getLogger(settings.AXES_LOGGER) -def get_filter_kwargs(username: str, ip_address: str, user_agent: str) -> OrderedDict: - """ - Get query parameters for filtering AccessAttempt queryset. - - This method returns an OrderedDict that guarantees iteration order for keys and values, - and can so be used in e.g. the generation of hash keys or other deterministic functions. - """ - - query = OrderedDict() # type: OrderedDict - - if settings.AXES_ONLY_USER_FAILURES: - query['username'] = username - else: - if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP: - query['username'] = username - query['ip_address'] = ip_address - else: - query['ip_address'] = ip_address - - if settings.AXES_USE_USER_AGENT: - query['user_agent'] = user_agent - - return query - - -def filter_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet: - """ - Return a queryset of AccessAttempts that match the given request and credentials. - """ - - username = get_client_username(request, credentials) - ip_address = get_client_ip(request) - user_agent = get_client_user_agent(request) - - filter_kwargs = get_filter_kwargs(username, ip_address, user_agent) - - return AccessAttempt.objects.filter(**filter_kwargs) - - -def get_cache_key(request_or_attempt, credentials: dict = None) -> str: +def get_cache_key(request_or_attempt: Union[HttpRequest, AccessAttempt], credentials: dict = None) -> str: """ Build cache key name from request or AccessAttempt object. @@ -75,10 +37,10 @@ def get_cache_key(request_or_attempt, credentials: dict = None) -> str: user_agent = request_or_attempt.user_agent else: username = get_client_username(request_or_attempt, credentials) - ip_address = get_client_ip(request_or_attempt) + ip_address = get_client_ip_address(request_or_attempt) user_agent = get_client_user_agent(request_or_attempt) - filter_kwargs = get_filter_kwargs(username, ip_address, user_agent) + filter_kwargs = get_client_parameters(username, ip_address, user_agent) cache_key_components = ''.join(filter_kwargs.values()) cache_key_digest = md5(cache_key_components.encode()).hexdigest() @@ -87,6 +49,20 @@ def get_cache_key(request_or_attempt, credentials: dict = None) -> str: return cache_key +def filter_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet: + """ + Return a queryset of AccessAttempts that match the given request and credentials. + """ + + username = get_client_username(request, credentials) + ip_address = get_client_ip_address(request) + user_agent = get_client_user_agent(request) + + filter_kwargs = get_client_parameters(username, ip_address, user_agent) + + return AccessAttempt.objects.filter(**filter_kwargs) + + def get_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet: """ Get valid user attempts and delete expired attempts which have cool offs in the past. @@ -119,6 +95,10 @@ def get_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySe def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int: + """ + Reset all user attempts that match the given request and credentials. + """ + attempts = filter_user_attempts(request, credentials) count, _ = attempts.delete() @@ -144,7 +124,7 @@ def is_ip_blacklisted(request: HttpRequest) -> bool: Check if the given request refers to a blacklisted IP. """ - ip = get_client_ip(request) + ip = get_client_ip_address(request) if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip): return False diff --git a/axes/handlers.py b/axes/handlers.py index 4d566ba..3a707f7 100644 --- a/axes/handlers.py +++ b/axes/handlers.py @@ -12,9 +12,17 @@ from axes.conf import settings from axes.exceptions import AxesSignalPermissionDenied from axes.models import AccessLog, AccessAttempt from axes.signals import user_locked_out -from axes.utils import get_client_str, get_client_user_agent -from axes.utils import query2str -from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials +from axes.utils import ( + get_axes_cache, + get_client_ip_address, + get_client_path_info, + get_client_http_accept, + get_client_str, + get_client_username, + get_client_user_agent, + get_credentials, + get_query_str, +) log = getLogger(settings.AXES_LOGGER) @@ -37,10 +45,10 @@ class AxesHandler: # pylint: disable=too-many-locals return username = get_client_username(request, credentials) - ip_address = get_client_ip(request) + ip_address = get_client_ip_address(request) user_agent = get_client_user_agent(request) - path_info = request.META.get('PATH_INFO', '')[:255] - http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] + path_info = get_client_path_info(request) + http_accept = get_client_http_accept(request) client_str = get_client_str(username, ip_address, user_agent, path_info) if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address): @@ -76,11 +84,11 @@ class AxesHandler: # pylint: disable=too-many-locals attempt.get_data = template.format( attempt.get_data, - query2str(request.GET), + get_query_str(request.GET), ) attempt.post_data = template.format( attempt.post_data, - query2str(request.POST) + get_query_str(request.POST) ) attempt.http_accept = http_accept attempt.path_info = path_info @@ -96,14 +104,14 @@ class AxesHandler: # pylint: disable=too-many-locals ) else: # Record failed attempt. Whether or not the IP address or user agent is - # used in counting failures is handled elsewhere, so we just record # everything here. + # used in counting failures is handled elsewhere, so we just record everything here. AccessAttempt.objects.create( username=username, ip_address=ip_address, user_agent=user_agent, - get_data=query2str(request.GET), - post_data=query2str(request.POST), + get_data=get_query_str(request.GET), + post_data=get_query_str(request.POST), http_accept=http_accept, path_info=path_info, failures_since_start=failures, @@ -136,10 +144,10 @@ class AxesHandler: # pylint: disable=too-many-locals username = user.get_username() credentials = get_credentials(username) - ip_address = get_client_ip(request) + ip_address = get_client_ip_address(request) user_agent = get_client_user_agent(request) - path_info = request.META.get('PATH_INFO', '')[:255] - http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] + path_info = get_client_path_info(request) + http_accept = get_client_http_accept(request) client_str = get_client_str(username, ip_address, user_agent, path_info) log.info( @@ -170,7 +178,16 @@ class AxesHandler: # pylint: disable=too-many-locals When user logs out, update the AccessLog related to the user. """ - log.info('AXES: Successful logout by %s.', user) + username = user.get_username() + ip_address = get_client_ip_address(request) + user_agent = get_client_user_agent(request) + path_info = get_client_path_info(request) + client_str = get_client_str(username, ip_address, user_agent, path_info) + + log.info( + 'AXES: Successful logout by %s.', + client_str, + ) if user and not settings.AXES_DISABLE_ACCESS_LOG: AccessLog.objects.filter( diff --git a/axes/tests/test_access_attempt.py b/axes/tests/test_access_attempt.py index 3462974..1911d4f 100644 --- a/axes/tests/test_access_attempt.py +++ b/axes/tests/test_access_attempt.py @@ -19,13 +19,12 @@ from axes.attempts import ( ip_in_blacklist, ip_in_whitelist, is_user_lockable, - get_filter_kwargs, + get_client_parameters, get_user_attempts, ) from axes.conf import settings from axes.models import AccessAttempt, AccessLog from axes.signals import user_locked_out -from axes.utils import reset class AccessAttemptTest(TestCase): @@ -229,7 +228,7 @@ class AccessAttemptTest(TestCase): self.test_failure_limit_once() # Reset the ip so we can try again - reset(ip='127.0.0.1') + AccessAttempt.objects.filter(ip_address='127.0.0.1').delete() # Make a login attempt again self.test_valid_login() @@ -243,7 +242,7 @@ class AccessAttemptTest(TestCase): self.test_failure_limit_once() # Reset all attempts so we can try again - reset() + AccessAttempt.objects.all().delete() # Make a login attempt again self.test_valid_login() @@ -253,7 +252,7 @@ class AccessAttemptTest(TestCase): ) def test_get_filter_kwargs_user(self): self.assertEqual( - dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)), + dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), {'username': self.username}, ) @@ -264,7 +263,7 @@ class AccessAttemptTest(TestCase): ) def test_get_filter_kwargs_ip(self): self.assertEqual( - dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)), + dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), {'ip_address': self.ip_address}, ) @@ -275,7 +274,7 @@ class AccessAttemptTest(TestCase): ) def test_get_filter_kwargs_user_and_ip(self): self.assertEqual( - dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)), + dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), {'username': self.username, 'ip_address': self.ip_address}, ) @@ -286,7 +285,7 @@ class AccessAttemptTest(TestCase): ) def test_get_filter_kwargs_ip_and_agent(self): self.assertEqual( - dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)), + dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), {'ip_address': self.ip_address, 'user_agent': self.user_agent}, ) @@ -297,11 +296,11 @@ class AccessAttemptTest(TestCase): ) def test_get_filter_kwargs_user_ip_agent(self): self.assertEqual( - dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)), + dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), {'username': self.username, 'ip_address': self.ip_address, 'user_agent': self.user_agent}, ) - @patch('axes.utils.get_client_ip', return_value='127.0.0.1') + @patch('axes.utils.get_client_ip_address', return_value='127.0.0.1') def test_get_cache_key(self, _): """ Test the cache key format. @@ -338,7 +337,7 @@ class AccessAttemptTest(TestCase): self.assertEqual(cache_hash_key, get_cache_key(attempt)) - @patch('axes.utils.get_client_ip', return_value='127.0.0.1') + @patch('axes.utils.get_client_ip_address', return_value='127.0.0.1') def test_get_cache_key_credentials(self, _): """ Test the cache key format. @@ -398,7 +397,7 @@ class AccessAttemptTest(TestCase): self.test_failure_limit_once() self.assertEqual(scope.signal_received, 1) - reset() + AccessAttempt.objects.all().delete() # Make another lockout self.test_failure_limit_once() @@ -446,7 +445,7 @@ class AccessAttemptTest(TestCase): # reset the username only and make sure we can log in now even though # our IP has failed each time - reset(username=AccessAttemptTest.VALID_USERNAME) + AccessAttempt.objects.filter(username=AccessAttemptTest.VALID_USERNAME).delete() response = self._login( is_valid_username=True, is_valid_password=True, @@ -461,13 +460,13 @@ class AccessAttemptTest(TestCase): is_valid_username=False, is_valid_password=False, ) - # Check if we can still log in with valid user + # Check if we can still initialize in with valid user response = self._login(is_valid_username=True, is_valid_password=True) self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True) def test_log_data_truncated(self): """ - Test that query2str properly truncates data to the max_length (default 1024). + Test that get_query_str properly truncates data to the max_length (default 1024). """ # An impossibly large post dict diff --git a/axes/tests/test_handlers.py b/axes/tests/test_handlers.py index 6d05181..d89d78b 100644 --- a/axes/tests/test_handlers.py +++ b/axes/tests/test_handlers.py @@ -89,7 +89,7 @@ class AxesHandlerTestCase(TestCase): log.warning.assert_called_with('AxesHandler.user_login_failed does not function without a request.') @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=['127.0.0.1']) - @patch('axes.handlers.get_client_ip', return_value='127.0.0.1') + @patch('axes.handlers.get_client_ip_address', return_value='127.0.0.1') @patch('axes.handlers.ip_in_whitelist', return_value=True) @patch('axes.handlers.log') def test_user_login_failed_whitelist(self, log, _, __): diff --git a/axes/tests/test_utils.py b/axes/tests/test_utils.py index 2b5f4d3..9f661ec 100644 --- a/axes/tests/test_utils.py +++ b/axes/tests/test_utils.py @@ -5,11 +5,15 @@ from django.http import HttpRequest, JsonResponse, HttpResponseRedirect, HttpRes from django.test import TestCase, override_settings from axes import get_version -from axes.utils import iso8601, is_ipv6, get_client_str, get_client_username, get_lockout_response +from axes.utils import get_cool_off_iso8601, get_client_str, get_client_username, get_lockout_response + + +def get_username(request: HttpRequest, credentials: dict) -> str: + return 'username' def get_expected_client_str(*args, **kwargs): - client_str_template = '{{user: "{0}", ip: "{1}", user-agent: "{2}", path: "{3}"}}' + client_str_template = '{{username: "{0}", ip_address: "{1}", user_agent: "{2}", path_info: "{3}"}}' return client_str_template.format(*args, **kwargs) @@ -22,7 +26,7 @@ class AxesTestCase(TestCase): class UtilsTestCase(TestCase): def test_iso8601(self): """ - Test iso8601 correctly translates datetime.timdelta to ISO 8601 formatted duration. + Test get_cool_off_iso8601 correctly translates datetime.timdelta to ISO 8601 formatted duration. """ expected = { @@ -46,46 +50,41 @@ class UtilsTestCase(TestCase): for delta, iso_duration in expected.items(): with self.subTest(iso_duration): - self.assertEqual(iso8601(delta), iso_duration) - - def test_is_ipv6(self): - self.assertTrue(is_ipv6('ff80::220:16ff:fec9:1')) - self.assertFalse(is_ipv6('67.255.125.204')) - self.assertFalse(is_ipv6('foo')) + self.assertEqual(get_cool_off_iso8601(delta), iso_duration) @override_settings(AXES_VERBOSE=True) def test_verbose_ip_only_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = get_expected_client_str(username, ip, user_agent, path_info) - actual = get_client_str(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip_address, user_agent, path_info) + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @override_settings(AXES_VERBOSE=True) def test_verbose_ip_only_client_details_tuple(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = ('admin', 'login') - expected = get_expected_client_str(username, ip, user_agent, path_info[0]) - actual = get_client_str(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip_address, user_agent, path_info[0]) + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @override_settings(AXES_VERBOSE=False) def test_non_verbose_ip_only_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = ip - actual = get_client_str(username, ip, user_agent, path_info) + expected = '{ip_address: "127.0.0.1", path_info: "/admin/"}' + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -93,12 +92,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=True) def test_verbose_user_only_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = get_expected_client_str(username, ip, user_agent, path_info) - actual = get_client_str(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip_address, user_agent, path_info) + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -106,12 +105,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=False) def test_non_verbose_user_only_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = username - actual = get_client_str(username, ip, user_agent, path_info) + expected = '{username: "test@example.com", path_info: "/admin/"}' + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -119,12 +118,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=True) def test_verbose_user_ip_combo_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = get_expected_client_str(username, ip, user_agent, path_info) - actual = get_client_str(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip_address, user_agent, path_info) + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -132,12 +131,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=False) def test_non_verbose_user_ip_combo_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = '{0} from {1}'.format(username, ip) - actual = get_client_str(username, ip, user_agent, path_info) + expected = '{username: "test@example.com", ip_address: "127.0.0.1", path_info: "/admin/"}' + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -145,12 +144,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=True) def test_verbose_user_agent_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = get_expected_client_str(username, ip, user_agent, path_info) - actual = get_client_str(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip_address, user_agent, path_info) + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -158,12 +157,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=False) def test_non_verbose_user_agent_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = ip + '(user-agent={0})'.format(user_agent) - actual = get_client_str(username, ip, user_agent, path_info) + expected = '{ip_address: "127.0.0.1", user_agent: "Googlebot/2.1 (+http://www.googlebot.com/bot.html)", path_info: "/admin/"}' + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) diff --git a/axes/utils.py b/axes/utils.py index 1fc4b7f..666b4c0 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -1,7 +1,7 @@ -from typing import Optional +from collections import OrderedDict from datetime import timedelta from logging import getLogger -from socket import error, inet_pton, AF_INET6 +from typing import Optional from django.core.cache import caches, BaseCache from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, JsonResponse @@ -9,6 +9,7 @@ from django.shortcuts import render from django.utils.module_loading import import_string import ipware.ip2 +from django.utils.module_loading import import_string from axes.conf import settings from axes.models import AccessAttempt @@ -16,59 +17,109 @@ from axes.models import AccessAttempt logger = getLogger(__name__) +def reset(ip: str = None, username: str = None) -> int: + """ + Reset records that match IP or username, and return the count of removed attempts. + """ + + attempts = AccessAttempt.objects.all() + if ip: + attempts = attempts.filter(ip_address=ip) + if username: + attempts = attempts.filter(username=username) + + count, _ = attempts.delete() + + return count + + def get_axes_cache() -> BaseCache: + """ + Get the cache instance Axes is configured to use with ``settings.AXES_CACHE`` and use ``'default'`` if not set. + """ + return caches[getattr(settings, 'AXES_CACHE', 'default')] -def query2str(dictionary: dict, max_length: int = 1024) -> str: +def get_cache_timeout() -> Optional[int]: """ - Turns a dictionary into an easy-to-read list of key-value pairs. + Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME. - If there is a field called password it will be excluded from the output. + The cache timeout can be either None if not configured or integer of seconds if configured. - The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads. + Notice that the settings.AXES_COOLOFF_TIME can be None, timedelta, or integer of hours, + and this function offers a unified _integer or None_ representation of that configuration + for use with the Django cache backends. """ - return '\n'.join([ - '%s=%s' % (k, v) for k, v in dictionary.items() - if k != settings.AXES_PASSWORD_FORM_FIELD - ][:int(max_length / 2)])[:max_length] + cool_off = get_cool_off() + if cool_off is None: + return None + return int(cool_off.total_seconds()) -def get_client_str(username, ip_address, user_agent=None, path_info=None): - if settings.AXES_VERBOSE: - if path_info and isinstance(path_info, tuple): - path_info = path_info[0] +def get_cool_off() -> Optional[timedelta]: + """ + Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME. - details = '{{user: "{0}", ip: "{1}", user-agent: "{2}", path: "{3}"}}' - return details.format(username, ip_address, user_agent, path_info) + The return value is either None or timedelta. - if settings.AXES_ONLY_USER_FAILURES: - client = username - elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP: - client = '{0} from {1}'.format(username, ip_address) + Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, or integer of hours, + and this function offers a unified _timedelta or None_ representation of that configuration + for use with the Axes internal implementations. + + :exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type. + """ + + cool_off = settings.AXES_COOLOFF_TIME + + if isinstance(cool_off, int): + return timedelta(hours=cool_off) + return cool_off + + +def get_cool_off_iso8601(delta: timedelta) -> str: + """ + Return datetime.timedelta translated to ISO 8601 formatted duration for use in e.g. cool offs. + """ + + seconds = delta.total_seconds() + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + days, hours = divmod(hours, 24) + + days_str = '{:.0f}D'.format(days) if days else '' + + time_str = ''.join( + '{value:.0f}{designator}'.format(value=value, designator=designator) + for value, designator + in [ + [hours, 'H'], + [minutes, 'M'], + [seconds, 'S'], + ] + if value + ) + + if time_str: + template = 'P{days_str}T{time_str}' else: - client = ip_address + template = 'P{days_str}' - if settings.AXES_USE_USER_AGENT: - client += '(user-agent={0})'.format(user_agent) - - return client + return template.format(days_str=days_str, time_str=time_str) -def get_client_ip(request: HttpRequest) -> str: - client_ip_attribute = 'axes_client_ip' +def get_credentials(username: str = None, **kwargs) -> dict: + """ + Calculate credentials for Axes to use internally from given username and kwargs. - if not hasattr(request, client_ip_attribute): - client_ip, _ = ipware.ip2.get_client_ip( - request, - proxy_order=settings.AXES_PROXY_ORDER, - proxy_count=settings.AXES_PROXY_COUNT, - proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS, - request_header_order=settings.AXES_META_PRECEDENCE_ORDER, - ) - setattr(request, client_ip_attribute, client_ip) - return getattr(request, client_ip_attribute) + Axes will set the username value into the key defined with ``settings.AXES_USERNAME_FORM_FIELD`` + and update the credentials dictionary with the kwargs given on top of that. + """ + + credentials = {settings.AXES_USERNAME_FORM_FIELD: username} + credentials.update(kwargs) + return credentials def get_client_username(request: HttpRequest, credentials: dict = None) -> str: @@ -102,98 +153,132 @@ def get_client_username(request: HttpRequest, credentials: dict = None) -> str: return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) +def get_client_ip_address(request: HttpRequest) -> str: + """ + Get client IP address as configured by the user. + + The primary method is to fetch the reques attribute configured with the + ``settings.AXES_IP_ATTRIBUTE`` that can be set by the user on the view layer. + + If the ``settings.AXES_IP_ATTRIBUTE`` is not set, the ``ipware`` package will be utilized to resolve the address + according to the client IP configurations that can be defined by the user to suit the reverse proxy configuration + that is used in the users HTTP proxy or *aaS service layers. Refer to the documentation for more information. + """ + + client_ip_attribute = 'axes_client_ip' + + if not hasattr(request, client_ip_attribute): + client_ip, _ = ipware.ip2.get_client_ip( + request, + proxy_order=settings.AXES_PROXY_ORDER, + proxy_count=settings.AXES_PROXY_COUNT, + proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS, + request_header_order=settings.AXES_META_PRECEDENCE_ORDER, + ) + + setattr(request, client_ip_attribute, client_ip) + + return getattr(request, client_ip_attribute) + + def get_client_user_agent(request: HttpRequest) -> str: return request.META.get('HTTP_USER_AGENT', '')[:255] -def get_credentials(username: str = None, **kwargs): - credentials = {settings.AXES_USERNAME_FORM_FIELD: username} - credentials.update(kwargs) - return credentials +def get_client_path_info(request: HttpRequest) -> str: + return request.META.get('PATH_INFO', '')[:255] -def get_cool_off() -> Optional[timedelta]: +def get_client_http_accept(request: HttpRequest) -> str: + return request.META.get('HTTP_ACCEPT', '')[:1025] + + +def get_client_parameters(username: str, ip_address: str, user_agent: str) -> OrderedDict: """ - Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME. + Get query parameters for filtering AccessAttempt queryset. - The return value is either None or timedelta. - - Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, or integer of hours, - and this function offers a unified _timedelta or None_ representation of that configuration - for use with the Axes internal implementations. - - :exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type. + This method returns an OrderedDict that guarantees iteration order for keys and values, + and can so be used in e.g. the generation of hash keys or other deterministic functions. """ - cool_off = settings.AXES_COOLOFF_TIME + filter_kwargs = OrderedDict() # type: OrderedDict - if isinstance(cool_off, int): - return timedelta(hours=cool_off) - return cool_off + if settings.AXES_ONLY_USER_FAILURES: + # 1. Only individual usernames can be tracked with parametrization + filter_kwargs['username'] = username + else: + if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP: + # 2. A combination of username and IP address can be used as well + filter_kwargs['username'] = username + filter_kwargs['ip_address'] = ip_address + else: + # 3. Default case is to track the IP address only, which is the most secure option + filter_kwargs['ip_address'] = ip_address + + if settings.AXES_USE_USER_AGENT: + # 4. The HTTP User-Agent can be used to track e.g. one browser + filter_kwargs['user_agent'] = user_agent + + return filter_kwargs -def get_cache_timeout() -> Optional[int]: +def get_client_str(username: str, ip_address: str, user_agent: str, path_info: str) -> str: """ - Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME. + Get a readable string that can be used in e.g. logging to distinguish client requests. - The cache timeout can be either None if not configured or integer of seconds if configured. - - Notice that the settings.AXES_COOLOFF_TIME can be None, timedelta, or integer of hours, - and this function offers a unified _integer or None_ representation of that configuration - for use with the Django cache backends. + Example log format would be ``{username: "example", ip_address: "127.0.0.1", path_info: "/example/"}`` """ - cool_off = get_cool_off() - if cool_off is None: - return None - return int(cool_off.total_seconds()) + if settings.AXES_VERBOSE: + # Verbose mode logs every attribute that is given + client_dict = OrderedDict() + client_dict['username'] = username + client_dict['ip_address'] = ip_address + client_dict['user_agent'] = user_agent + else: + # Other modes initialize the attributes that are used for the actual lockouts + client_dict = get_client_parameters(username, ip_address, user_agent) + # Path info is always included as last component in the client string for traceability purposes + if path_info and isinstance(path_info, (tuple, list)): + path_info = path_info[0] + client_dict['path_info'] = path_info -def is_ipv6(ip: str): - try: - inet_pton(AF_INET6, ip) - except (OSError, error): - return False - return True - - -def reset(ip: str = None, username: str = None): - """ - Reset records that match IP or username, and return the count of removed attempts. - """ - - attempts = AccessAttempt.objects.all() - if ip: - attempts = attempts.filter(ip_address=ip) - if username: - attempts = attempts.filter(username=username) - - count, _ = attempts.delete() - - return count - - -def iso8601(delta: timedelta) -> str: - """ - Return datetime.timedelta translated to ISO 8601 formatted duration. - """ - - seconds = delta.total_seconds() - minutes, seconds = divmod(seconds, 60) - hours, minutes = divmod(minutes, 60) - days, hours = divmod(hours, 24) - - date = '{:.0f}D'.format(days) if days else '' - - time_values = hours, minutes, seconds - time_designators = 'H', 'M', 'S' - - time = ''.join([ - ('{:.0f}'.format(value) + designator) - for value, designator in zip(time_values, time_designators) - if value] + # Template the internal dictionary representation into a readable and concatenated key: "value" format + template = ', '.join( + '{key}: "{value}"'.format(key=key, value=value) + for key, value + in client_dict.items() ) - return 'P' + date + ('T' + time if time else '') + + # Wrap the internal dict into a single {key: "value"} bracing in the output + # which requires double braces when done with the Python string templating system + # i.e. {{key: "value"}} becomes {key: "value"} when run through a .format() call + template = '{{' + template + '}}' + + return template.format(client_dict) + + +def get_query_str(query: dict, max_length: int = 1024) -> str: + """ + Turns a query dictionary into an easy-to-read list of key-value pairs. + + If a field is called either ``'password'`` or ``settings.AXES_PASSWORD_FORM_FIELD`` it will be excluded. + + The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads. + """ + + query_dict = query.copy() + query_dict.pop('password', None) + query_dict.pop(settings.AXES_PASSWORD_FORM_FIELD, None) + + query_str = '\n'.join( + '{key}={value}'.format(key=key, value=value) + for key, value + in query_dict.items() + ) + + return query_str[:max_length] def get_lockout_message() -> str: @@ -202,23 +287,19 @@ def get_lockout_message() -> str: return settings.AXES_PERMALOCK_MESSAGE -def get_lockout_response(request: HttpRequest) -> HttpResponse: +def get_lockout_response(request: HttpRequest, credentials: dict = None) -> HttpResponse: + status = 403 context = { 'failure_limit': settings.AXES_FAILURE_LIMIT, - 'username': get_client_username(request) or '' + 'username': get_client_username(request, credentials) or '' } - cool_off = settings.AXES_COOLOFF_TIME + cool_off = get_cool_off() if cool_off: - if isinstance(cool_off, (int, float)): - cool_off = timedelta(hours=cool_off) - context.update({ - 'cooloff_time': iso8601(cool_off) + 'cool_off': get_cool_off_iso8601(cool_off) }) - status = 403 - if request.is_ajax(): return JsonResponse( context, @@ -234,6 +315,11 @@ def get_lockout_response(request: HttpRequest) -> HttpResponse: ) if settings.AXES_LOCKOUT_URL: - return HttpResponseRedirect(settings.AXES_LOCKOUT_URL) + return HttpResponseRedirect( + settings.AXES_LOCKOUT_URL, + ) - return HttpResponse(get_lockout_message(), status=status) + return HttpResponse( + get_lockout_message(), + status=status, + )