🔥 Cleaning

This commit is contained in:
Camilo Nova 2016-06-24 09:42:18 -05:00
parent 036b47706a
commit 08f40bc13b
7 changed files with 79 additions and 73 deletions

View file

@ -58,6 +58,8 @@ LOGGER = getattr(settings, 'AXES_LOGGER', 'axes.watch_login')
LOCKOUT_TEMPLATE = getattr(settings, 'AXES_LOCKOUT_TEMPLATE', None)
LOCKOUT_URL = getattr(settings, 'AXES_LOCKOUT_URL', None)
VERBOSE = getattr(settings, 'AXES_VERBOSE', True)
# whitelist and blacklist
@ -157,10 +159,6 @@ def get_ip(request):
return ip
def get_lockout_url():
return getattr(settings, 'AXES_LOCKOUT_URL', None)
def query2str(items, max_length=1024):
"""Turns a dictionary into an easy-to-read list of key-value pairs.
@ -198,15 +196,16 @@ def is_user_lockable(request):
field: request.POST.get(USERNAME_FORM_FIELD)
}
user = get_user_model().objects.get(**kwargs)
if hasattr(user, 'nolockout'):
# need to invert since we need to return
# false for users that can't be blocked
return not user.nolockout
except get_user_model().DoesNotExist:
# not a valid user
return True
if hasattr(user, 'nolockout'):
# need to invert since we need to return
# false for users that can't be blocked
return not user.nolockout
# Default behavior for a user to be lockable
return True
@ -216,7 +215,6 @@ def _get_user_attempts(request):
Otherwise return None.
"""
ip = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
if USE_USER_AGENT:
@ -240,6 +238,7 @@ def _get_user_attempts(request):
return attempts
def get_user_attempts(request):
objects_deleted = False
attempts = _get_user_attempts(request)
@ -307,14 +306,13 @@ def watch_login(func):
if request.method == 'POST':
# see if the login was successful
login_unsuccessful = (
response and
not response.has_header('location') and
response.status_code != 302
)
access_log = AccessLog.objects.create(
AccessLog.objects.create(
user_agent=request.META.get('HTTP_USER_AGENT', '<unknown>')[:255],
ip_address=get_ip(request),
username=request.POST.get(USERNAME_FORM_FIELD, None),
@ -339,26 +337,32 @@ def lockout_response(request):
}
if request.is_ajax():
context.update({'cooloff_time': iso8601(COOLOFF_TIME)})
return HttpResponse(json.dumps(context),
content_type='application/json',
status=403)
if COOLOFF_TIME:
context.update({'cooloff_time': iso8601(COOLOFF_TIME)})
return HttpResponse(
json.dumps(context),
content_type='application/json',
status=403,
)
elif LOCKOUT_TEMPLATE:
if COOLOFF_TIME:
context.update({'cooloff_time': iso8601(COOLOFF_TIME)})
if LOCKOUT_TEMPLATE:
context.update({'cooloff_time': COOLOFF_TIME})
return render(request, LOCKOUT_TEMPLATE, context, status=403)
LOCKOUT_URL = get_lockout_url()
if LOCKOUT_URL:
elif LOCKOUT_URL:
return HttpResponseRedirect(LOCKOUT_URL)
if COOLOFF_TIME:
return HttpResponse("Account locked: too many login attempts. "
"Please try again later.", status=403)
else:
return HttpResponse("Account locked: too many login attempts. "
"Contact an admin to unlock your account.",
status=403)
msg = 'Account locked: too many login attempts. {0}'
if COOLOFF_TIME:
msg = msg.format('Please try again later.')
else:
msg = msg.format('Contact an admin to unlock your account.')
return HttpResponse(msg, status=403)
def is_already_locked(request):
@ -367,21 +371,16 @@ def is_already_locked(request):
if NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip):
return False
if ONLY_WHITELIST:
if not ip_in_whitelist(ip):
return True
if ONLY_WHITELIST and not ip_in_whitelist(ip):
return True
if ip_in_blacklist(ip):
return True
user_lockable = is_user_lockable(request)
if not user_lockable:
if not is_user_lockable(request):
return False
attempts = get_user_attempts(request)
for attempt in attempts:
for attempt in get_user_attempts(request):
if attempt.failures_since_start >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE:
return True
@ -449,10 +448,13 @@ def check_request(request, login_unsuccessful):
# password
if hasattr(request, 'user') and request.user.is_authenticated():
logout(request)
log.warn('AXES: locked out %s after repeated login attempts.' %
(ip_address,))
log.warn(
'AXES: locked out %s after repeated login attempts.' % (ip_address,)
)
# send signal when someone is locked out.
user_locked_out.send("axes", request=request, username=username, ip_address=ip_address)
user_locked_out.send(
'axes', request=request, username=username, ip_address=ip_address
)
# if a trusted login has violated lockout, revoke trust
for attempt in [a for a in attempts if a.trusted]:
@ -469,18 +471,16 @@ def create_new_failure_records(request, failures):
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
username = request.POST.get(USERNAME_FORM_FIELD, None)
params = {
'user_agent': ua,
'ip_address': ip,
'username': username,
'get_data': query2str(request.GET),
'post_data': query2str(request.POST),
'http_accept': request.META.get('HTTP_ACCEPT', '<unknown>'),
'path_info': request.META.get('PATH_INFO', '<unknown>'),
'failures_since_start': failures,
}
AccessAttempt.objects.create(**params)
AccessAttempt.objects.create(
user_agent=ua,
ip_address=ip,
username=username,
get_data=query2str(request.GET),
post_data=query2str(request.POST),
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
failures_since_start=failures,
)
log.info('AXES: New login failure by %s. Creating access record.' % (ip,))

View file

@ -1,6 +1,7 @@
from django.db import models
from django.utils import six
class CommonAccess(models.Model):
user_agent = models.CharField(
max_length=255,

View file

@ -2,7 +2,6 @@ from django.dispatch import receiver
from django.dispatch import Signal
from django.utils.timezone import now
from django.contrib.auth.signals import user_logged_out
from django.core.exceptions import ObjectDoesNotExist
from axes.models import AccessLog
@ -17,18 +16,12 @@ def log_user_lockout(sender, request, user, signal, *args, **kwargs):
if not user:
return
try:
username = user.get_username()
except AttributeError:
# Django < 1.5
username = user.username
access_logs = AccessLog.objects.filter(
username=username,
username=user.get_username(),
logout_time__isnull=True,
).order_by('-attempt_time')
if access_logs:
access_log = access_logs[0]
if access_logs.exists():
access_log = access_logs.first()
access_log.logout_time = now()
access_log.save()

View file

@ -1,6 +1,7 @@
from django.conf.urls import patterns, include
from django.contrib import admin
urlpatterns = patterns('',
urlpatterns = patterns(
'',
(r'^admin/', include(admin.site.urls)),
)
)

View file

@ -83,7 +83,8 @@ class AccessAttemptTest(TestCase):
"""Tests the login lock trying to login one more time
than failure limit
"""
for i in range(1, FAILURE_LIMIT): # test until one try before the limit
# test until one try before the limit
for i in range(1, FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY)
@ -148,7 +149,11 @@ class AccessAttemptTest(TestCase):
"""Tests if can handle a long user agent
"""
long_user_agent = 'ie6' * 1024
response = self._login(is_valid_username=True, is_valid_password=True, user_agent=long_user_agent)
response = self._login(
is_valid_username=True,
is_valid_password=True,
user_agent=long_user_agent,
)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302)
def test_long_user_agent_not_valid(self):
@ -213,8 +218,12 @@ class AccessAttemptTest(TestCase):
"""Tests the login lock with a valid username and invalid password
when AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP is True
"""
for i in range(1, FAILURE_LIMIT): # test until one try before the limit
response = self._login(is_valid_username=True, is_valid_password=False)
# test until one try before the limit
for i in range(1, FAILURE_LIMIT):
response = self._login(
is_valid_username=True,
is_valid_password=False,
)
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY)
@ -226,9 +235,12 @@ class AccessAttemptTest(TestCase):
def test_log_data_truncated(self):
"""Tests that query2str properly truncates data to the max_length (default 1024)
"""
extra_data = {string.ascii_letters * x: x for x in range(0, 1000)} # An impossibly large post dict
# An impossibly large post dict
extra_data = {string.ascii_letters * x: x for x in range(0, 1000)}
self._login(**extra_data)
self.assertEquals(len(AccessAttempt.objects.latest('id').post_data), 1024)
self.assertEquals(
len(AccessAttempt.objects.latest('id').post_data), 1024
)
def test_json_response(self):
"""Tests response content type and status code for the ajax request

View file

@ -20,10 +20,10 @@ def reset(ip=None, username=None):
return count
def iso8601(value):
def iso8601(timestamp):
"""Returns datetime.timedelta translated to ISO 8601 formatted duration.
"""
seconds = value.total_seconds()
seconds = timestamp.total_seconds()
minutes, seconds = divmod(seconds, 60)
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
@ -33,8 +33,8 @@ def iso8601(value):
time_values = hours, minutes, seconds
time_designators = 'H', 'M', 'S'
time = ''.join(
[('{:.0f}'.format(value) + designator)
time = ''.join([
('{:.0f}'.format(value) + designator)
for value, designator in zip(time_values, time_designators)
if value]
)

View file

@ -1 +0,0 @@
# Create your views here.