From 08f40bc13b6cbfd9bcbc541c0e6c5460d22c1f9a Mon Sep 17 00:00:00 2001 From: Camilo Nova Date: Fri, 24 Jun 2016 09:42:18 -0500 Subject: [PATCH] :fire: Cleaning --- axes/decorators.py | 100 ++++++++++++++++++++++----------------------- axes/models.py | 1 + axes/signals.py | 13 ++---- axes/test_urls.py | 5 ++- axes/tests.py | 24 ++++++++--- axes/utils.py | 8 ++-- axes/views.py | 1 - 7 files changed, 79 insertions(+), 73 deletions(-) delete mode 100644 axes/views.py diff --git a/axes/decorators.py b/axes/decorators.py index 7ad2adb..eab4774 100644 --- a/axes/decorators.py +++ b/axes/decorators.py @@ -58,6 +58,8 @@ LOGGER = getattr(settings, 'AXES_LOGGER', 'axes.watch_login') LOCKOUT_TEMPLATE = getattr(settings, 'AXES_LOCKOUT_TEMPLATE', None) +LOCKOUT_URL = getattr(settings, 'AXES_LOCKOUT_URL', None) + VERBOSE = getattr(settings, 'AXES_VERBOSE', True) # whitelist and blacklist @@ -157,10 +159,6 @@ def get_ip(request): return ip -def get_lockout_url(): - return getattr(settings, 'AXES_LOCKOUT_URL', None) - - def query2str(items, max_length=1024): """Turns a dictionary into an easy-to-read list of key-value pairs. @@ -198,15 +196,16 @@ def is_user_lockable(request): field: request.POST.get(USERNAME_FORM_FIELD) } user = get_user_model().objects.get(**kwargs) + + if hasattr(user, 'nolockout'): + # need to invert since we need to return + # false for users that can't be blocked + return not user.nolockout + except get_user_model().DoesNotExist: # not a valid user return True - if hasattr(user, 'nolockout'): - # need to invert since we need to return - # false for users that can't be blocked - return not user.nolockout - # Default behavior for a user to be lockable return True @@ -216,7 +215,6 @@ def _get_user_attempts(request): Otherwise return None. """ ip = get_ip(request) - username = request.POST.get(USERNAME_FORM_FIELD, None) if USE_USER_AGENT: @@ -240,6 +238,7 @@ def _get_user_attempts(request): return attempts + def get_user_attempts(request): objects_deleted = False attempts = _get_user_attempts(request) @@ -307,14 +306,13 @@ def watch_login(func): if request.method == 'POST': # see if the login was successful - login_unsuccessful = ( response and not response.has_header('location') and response.status_code != 302 ) - access_log = AccessLog.objects.create( + AccessLog.objects.create( user_agent=request.META.get('HTTP_USER_AGENT', '')[:255], ip_address=get_ip(request), username=request.POST.get(USERNAME_FORM_FIELD, None), @@ -339,26 +337,32 @@ def lockout_response(request): } if request.is_ajax(): - context.update({'cooloff_time': iso8601(COOLOFF_TIME)}) - return HttpResponse(json.dumps(context), - content_type='application/json', - status=403) + if COOLOFF_TIME: + context.update({'cooloff_time': iso8601(COOLOFF_TIME)}) + + return HttpResponse( + json.dumps(context), + content_type='application/json', + status=403, + ) + + elif LOCKOUT_TEMPLATE: + if COOLOFF_TIME: + context.update({'cooloff_time': iso8601(COOLOFF_TIME)}) - if LOCKOUT_TEMPLATE: - context.update({'cooloff_time': COOLOFF_TIME}) return render(request, LOCKOUT_TEMPLATE, context, status=403) - LOCKOUT_URL = get_lockout_url() - if LOCKOUT_URL: + elif LOCKOUT_URL: return HttpResponseRedirect(LOCKOUT_URL) - if COOLOFF_TIME: - return HttpResponse("Account locked: too many login attempts. " - "Please try again later.", status=403) else: - return HttpResponse("Account locked: too many login attempts. " - "Contact an admin to unlock your account.", - status=403) + msg = 'Account locked: too many login attempts. {0}' + if COOLOFF_TIME: + msg = msg.format('Please try again later.') + else: + msg = msg.format('Contact an admin to unlock your account.') + + return HttpResponse(msg, status=403) def is_already_locked(request): @@ -367,21 +371,16 @@ def is_already_locked(request): if NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip): return False - if ONLY_WHITELIST: - if not ip_in_whitelist(ip): - return True + if ONLY_WHITELIST and not ip_in_whitelist(ip): + return True if ip_in_blacklist(ip): return True - user_lockable = is_user_lockable(request) - - if not user_lockable: + if not is_user_lockable(request): return False - attempts = get_user_attempts(request) - - for attempt in attempts: + for attempt in get_user_attempts(request): if attempt.failures_since_start >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE: return True @@ -449,10 +448,13 @@ def check_request(request, login_unsuccessful): # password if hasattr(request, 'user') and request.user.is_authenticated(): logout(request) - log.warn('AXES: locked out %s after repeated login attempts.' % - (ip_address,)) + log.warn( + 'AXES: locked out %s after repeated login attempts.' % (ip_address,) + ) # send signal when someone is locked out. - user_locked_out.send("axes", request=request, username=username, ip_address=ip_address) + user_locked_out.send( + 'axes', request=request, username=username, ip_address=ip_address + ) # if a trusted login has violated lockout, revoke trust for attempt in [a for a in attempts if a.trusted]: @@ -469,18 +471,16 @@ def create_new_failure_records(request, failures): ua = request.META.get('HTTP_USER_AGENT', '')[:255] username = request.POST.get(USERNAME_FORM_FIELD, None) - params = { - 'user_agent': ua, - 'ip_address': ip, - 'username': username, - 'get_data': query2str(request.GET), - 'post_data': query2str(request.POST), - 'http_accept': request.META.get('HTTP_ACCEPT', ''), - 'path_info': request.META.get('PATH_INFO', ''), - 'failures_since_start': failures, - } - - AccessAttempt.objects.create(**params) + AccessAttempt.objects.create( + user_agent=ua, + ip_address=ip, + username=username, + get_data=query2str(request.GET), + post_data=query2str(request.POST), + http_accept=request.META.get('HTTP_ACCEPT', ''), + path_info=request.META.get('PATH_INFO', ''), + failures_since_start=failures, + ) log.info('AXES: New login failure by %s. Creating access record.' % (ip,)) diff --git a/axes/models.py b/axes/models.py index 4ad0373..56ca5f7 100644 --- a/axes/models.py +++ b/axes/models.py @@ -1,6 +1,7 @@ from django.db import models from django.utils import six + class CommonAccess(models.Model): user_agent = models.CharField( max_length=255, diff --git a/axes/signals.py b/axes/signals.py index 54e6b3d..523edcb 100644 --- a/axes/signals.py +++ b/axes/signals.py @@ -2,7 +2,6 @@ from django.dispatch import receiver from django.dispatch import Signal from django.utils.timezone import now from django.contrib.auth.signals import user_logged_out -from django.core.exceptions import ObjectDoesNotExist from axes.models import AccessLog @@ -17,18 +16,12 @@ def log_user_lockout(sender, request, user, signal, *args, **kwargs): if not user: return - try: - username = user.get_username() - except AttributeError: - # Django < 1.5 - username = user.username - access_logs = AccessLog.objects.filter( - username=username, + username=user.get_username(), logout_time__isnull=True, ).order_by('-attempt_time') - if access_logs: - access_log = access_logs[0] + if access_logs.exists(): + access_log = access_logs.first() access_log.logout_time = now() access_log.save() diff --git a/axes/test_urls.py b/axes/test_urls.py index 0d120ce..88ad7e7 100644 --- a/axes/test_urls.py +++ b/axes/test_urls.py @@ -1,6 +1,7 @@ from django.conf.urls import patterns, include from django.contrib import admin -urlpatterns = patterns('', +urlpatterns = patterns( + '', (r'^admin/', include(admin.site.urls)), -) \ No newline at end of file +) diff --git a/axes/tests.py b/axes/tests.py index 3ad23fb..fdf9f6b 100644 --- a/axes/tests.py +++ b/axes/tests.py @@ -83,7 +83,8 @@ class AccessAttemptTest(TestCase): """Tests the login lock trying to login one more time than failure limit """ - for i in range(1, FAILURE_LIMIT): # test until one try before the limit + # test until one try before the limit + for i in range(1, FAILURE_LIMIT): response = self._login() # Check if we are in the same login page self.assertContains(response, self.LOGIN_FORM_KEY) @@ -148,7 +149,11 @@ class AccessAttemptTest(TestCase): """Tests if can handle a long user agent """ long_user_agent = 'ie6' * 1024 - response = self._login(is_valid_username=True, is_valid_password=True, user_agent=long_user_agent) + response = self._login( + is_valid_username=True, + is_valid_password=True, + user_agent=long_user_agent, + ) self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302) def test_long_user_agent_not_valid(self): @@ -213,8 +218,12 @@ class AccessAttemptTest(TestCase): """Tests the login lock with a valid username and invalid password when AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP is True """ - for i in range(1, FAILURE_LIMIT): # test until one try before the limit - response = self._login(is_valid_username=True, is_valid_password=False) + # test until one try before the limit + for i in range(1, FAILURE_LIMIT): + response = self._login( + is_valid_username=True, + is_valid_password=False, + ) # Check if we are in the same login page self.assertContains(response, self.LOGIN_FORM_KEY) @@ -226,9 +235,12 @@ class AccessAttemptTest(TestCase): def test_log_data_truncated(self): """Tests that query2str properly truncates data to the max_length (default 1024) """ - extra_data = {string.ascii_letters * x: x for x in range(0, 1000)} # An impossibly large post dict + # An impossibly large post dict + extra_data = {string.ascii_letters * x: x for x in range(0, 1000)} self._login(**extra_data) - self.assertEquals(len(AccessAttempt.objects.latest('id').post_data), 1024) + self.assertEquals( + len(AccessAttempt.objects.latest('id').post_data), 1024 + ) def test_json_response(self): """Tests response content type and status code for the ajax request diff --git a/axes/utils.py b/axes/utils.py index 7f140d3..8a050ed 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -20,10 +20,10 @@ def reset(ip=None, username=None): return count -def iso8601(value): +def iso8601(timestamp): """Returns datetime.timedelta translated to ISO 8601 formatted duration. """ - seconds = value.total_seconds() + seconds = timestamp.total_seconds() minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) days, hours = divmod(hours, 24) @@ -33,8 +33,8 @@ def iso8601(value): time_values = hours, minutes, seconds time_designators = 'H', 'M', 'S' - time = ''.join( - [('{:.0f}'.format(value) + designator) + time = ''.join([ + ('{:.0f}'.format(value) + designator) for value, designator in zip(time_values, time_designators) if value] ) diff --git a/axes/views.py b/axes/views.py deleted file mode 100644 index 60f00ef..0000000 --- a/axes/views.py +++ /dev/null @@ -1 +0,0 @@ -# Create your views here.