mirror of
https://github.com/jazzband/django-axes.git
synced 2026-03-16 22:30:23 +00:00
Merge pull request #21 from kencochrane/master
New Lockout signal, Log all access attempts and unlock-able user accounts.
This commit is contained in:
commit
ce00422a72
4 changed files with 108 additions and 15 deletions
|
|
@ -1,5 +1,5 @@
|
|||
from django.contrib import admin
|
||||
from axes.models import AccessAttempt
|
||||
from axes.models import AccessAttempt, AccessLog
|
||||
|
||||
|
||||
class AccessAttemptAdmin(admin.ModelAdmin):
|
||||
|
|
@ -21,3 +21,20 @@ class AccessAttemptAdmin(admin.ModelAdmin):
|
|||
)
|
||||
|
||||
admin.site.register(AccessAttempt, AccessAttemptAdmin)
|
||||
|
||||
class AccessLogAdmin(admin.ModelAdmin):
|
||||
list_display = ('attempt_time','logout_time', 'ip_address',
|
||||
'user_agent', 'path_info')
|
||||
list_filter = ['attempt_time', 'logout_time', 'ip_address', 'path_info']
|
||||
search_fields = ['ip_address', 'user_agent', 'path_info']
|
||||
date_hierarchy = 'attempt_time'
|
||||
fieldsets = (
|
||||
(None, {
|
||||
'fields': ('path_info',)
|
||||
}),
|
||||
('Meta Data', {
|
||||
'fields': ('user_agent', 'ip_address', 'http_accept')
|
||||
})
|
||||
)
|
||||
|
||||
admin.site.register(AccessLog, AccessLogAdmin)
|
||||
|
|
@ -23,7 +23,8 @@ except ImportError:
|
|||
# Fallback for Django < 1.4.
|
||||
from datetime import datetime
|
||||
|
||||
from axes.models import AccessAttempt
|
||||
from axes.models import AccessAttempt, AccessLog
|
||||
from axes.signals import user_locked_out
|
||||
import axes
|
||||
|
||||
# see if the user has overridden the failure limit
|
||||
|
|
@ -91,13 +92,34 @@ if VERBOSE:
|
|||
log.info('AXES: BEGIN LOG')
|
||||
log.info('Using django-axes ' + axes.get_version())
|
||||
|
||||
def is_user_lockable(request):
|
||||
""" Check if the user has a profile with nolockout
|
||||
If so, then return the value to see if this user is special
|
||||
and doesn't get their account locked out """
|
||||
username = request.POST.get('username', None)
|
||||
try:
|
||||
user = User.objects.get(username=username)
|
||||
except User.DoesNotExist:
|
||||
# not a valid user
|
||||
return True
|
||||
try:
|
||||
profile = user.get_profile()
|
||||
except:
|
||||
# no profile
|
||||
return True
|
||||
|
||||
if hasattr(profile, 'nolockout'):
|
||||
# need to revert since we need to return
|
||||
# false for users that can't be blocked
|
||||
return not profile.nolockout
|
||||
else:
|
||||
return True
|
||||
|
||||
def get_user_attempts(request):
|
||||
"""
|
||||
Returns access attempt record if it exists.
|
||||
Otherwise return None.
|
||||
"""
|
||||
|
||||
ip = request.META.get('REMOTE_ADDR', '')
|
||||
username = request.POST.get('username', None)
|
||||
|
||||
|
|
@ -177,6 +199,7 @@ def watch_login(func):
|
|||
not response.has_header('location') and
|
||||
response.status_code != 302
|
||||
)
|
||||
log_access_request(request, login_unsuccessful)
|
||||
if check_request(request, login_unsuccessful):
|
||||
return response
|
||||
|
||||
|
|
@ -218,14 +241,29 @@ def is_already_locked(request):
|
|||
return True
|
||||
|
||||
attempts = get_user_attempts(request)
|
||||
user_lockable = is_user_lockable(request)
|
||||
for attempt in attempts:
|
||||
if attempt.failures_since_start >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE:
|
||||
if attempt.failures_since_start >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE and user_lockable:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def log_access_request(request, login_unsuccessful):
|
||||
""" Log the access attempt """
|
||||
access_log = AccessLog()
|
||||
access_log.user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')
|
||||
access_log.ip_address = request.META.get('REMOTE_ADDR', '')
|
||||
access_log.username = request.POST.get('username', None)
|
||||
access_log.http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')
|
||||
access_log.path_info = request.META.get('PATH_INFO', '<unknown>')
|
||||
access_log.trusted = not login_unsuccessful
|
||||
access_log.save()
|
||||
|
||||
|
||||
def check_request(request, login_unsuccessful):
|
||||
ip_address = request.META.get('REMOTE_ADDR', '')
|
||||
username = request.POST.get('username', None)
|
||||
failures = 0
|
||||
attempts = get_user_attempts(request)
|
||||
|
||||
|
|
@ -273,14 +311,17 @@ def check_request(request, login_unsuccessful):
|
|||
if trusted_record_exists is False:
|
||||
create_new_trusted_record(request)
|
||||
|
||||
user_lockable = is_user_lockable(request)
|
||||
# no matter what, we want to lock them out if they're past the number of
|
||||
# attempts allowed
|
||||
if failures >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE:
|
||||
# attempts allowed, unless the user is set to notlockable
|
||||
if failures >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE and user_lockable:
|
||||
# We log them out in case they actually managed to enter the correct
|
||||
# password
|
||||
logout(request)
|
||||
log.warn('AXES: locked out %s after repeated login attempts.' %
|
||||
(attempt.ip_address,))
|
||||
(ip_address,))
|
||||
# send signal when someone is locked out.
|
||||
user_locked_out.send("axes", request=request, username=username, ip_address=ip_address)
|
||||
|
||||
# if a trusted login has violated lockout, revoke trust
|
||||
for attempt in [a for a in attempts if a.trusted]:
|
||||
|
|
|
|||
|
|
@ -1,13 +1,12 @@
|
|||
from django.db import models
|
||||
|
||||
import signals
|
||||
FAILURES_DESC = 'Failed Logins'
|
||||
|
||||
#XXX TODO
|
||||
# set unique by user_agent, ip
|
||||
# make user agent, ip indexed fields
|
||||
|
||||
|
||||
class AccessAttempt(models.Model):
|
||||
class CommonAccess(models.Model):
|
||||
user_agent = models.CharField(max_length=255)
|
||||
ip_address = models.IPAddressField('IP Address', null=True)
|
||||
username = models.CharField(max_length=255, null=True)
|
||||
|
|
@ -15,13 +14,19 @@ class AccessAttempt(models.Model):
|
|||
# Once a user logs in from an ip, that combination is trusted and not
|
||||
# locked out in case of a distributed attack
|
||||
trusted = models.BooleanField(default=False)
|
||||
get_data = models.TextField('GET Data')
|
||||
post_data = models.TextField('POST Data')
|
||||
http_accept = models.CharField('HTTP Accept', max_length=255)
|
||||
path_info = models.CharField('Path', max_length=255)
|
||||
failures_since_start = models.PositiveIntegerField(FAILURES_DESC)
|
||||
attempt_time = models.DateTimeField(auto_now_add=True)
|
||||
|
||||
class Meta:
|
||||
abstract = True
|
||||
ordering = ['-attempt_time']
|
||||
|
||||
class AccessAttempt(CommonAccess):
|
||||
get_data = models.TextField('GET Data')
|
||||
post_data = models.TextField('POST Data')
|
||||
failures_since_start = models.PositiveIntegerField(FAILURES_DESC)
|
||||
|
||||
def __unicode__(self):
|
||||
return u'Attempted Access: %s' % self.attempt_time
|
||||
|
||||
|
|
@ -29,5 +34,8 @@ class AccessAttempt(models.Model):
|
|||
def failures(self):
|
||||
return self.failures_since_start
|
||||
|
||||
class Meta:
|
||||
ordering = ['-attempt_time']
|
||||
class AccessLog(CommonAccess):
|
||||
logout_time = models.DateTimeField(null=True, blank=True)
|
||||
|
||||
def __unicode__(self):
|
||||
return u'Access Log for %s @ %s' % (self.user, self.attempt_time)
|
||||
27
axes/signals.py
Normal file
27
axes/signals.py
Normal file
|
|
@ -0,0 +1,27 @@
|
|||
from django.dispatch import Signal, receiver
|
||||
from django.contrib.auth.signals import user_logged_out
|
||||
from django.core.exceptions import ObjectDoesNotExist
|
||||
from axes.models import AccessLog
|
||||
|
||||
# django 1.4 has a new timezone aware now() use if available.
|
||||
try:
|
||||
from django.utils.timezone import now
|
||||
except ImportError:
|
||||
# fall back to none timezone aware now()
|
||||
from datetime import datetime
|
||||
now = datetime.now
|
||||
|
||||
user_locked_out = Signal(providing_args=['request', 'username', 'ip_address'])
|
||||
|
||||
@receiver(user_logged_out)
|
||||
def log_user_lockout(sender, request, user, signal, *args, **kwargs):
|
||||
""" When a user logs out, update the access log"""
|
||||
if not user:
|
||||
return
|
||||
|
||||
access_log = AccessLog.objects.filter(username=user.username,
|
||||
logout_time__isnull=True).order_by("-attempt_time")[0]
|
||||
|
||||
if access_log:
|
||||
access_log.logout_time = now()
|
||||
access_log.save()
|
||||
Loading…
Reference in a new issue