diff --git a/axes/decorators.py b/axes/decorators.py index a80bd27..5cb8458 100644 --- a/axes/decorators.py +++ b/axes/decorators.py @@ -23,6 +23,12 @@ log = logging.getLogger(LOGGER) if VERBOSE: log.info('AXES: BEGIN LOG') log.info('AXES: Using django-axes ' + axes.get_version()) + if AXES_ONLY_USER_FAILURES: + log.info('AXES: blocking by username only.') + elif LOCK_OUT_BY_COMBINATION_USER_AND_IP: + log.info('AXES: blocking by combination of username and IP.') + else: + log.info('AXES: blocking by IP only.') if BEHIND_REVERSE_PROXY: @@ -36,6 +42,62 @@ if BEHIND_REVERSE_PROXY: ) +def get_client_str(username, ip_address, user_agent=None, path_info=None): + + if VERBOSE: + if isinstance(path_info, tuple): + path_info = path_info[0] + details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}" + return details.format(username, ip_address, user_agent, path_info) + + if AXES_ONLY_USER_FAILURES: + client = username + elif LOCK_OUT_BY_COMBINATION_USER_AND_IP: + client = '{0} from {1}'.format(username, ip_address) + else: + client = ip_address + + if USE_USER_AGENT: + return client + '(user-agent={0})'.format(user_agent) + + return client + + +def log_successful_attempt(username, ip_address, + user_agent=None, path_info=None): + client = get_client_str(username, ip_address, user_agent, path_info) + msg = 'AXES: Successful login by {0}. Creating access record.' + log.info(msg.format(client)) + + +def log_initial_attempt(username, ip_address, user_agent, path_info): + client = get_client_str(username, ip_address, user_agent, path_info) + msg = 'AXES: New login failure by {0}. Creating access record.' + log.info(msg.format(client)) + + +def log_repeated_attempt(username, ip_address, user_agent, path_info, + fail_count): + client = get_client_str(username, ip_address, user_agent, path_info) + fail_msg = 'AXES: Repeated login failure by {0}. Updating access record.' + count_msg = 'Count = {0} of {1}'.format(fail_count, FAILURE_LIMIT) + log.info('{0} {1}'.format(fail_msg.format(client), count_msg)) + + +def log_lockout(username, ip_address, user_agent, path_info): + client = get_client_str(username, ip_address, user_agent, path_info) + msg = 'AXES: locked out {0} after repeated login attempts.' + log.warn(msg.format(client)) + + +def log_decorated_call(func, args=None, kwargs=None): + log.info('AXES: Calling decorated function: %s' % func.__name__) + if args: + log.info('args: %s' % str(args)) + if kwargs: + log.info('kwargs: %s' % kwargs) + + def is_ipv6(ip): try: inet_pton(AF_INET6, ip) @@ -241,6 +303,22 @@ def get_user_attempts(request): return attempts +def is_login_failed(response): + return ( + response and + not response.has_header('location') and + response.status_code != 302 + ) + +def is_ajax_login_failed(response): + return ( + response and + response.status_code != 302 and + response.status_code != 200 + ) + + + def watch_login(func): """ Used to decorate the django.contrib.admin.site.login method. @@ -253,11 +331,7 @@ def watch_login(func): def decorated_login(request, *args, **kwargs): # share some useful information if func.__name__ != 'decorated_login' and VERBOSE: - log.info('AXES: Calling decorated function: %s' % func.__name__) - if args: - log.info('args: %s' % str(args)) - if kwargs: - log.info('kwargs: %s' % kwargs) + log_decorated_call(func, args, kwargs) # TODO: create a class to hold the attempts records and perform checks # with its methods? or just store attempts=get_user_attempts here and @@ -286,25 +360,31 @@ def watch_login(func): if request.method == 'POST': # see if the login was successful - login_unsuccessful = ( - response and - not response.has_header('location') and - response.status_code != 302 - ) + if request.is_ajax(): + login_unsuccessful = is_ajax_login_failed(response) + else: + login_unsuccessful = is_login_failed(response) user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] http_accept = request.META.get('HTTP_ACCEPT', '')[:1025] path_info = request.META.get('PATH_INFO', '')[:255] if not DISABLE_ACCESS_LOG: + username = request.POST.get(USERNAME_FORM_FIELD, None) + ip_address = get_ip(request) + if login_unsuccessful or not DISABLE_SUCCESS_ACCESS_LOG: AccessLog.objects.create( user_agent=user_agent, - ip_address=get_ip(request), - username=request.POST.get(USERNAME_FORM_FIELD, None), + ip_address=ip_address, + username=username, http_accept=http_accept, path_info=path_info, trusted=not login_unsuccessful, ) + if not login_unsuccessful and not DISABLE_SUCCESS_ACCESS_LOG: + log_successful_attempt(username, ip_address, + user_agent, path_info) + if check_request(request, login_unsuccessful): return response @@ -381,6 +461,8 @@ def is_already_locked(request): def check_request(request, login_unsuccessful): ip_address = get_ip(request) username = request.POST.get(USERNAME_FORM_FIELD, None) + user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] + path_info = request.META.get('PATH_INFO', '')[:255] failures = 0 attempts = get_user_attempts(request) cache_hash_key = get_cache_key(request) @@ -412,15 +494,14 @@ def check_request(request, login_unsuccessful): ) attempt.http_accept = \ request.META.get('HTTP_ACCEPT', '')[:1025] - attempt.path_info = \ - request.META.get('PATH_INFO', '')[:255] + attempt.path_info = path_info,path_info attempt.failures_since_start = failures attempt.attempt_time = datetime.now() attempt.save() - log.info( - 'AXES: Repeated login failure by %s. Updating access ' - 'record. Count = %s' % (attempt.ip_address, failures) - ) + + log_repeated_attempt(username, ip_address, + user_agent, path_info, failures) + else: create_new_failure_records(request, failures) else: @@ -455,9 +536,10 @@ def check_request(request, login_unsuccessful): # password if hasattr(request, 'user') and request.user.is_authenticated(): logout(request) - log.warn( - 'AXES: locked out %s after repeated login attempts.' % ip_address - ) + + username = request.POST.get(USERNAME_FORM_FIELD, None) + log_lockout(username, ip_address, user_agent, path_info) + # send signal when someone is locked out. user_locked_out.send( 'axes', request=request, username=username, ip_address=ip_address @@ -477,6 +559,7 @@ def create_new_failure_records(request, failures): ip = get_ip(request) ua = request.META.get('HTTP_USER_AGENT', '')[:255] username = request.POST.get(USERNAME_FORM_FIELD, None) + path_info = request.META.get('PATH_INFO', ''), # Record failed attempt. Whether or not the IP address or user agent is # used in counting failures is handled elsewhere, so we just record @@ -488,10 +571,12 @@ def create_new_failure_records(request, failures): get_data=query2str(request.GET), post_data=query2str(request.POST), http_accept=request.META.get('HTTP_ACCEPT', ''), - path_info=request.META.get('PATH_INFO', ''), + path_info=path_info, failures_since_start=failures, ) - log.info('AXES: New login failure by %s. Creating access record.' % (ip,)) + + username = request.POST.get(USERNAME_FORM_FIELD, None) + log_initial_attempt(username, ip, ua, path_info) def create_new_trusted_record(request): diff --git a/axes/tests.py b/axes/tests.py index c5033ce..b9de7ba 100644 --- a/axes/tests.py +++ b/axes/tests.py @@ -15,7 +15,7 @@ from django.core.urlresolvers import reverse from django.utils import six from django.test.client import RequestFactory -from axes.decorators import get_ip, get_cache_key +from axes.decorators import get_ip, get_cache_key, get_client_str from axes.settings import FAILURE_LIMIT from axes.models import AccessAttempt, AccessLog from axes.signals import user_locked_out @@ -215,7 +215,6 @@ class AccessAttemptTest(TestCase): """ Test the cache key format""" # Getting cache key from request ip = '127.0.0.1'.encode('utf-8') - ua = ''.encode('utf-8') cache_hash_key_checker = 'axes-{}'.format(md5((ip)).hexdigest()) @@ -329,7 +328,6 @@ class AccessAttemptTest(TestCase): response = self._login(is_valid_username=True, is_valid_password=True) self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302) - @patch('axes.decorators.cache.set', return_value=None) @patch('axes.decorators.cache.get', return_value=None) def test_log_data_truncated(self, cache_set_mock, cache_get_mock): @@ -941,6 +939,112 @@ class UtilsTest(TestCase): self.assertFalse(is_ipv6('67.255.125.204')) self.assertFalse(is_ipv6('foo')) + @patch('axes.decorators.VERBOSE', True) + def test_verbose_ip_only_client_details(self): + username = 'test@example.com' + ip = '127.0.0.1' + user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' + path_info = '/admin/' + + details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}" + expected = details.format(username, ip, user_agent, path_info) + actual = get_client_str(username, ip, user_agent, path_info) + + self.assertEqual(expected, actual) + + @patch('axes.decorators.VERBOSE', False) + def test_non_verbose_ip_only_client_details(self): + username = 'test@example.com' + ip = '127.0.0.1' + user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' + path_info = '/admin/' + + expected = ip + actual = get_client_str(username, ip, user_agent, path_info) + + self.assertEqual(expected, actual) + + @patch('axes.decorators.AXES_ONLY_USER_FAILURES', True) + @patch('axes.decorators.VERBOSE', True) + def test_verbose_user_only_client_details(self): + username = 'test@example.com' + ip = '127.0.0.1' + user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' + path_info = '/admin/' + + details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}" + expected = details.format(username, ip, user_agent, path_info) + actual = get_client_str(username, ip, user_agent, path_info) + + self.assertEqual(expected, actual) + + @patch('axes.decorators.AXES_ONLY_USER_FAILURES', True) + @patch('axes.decorators.VERBOSE', False) + def test_non_verbose_user_only_client_details(self): + username = 'test@example.com' + ip = '127.0.0.1' + user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' + path_info = '/admin/' + + expected = username + actual = get_client_str(username, ip, user_agent, path_info) + + self.assertEqual(expected, actual) + + @patch('axes.decorators.LOCK_OUT_BY_COMBINATION_USER_AND_IP', True) + @patch('axes.decorators.VERBOSE', True) + def test_verbose_user_ip_combo_client_details(self): + username = 'test@example.com' + ip = '127.0.0.1' + user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' + path_info = '/admin/' + + details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}" + expected = details.format(username, ip, user_agent, path_info) + actual = get_client_str(username, ip, user_agent, path_info) + + self.assertEqual(expected, actual) + + @patch('axes.decorators.LOCK_OUT_BY_COMBINATION_USER_AND_IP', True) + @patch('axes.decorators.VERBOSE', False) + def test_non_verbose_user_ip_combo_client_details(self): + username = 'test@example.com' + ip = '127.0.0.1' + user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' + path_info = '/admin/' + + expected = '{0} from {1}'.format(username, ip) + actual = get_client_str(username, ip, user_agent, path_info) + + self.assertEqual(expected, actual) + + @patch('axes.decorators.USE_USER_AGENT', True) + @patch('axes.decorators.VERBOSE', True) + def test_verbose_user_agent_client_details(self): + username = 'test@example.com' + ip = '127.0.0.1' + user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' + path_info = '/admin/' + + details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}" + expected = details.format(username, ip, user_agent, path_info) + actual = get_client_str(username, ip, user_agent, path_info) + + self.assertEqual(expected, actual) + + @patch('axes.decorators.USE_USER_AGENT', True) + @patch('axes.decorators.VERBOSE', False) + def test_non_verbose_user_agent_client_details(self): + username = 'test@example.com' + ip = '127.0.0.1' + user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' + path_info = '/admin/' + + expected = ip + '(user-agent={0})'.format(user_agent) + actual = get_client_str(username, ip, user_agent, path_info) + + self.assertEqual(expected, actual) + class GetIPProxyTest(TestCase): """Test get_ip returns correct addresses with proxy @@ -1005,6 +1109,7 @@ class GetIPProxyCustomHeaderTest(TestCase): self.request.META[settings.AXES_REVERSE_PROXY_HEADER] = header self.assertEqual(self.ip, get_ip(self.request)) + class GetIPNumProxiesTest(TestCase): """Test that get_ip returns the correct last IP when NUM_PROXIES is configured """