Merge pull request #229 from HuntedCodes/logging-conf

Logging respects configuration settings
This commit is contained in:
Camilo Nova 2017-05-16 15:01:59 -05:00 committed by GitHub
commit 4835e90990
2 changed files with 216 additions and 26 deletions

View file

@ -23,6 +23,12 @@ log = logging.getLogger(LOGGER)
if VERBOSE:
log.info('AXES: BEGIN LOG')
log.info('AXES: Using django-axes ' + axes.get_version())
if AXES_ONLY_USER_FAILURES:
log.info('AXES: blocking by username only.')
elif LOCK_OUT_BY_COMBINATION_USER_AND_IP:
log.info('AXES: blocking by combination of username and IP.')
else:
log.info('AXES: blocking by IP only.')
if BEHIND_REVERSE_PROXY:
@ -36,6 +42,62 @@ if BEHIND_REVERSE_PROXY:
)
def get_client_str(username, ip_address, user_agent=None, path_info=None):
if VERBOSE:
if isinstance(path_info, tuple):
path_info = path_info[0]
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
return details.format(username, ip_address, user_agent, path_info)
if AXES_ONLY_USER_FAILURES:
client = username
elif LOCK_OUT_BY_COMBINATION_USER_AND_IP:
client = '{0} from {1}'.format(username, ip_address)
else:
client = ip_address
if USE_USER_AGENT:
return client + '(user-agent={0})'.format(user_agent)
return client
def log_successful_attempt(username, ip_address,
user_agent=None, path_info=None):
client = get_client_str(username, ip_address, user_agent, path_info)
msg = 'AXES: Successful login by {0}. Creating access record.'
log.info(msg.format(client))
def log_initial_attempt(username, ip_address, user_agent, path_info):
client = get_client_str(username, ip_address, user_agent, path_info)
msg = 'AXES: New login failure by {0}. Creating access record.'
log.info(msg.format(client))
def log_repeated_attempt(username, ip_address, user_agent, path_info,
fail_count):
client = get_client_str(username, ip_address, user_agent, path_info)
fail_msg = 'AXES: Repeated login failure by {0}. Updating access record.'
count_msg = 'Count = {0} of {1}'.format(fail_count, FAILURE_LIMIT)
log.info('{0} {1}'.format(fail_msg.format(client), count_msg))
def log_lockout(username, ip_address, user_agent, path_info):
client = get_client_str(username, ip_address, user_agent, path_info)
msg = 'AXES: locked out {0} after repeated login attempts.'
log.warn(msg.format(client))
def log_decorated_call(func, args=None, kwargs=None):
log.info('AXES: Calling decorated function: %s' % func.__name__)
if args:
log.info('args: %s' % str(args))
if kwargs:
log.info('kwargs: %s' % kwargs)
def is_ipv6(ip):
try:
inet_pton(AF_INET6, ip)
@ -241,6 +303,22 @@ def get_user_attempts(request):
return attempts
def is_login_failed(response):
return (
response and
not response.has_header('location') and
response.status_code != 302
)
def is_ajax_login_failed(response):
return (
response and
response.status_code != 302 and
response.status_code != 200
)
def watch_login(func):
"""
Used to decorate the django.contrib.admin.site.login method.
@ -253,11 +331,7 @@ def watch_login(func):
def decorated_login(request, *args, **kwargs):
# share some useful information
if func.__name__ != 'decorated_login' and VERBOSE:
log.info('AXES: Calling decorated function: %s' % func.__name__)
if args:
log.info('args: %s' % str(args))
if kwargs:
log.info('kwargs: %s' % kwargs)
log_decorated_call(func, args, kwargs)
# TODO: create a class to hold the attempts records and perform checks
# with its methods? or just store attempts=get_user_attempts here and
@ -286,25 +360,31 @@ def watch_login(func):
if request.method == 'POST':
# see if the login was successful
login_unsuccessful = (
response and
not response.has_header('location') and
response.status_code != 302
)
if request.is_ajax():
login_unsuccessful = is_ajax_login_failed(response)
else:
login_unsuccessful = is_login_failed(response)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
if not DISABLE_ACCESS_LOG:
username = request.POST.get(USERNAME_FORM_FIELD, None)
ip_address = get_ip(request)
if login_unsuccessful or not DISABLE_SUCCESS_ACCESS_LOG:
AccessLog.objects.create(
user_agent=user_agent,
ip_address=get_ip(request),
username=request.POST.get(USERNAME_FORM_FIELD, None),
ip_address=ip_address,
username=username,
http_accept=http_accept,
path_info=path_info,
trusted=not login_unsuccessful,
)
if not login_unsuccessful and not DISABLE_SUCCESS_ACCESS_LOG:
log_successful_attempt(username, ip_address,
user_agent, path_info)
if check_request(request, login_unsuccessful):
return response
@ -381,6 +461,8 @@ def is_already_locked(request):
def check_request(request, login_unsuccessful):
ip_address = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
failures = 0
attempts = get_user_attempts(request)
cache_hash_key = get_cache_key(request)
@ -412,15 +494,14 @@ def check_request(request, login_unsuccessful):
)
attempt.http_accept = \
request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
attempt.path_info = \
request.META.get('PATH_INFO', '<unknown>')[:255]
attempt.path_info = path_info,path_info
attempt.failures_since_start = failures
attempt.attempt_time = datetime.now()
attempt.save()
log.info(
'AXES: Repeated login failure by %s. Updating access '
'record. Count = %s' % (attempt.ip_address, failures)
)
log_repeated_attempt(username, ip_address,
user_agent, path_info, failures)
else:
create_new_failure_records(request, failures)
else:
@ -455,9 +536,10 @@ def check_request(request, login_unsuccessful):
# password
if hasattr(request, 'user') and request.user.is_authenticated():
logout(request)
log.warn(
'AXES: locked out %s after repeated login attempts.' % ip_address
)
username = request.POST.get(USERNAME_FORM_FIELD, None)
log_lockout(username, ip_address, user_agent, path_info)
# send signal when someone is locked out.
user_locked_out.send(
'axes', request=request, username=username, ip_address=ip_address
@ -477,6 +559,7 @@ def create_new_failure_records(request, failures):
ip = get_ip(request)
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
username = request.POST.get(USERNAME_FORM_FIELD, None)
path_info = request.META.get('PATH_INFO', '<unknown>'),
# Record failed attempt. Whether or not the IP address or user agent is
# used in counting failures is handled elsewhere, so we just record
@ -488,10 +571,12 @@ def create_new_failure_records(request, failures):
get_data=query2str(request.GET),
post_data=query2str(request.POST),
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
path_info=path_info,
failures_since_start=failures,
)
log.info('AXES: New login failure by %s. Creating access record.' % (ip,))
username = request.POST.get(USERNAME_FORM_FIELD, None)
log_initial_attempt(username, ip, ua, path_info)
def create_new_trusted_record(request):

View file

@ -15,7 +15,7 @@ from django.core.urlresolvers import reverse
from django.utils import six
from django.test.client import RequestFactory
from axes.decorators import get_ip, get_cache_key
from axes.decorators import get_ip, get_cache_key, get_client_str
from axes.settings import FAILURE_LIMIT
from axes.models import AccessAttempt, AccessLog
from axes.signals import user_locked_out
@ -215,7 +215,6 @@ class AccessAttemptTest(TestCase):
""" Test the cache key format"""
# Getting cache key from request
ip = '127.0.0.1'.encode('utf-8')
ua = '<unknown>'.encode('utf-8')
cache_hash_key_checker = 'axes-{}'.format(md5((ip)).hexdigest())
@ -329,7 +328,6 @@ class AccessAttemptTest(TestCase):
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302)
@patch('axes.decorators.cache.set', return_value=None)
@patch('axes.decorators.cache.get', return_value=None)
def test_log_data_truncated(self, cache_set_mock, cache_get_mock):
@ -941,6 +939,112 @@ class UtilsTest(TestCase):
self.assertFalse(is_ipv6('67.255.125.204'))
self.assertFalse(is_ipv6('foo'))
@patch('axes.decorators.VERBOSE', True)
def test_verbose_ip_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@patch('axes.decorators.VERBOSE', False)
def test_non_verbose_ip_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = ip
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@patch('axes.decorators.AXES_ONLY_USER_FAILURES', True)
@patch('axes.decorators.VERBOSE', True)
def test_verbose_user_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@patch('axes.decorators.AXES_ONLY_USER_FAILURES', True)
@patch('axes.decorators.VERBOSE', False)
def test_non_verbose_user_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = username
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@patch('axes.decorators.LOCK_OUT_BY_COMBINATION_USER_AND_IP', True)
@patch('axes.decorators.VERBOSE', True)
def test_verbose_user_ip_combo_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@patch('axes.decorators.LOCK_OUT_BY_COMBINATION_USER_AND_IP', True)
@patch('axes.decorators.VERBOSE', False)
def test_non_verbose_user_ip_combo_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = '{0} from {1}'.format(username, ip)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@patch('axes.decorators.USE_USER_AGENT', True)
@patch('axes.decorators.VERBOSE', True)
def test_verbose_user_agent_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@patch('axes.decorators.USE_USER_AGENT', True)
@patch('axes.decorators.VERBOSE', False)
def test_non_verbose_user_agent_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = ip + '(user-agent={0})'.format(user_agent)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
class GetIPProxyTest(TestCase):
"""Test get_ip returns correct addresses with proxy
@ -1005,6 +1109,7 @@ class GetIPProxyCustomHeaderTest(TestCase):
self.request.META[settings.AXES_REVERSE_PROXY_HEADER] = header
self.assertEqual(self.ip, get_ip(self.request))
class GetIPNumProxiesTest(TestCase):
"""Test that get_ip returns the correct last IP when NUM_PROXIES is configured
"""