From a4c4ba6fb73d3dc5a07fed499bc051172f21a3c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Aleksi=20Ha=CC=88kli?= Date: Sat, 16 Feb 2019 19:05:59 +0200 Subject: [PATCH] Refactor utils and attempts internal API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Clean up internal implementations and tests while keeping the APIs as similar as possible where feasible. The goal of this change is to not change any documented or stable APIs that might be in use by users, but to improve the internal implementations for maintainability and usability. Signed-off-by: Aleksi Häkli --- axes/attempts.py | 70 +++---- axes/handlers.py | 47 +++-- axes/tests/test_access_attempt.py | 29 ++- axes/tests/test_handlers.py | 2 +- axes/tests/test_utils.py | 71 ++++--- axes/utils.py | 326 +++++++++++++++++++----------- 6 files changed, 313 insertions(+), 232 deletions(-) diff --git a/axes/attempts.py b/axes/attempts.py index d76c50b..a5a7993 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -1,6 +1,6 @@ -from collections import OrderedDict from hashlib import md5 from logging import getLogger +from typing import Union from django.contrib.auth import get_user_model from django.db.models import QuerySet @@ -11,56 +11,18 @@ from axes.conf import settings from axes.models import AccessAttempt from axes.utils import ( get_axes_cache, - get_client_ip, + get_client_ip_address, get_client_username, get_client_user_agent, get_cache_timeout, get_cool_off, + get_client_parameters, ) log = getLogger(settings.AXES_LOGGER) -def get_filter_kwargs(username: str, ip_address: str, user_agent: str) -> OrderedDict: - """ - Get query parameters for filtering AccessAttempt queryset. - - This method returns an OrderedDict that guarantees iteration order for keys and values, - and can so be used in e.g. the generation of hash keys or other deterministic functions. - """ - - query = OrderedDict() # type: OrderedDict - - if settings.AXES_ONLY_USER_FAILURES: - query['username'] = username - else: - if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP: - query['username'] = username - query['ip_address'] = ip_address - else: - query['ip_address'] = ip_address - - if settings.AXES_USE_USER_AGENT: - query['user_agent'] = user_agent - - return query - - -def filter_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet: - """ - Return a queryset of AccessAttempts that match the given request and credentials. - """ - - username = get_client_username(request, credentials) - ip_address = get_client_ip(request) - user_agent = get_client_user_agent(request) - - filter_kwargs = get_filter_kwargs(username, ip_address, user_agent) - - return AccessAttempt.objects.filter(**filter_kwargs) - - -def get_cache_key(request_or_attempt, credentials: dict = None) -> str: +def get_cache_key(request_or_attempt: Union[HttpRequest, AccessAttempt], credentials: dict = None) -> str: """ Build cache key name from request or AccessAttempt object. @@ -75,10 +37,10 @@ def get_cache_key(request_or_attempt, credentials: dict = None) -> str: user_agent = request_or_attempt.user_agent else: username = get_client_username(request_or_attempt, credentials) - ip_address = get_client_ip(request_or_attempt) + ip_address = get_client_ip_address(request_or_attempt) user_agent = get_client_user_agent(request_or_attempt) - filter_kwargs = get_filter_kwargs(username, ip_address, user_agent) + filter_kwargs = get_client_parameters(username, ip_address, user_agent) cache_key_components = ''.join(filter_kwargs.values()) cache_key_digest = md5(cache_key_components.encode()).hexdigest() @@ -87,6 +49,20 @@ def get_cache_key(request_or_attempt, credentials: dict = None) -> str: return cache_key +def filter_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet: + """ + Return a queryset of AccessAttempts that match the given request and credentials. + """ + + username = get_client_username(request, credentials) + ip_address = get_client_ip_address(request) + user_agent = get_client_user_agent(request) + + filter_kwargs = get_client_parameters(username, ip_address, user_agent) + + return AccessAttempt.objects.filter(**filter_kwargs) + + def get_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet: """ Get valid user attempts and delete expired attempts which have cool offs in the past. @@ -119,6 +95,10 @@ def get_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySe def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int: + """ + Reset all user attempts that match the given request and credentials. + """ + attempts = filter_user_attempts(request, credentials) count, _ = attempts.delete() @@ -144,7 +124,7 @@ def is_ip_blacklisted(request: HttpRequest) -> bool: Check if the given request refers to a blacklisted IP. """ - ip = get_client_ip(request) + ip = get_client_ip_address(request) if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip): return False diff --git a/axes/handlers.py b/axes/handlers.py index 4d566ba..3a707f7 100644 --- a/axes/handlers.py +++ b/axes/handlers.py @@ -12,9 +12,17 @@ from axes.conf import settings from axes.exceptions import AxesSignalPermissionDenied from axes.models import AccessLog, AccessAttempt from axes.signals import user_locked_out -from axes.utils import get_client_str, get_client_user_agent -from axes.utils import query2str -from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials +from axes.utils import ( + get_axes_cache, + get_client_ip_address, + get_client_path_info, + get_client_http_accept, + get_client_str, + get_client_username, + get_client_user_agent, + get_credentials, + get_query_str, +) log = getLogger(settings.AXES_LOGGER) @@ -37,10 +45,10 @@ class AxesHandler: # pylint: disable=too-many-locals return username = get_client_username(request, credentials) - ip_address = get_client_ip(request) + ip_address = get_client_ip_address(request) user_agent = get_client_user_agent(request) - path_info = request.META.get('PATH_INFO', '')[:255] - http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] + path_info = get_client_path_info(request) + http_accept = get_client_http_accept(request) client_str = get_client_str(username, ip_address, user_agent, path_info) if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address): @@ -76,11 +84,11 @@ class AxesHandler: # pylint: disable=too-many-locals attempt.get_data = template.format( attempt.get_data, - query2str(request.GET), + get_query_str(request.GET), ) attempt.post_data = template.format( attempt.post_data, - query2str(request.POST) + get_query_str(request.POST) ) attempt.http_accept = http_accept attempt.path_info = path_info @@ -96,14 +104,14 @@ class AxesHandler: # pylint: disable=too-many-locals ) else: # Record failed attempt. Whether or not the IP address or user agent is - # used in counting failures is handled elsewhere, so we just record # everything here. + # used in counting failures is handled elsewhere, so we just record everything here. AccessAttempt.objects.create( username=username, ip_address=ip_address, user_agent=user_agent, - get_data=query2str(request.GET), - post_data=query2str(request.POST), + get_data=get_query_str(request.GET), + post_data=get_query_str(request.POST), http_accept=http_accept, path_info=path_info, failures_since_start=failures, @@ -136,10 +144,10 @@ class AxesHandler: # pylint: disable=too-many-locals username = user.get_username() credentials = get_credentials(username) - ip_address = get_client_ip(request) + ip_address = get_client_ip_address(request) user_agent = get_client_user_agent(request) - path_info = request.META.get('PATH_INFO', '')[:255] - http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] + path_info = get_client_path_info(request) + http_accept = get_client_http_accept(request) client_str = get_client_str(username, ip_address, user_agent, path_info) log.info( @@ -170,7 +178,16 @@ class AxesHandler: # pylint: disable=too-many-locals When user logs out, update the AccessLog related to the user. """ - log.info('AXES: Successful logout by %s.', user) + username = user.get_username() + ip_address = get_client_ip_address(request) + user_agent = get_client_user_agent(request) + path_info = get_client_path_info(request) + client_str = get_client_str(username, ip_address, user_agent, path_info) + + log.info( + 'AXES: Successful logout by %s.', + client_str, + ) if user and not settings.AXES_DISABLE_ACCESS_LOG: AccessLog.objects.filter( diff --git a/axes/tests/test_access_attempt.py b/axes/tests/test_access_attempt.py index 3462974..1911d4f 100644 --- a/axes/tests/test_access_attempt.py +++ b/axes/tests/test_access_attempt.py @@ -19,13 +19,12 @@ from axes.attempts import ( ip_in_blacklist, ip_in_whitelist, is_user_lockable, - get_filter_kwargs, + get_client_parameters, get_user_attempts, ) from axes.conf import settings from axes.models import AccessAttempt, AccessLog from axes.signals import user_locked_out -from axes.utils import reset class AccessAttemptTest(TestCase): @@ -229,7 +228,7 @@ class AccessAttemptTest(TestCase): self.test_failure_limit_once() # Reset the ip so we can try again - reset(ip='127.0.0.1') + AccessAttempt.objects.filter(ip_address='127.0.0.1').delete() # Make a login attempt again self.test_valid_login() @@ -243,7 +242,7 @@ class AccessAttemptTest(TestCase): self.test_failure_limit_once() # Reset all attempts so we can try again - reset() + AccessAttempt.objects.all().delete() # Make a login attempt again self.test_valid_login() @@ -253,7 +252,7 @@ class AccessAttemptTest(TestCase): ) def test_get_filter_kwargs_user(self): self.assertEqual( - dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)), + dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), {'username': self.username}, ) @@ -264,7 +263,7 @@ class AccessAttemptTest(TestCase): ) def test_get_filter_kwargs_ip(self): self.assertEqual( - dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)), + dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), {'ip_address': self.ip_address}, ) @@ -275,7 +274,7 @@ class AccessAttemptTest(TestCase): ) def test_get_filter_kwargs_user_and_ip(self): self.assertEqual( - dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)), + dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), {'username': self.username, 'ip_address': self.ip_address}, ) @@ -286,7 +285,7 @@ class AccessAttemptTest(TestCase): ) def test_get_filter_kwargs_ip_and_agent(self): self.assertEqual( - dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)), + dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), {'ip_address': self.ip_address, 'user_agent': self.user_agent}, ) @@ -297,11 +296,11 @@ class AccessAttemptTest(TestCase): ) def test_get_filter_kwargs_user_ip_agent(self): self.assertEqual( - dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)), + dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), {'username': self.username, 'ip_address': self.ip_address, 'user_agent': self.user_agent}, ) - @patch('axes.utils.get_client_ip', return_value='127.0.0.1') + @patch('axes.utils.get_client_ip_address', return_value='127.0.0.1') def test_get_cache_key(self, _): """ Test the cache key format. @@ -338,7 +337,7 @@ class AccessAttemptTest(TestCase): self.assertEqual(cache_hash_key, get_cache_key(attempt)) - @patch('axes.utils.get_client_ip', return_value='127.0.0.1') + @patch('axes.utils.get_client_ip_address', return_value='127.0.0.1') def test_get_cache_key_credentials(self, _): """ Test the cache key format. @@ -398,7 +397,7 @@ class AccessAttemptTest(TestCase): self.test_failure_limit_once() self.assertEqual(scope.signal_received, 1) - reset() + AccessAttempt.objects.all().delete() # Make another lockout self.test_failure_limit_once() @@ -446,7 +445,7 @@ class AccessAttemptTest(TestCase): # reset the username only and make sure we can log in now even though # our IP has failed each time - reset(username=AccessAttemptTest.VALID_USERNAME) + AccessAttempt.objects.filter(username=AccessAttemptTest.VALID_USERNAME).delete() response = self._login( is_valid_username=True, is_valid_password=True, @@ -461,13 +460,13 @@ class AccessAttemptTest(TestCase): is_valid_username=False, is_valid_password=False, ) - # Check if we can still log in with valid user + # Check if we can still initialize in with valid user response = self._login(is_valid_username=True, is_valid_password=True) self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True) def test_log_data_truncated(self): """ - Test that query2str properly truncates data to the max_length (default 1024). + Test that get_query_str properly truncates data to the max_length (default 1024). """ # An impossibly large post dict diff --git a/axes/tests/test_handlers.py b/axes/tests/test_handlers.py index 6d05181..d89d78b 100644 --- a/axes/tests/test_handlers.py +++ b/axes/tests/test_handlers.py @@ -89,7 +89,7 @@ class AxesHandlerTestCase(TestCase): log.warning.assert_called_with('AxesHandler.user_login_failed does not function without a request.') @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=['127.0.0.1']) - @patch('axes.handlers.get_client_ip', return_value='127.0.0.1') + @patch('axes.handlers.get_client_ip_address', return_value='127.0.0.1') @patch('axes.handlers.ip_in_whitelist', return_value=True) @patch('axes.handlers.log') def test_user_login_failed_whitelist(self, log, _, __): diff --git a/axes/tests/test_utils.py b/axes/tests/test_utils.py index 2b5f4d3..9f661ec 100644 --- a/axes/tests/test_utils.py +++ b/axes/tests/test_utils.py @@ -5,11 +5,15 @@ from django.http import HttpRequest, JsonResponse, HttpResponseRedirect, HttpRes from django.test import TestCase, override_settings from axes import get_version -from axes.utils import iso8601, is_ipv6, get_client_str, get_client_username, get_lockout_response +from axes.utils import get_cool_off_iso8601, get_client_str, get_client_username, get_lockout_response + + +def get_username(request: HttpRequest, credentials: dict) -> str: + return 'username' def get_expected_client_str(*args, **kwargs): - client_str_template = '{{user: "{0}", ip: "{1}", user-agent: "{2}", path: "{3}"}}' + client_str_template = '{{username: "{0}", ip_address: "{1}", user_agent: "{2}", path_info: "{3}"}}' return client_str_template.format(*args, **kwargs) @@ -22,7 +26,7 @@ class AxesTestCase(TestCase): class UtilsTestCase(TestCase): def test_iso8601(self): """ - Test iso8601 correctly translates datetime.timdelta to ISO 8601 formatted duration. + Test get_cool_off_iso8601 correctly translates datetime.timdelta to ISO 8601 formatted duration. """ expected = { @@ -46,46 +50,41 @@ class UtilsTestCase(TestCase): for delta, iso_duration in expected.items(): with self.subTest(iso_duration): - self.assertEqual(iso8601(delta), iso_duration) - - def test_is_ipv6(self): - self.assertTrue(is_ipv6('ff80::220:16ff:fec9:1')) - self.assertFalse(is_ipv6('67.255.125.204')) - self.assertFalse(is_ipv6('foo')) + self.assertEqual(get_cool_off_iso8601(delta), iso_duration) @override_settings(AXES_VERBOSE=True) def test_verbose_ip_only_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = get_expected_client_str(username, ip, user_agent, path_info) - actual = get_client_str(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip_address, user_agent, path_info) + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @override_settings(AXES_VERBOSE=True) def test_verbose_ip_only_client_details_tuple(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = ('admin', 'login') - expected = get_expected_client_str(username, ip, user_agent, path_info[0]) - actual = get_client_str(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip_address, user_agent, path_info[0]) + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @override_settings(AXES_VERBOSE=False) def test_non_verbose_ip_only_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = ip - actual = get_client_str(username, ip, user_agent, path_info) + expected = '{ip_address: "127.0.0.1", path_info: "/admin/"}' + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -93,12 +92,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=True) def test_verbose_user_only_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = get_expected_client_str(username, ip, user_agent, path_info) - actual = get_client_str(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip_address, user_agent, path_info) + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -106,12 +105,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=False) def test_non_verbose_user_only_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = username - actual = get_client_str(username, ip, user_agent, path_info) + expected = '{username: "test@example.com", path_info: "/admin/"}' + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -119,12 +118,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=True) def test_verbose_user_ip_combo_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = get_expected_client_str(username, ip, user_agent, path_info) - actual = get_client_str(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip_address, user_agent, path_info) + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -132,12 +131,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=False) def test_non_verbose_user_ip_combo_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = '{0} from {1}'.format(username, ip) - actual = get_client_str(username, ip, user_agent, path_info) + expected = '{username: "test@example.com", ip_address: "127.0.0.1", path_info: "/admin/"}' + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -145,12 +144,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=True) def test_verbose_user_agent_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = get_expected_client_str(username, ip, user_agent, path_info) - actual = get_client_str(username, ip, user_agent, path_info) + expected = get_expected_client_str(username, ip_address, user_agent, path_info) + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -158,12 +157,12 @@ class UtilsTestCase(TestCase): @override_settings(AXES_VERBOSE=False) def test_non_verbose_user_agent_client_details(self): username = 'test@example.com' - ip = '127.0.0.1' + ip_address = '127.0.0.1' user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' path_info = '/admin/' - expected = ip + '(user-agent={0})'.format(user_agent) - actual = get_client_str(username, ip, user_agent, path_info) + expected = '{ip_address: "127.0.0.1", user_agent: "Googlebot/2.1 (+http://www.googlebot.com/bot.html)", path_info: "/admin/"}' + actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) diff --git a/axes/utils.py b/axes/utils.py index 1fc4b7f..666b4c0 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -1,7 +1,7 @@ -from typing import Optional +from collections import OrderedDict from datetime import timedelta from logging import getLogger -from socket import error, inet_pton, AF_INET6 +from typing import Optional from django.core.cache import caches, BaseCache from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, JsonResponse @@ -9,6 +9,7 @@ from django.shortcuts import render from django.utils.module_loading import import_string import ipware.ip2 +from django.utils.module_loading import import_string from axes.conf import settings from axes.models import AccessAttempt @@ -16,59 +17,109 @@ from axes.models import AccessAttempt logger = getLogger(__name__) +def reset(ip: str = None, username: str = None) -> int: + """ + Reset records that match IP or username, and return the count of removed attempts. + """ + + attempts = AccessAttempt.objects.all() + if ip: + attempts = attempts.filter(ip_address=ip) + if username: + attempts = attempts.filter(username=username) + + count, _ = attempts.delete() + + return count + + def get_axes_cache() -> BaseCache: + """ + Get the cache instance Axes is configured to use with ``settings.AXES_CACHE`` and use ``'default'`` if not set. + """ + return caches[getattr(settings, 'AXES_CACHE', 'default')] -def query2str(dictionary: dict, max_length: int = 1024) -> str: +def get_cache_timeout() -> Optional[int]: """ - Turns a dictionary into an easy-to-read list of key-value pairs. + Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME. - If there is a field called password it will be excluded from the output. + The cache timeout can be either None if not configured or integer of seconds if configured. - The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads. + Notice that the settings.AXES_COOLOFF_TIME can be None, timedelta, or integer of hours, + and this function offers a unified _integer or None_ representation of that configuration + for use with the Django cache backends. """ - return '\n'.join([ - '%s=%s' % (k, v) for k, v in dictionary.items() - if k != settings.AXES_PASSWORD_FORM_FIELD - ][:int(max_length / 2)])[:max_length] + cool_off = get_cool_off() + if cool_off is None: + return None + return int(cool_off.total_seconds()) -def get_client_str(username, ip_address, user_agent=None, path_info=None): - if settings.AXES_VERBOSE: - if path_info and isinstance(path_info, tuple): - path_info = path_info[0] +def get_cool_off() -> Optional[timedelta]: + """ + Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME. - details = '{{user: "{0}", ip: "{1}", user-agent: "{2}", path: "{3}"}}' - return details.format(username, ip_address, user_agent, path_info) + The return value is either None or timedelta. - if settings.AXES_ONLY_USER_FAILURES: - client = username - elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP: - client = '{0} from {1}'.format(username, ip_address) + Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, or integer of hours, + and this function offers a unified _timedelta or None_ representation of that configuration + for use with the Axes internal implementations. + + :exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type. + """ + + cool_off = settings.AXES_COOLOFF_TIME + + if isinstance(cool_off, int): + return timedelta(hours=cool_off) + return cool_off + + +def get_cool_off_iso8601(delta: timedelta) -> str: + """ + Return datetime.timedelta translated to ISO 8601 formatted duration for use in e.g. cool offs. + """ + + seconds = delta.total_seconds() + minutes, seconds = divmod(seconds, 60) + hours, minutes = divmod(minutes, 60) + days, hours = divmod(hours, 24) + + days_str = '{:.0f}D'.format(days) if days else '' + + time_str = ''.join( + '{value:.0f}{designator}'.format(value=value, designator=designator) + for value, designator + in [ + [hours, 'H'], + [minutes, 'M'], + [seconds, 'S'], + ] + if value + ) + + if time_str: + template = 'P{days_str}T{time_str}' else: - client = ip_address + template = 'P{days_str}' - if settings.AXES_USE_USER_AGENT: - client += '(user-agent={0})'.format(user_agent) - - return client + return template.format(days_str=days_str, time_str=time_str) -def get_client_ip(request: HttpRequest) -> str: - client_ip_attribute = 'axes_client_ip' +def get_credentials(username: str = None, **kwargs) -> dict: + """ + Calculate credentials for Axes to use internally from given username and kwargs. - if not hasattr(request, client_ip_attribute): - client_ip, _ = ipware.ip2.get_client_ip( - request, - proxy_order=settings.AXES_PROXY_ORDER, - proxy_count=settings.AXES_PROXY_COUNT, - proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS, - request_header_order=settings.AXES_META_PRECEDENCE_ORDER, - ) - setattr(request, client_ip_attribute, client_ip) - return getattr(request, client_ip_attribute) + Axes will set the username value into the key defined with ``settings.AXES_USERNAME_FORM_FIELD`` + and update the credentials dictionary with the kwargs given on top of that. + """ + + credentials = {settings.AXES_USERNAME_FORM_FIELD: username} + credentials.update(kwargs) + return credentials def get_client_username(request: HttpRequest, credentials: dict = None) -> str: @@ -102,98 +153,132 @@ def get_client_username(request: HttpRequest, credentials: dict = None) -> str: return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) +def get_client_ip_address(request: HttpRequest) -> str: + """ + Get client IP address as configured by the user. + + The primary method is to fetch the reques attribute configured with the + ``settings.AXES_IP_ATTRIBUTE`` that can be set by the user on the view layer. + + If the ``settings.AXES_IP_ATTRIBUTE`` is not set, the ``ipware`` package will be utilized to resolve the address + according to the client IP configurations that can be defined by the user to suit the reverse proxy configuration + that is used in the users HTTP proxy or *aaS service layers. Refer to the documentation for more information. + """ + + client_ip_attribute = 'axes_client_ip' + + if not hasattr(request, client_ip_attribute): + client_ip, _ = ipware.ip2.get_client_ip( + request, + proxy_order=settings.AXES_PROXY_ORDER, + proxy_count=settings.AXES_PROXY_COUNT, + proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS, + request_header_order=settings.AXES_META_PRECEDENCE_ORDER, + ) + + setattr(request, client_ip_attribute, client_ip) + + return getattr(request, client_ip_attribute) + + def get_client_user_agent(request: HttpRequest) -> str: return request.META.get('HTTP_USER_AGENT', '')[:255] -def get_credentials(username: str = None, **kwargs): - credentials = {settings.AXES_USERNAME_FORM_FIELD: username} - credentials.update(kwargs) - return credentials +def get_client_path_info(request: HttpRequest) -> str: + return request.META.get('PATH_INFO', '')[:255] -def get_cool_off() -> Optional[timedelta]: +def get_client_http_accept(request: HttpRequest) -> str: + return request.META.get('HTTP_ACCEPT', '')[:1025] + + +def get_client_parameters(username: str, ip_address: str, user_agent: str) -> OrderedDict: """ - Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME. + Get query parameters for filtering AccessAttempt queryset. - The return value is either None or timedelta. - - Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, or integer of hours, - and this function offers a unified _timedelta or None_ representation of that configuration - for use with the Axes internal implementations. - - :exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type. + This method returns an OrderedDict that guarantees iteration order for keys and values, + and can so be used in e.g. the generation of hash keys or other deterministic functions. """ - cool_off = settings.AXES_COOLOFF_TIME + filter_kwargs = OrderedDict() # type: OrderedDict - if isinstance(cool_off, int): - return timedelta(hours=cool_off) - return cool_off + if settings.AXES_ONLY_USER_FAILURES: + # 1. Only individual usernames can be tracked with parametrization + filter_kwargs['username'] = username + else: + if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP: + # 2. A combination of username and IP address can be used as well + filter_kwargs['username'] = username + filter_kwargs['ip_address'] = ip_address + else: + # 3. Default case is to track the IP address only, which is the most secure option + filter_kwargs['ip_address'] = ip_address + + if settings.AXES_USE_USER_AGENT: + # 4. The HTTP User-Agent can be used to track e.g. one browser + filter_kwargs['user_agent'] = user_agent + + return filter_kwargs -def get_cache_timeout() -> Optional[int]: +def get_client_str(username: str, ip_address: str, user_agent: str, path_info: str) -> str: """ - Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME. + Get a readable string that can be used in e.g. logging to distinguish client requests. - The cache timeout can be either None if not configured or integer of seconds if configured. - - Notice that the settings.AXES_COOLOFF_TIME can be None, timedelta, or integer of hours, - and this function offers a unified _integer or None_ representation of that configuration - for use with the Django cache backends. + Example log format would be ``{username: "example", ip_address: "127.0.0.1", path_info: "/example/"}`` """ - cool_off = get_cool_off() - if cool_off is None: - return None - return int(cool_off.total_seconds()) + if settings.AXES_VERBOSE: + # Verbose mode logs every attribute that is given + client_dict = OrderedDict() + client_dict['username'] = username + client_dict['ip_address'] = ip_address + client_dict['user_agent'] = user_agent + else: + # Other modes initialize the attributes that are used for the actual lockouts + client_dict = get_client_parameters(username, ip_address, user_agent) + # Path info is always included as last component in the client string for traceability purposes + if path_info and isinstance(path_info, (tuple, list)): + path_info = path_info[0] + client_dict['path_info'] = path_info -def is_ipv6(ip: str): - try: - inet_pton(AF_INET6, ip) - except (OSError, error): - return False - return True - - -def reset(ip: str = None, username: str = None): - """ - Reset records that match IP or username, and return the count of removed attempts. - """ - - attempts = AccessAttempt.objects.all() - if ip: - attempts = attempts.filter(ip_address=ip) - if username: - attempts = attempts.filter(username=username) - - count, _ = attempts.delete() - - return count - - -def iso8601(delta: timedelta) -> str: - """ - Return datetime.timedelta translated to ISO 8601 formatted duration. - """ - - seconds = delta.total_seconds() - minutes, seconds = divmod(seconds, 60) - hours, minutes = divmod(minutes, 60) - days, hours = divmod(hours, 24) - - date = '{:.0f}D'.format(days) if days else '' - - time_values = hours, minutes, seconds - time_designators = 'H', 'M', 'S' - - time = ''.join([ - ('{:.0f}'.format(value) + designator) - for value, designator in zip(time_values, time_designators) - if value] + # Template the internal dictionary representation into a readable and concatenated key: "value" format + template = ', '.join( + '{key}: "{value}"'.format(key=key, value=value) + for key, value + in client_dict.items() ) - return 'P' + date + ('T' + time if time else '') + + # Wrap the internal dict into a single {key: "value"} bracing in the output + # which requires double braces when done with the Python string templating system + # i.e. {{key: "value"}} becomes {key: "value"} when run through a .format() call + template = '{{' + template + '}}' + + return template.format(client_dict) + + +def get_query_str(query: dict, max_length: int = 1024) -> str: + """ + Turns a query dictionary into an easy-to-read list of key-value pairs. + + If a field is called either ``'password'`` or ``settings.AXES_PASSWORD_FORM_FIELD`` it will be excluded. + + The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads. + """ + + query_dict = query.copy() + query_dict.pop('password', None) + query_dict.pop(settings.AXES_PASSWORD_FORM_FIELD, None) + + query_str = '\n'.join( + '{key}={value}'.format(key=key, value=value) + for key, value + in query_dict.items() + ) + + return query_str[:max_length] def get_lockout_message() -> str: @@ -202,23 +287,19 @@ def get_lockout_message() -> str: return settings.AXES_PERMALOCK_MESSAGE -def get_lockout_response(request: HttpRequest) -> HttpResponse: +def get_lockout_response(request: HttpRequest, credentials: dict = None) -> HttpResponse: + status = 403 context = { 'failure_limit': settings.AXES_FAILURE_LIMIT, - 'username': get_client_username(request) or '' + 'username': get_client_username(request, credentials) or '' } - cool_off = settings.AXES_COOLOFF_TIME + cool_off = get_cool_off() if cool_off: - if isinstance(cool_off, (int, float)): - cool_off = timedelta(hours=cool_off) - context.update({ - 'cooloff_time': iso8601(cool_off) + 'cool_off': get_cool_off_iso8601(cool_off) }) - status = 403 - if request.is_ajax(): return JsonResponse( context, @@ -234,6 +315,11 @@ def get_lockout_response(request: HttpRequest) -> HttpResponse: ) if settings.AXES_LOCKOUT_URL: - return HttpResponseRedirect(settings.AXES_LOCKOUT_URL) + return HttpResponseRedirect( + settings.AXES_LOCKOUT_URL, + ) - return HttpResponse(get_lockout_message(), status=status) + return HttpResponse( + get_lockout_message(), + status=status, + )