diff --git a/axes/decorators.py b/axes/decorators.py index 146bfb7..fa81a21 100644 --- a/axes/decorators.py +++ b/axes/decorators.py @@ -41,9 +41,22 @@ if isinstance(COOLOFF_TIME, int): LOGGER = getattr(settings, 'AXES_LOGGER', 'axes.watch_login') LOCKOUT_TEMPLATE = getattr(settings, 'AXES_LOCKOUT_TEMPLATE', None) -LOCKOUT_URL = getattr(settings, 'AXES_LOCKOUT_URL', None) VERBOSE = getattr(settings, 'AXES_VERBOSE', True) +# whitelist and blacklist +# todo: convert the strings to IPv4 on startup to avoid type conversion during processing +ONLY_WHITELIST = getattr(settings, 'AXES_ONLY_ALLOW_WHITELIST', False) +IP_WHITELIST = getattr(settings, 'AXES_IP_WHITELIST', None) +IP_BLACKLIST = getattr(settings, 'AXES_IP_BLACKLIST', None) + +ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. " + "Note that both fields are case-sensitive.") +LOGIN_FORM_KEY = 'this_is_the_login_form' + + +def get_lockout_url(): + return getattr(settings, 'AXES_LOCKOUT_URL', None) + def query2str(items): """Turns a dictionary into an easy-to-read list of key-value pairs. @@ -58,39 +71,64 @@ def query2str(items): return '\n'.join(kvs) + +def ip_in_whitelist(ip): + if IP_WHITELIST is not None: + return ip in IP_WHITELIST + else: + return False + + +def ip_in_blacklist(ip): + if IP_BLACKLIST is not None: + return ip in IP_BLACKLIST + else: + return False + + log = logging.getLogger(LOGGER) if VERBOSE: log.info('AXES: BEGIN LOG') log.info('Using django-axes ' + axes.get_version()) -def get_user_attempt(request): +def get_user_attempts(request): """ Returns access attempt record if it exists. Otherwise return None. """ + ip = request.META.get('REMOTE_ADDR', '') + username = request.POST.get('username', None) + if USE_USER_AGENT: ua = request.META.get('HTTP_USER_AGENT', '') attempts = AccessAttempt.objects.filter( - user_agent=ua, - ip_address=ip + user_agent=ua, ip_address=ip, username=username, trusted=True ) else: attempts = AccessAttempt.objects.filter( - ip_address=ip + ip_address=ip, username=username, trusted=True ) - if not attempts: - return None + if len(attempts) == 0: + params = {'ip_address': ip, 'trusted': False} + if USE_USER_AGENT: + params['user_agent'] = ua - attempt = attempts[0] - current_time = datetime.now() - if COOLOFF_TIME and attempt.attempt_time + COOLOFF_TIME < current_time: - attempt.delete() - return None + attempts = AccessAttempt.objects.filter(**params) + if username and not ip_in_whitelist(ip): + del params['ip_address'] + params['username'] = username + attempts |= AccessAttempt.objects.filter(**params) - return attempt + if COOLOFF_TIME: + for attempt in attempts: + if attempt.attempt_time + COOLOFF_TIME < datetime.now() \ + and attempt.trusted is False: + attempt.delete() + + return attempts def watch_login(func): @@ -107,6 +145,20 @@ def watch_login(func): if kwargs: log.info('kwargs: %s' % kwargs) + # TODO: create a class to hold the attempts records and perform checks + # with its methods? or just store attempts=get_user_attempts here and + # pass it to the functions + # also no need to keep accessing these: + # ip = request.META.get('REMOTE_ADDR', '') + # ua = request.META.get('HTTP_USER_AGENT', '') + # username = request.POST.get('username', None) + + # if the request is currently under lockout, do not proceed to the + # login function, go directly to lockout url, do not pass go, do not + # collect messages about this login attempt + if is_already_locked(request): + return lockout_response(request) + # call the login function response = func(request, *args, **kwargs) @@ -143,6 +195,7 @@ def lockout_response(request): return render_to_response(LOCKOUT_TEMPLATE, context, context_instance=RequestContext(request)) + LOCKOUT_URL = get_lockout_url() if LOCKOUT_URL: return HttpResponseRedirect(LOCKOUT_URL) @@ -154,22 +207,30 @@ def lockout_response(request): "Contact an admin to unlock your account.") +def is_already_locked(request): + ip = request.META.get('REMOTE_ADDR', '') + + if ONLY_WHITELIST: + if not ip_in_whitelist(ip): + return True + + if ip_in_blacklist(ip): + return True + + attempts = get_user_attempts(request) + for attempt in attempts: + if attempt.failures_since_start >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE: + return True + + return False + + def check_request(request, login_unsuccessful): failures = 0 - attempt = get_user_attempt(request) + attempts = get_user_attempts(request) - if attempt: - failures = attempt.failures_since_start - - # no matter what, we want to lock them out - # if they're past the number of attempts allowed - if failures > FAILURE_LIMIT and LOCK_OUT_AT_FAILURE: - # We log them out in case they actually managed to enter - # the correct password. - logout(request) - log.warn('AXES: locked out %s after repeated login attempts.' % - attempt.ip_address) - return False + for attempt in attempts: + failures = max(failures, attempt.failures_since_start) if login_unsuccessful: # add a failed attempt for this user @@ -177,47 +238,108 @@ def check_request(request, login_unsuccessful): # Create an AccessAttempt record if the login wasn't successful # has already attempted, update the info - if attempt: - attempt.get_data = '%s\n---------\n%s' % ( - attempt.get_data, - query2str(request.GET.items()), - ) - attempt.post_data = '%s\n---------\n%s' % ( - attempt.post_data, - query2str(request.POST.items()) - ) - attempt.http_accept = request.META.get('HTTP_ACCEPT', '') - attempt.path_info = request.META.get('PATH_INFO', '') - attempt.failures_since_start = failures - attempt.attempt_time = datetime.now() - attempt.save() - log.info('AXES: Repeated login failure by %s. Updating access ' - 'record. Count = %s' % - (attempt.ip_address, failures)) + if len(attempts): + for attempt in attempts: + attempt.get_data = '%s\n---------\n%s' % ( + attempt.get_data, + query2str(request.GET.items()), + ) + attempt.post_data = '%s\n---------\n%s' % ( + attempt.post_data, + query2str(request.POST.items()) + ) + attempt.http_accept = request.META.get('HTTP_ACCEPT', '') + attempt.path_info = request.META.get('PATH_INFO', '') + attempt.failures_since_start = failures + attempt.attempt_time = datetime.now() + attempt.save() + log.info('AXES: Repeated login failure by %s. Updating access ' + 'record. Count = %s' % + (attempt.ip_address, failures)) else: - ip = request.META.get('REMOTE_ADDR', '') - ua = request.META.get('HTTP_USER_AGENT', '') - attempt = AccessAttempt.objects.create( - user_agent=ua, - ip_address=ip, - get_data=query2str(request.GET.items()), - post_data=query2str(request.POST.items()), - http_accept=request.META.get('HTTP_ACCEPT', ''), - path_info=request.META.get('PATH_INFO', ''), - failures_since_start=failures - ) - log.info('AXES: New login failure by %s. Creating access record.' % - ip) + create_new_failure_records(request, failures) else: # user logged in -- forget the failed attempts - if attempt: + failures = 0 + trusted_record_exists = False + for attempt in attempts: + if not attempt.trusted: + attempt.delete() + else: + trusted_record_exists = True + attempt.failures_since_start = 0 + attempt.save() + + if trusted_record_exists is False: + create_new_trusted_record(request) + + # no matter what, we want to lock them out if they're past the number of + # attempts allowed + if failures >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE: + # We log them out in case they actually managed to enter the correct + # password + logout(request) + log.warn('AXES: locked out %s after repeated login attempts.' % + (attempt.ip_address,)) + + # if a trusted login has violated lockout, revoke trust + for attempt in [a for a in attempts if a.trusted]: attempt.delete() + create_new_failure_records(request, failures) + + return False return True -ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. " - "Note that both fields are case-sensitive.") -LOGIN_FORM_KEY = 'this_is_the_login_form' + +def create_new_failure_records(request, failures): + ip = request.META.get('REMOTE_ADDR', '') + ua = request.META.get('HTTP_USER_AGENT', '') + username = request.POST.get('username', None) + + params = { + 'user_agent': ua, + 'ip_address': ip, + 'username': None, + 'get_data': query2str(request.GET.items()), + 'post_data': query2str(request.POST.items()), + 'http_accept': request.META.get('HTTP_ACCEPT', ''), + 'path_info': request.META.get('PATH_INFO', ''), + 'failures_since_start': failures, + } + + # record failed attempt from this IP + AccessAttempt.objects.create(**params) + + # record failed attempt on this username from untrusted IP + params.update({ + 'ip_address': None, + 'username': username, + }) + AccessAttempt.objects.create(**params) + + log.info('AXES: New login failure by %s. Creating access record.' % (ip,)) + + +def create_new_trusted_record(request): + ip = request.META.get('REMOTE_ADDR', '') + ua = request.META.get('HTTP_USER_AGENT', '') + username = request.POST.get('username', None) + + if not username: + return False + + AccessAttempt.objects.create( + user_agent=ua, + ip_address=ip, + username=username, + get_data=query2str(request.GET.items()), + post_data=query2str(request.POST.items()), + http_accept=request.META.get('HTTP_ACCEPT', ''), + path_info=request.META.get('PATH_INFO', ''), + failures_since_start=0, + trusted=True + ) def _display_login_form(request, error_message=''): diff --git a/axes/models.py b/axes/models.py index eae53ca..1c3c5e3 100644 --- a/axes/models.py +++ b/axes/models.py @@ -9,7 +9,12 @@ FAILURES_DESC = 'Failed Logins' class AccessAttempt(models.Model): user_agent = models.CharField(max_length=255) - ip_address = models.IPAddressField('IP Address') + ip_address = models.IPAddressField('IP Address', null=True) + username = models.CharField(max_length=255, null=True) + + # Once a user logs in from an ip, that combination is trusted and not + # locked out in case of a distributed attack + trusted = models.BooleanField(default=False) get_data = models.TextField('GET Data') post_data = models.TextField('POST Data') http_accept = models.CharField('HTTP Accept', max_length=255) diff --git a/axes/tests.py b/axes/tests.py index 15f464d..3928883 100644 --- a/axes/tests.py +++ b/axes/tests.py @@ -17,6 +17,7 @@ class AccessAttemptTest(TestCase): NOT_GONNA_BE_USERNAME = "whywouldyouohwhy" def setUp(self): + settings.AXES_LOCKOUT_URL = None for i in range(0, random.randrange(10, 50)): username = "person%s" % i email = "%s@example.org" % username @@ -42,7 +43,7 @@ class AccessAttemptTest(TestCase): return response def test_login_max(self, correct_username=False): - for i in range(0, FAILURE_LIMIT): + for i in range(0, FAILURE_LIMIT - 1): response = self._attempt_login(correct_username=correct_username) self.assertContains(response, "this_is_the_login_form") # So, we shouldn't have gotten a lock-out yet. @@ -51,7 +52,7 @@ class AccessAttemptTest(TestCase): self.assertContains(response, "Account locked") def test_login_max_with_more(self, correct_username=False): - for i in range(0, FAILURE_LIMIT): + for i in range(0, FAILURE_LIMIT - 1): response = self._attempt_login(correct_username=correct_username) self.assertContains(response, "this_is_the_login_form") # So, we shouldn't have gotten a lock-out yet. diff --git a/axes/utils.py b/axes/utils.py index 3a5de6f..be39a1c 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -1,19 +1,25 @@ from axes.models import AccessAttempt -def reset(ip=None, silent=False): - if not ip: - attempts = AccessAttempt.objects.all() - if attempts: - attempts.delete() - else: - if not silent: - print 'No attempts found.' +def reset(ip=None, username=None, silent=False): + # no need to reset trusted records. If they fail, they go to untrusted + params = { + 'trusted': False, + } + + if ip: + params['ip_address'] = ip + + attempts = AccessAttempt.objects.filter(**params) + if username: + if 'ip_address' in params: + del params['ip_address'] + + params['username'] = username + attempts |= AccessAttempt.objects.filter(**params) + + if attempts: + attempts.delete() else: - try: - attempt = AccessAttempt.objects.get(ip_address=ip) - except: - if not silent: - print 'No matching attempt found.' - else: - attempt.delete() + if not silent: + print 'No attempts found.'