Refactor utils and attempts internal API

Clean up internal implementations and tests while keeping the
APIs as similar as possible where feasible.

The goal of this change is to not change any documented
or stable APIs that might be in use by users, but to improve
the internal implementations for maintainability and usability.

Signed-off-by: Aleksi Häkli <aleksi.hakli@iki.fi>
This commit is contained in:
Aleksi Häkli 2019-02-16 19:05:59 +02:00 committed by Aleksi Häkli
parent bc24d12975
commit a4c4ba6fb7
No known key found for this signature in database
GPG key ID: 3E7146964D726BBE
6 changed files with 313 additions and 232 deletions

View file

@ -1,6 +1,6 @@
from collections import OrderedDict
from hashlib import md5
from logging import getLogger
from typing import Union
from django.contrib.auth import get_user_model
from django.db.models import QuerySet
@ -11,56 +11,18 @@ from axes.conf import settings
from axes.models import AccessAttempt
from axes.utils import (
get_axes_cache,
get_client_ip,
get_client_ip_address,
get_client_username,
get_client_user_agent,
get_cache_timeout,
get_cool_off,
get_client_parameters,
)
log = getLogger(settings.AXES_LOGGER)
def get_filter_kwargs(username: str, ip_address: str, user_agent: str) -> OrderedDict:
"""
Get query parameters for filtering AccessAttempt queryset.
This method returns an OrderedDict that guarantees iteration order for keys and values,
and can so be used in e.g. the generation of hash keys or other deterministic functions.
"""
query = OrderedDict() # type: OrderedDict
if settings.AXES_ONLY_USER_FAILURES:
query['username'] = username
else:
if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
query['username'] = username
query['ip_address'] = ip_address
else:
query['ip_address'] = ip_address
if settings.AXES_USE_USER_AGENT:
query['user_agent'] = user_agent
return query
def filter_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet:
"""
Return a queryset of AccessAttempts that match the given request and credentials.
"""
username = get_client_username(request, credentials)
ip_address = get_client_ip(request)
user_agent = get_client_user_agent(request)
filter_kwargs = get_filter_kwargs(username, ip_address, user_agent)
return AccessAttempt.objects.filter(**filter_kwargs)
def get_cache_key(request_or_attempt, credentials: dict = None) -> str:
def get_cache_key(request_or_attempt: Union[HttpRequest, AccessAttempt], credentials: dict = None) -> str:
"""
Build cache key name from request or AccessAttempt object.
@ -75,10 +37,10 @@ def get_cache_key(request_or_attempt, credentials: dict = None) -> str:
user_agent = request_or_attempt.user_agent
else:
username = get_client_username(request_or_attempt, credentials)
ip_address = get_client_ip(request_or_attempt)
ip_address = get_client_ip_address(request_or_attempt)
user_agent = get_client_user_agent(request_or_attempt)
filter_kwargs = get_filter_kwargs(username, ip_address, user_agent)
filter_kwargs = get_client_parameters(username, ip_address, user_agent)
cache_key_components = ''.join(filter_kwargs.values())
cache_key_digest = md5(cache_key_components.encode()).hexdigest()
@ -87,6 +49,20 @@ def get_cache_key(request_or_attempt, credentials: dict = None) -> str:
return cache_key
def filter_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet:
"""
Return a queryset of AccessAttempts that match the given request and credentials.
"""
username = get_client_username(request, credentials)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
filter_kwargs = get_client_parameters(username, ip_address, user_agent)
return AccessAttempt.objects.filter(**filter_kwargs)
def get_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySet:
"""
Get valid user attempts and delete expired attempts which have cool offs in the past.
@ -119,6 +95,10 @@ def get_user_attempts(request: HttpRequest, credentials: dict = None) -> QuerySe
def reset_user_attempts(request: HttpRequest, credentials: dict = None) -> int:
"""
Reset all user attempts that match the given request and credentials.
"""
attempts = filter_user_attempts(request, credentials)
count, _ = attempts.delete()
@ -144,7 +124,7 @@ def is_ip_blacklisted(request: HttpRequest) -> bool:
Check if the given request refers to a blacklisted IP.
"""
ip = get_client_ip(request)
ip = get_client_ip_address(request)
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip):
return False

View file

@ -12,9 +12,17 @@ from axes.conf import settings
from axes.exceptions import AxesSignalPermissionDenied
from axes.models import AccessLog, AccessAttempt
from axes.signals import user_locked_out
from axes.utils import get_client_str, get_client_user_agent
from axes.utils import query2str
from axes.utils import get_axes_cache, get_client_ip, get_client_username, get_credentials
from axes.utils import (
get_axes_cache,
get_client_ip_address,
get_client_path_info,
get_client_http_accept,
get_client_str,
get_client_username,
get_client_user_agent,
get_credentials,
get_query_str,
)
log = getLogger(settings.AXES_LOGGER)
@ -37,10 +45,10 @@ class AxesHandler: # pylint: disable=too-many-locals
return
username = get_client_username(request, credentials)
ip_address = get_client_ip(request)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
path_info = get_client_path_info(request)
http_accept = get_client_http_accept(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address):
@ -76,11 +84,11 @@ class AxesHandler: # pylint: disable=too-many-locals
attempt.get_data = template.format(
attempt.get_data,
query2str(request.GET),
get_query_str(request.GET),
)
attempt.post_data = template.format(
attempt.post_data,
query2str(request.POST)
get_query_str(request.POST)
)
attempt.http_accept = http_accept
attempt.path_info = path_info
@ -96,14 +104,14 @@ class AxesHandler: # pylint: disable=too-many-locals
)
else:
# Record failed attempt. Whether or not the IP address or user agent is
# used in counting failures is handled elsewhere, so we just record # everything here.
# used in counting failures is handled elsewhere, so we just record everything here.
AccessAttempt.objects.create(
username=username,
ip_address=ip_address,
user_agent=user_agent,
get_data=query2str(request.GET),
post_data=query2str(request.POST),
get_data=get_query_str(request.GET),
post_data=get_query_str(request.POST),
http_accept=http_accept,
path_info=path_info,
failures_since_start=failures,
@ -136,10 +144,10 @@ class AxesHandler: # pylint: disable=too-many-locals
username = user.get_username()
credentials = get_credentials(username)
ip_address = get_client_ip(request)
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
path_info = get_client_path_info(request)
http_accept = get_client_http_accept(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
log.info(
@ -170,7 +178,16 @@ class AxesHandler: # pylint: disable=too-many-locals
When user logs out, update the AccessLog related to the user.
"""
log.info('AXES: Successful logout by %s.', user)
username = user.get_username()
ip_address = get_client_ip_address(request)
user_agent = get_client_user_agent(request)
path_info = get_client_path_info(request)
client_str = get_client_str(username, ip_address, user_agent, path_info)
log.info(
'AXES: Successful logout by %s.',
client_str,
)
if user and not settings.AXES_DISABLE_ACCESS_LOG:
AccessLog.objects.filter(

View file

@ -19,13 +19,12 @@ from axes.attempts import (
ip_in_blacklist,
ip_in_whitelist,
is_user_lockable,
get_filter_kwargs,
get_client_parameters,
get_user_attempts,
)
from axes.conf import settings
from axes.models import AccessAttempt, AccessLog
from axes.signals import user_locked_out
from axes.utils import reset
class AccessAttemptTest(TestCase):
@ -229,7 +228,7 @@ class AccessAttemptTest(TestCase):
self.test_failure_limit_once()
# Reset the ip so we can try again
reset(ip='127.0.0.1')
AccessAttempt.objects.filter(ip_address='127.0.0.1').delete()
# Make a login attempt again
self.test_valid_login()
@ -243,7 +242,7 @@ class AccessAttemptTest(TestCase):
self.test_failure_limit_once()
# Reset all attempts so we can try again
reset()
AccessAttempt.objects.all().delete()
# Make a login attempt again
self.test_valid_login()
@ -253,7 +252,7 @@ class AccessAttemptTest(TestCase):
)
def test_get_filter_kwargs_user(self):
self.assertEqual(
dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)),
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'username': self.username},
)
@ -264,7 +263,7 @@ class AccessAttemptTest(TestCase):
)
def test_get_filter_kwargs_ip(self):
self.assertEqual(
dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)),
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'ip_address': self.ip_address},
)
@ -275,7 +274,7 @@ class AccessAttemptTest(TestCase):
)
def test_get_filter_kwargs_user_and_ip(self):
self.assertEqual(
dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)),
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'username': self.username, 'ip_address': self.ip_address},
)
@ -286,7 +285,7 @@ class AccessAttemptTest(TestCase):
)
def test_get_filter_kwargs_ip_and_agent(self):
self.assertEqual(
dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)),
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'ip_address': self.ip_address, 'user_agent': self.user_agent},
)
@ -297,11 +296,11 @@ class AccessAttemptTest(TestCase):
)
def test_get_filter_kwargs_user_ip_agent(self):
self.assertEqual(
dict(get_filter_kwargs(self.username, self.ip_address, self.user_agent)),
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'username': self.username, 'ip_address': self.ip_address, 'user_agent': self.user_agent},
)
@patch('axes.utils.get_client_ip', return_value='127.0.0.1')
@patch('axes.utils.get_client_ip_address', return_value='127.0.0.1')
def test_get_cache_key(self, _):
"""
Test the cache key format.
@ -338,7 +337,7 @@ class AccessAttemptTest(TestCase):
self.assertEqual(cache_hash_key, get_cache_key(attempt))
@patch('axes.utils.get_client_ip', return_value='127.0.0.1')
@patch('axes.utils.get_client_ip_address', return_value='127.0.0.1')
def test_get_cache_key_credentials(self, _):
"""
Test the cache key format.
@ -398,7 +397,7 @@ class AccessAttemptTest(TestCase):
self.test_failure_limit_once()
self.assertEqual(scope.signal_received, 1)
reset()
AccessAttempt.objects.all().delete()
# Make another lockout
self.test_failure_limit_once()
@ -446,7 +445,7 @@ class AccessAttemptTest(TestCase):
# reset the username only and make sure we can log in now even though
# our IP has failed each time
reset(username=AccessAttemptTest.VALID_USERNAME)
AccessAttempt.objects.filter(username=AccessAttemptTest.VALID_USERNAME).delete()
response = self._login(
is_valid_username=True,
is_valid_password=True,
@ -461,13 +460,13 @@ class AccessAttemptTest(TestCase):
is_valid_username=False,
is_valid_password=False,
)
# Check if we can still log in with valid user
# Check if we can still initialize in with valid user
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302, html=True)
def test_log_data_truncated(self):
"""
Test that query2str properly truncates data to the max_length (default 1024).
Test that get_query_str properly truncates data to the max_length (default 1024).
"""
# An impossibly large post dict

View file

@ -89,7 +89,7 @@ class AxesHandlerTestCase(TestCase):
log.warning.assert_called_with('AxesHandler.user_login_failed does not function without a request.')
@override_settings(AXES_NEVER_LOCKOUT_WHITELIST=['127.0.0.1'])
@patch('axes.handlers.get_client_ip', return_value='127.0.0.1')
@patch('axes.handlers.get_client_ip_address', return_value='127.0.0.1')
@patch('axes.handlers.ip_in_whitelist', return_value=True)
@patch('axes.handlers.log')
def test_user_login_failed_whitelist(self, log, _, __):

View file

@ -5,11 +5,15 @@ from django.http import HttpRequest, JsonResponse, HttpResponseRedirect, HttpRes
from django.test import TestCase, override_settings
from axes import get_version
from axes.utils import iso8601, is_ipv6, get_client_str, get_client_username, get_lockout_response
from axes.utils import get_cool_off_iso8601, get_client_str, get_client_username, get_lockout_response
def get_username(request: HttpRequest, credentials: dict) -> str:
return 'username'
def get_expected_client_str(*args, **kwargs):
client_str_template = '{{user: "{0}", ip: "{1}", user-agent: "{2}", path: "{3}"}}'
client_str_template = '{{username: "{0}", ip_address: "{1}", user_agent: "{2}", path_info: "{3}"}}'
return client_str_template.format(*args, **kwargs)
@ -22,7 +26,7 @@ class AxesTestCase(TestCase):
class UtilsTestCase(TestCase):
def test_iso8601(self):
"""
Test iso8601 correctly translates datetime.timdelta to ISO 8601 formatted duration.
Test get_cool_off_iso8601 correctly translates datetime.timdelta to ISO 8601 formatted duration.
"""
expected = {
@ -46,46 +50,41 @@ class UtilsTestCase(TestCase):
for delta, iso_duration in expected.items():
with self.subTest(iso_duration):
self.assertEqual(iso8601(delta), iso_duration)
def test_is_ipv6(self):
self.assertTrue(is_ipv6('ff80::220:16ff:fec9:1'))
self.assertFalse(is_ipv6('67.255.125.204'))
self.assertFalse(is_ipv6('foo'))
self.assertEqual(get_cool_off_iso8601(delta), iso_duration)
@override_settings(AXES_VERBOSE=True)
def test_verbose_ip_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = get_expected_client_str(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
expected = get_expected_client_str(username, ip_address, user_agent, path_info)
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@override_settings(AXES_VERBOSE=True)
def test_verbose_ip_only_client_details_tuple(self):
username = 'test@example.com'
ip = '127.0.0.1'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = ('admin', 'login')
expected = get_expected_client_str(username, ip, user_agent, path_info[0])
actual = get_client_str(username, ip, user_agent, path_info)
expected = get_expected_client_str(username, ip_address, user_agent, path_info[0])
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_ip_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = ip
actual = get_client_str(username, ip, user_agent, path_info)
expected = '{ip_address: "127.0.0.1", path_info: "/admin/"}'
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@ -93,12 +92,12 @@ class UtilsTestCase(TestCase):
@override_settings(AXES_VERBOSE=True)
def test_verbose_user_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = get_expected_client_str(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
expected = get_expected_client_str(username, ip_address, user_agent, path_info)
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@ -106,12 +105,12 @@ class UtilsTestCase(TestCase):
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_user_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = username
actual = get_client_str(username, ip, user_agent, path_info)
expected = '{username: "test@example.com", path_info: "/admin/"}'
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@ -119,12 +118,12 @@ class UtilsTestCase(TestCase):
@override_settings(AXES_VERBOSE=True)
def test_verbose_user_ip_combo_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = get_expected_client_str(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
expected = get_expected_client_str(username, ip_address, user_agent, path_info)
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@ -132,12 +131,12 @@ class UtilsTestCase(TestCase):
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_user_ip_combo_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = '{0} from {1}'.format(username, ip)
actual = get_client_str(username, ip, user_agent, path_info)
expected = '{username: "test@example.com", ip_address: "127.0.0.1", path_info: "/admin/"}'
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@ -145,12 +144,12 @@ class UtilsTestCase(TestCase):
@override_settings(AXES_VERBOSE=True)
def test_verbose_user_agent_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = get_expected_client_str(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
expected = get_expected_client_str(username, ip_address, user_agent, path_info)
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@ -158,12 +157,12 @@ class UtilsTestCase(TestCase):
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_user_agent_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = ip + '(user-agent={0})'.format(user_agent)
actual = get_client_str(username, ip, user_agent, path_info)
expected = '{ip_address: "127.0.0.1", user_agent: "Googlebot/2.1 (+http://www.googlebot.com/bot.html)", path_info: "/admin/"}'
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)

View file

@ -1,7 +1,7 @@
from typing import Optional
from collections import OrderedDict
from datetime import timedelta
from logging import getLogger
from socket import error, inet_pton, AF_INET6
from typing import Optional
from django.core.cache import caches, BaseCache
from django.http import HttpResponse, HttpResponseRedirect, HttpRequest, JsonResponse
@ -9,6 +9,7 @@ from django.shortcuts import render
from django.utils.module_loading import import_string
import ipware.ip2
from django.utils.module_loading import import_string
from axes.conf import settings
from axes.models import AccessAttempt
@ -16,59 +17,109 @@ from axes.models import AccessAttempt
logger = getLogger(__name__)
def reset(ip: str = None, username: str = None) -> int:
"""
Reset records that match IP or username, and return the count of removed attempts.
"""
attempts = AccessAttempt.objects.all()
if ip:
attempts = attempts.filter(ip_address=ip)
if username:
attempts = attempts.filter(username=username)
count, _ = attempts.delete()
return count
def get_axes_cache() -> BaseCache:
"""
Get the cache instance Axes is configured to use with ``settings.AXES_CACHE`` and use ``'default'`` if not set.
"""
return caches[getattr(settings, 'AXES_CACHE', 'default')]
def query2str(dictionary: dict, max_length: int = 1024) -> str:
def get_cache_timeout() -> Optional[int]:
"""
Turns a dictionary into an easy-to-read list of key-value pairs.
Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME.
If there is a field called password it will be excluded from the output.
The cache timeout can be either None if not configured or integer of seconds if configured.
The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads.
Notice that the settings.AXES_COOLOFF_TIME can be None, timedelta, or integer of hours,
and this function offers a unified _integer or None_ representation of that configuration
for use with the Django cache backends.
"""
return '\n'.join([
'%s=%s' % (k, v) for k, v in dictionary.items()
if k != settings.AXES_PASSWORD_FORM_FIELD
][:int(max_length / 2)])[:max_length]
cool_off = get_cool_off()
if cool_off is None:
return None
return int(cool_off.total_seconds())
def get_client_str(username, ip_address, user_agent=None, path_info=None):
if settings.AXES_VERBOSE:
if path_info and isinstance(path_info, tuple):
path_info = path_info[0]
def get_cool_off() -> Optional[timedelta]:
"""
Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME.
details = '{{user: "{0}", ip: "{1}", user-agent: "{2}", path: "{3}"}}'
return details.format(username, ip_address, user_agent, path_info)
The return value is either None or timedelta.
if settings.AXES_ONLY_USER_FAILURES:
client = username
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
client = '{0} from {1}'.format(username, ip_address)
Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, or integer of hours,
and this function offers a unified _timedelta or None_ representation of that configuration
for use with the Axes internal implementations.
:exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type.
"""
cool_off = settings.AXES_COOLOFF_TIME
if isinstance(cool_off, int):
return timedelta(hours=cool_off)
return cool_off
def get_cool_off_iso8601(delta: timedelta) -> str:
"""
Return datetime.timedelta translated to ISO 8601 formatted duration for use in e.g. cool offs.
"""
seconds = delta.total_seconds()
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
days_str = '{:.0f}D'.format(days) if days else ''
time_str = ''.join(
'{value:.0f}{designator}'.format(value=value, designator=designator)
for value, designator
in [
[hours, 'H'],
[minutes, 'M'],
[seconds, 'S'],
]
if value
)
if time_str:
template = 'P{days_str}T{time_str}'
else:
client = ip_address
template = 'P{days_str}'
if settings.AXES_USE_USER_AGENT:
client += '(user-agent={0})'.format(user_agent)
return client
return template.format(days_str=days_str, time_str=time_str)
def get_client_ip(request: HttpRequest) -> str:
client_ip_attribute = 'axes_client_ip'
def get_credentials(username: str = None, **kwargs) -> dict:
"""
Calculate credentials for Axes to use internally from given username and kwargs.
if not hasattr(request, client_ip_attribute):
client_ip, _ = ipware.ip2.get_client_ip(
request,
proxy_order=settings.AXES_PROXY_ORDER,
proxy_count=settings.AXES_PROXY_COUNT,
proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS,
request_header_order=settings.AXES_META_PRECEDENCE_ORDER,
)
setattr(request, client_ip_attribute, client_ip)
return getattr(request, client_ip_attribute)
Axes will set the username value into the key defined with ``settings.AXES_USERNAME_FORM_FIELD``
and update the credentials dictionary with the kwargs given on top of that.
"""
credentials = {settings.AXES_USERNAME_FORM_FIELD: username}
credentials.update(kwargs)
return credentials
def get_client_username(request: HttpRequest, credentials: dict = None) -> str:
@ -102,98 +153,132 @@ def get_client_username(request: HttpRequest, credentials: dict = None) -> str:
return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None)
def get_client_ip_address(request: HttpRequest) -> str:
"""
Get client IP address as configured by the user.
The primary method is to fetch the reques attribute configured with the
``settings.AXES_IP_ATTRIBUTE`` that can be set by the user on the view layer.
If the ``settings.AXES_IP_ATTRIBUTE`` is not set, the ``ipware`` package will be utilized to resolve the address
according to the client IP configurations that can be defined by the user to suit the reverse proxy configuration
that is used in the users HTTP proxy or *aaS service layers. Refer to the documentation for more information.
"""
client_ip_attribute = 'axes_client_ip'
if not hasattr(request, client_ip_attribute):
client_ip, _ = ipware.ip2.get_client_ip(
request,
proxy_order=settings.AXES_PROXY_ORDER,
proxy_count=settings.AXES_PROXY_COUNT,
proxy_trusted_ips=settings.AXES_PROXY_TRUSTED_IPS,
request_header_order=settings.AXES_META_PRECEDENCE_ORDER,
)
setattr(request, client_ip_attribute, client_ip)
return getattr(request, client_ip_attribute)
def get_client_user_agent(request: HttpRequest) -> str:
return request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
def get_credentials(username: str = None, **kwargs):
credentials = {settings.AXES_USERNAME_FORM_FIELD: username}
credentials.update(kwargs)
return credentials
def get_client_path_info(request: HttpRequest) -> str:
return request.META.get('PATH_INFO', '<unknown>')[:255]
def get_cool_off() -> Optional[timedelta]:
def get_client_http_accept(request: HttpRequest) -> str:
return request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
def get_client_parameters(username: str, ip_address: str, user_agent: str) -> OrderedDict:
"""
Return the login cool off time interpreted from settings.AXES_COOLOFF_TIME.
Get query parameters for filtering AccessAttempt queryset.
The return value is either None or timedelta.
Notice that the settings.AXES_COOLOFF_TIME is either None, timedelta, or integer of hours,
and this function offers a unified _timedelta or None_ representation of that configuration
for use with the Axes internal implementations.
:exception TypeError: if settings.AXES_COOLOFF_TIME is of wrong type.
This method returns an OrderedDict that guarantees iteration order for keys and values,
and can so be used in e.g. the generation of hash keys or other deterministic functions.
"""
cool_off = settings.AXES_COOLOFF_TIME
filter_kwargs = OrderedDict() # type: OrderedDict
if isinstance(cool_off, int):
return timedelta(hours=cool_off)
return cool_off
if settings.AXES_ONLY_USER_FAILURES:
# 1. Only individual usernames can be tracked with parametrization
filter_kwargs['username'] = username
else:
if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
# 2. A combination of username and IP address can be used as well
filter_kwargs['username'] = username
filter_kwargs['ip_address'] = ip_address
else:
# 3. Default case is to track the IP address only, which is the most secure option
filter_kwargs['ip_address'] = ip_address
if settings.AXES_USE_USER_AGENT:
# 4. The HTTP User-Agent can be used to track e.g. one browser
filter_kwargs['user_agent'] = user_agent
return filter_kwargs
def get_cache_timeout() -> Optional[int]:
def get_client_str(username: str, ip_address: str, user_agent: str, path_info: str) -> str:
"""
Return the cache timeout interpreted from settings.AXES_COOLOFF_TIME.
Get a readable string that can be used in e.g. logging to distinguish client requests.
The cache timeout can be either None if not configured or integer of seconds if configured.
Notice that the settings.AXES_COOLOFF_TIME can be None, timedelta, or integer of hours,
and this function offers a unified _integer or None_ representation of that configuration
for use with the Django cache backends.
Example log format would be ``{username: "example", ip_address: "127.0.0.1", path_info: "/example/"}``
"""
cool_off = get_cool_off()
if cool_off is None:
return None
return int(cool_off.total_seconds())
if settings.AXES_VERBOSE:
# Verbose mode logs every attribute that is given
client_dict = OrderedDict()
client_dict['username'] = username
client_dict['ip_address'] = ip_address
client_dict['user_agent'] = user_agent
else:
# Other modes initialize the attributes that are used for the actual lockouts
client_dict = get_client_parameters(username, ip_address, user_agent)
# Path info is always included as last component in the client string for traceability purposes
if path_info and isinstance(path_info, (tuple, list)):
path_info = path_info[0]
client_dict['path_info'] = path_info
def is_ipv6(ip: str):
try:
inet_pton(AF_INET6, ip)
except (OSError, error):
return False
return True
def reset(ip: str = None, username: str = None):
"""
Reset records that match IP or username, and return the count of removed attempts.
"""
attempts = AccessAttempt.objects.all()
if ip:
attempts = attempts.filter(ip_address=ip)
if username:
attempts = attempts.filter(username=username)
count, _ = attempts.delete()
return count
def iso8601(delta: timedelta) -> str:
"""
Return datetime.timedelta translated to ISO 8601 formatted duration.
"""
seconds = delta.total_seconds()
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
date = '{:.0f}D'.format(days) if days else ''
time_values = hours, minutes, seconds
time_designators = 'H', 'M', 'S'
time = ''.join([
('{:.0f}'.format(value) + designator)
for value, designator in zip(time_values, time_designators)
if value]
# Template the internal dictionary representation into a readable and concatenated key: "value" format
template = ', '.join(
'{key}: "{value}"'.format(key=key, value=value)
for key, value
in client_dict.items()
)
return 'P' + date + ('T' + time if time else '')
# Wrap the internal dict into a single {key: "value"} bracing in the output
# which requires double braces when done with the Python string templating system
# i.e. {{key: "value"}} becomes {key: "value"} when run through a .format() call
template = '{{' + template + '}}'
return template.format(client_dict)
def get_query_str(query: dict, max_length: int = 1024) -> str:
"""
Turns a query dictionary into an easy-to-read list of key-value pairs.
If a field is called either ``'password'`` or ``settings.AXES_PASSWORD_FORM_FIELD`` it will be excluded.
The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads.
"""
query_dict = query.copy()
query_dict.pop('password', None)
query_dict.pop(settings.AXES_PASSWORD_FORM_FIELD, None)
query_str = '\n'.join(
'{key}={value}'.format(key=key, value=value)
for key, value
in query_dict.items()
)
return query_str[:max_length]
def get_lockout_message() -> str:
@ -202,23 +287,19 @@ def get_lockout_message() -> str:
return settings.AXES_PERMALOCK_MESSAGE
def get_lockout_response(request: HttpRequest) -> HttpResponse:
def get_lockout_response(request: HttpRequest, credentials: dict = None) -> HttpResponse:
status = 403
context = {
'failure_limit': settings.AXES_FAILURE_LIMIT,
'username': get_client_username(request) or ''
'username': get_client_username(request, credentials) or ''
}
cool_off = settings.AXES_COOLOFF_TIME
cool_off = get_cool_off()
if cool_off:
if isinstance(cool_off, (int, float)):
cool_off = timedelta(hours=cool_off)
context.update({
'cooloff_time': iso8601(cool_off)
'cool_off': get_cool_off_iso8601(cool_off)
})
status = 403
if request.is_ajax():
return JsonResponse(
context,
@ -234,6 +315,11 @@ def get_lockout_response(request: HttpRequest) -> HttpResponse:
)
if settings.AXES_LOCKOUT_URL:
return HttpResponseRedirect(settings.AXES_LOCKOUT_URL)
return HttpResponseRedirect(
settings.AXES_LOCKOUT_URL,
)
return HttpResponse(get_lockout_message(), status=status)
return HttpResponse(
get_lockout_message(),
status=status,
)