mirror of
https://github.com/jazzband/django-axes.git
synced 2026-03-16 22:30:23 +00:00
Incorporating changes offered by kostrom in pull request #6 a flippin year ago...
This commit is contained in:
parent
6cb6b37a64
commit
bd88ac1e7c
4 changed files with 212 additions and 78 deletions
|
|
@ -41,9 +41,22 @@ if isinstance(COOLOFF_TIME, int):
|
|||
LOGGER = getattr(settings, 'AXES_LOGGER', 'axes.watch_login')
|
||||
|
||||
LOCKOUT_TEMPLATE = getattr(settings, 'AXES_LOCKOUT_TEMPLATE', None)
|
||||
LOCKOUT_URL = getattr(settings, 'AXES_LOCKOUT_URL', None)
|
||||
VERBOSE = getattr(settings, 'AXES_VERBOSE', True)
|
||||
|
||||
# whitelist and blacklist
|
||||
# todo: convert the strings to IPv4 on startup to avoid type conversion during processing
|
||||
ONLY_WHITELIST = getattr(settings, 'AXES_ONLY_ALLOW_WHITELIST', False)
|
||||
IP_WHITELIST = getattr(settings, 'AXES_IP_WHITELIST', None)
|
||||
IP_BLACKLIST = getattr(settings, 'AXES_IP_BLACKLIST', None)
|
||||
|
||||
ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. "
|
||||
"Note that both fields are case-sensitive.")
|
||||
LOGIN_FORM_KEY = 'this_is_the_login_form'
|
||||
|
||||
|
||||
def get_lockout_url():
|
||||
return getattr(settings, 'AXES_LOCKOUT_URL', None)
|
||||
|
||||
|
||||
def query2str(items):
|
||||
"""Turns a dictionary into an easy-to-read list of key-value pairs.
|
||||
|
|
@ -58,39 +71,64 @@ def query2str(items):
|
|||
|
||||
return '\n'.join(kvs)
|
||||
|
||||
|
||||
def ip_in_whitelist(ip):
|
||||
if IP_WHITELIST is not None:
|
||||
return ip in IP_WHITELIST
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def ip_in_blacklist(ip):
|
||||
if IP_BLACKLIST is not None:
|
||||
return ip in IP_BLACKLIST
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
log = logging.getLogger(LOGGER)
|
||||
if VERBOSE:
|
||||
log.info('AXES: BEGIN LOG')
|
||||
log.info('Using django-axes ' + axes.get_version())
|
||||
|
||||
|
||||
def get_user_attempt(request):
|
||||
def get_user_attempts(request):
|
||||
"""
|
||||
Returns access attempt record if it exists.
|
||||
Otherwise return None.
|
||||
"""
|
||||
|
||||
ip = request.META.get('REMOTE_ADDR', '')
|
||||
username = request.POST.get('username', None)
|
||||
|
||||
if USE_USER_AGENT:
|
||||
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')
|
||||
attempts = AccessAttempt.objects.filter(
|
||||
user_agent=ua,
|
||||
ip_address=ip
|
||||
user_agent=ua, ip_address=ip, username=username, trusted=True
|
||||
)
|
||||
else:
|
||||
attempts = AccessAttempt.objects.filter(
|
||||
ip_address=ip
|
||||
ip_address=ip, username=username, trusted=True
|
||||
)
|
||||
|
||||
if not attempts:
|
||||
return None
|
||||
if len(attempts) == 0:
|
||||
params = {'ip_address': ip, 'trusted': False}
|
||||
if USE_USER_AGENT:
|
||||
params['user_agent'] = ua
|
||||
|
||||
attempt = attempts[0]
|
||||
current_time = datetime.now()
|
||||
if COOLOFF_TIME and attempt.attempt_time + COOLOFF_TIME < current_time:
|
||||
attempt.delete()
|
||||
return None
|
||||
attempts = AccessAttempt.objects.filter(**params)
|
||||
if username and not ip_in_whitelist(ip):
|
||||
del params['ip_address']
|
||||
params['username'] = username
|
||||
attempts |= AccessAttempt.objects.filter(**params)
|
||||
|
||||
return attempt
|
||||
if COOLOFF_TIME:
|
||||
for attempt in attempts:
|
||||
if attempt.attempt_time + COOLOFF_TIME < datetime.now() \
|
||||
and attempt.trusted is False:
|
||||
attempt.delete()
|
||||
|
||||
return attempts
|
||||
|
||||
|
||||
def watch_login(func):
|
||||
|
|
@ -107,6 +145,20 @@ def watch_login(func):
|
|||
if kwargs:
|
||||
log.info('kwargs: %s' % kwargs)
|
||||
|
||||
# TODO: create a class to hold the attempts records and perform checks
|
||||
# with its methods? or just store attempts=get_user_attempts here and
|
||||
# pass it to the functions
|
||||
# also no need to keep accessing these:
|
||||
# ip = request.META.get('REMOTE_ADDR', '')
|
||||
# ua = request.META.get('HTTP_USER_AGENT', '<unknown>')
|
||||
# username = request.POST.get('username', None)
|
||||
|
||||
# if the request is currently under lockout, do not proceed to the
|
||||
# login function, go directly to lockout url, do not pass go, do not
|
||||
# collect messages about this login attempt
|
||||
if is_already_locked(request):
|
||||
return lockout_response(request)
|
||||
|
||||
# call the login function
|
||||
response = func(request, *args, **kwargs)
|
||||
|
||||
|
|
@ -143,6 +195,7 @@ def lockout_response(request):
|
|||
return render_to_response(LOCKOUT_TEMPLATE, context,
|
||||
context_instance=RequestContext(request))
|
||||
|
||||
LOCKOUT_URL = get_lockout_url()
|
||||
if LOCKOUT_URL:
|
||||
return HttpResponseRedirect(LOCKOUT_URL)
|
||||
|
||||
|
|
@ -154,22 +207,30 @@ def lockout_response(request):
|
|||
"Contact an admin to unlock your account.")
|
||||
|
||||
|
||||
def is_already_locked(request):
|
||||
ip = request.META.get('REMOTE_ADDR', '')
|
||||
|
||||
if ONLY_WHITELIST:
|
||||
if not ip_in_whitelist(ip):
|
||||
return True
|
||||
|
||||
if ip_in_blacklist(ip):
|
||||
return True
|
||||
|
||||
attempts = get_user_attempts(request)
|
||||
for attempt in attempts:
|
||||
if attempt.failures_since_start >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def check_request(request, login_unsuccessful):
|
||||
failures = 0
|
||||
attempt = get_user_attempt(request)
|
||||
attempts = get_user_attempts(request)
|
||||
|
||||
if attempt:
|
||||
failures = attempt.failures_since_start
|
||||
|
||||
# no matter what, we want to lock them out
|
||||
# if they're past the number of attempts allowed
|
||||
if failures > FAILURE_LIMIT and LOCK_OUT_AT_FAILURE:
|
||||
# We log them out in case they actually managed to enter
|
||||
# the correct password.
|
||||
logout(request)
|
||||
log.warn('AXES: locked out %s after repeated login attempts.' %
|
||||
attempt.ip_address)
|
||||
return False
|
||||
for attempt in attempts:
|
||||
failures = max(failures, attempt.failures_since_start)
|
||||
|
||||
if login_unsuccessful:
|
||||
# add a failed attempt for this user
|
||||
|
|
@ -177,47 +238,108 @@ def check_request(request, login_unsuccessful):
|
|||
|
||||
# Create an AccessAttempt record if the login wasn't successful
|
||||
# has already attempted, update the info
|
||||
if attempt:
|
||||
attempt.get_data = '%s\n---------\n%s' % (
|
||||
attempt.get_data,
|
||||
query2str(request.GET.items()),
|
||||
)
|
||||
attempt.post_data = '%s\n---------\n%s' % (
|
||||
attempt.post_data,
|
||||
query2str(request.POST.items())
|
||||
)
|
||||
attempt.http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')
|
||||
attempt.path_info = request.META.get('PATH_INFO', '<unknown>')
|
||||
attempt.failures_since_start = failures
|
||||
attempt.attempt_time = datetime.now()
|
||||
attempt.save()
|
||||
log.info('AXES: Repeated login failure by %s. Updating access '
|
||||
'record. Count = %s' %
|
||||
(attempt.ip_address, failures))
|
||||
if len(attempts):
|
||||
for attempt in attempts:
|
||||
attempt.get_data = '%s\n---------\n%s' % (
|
||||
attempt.get_data,
|
||||
query2str(request.GET.items()),
|
||||
)
|
||||
attempt.post_data = '%s\n---------\n%s' % (
|
||||
attempt.post_data,
|
||||
query2str(request.POST.items())
|
||||
)
|
||||
attempt.http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')
|
||||
attempt.path_info = request.META.get('PATH_INFO', '<unknown>')
|
||||
attempt.failures_since_start = failures
|
||||
attempt.attempt_time = datetime.now()
|
||||
attempt.save()
|
||||
log.info('AXES: Repeated login failure by %s. Updating access '
|
||||
'record. Count = %s' %
|
||||
(attempt.ip_address, failures))
|
||||
else:
|
||||
ip = request.META.get('REMOTE_ADDR', '')
|
||||
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')
|
||||
attempt = AccessAttempt.objects.create(
|
||||
user_agent=ua,
|
||||
ip_address=ip,
|
||||
get_data=query2str(request.GET.items()),
|
||||
post_data=query2str(request.POST.items()),
|
||||
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
|
||||
path_info=request.META.get('PATH_INFO', '<unknown>'),
|
||||
failures_since_start=failures
|
||||
)
|
||||
log.info('AXES: New login failure by %s. Creating access record.' %
|
||||
ip)
|
||||
create_new_failure_records(request, failures)
|
||||
else:
|
||||
# user logged in -- forget the failed attempts
|
||||
if attempt:
|
||||
failures = 0
|
||||
trusted_record_exists = False
|
||||
for attempt in attempts:
|
||||
if not attempt.trusted:
|
||||
attempt.delete()
|
||||
else:
|
||||
trusted_record_exists = True
|
||||
attempt.failures_since_start = 0
|
||||
attempt.save()
|
||||
|
||||
if trusted_record_exists is False:
|
||||
create_new_trusted_record(request)
|
||||
|
||||
# no matter what, we want to lock them out if they're past the number of
|
||||
# attempts allowed
|
||||
if failures >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE:
|
||||
# We log them out in case they actually managed to enter the correct
|
||||
# password
|
||||
logout(request)
|
||||
log.warn('AXES: locked out %s after repeated login attempts.' %
|
||||
(attempt.ip_address,))
|
||||
|
||||
# if a trusted login has violated lockout, revoke trust
|
||||
for attempt in [a for a in attempts if a.trusted]:
|
||||
attempt.delete()
|
||||
create_new_failure_records(request, failures)
|
||||
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. "
|
||||
"Note that both fields are case-sensitive.")
|
||||
LOGIN_FORM_KEY = 'this_is_the_login_form'
|
||||
|
||||
def create_new_failure_records(request, failures):
|
||||
ip = request.META.get('REMOTE_ADDR', '')
|
||||
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')
|
||||
username = request.POST.get('username', None)
|
||||
|
||||
params = {
|
||||
'user_agent': ua,
|
||||
'ip_address': ip,
|
||||
'username': None,
|
||||
'get_data': query2str(request.GET.items()),
|
||||
'post_data': query2str(request.POST.items()),
|
||||
'http_accept': request.META.get('HTTP_ACCEPT', '<unknown>'),
|
||||
'path_info': request.META.get('PATH_INFO', '<unknown>'),
|
||||
'failures_since_start': failures,
|
||||
}
|
||||
|
||||
# record failed attempt from this IP
|
||||
AccessAttempt.objects.create(**params)
|
||||
|
||||
# record failed attempt on this username from untrusted IP
|
||||
params.update({
|
||||
'ip_address': None,
|
||||
'username': username,
|
||||
})
|
||||
AccessAttempt.objects.create(**params)
|
||||
|
||||
log.info('AXES: New login failure by %s. Creating access record.' % (ip,))
|
||||
|
||||
|
||||
def create_new_trusted_record(request):
|
||||
ip = request.META.get('REMOTE_ADDR', '')
|
||||
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')
|
||||
username = request.POST.get('username', None)
|
||||
|
||||
if not username:
|
||||
return False
|
||||
|
||||
AccessAttempt.objects.create(
|
||||
user_agent=ua,
|
||||
ip_address=ip,
|
||||
username=username,
|
||||
get_data=query2str(request.GET.items()),
|
||||
post_data=query2str(request.POST.items()),
|
||||
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
|
||||
path_info=request.META.get('PATH_INFO', '<unknown>'),
|
||||
failures_since_start=0,
|
||||
trusted=True
|
||||
)
|
||||
|
||||
|
||||
def _display_login_form(request, error_message=''):
|
||||
|
|
|
|||
|
|
@ -9,7 +9,12 @@ FAILURES_DESC = 'Failed Logins'
|
|||
|
||||
class AccessAttempt(models.Model):
|
||||
user_agent = models.CharField(max_length=255)
|
||||
ip_address = models.IPAddressField('IP Address')
|
||||
ip_address = models.IPAddressField('IP Address', null=True)
|
||||
username = models.CharField(max_length=255, null=True)
|
||||
|
||||
# Once a user logs in from an ip, that combination is trusted and not
|
||||
# locked out in case of a distributed attack
|
||||
trusted = models.BooleanField(default=False)
|
||||
get_data = models.TextField('GET Data')
|
||||
post_data = models.TextField('POST Data')
|
||||
http_accept = models.CharField('HTTP Accept', max_length=255)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ class AccessAttemptTest(TestCase):
|
|||
NOT_GONNA_BE_USERNAME = "whywouldyouohwhy"
|
||||
|
||||
def setUp(self):
|
||||
settings.AXES_LOCKOUT_URL = None
|
||||
for i in range(0, random.randrange(10, 50)):
|
||||
username = "person%s" % i
|
||||
email = "%s@example.org" % username
|
||||
|
|
@ -42,7 +43,7 @@ class AccessAttemptTest(TestCase):
|
|||
return response
|
||||
|
||||
def test_login_max(self, correct_username=False):
|
||||
for i in range(0, FAILURE_LIMIT):
|
||||
for i in range(0, FAILURE_LIMIT - 1):
|
||||
response = self._attempt_login(correct_username=correct_username)
|
||||
self.assertContains(response, "this_is_the_login_form")
|
||||
# So, we shouldn't have gotten a lock-out yet.
|
||||
|
|
@ -51,7 +52,7 @@ class AccessAttemptTest(TestCase):
|
|||
self.assertContains(response, "Account locked")
|
||||
|
||||
def test_login_max_with_more(self, correct_username=False):
|
||||
for i in range(0, FAILURE_LIMIT):
|
||||
for i in range(0, FAILURE_LIMIT - 1):
|
||||
response = self._attempt_login(correct_username=correct_username)
|
||||
self.assertContains(response, "this_is_the_login_form")
|
||||
# So, we shouldn't have gotten a lock-out yet.
|
||||
|
|
|
|||
|
|
@ -1,19 +1,25 @@
|
|||
from axes.models import AccessAttempt
|
||||
|
||||
|
||||
def reset(ip=None, silent=False):
|
||||
if not ip:
|
||||
attempts = AccessAttempt.objects.all()
|
||||
if attempts:
|
||||
attempts.delete()
|
||||
else:
|
||||
if not silent:
|
||||
print 'No attempts found.'
|
||||
def reset(ip=None, username=None, silent=False):
|
||||
# no need to reset trusted records. If they fail, they go to untrusted
|
||||
params = {
|
||||
'trusted': False,
|
||||
}
|
||||
|
||||
if ip:
|
||||
params['ip_address'] = ip
|
||||
|
||||
attempts = AccessAttempt.objects.filter(**params)
|
||||
if username:
|
||||
if 'ip_address' in params:
|
||||
del params['ip_address']
|
||||
|
||||
params['username'] = username
|
||||
attempts |= AccessAttempt.objects.filter(**params)
|
||||
|
||||
if attempts:
|
||||
attempts.delete()
|
||||
else:
|
||||
try:
|
||||
attempt = AccessAttempt.objects.get(ip_address=ip)
|
||||
except:
|
||||
if not silent:
|
||||
print 'No matching attempt found.'
|
||||
else:
|
||||
attempt.delete()
|
||||
if not silent:
|
||||
print 'No attempts found.'
|
||||
|
|
|
|||
Loading…
Reference in a new issue