From b36e5513d97b06b54e971f330341892b708a5cb9 Mon Sep 17 00:00:00 2001 From: Joey Wilhelm Date: Fri, 9 Oct 2015 16:08:27 -0700 Subject: [PATCH] Improved performance & DoS prevention on query2str --- axes/decorators.py | 22 +++++++++------------- axes/tests.py | 17 +++++++++++++---- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/axes/decorators.py b/axes/decorators.py index 8ff03ae..24d1d32 100644 --- a/axes/decorators.py +++ b/axes/decorators.py @@ -160,15 +160,11 @@ def query2str(items, max_length=1024): If there's a field called "password" it will be excluded from the output. - The length of the output is limited to max_length to avoid a DoS attack. + The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads. """ - kvs = [] - for k, v in items: - if k != PASSWORD_FORM_FIELD: - kvs.append(six.u('%s=%s') % (k, v)) - - return '\n'.join(kvs)[:max_length] + return '\n'.join([six.u('%s=%s' % (k, v)) for k, v in six.iteritems(items) + if k != PASSWORD_FORM_FIELD][:int(max_length/2)])[:max_length] def ip_in_whitelist(ip): @@ -398,11 +394,11 @@ def check_request(request, login_unsuccessful): for attempt in attempts: attempt.get_data = '%s\n---------\n%s' % ( attempt.get_data, - query2str(request.GET.items()), + query2str(request.GET), ) attempt.post_data = '%s\n---------\n%s' % ( attempt.post_data, - query2str(request.POST.items()) + query2str(request.POST) ) attempt.http_accept = request.META.get('HTTP_ACCEPT', '') attempt.path_info = request.META.get('PATH_INFO', '') @@ -461,8 +457,8 @@ def create_new_failure_records(request, failures): 'user_agent': ua, 'ip_address': ip, 'username': username, - 'get_data': query2str(request.GET.items()), - 'post_data': query2str(request.POST.items()), + 'get_data': query2str(request.GET), + 'post_data': query2str(request.POST), 'http_accept': request.META.get('HTTP_ACCEPT', ''), 'path_info': request.META.get('PATH_INFO', ''), 'failures_since_start': failures, @@ -485,8 +481,8 @@ def create_new_trusted_record(request): user_agent=ua, ip_address=ip, username=username, - get_data=query2str(request.GET.items()), - post_data=query2str(request.POST.items()), + get_data=query2str(request.GET), + post_data=query2str(request.POST), http_accept=request.META.get('HTTP_ACCEPT', ''), path_info=request.META.get('PATH_INFO', ''), failures_since_start=0, diff --git a/axes/tests.py b/axes/tests.py index 04b0381..db11a97 100644 --- a/axes/tests.py +++ b/axes/tests.py @@ -10,7 +10,7 @@ from django.core.urlresolvers import reverse from axes.decorators import COOLOFF_TIME from axes.decorators import FAILURE_LIMIT -from axes.models import AccessLog +from axes.models import AccessAttempt, AccessLog from axes.signals import user_locked_out from axes.utils import reset @@ -23,7 +23,7 @@ class AccessAttemptTest(TestCase): LOCKED_MESSAGE = 'Account locked: too many login attempts.' LOGIN_FORM_KEY = '' - def _login(self, is_valid_username=False, is_valid_password=False, user_agent='test-browser'): + def _login(self, is_valid_username=False, is_valid_password=False, user_agent='test-browser', **kwargs): """Login a user. A valid credential is used when is_valid_username is True, otherwise it will use a random string to make a failed login. """ @@ -45,11 +45,13 @@ class AccessAttemptTest(TestCase): else: password = 'invalid-password' - response = self.client.post(admin_login, { + post_data = { 'username': username, 'password': password, 'this_is_the_login_form': 1, - }, HTTP_USER_AGENT=user_agent) + } + post_data.update(kwargs) + response = self.client.post(admin_login, post_data, HTTP_USER_AGENT=user_agent) return response @@ -205,3 +207,10 @@ class AccessAttemptTest(TestCase): # But we should get one now response = self._login(is_valid_username=True, is_valid_password=False) self.assertContains(response, self.LOCKED_MESSAGE) + + def test_log_data_truncated(self): + """Tests that query2str properly truncates data to the max_length (default 1024) + """ + extra_data = {string.ascii_letters * x: x for x in range(0, 1000)} # An impossibly large post dict + self._login(**extra_data) + self.assertEquals(len(AccessAttempt.objects.latest('id').post_data), 1024)