Merge pull request #197 from jorlugaqui/cache-attemps

WP: Cache failures in cache
This commit is contained in:
Camilo Nova 2016-12-08 10:20:47 -05:00 committed by GitHub
commit ca55a6ef16
4 changed files with 162 additions and 17 deletions

View file

@ -1,6 +1,7 @@
import json
import logging
from socket import inet_pton, AF_INET6, error
from hashlib import md5
from django.contrib.auth import get_user_model
from django.contrib.auth import logout
@ -9,6 +10,7 @@ from django.http import HttpResponseRedirect
from django.shortcuts import render
from django.utils import six
from django.utils import timezone as datetime
from django.core.cache import cache
from axes.models import AccessAttempt
from axes.models import AccessLog
@ -167,6 +169,8 @@ def _get_user_attempts(request):
def get_user_attempts(request):
objects_deleted = False
attempts = _get_user_attempts(request)
cache_hash_key = get_cache_key(request)
cache_timeout = get_cache_timeout()
if COOLOFF_TIME:
for attempt in attempts:
@ -174,9 +178,15 @@ def get_user_attempts(request):
if attempt.trusted:
attempt.failures_since_start = 0
attempt.save()
cache.set(cache_hash_key, 0, cache_timeout)
else:
attempt.delete()
objects_deleted = True
failures_cached = cache.get(cache_hash_key)
if failures_cached is not None:
cache.set(cache_hash_key,
failures_cached - 1,
cache_timeout)
# If objects were deleted, we need to update the queryset to reflect this,
# so force a reload.
@ -310,9 +320,15 @@ def is_already_locked(request):
if not is_user_lockable(request):
return False
for attempt in get_user_attempts(request):
if attempt.failures_since_start >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE:
return True
cache_hash_key = get_cache_key(request)
failures_cached = cache.get(cache_hash_key)
if failures_cached is not None:
return failures_cached >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE
else:
for attempt in get_user_attempts(request):
if attempt.failures_since_start >= FAILURE_LIMIT and \
LOCK_OUT_AT_FAILURE:
return True
return False
@ -322,13 +338,20 @@ def check_request(request, login_unsuccessful):
username = request.POST.get(USERNAME_FORM_FIELD, None)
failures = 0
attempts = get_user_attempts(request)
cache_hash_key = get_cache_key(request)
cache_timeout = get_cache_timeout()
for attempt in attempts:
failures = max(failures, attempt.failures_since_start)
failures_cached = cache.get(cache_hash_key)
if failures_cached is not None:
failures = failures_cached
else:
for attempt in attempts:
failures = max(failures, attempt.failures_since_start)
if login_unsuccessful:
# add a failed attempt for this user
failures += 1
cache.set(cache_hash_key, failures, cache_timeout)
# Create an AccessAttempt record if the login wasn't successful
# has already attempted, update the info
@ -362,10 +385,16 @@ def check_request(request, login_unsuccessful):
for attempt in attempts:
if not attempt.trusted:
attempt.delete()
failures_cached = cache.get(cache_hash_key)
if failures_cached is not None:
cache.set(cache_hash_key,
failures_cached - 1,
cache_timeout)
else:
trusted_record_exists = True
attempt.failures_since_start = 0
attempt.save()
cache.set(cache_hash_key, 0, cache_timeout)
if trusted_record_exists is False:
create_new_trusted_record(request)
@ -438,3 +467,38 @@ def create_new_trusted_record(request):
failures_since_start=0,
trusted=True
)
def get_cache_key(request_or_object):
"""
Build cache key name from request or AccessAttempt object.
:param request_or_object: Request or AccessAttempt object
:return cache-key: String, key to be used in cache system
"""
ua = None
ip = None
if isinstance(request_or_object, AccessAttempt):
ip = request_or_object.ip_address
ua = request_or_object.user_agent
else:
ip = get_ip(request_or_object)
ua = request_or_object.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
ip = ip.encode('utf-8')
if ua:
ua = ua.encode('utf-8')
cache_hash_key = 'axes-{}'.format(md5(ip+ua).hexdigest())
else:
cache_hash_key = 'axes-{}'.format(md5(ip).hexdigest())
return cache_hash_key
def get_cache_timeout():
"Returns timeout according to COOLOFF_TIME."
cache_timeout = None
if COOLOFF_TIME:
cache_timeout = COOLOFF_TIME.total_seconds()
return cache_timeout

View file

@ -2,8 +2,10 @@ from django.dispatch import receiver
from django.dispatch import Signal
from django.utils.timezone import now
from django.contrib.auth.signals import user_logged_out
from django.db.models.signals import post_save, post_delete
from django.core.cache import cache
from axes.models import AccessLog
from axes.models import AccessLog, AccessAttempt
from axes.settings import DISABLE_ACCESS_LOG
@ -26,3 +28,19 @@ if not DISABLE_ACCESS_LOG:
access_log = access_logs[0]
access_log.logout_time = now()
access_log.save()
@receiver(post_save, sender=AccessAttempt)
def update_cache_after_save(instance, **kwargs):
from axes.decorators import get_cache_timeout, get_cache_key
cache_hash_key = get_cache_key(instance)
if not cache.get(cache_hash_key):
cache_timeout = get_cache_timeout()
cache.set(cache_hash_key, instance.failures_since_start, cache_timeout)
@receiver(post_delete, sender=AccessAttempt)
def delete_cache_after_delete(instance, **kwargs):
from axes.decorators import get_cache_key
cache_hash_key = get_cache_key(instance)
cache.delete(cache_hash_key)

View file

@ -4,6 +4,7 @@ import time
import json
import datetime
from hashlib import md5
from mock import patch
from django.conf import settings
@ -13,8 +14,9 @@ from django.contrib.auth.models import User
from django.core.urlresolvers import NoReverseMatch
from django.core.urlresolvers import reverse
from django.utils import six
from django.test.client import RequestFactory
from axes.decorators import get_ip
from axes.decorators import get_ip, get_cache_key
from axes.settings import COOLOFF_TIME
from axes.settings import FAILURE_LIMIT
from axes.models import AccessAttempt, AccessLog
@ -87,7 +89,9 @@ class AccessAttemptTest(TestCase):
password=self.VALID_PASSWORD,
)
def test_failure_limit_once(self):
@patch('axes.decorators.cache.set', return_value=None)
@patch('axes.decorators.cache.get', return_value=None)
def test_failure_limit_once(self, cache_get_mock, cache_set_mock):
"""Tests the login lock trying to login one more time
than failure limit
"""
@ -117,13 +121,17 @@ class AccessAttemptTest(TestCase):
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
def test_valid_login(self):
@patch('axes.decorators.cache.set', return_value=None)
@patch('axes.decorators.cache.get', return_value=None)
def test_valid_login(self, cache_set_mock, cache_get_mock):
"""Tests a valid login for a real username
"""
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302)
def test_valid_logout(self):
@patch('axes.decorators.cache.set', return_value=None)
@patch('axes.decorators.cache.get', return_value=None)
def test_valid_logout(self, cache_set_mock, cache_get_mock):
"""Tests a valid logout and make sure the logout_time is updated
"""
response = self._login(is_valid_username=True, is_valid_password=True)
@ -153,7 +161,9 @@ class AccessAttemptTest(TestCase):
# Try the cooling off time
self.test_cooling_off()
def test_long_user_agent_valid(self):
@patch('axes.decorators.cache.set', return_value=None)
@patch('axes.decorators.cache.get', return_value=None)
def test_long_user_agent_valid(self, cache_set_mock, cache_get_mock):
"""Tests if can handle a long user agent
"""
long_user_agent = 'ie6' * 1024
@ -197,6 +207,39 @@ class AccessAttemptTest(TestCase):
# Make a login attempt again
self.test_valid_login()
@patch('axes.decorators.get_ip', return_value='127.0.0.1')
def test_get_cache_key(self, get_ip_mock):
""" Test the cache key format"""
# Getting cache key from request
ip = '127.0.0.1'.encode('utf-8')
ua = '<unknown>'.encode('utf-8')
cache_hash_key_checker = 'axes-{}'.format(md5((ip+ua)).hexdigest())
request_factory = RequestFactory()
request = request_factory.post('/admin/login/',
data={
'username': self.VALID_USERNAME,
'password': 'test'
})
cache_hash_key = get_cache_key(request)
self.assertEqual(cache_hash_key_checker, cache_hash_key)
# Getting cache key from AccessAttempt Object
attempt = AccessAttempt(
user_agent='<unknown>',
ip_address='127.0.0.1',
username=self.VALID_USERNAME,
get_data='',
post_data='',
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
failures_since_start=0,
)
cache_hash_key = get_cache_key(attempt)
self.assertEqual(cache_hash_key_checker, cache_hash_key)
def test_send_lockout_signal(self):
"""Test if the lockout signal is emitted
"""
@ -222,7 +265,10 @@ class AccessAttemptTest(TestCase):
self.assertEquals(scope.signal_received, 2)
@patch('axes.decorators.LOCK_OUT_BY_COMBINATION_USER_AND_IP', True)
def test_lockout_by_combination_user_and_ip(self):
@patch('axes.decorators.cache.set', return_value=None)
@patch('axes.decorators.cache.get', return_value=None)
def test_lockout_by_combination_user_and_ip(self, cache_set_mock,
cache_get_mock):
"""Tests the login lock with a valid username and invalid password
when AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP is True
"""
@ -241,7 +287,9 @@ class AccessAttemptTest(TestCase):
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_only(self):
@patch('axes.decorators.cache.set', return_value=None)
@patch('axes.decorators.cache.get', return_value=None)
def test_lockout_by_user_only(self, cache_set_mock, cache_get_mock):
"""Tests the login lock with a valid username and invalid password
when AXES_ONLY_USER_FAILURES is True
"""
@ -278,7 +326,10 @@ class AccessAttemptTest(TestCase):
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302)
def test_log_data_truncated(self):
@patch('axes.decorators.cache.set', return_value=None)
@patch('axes.decorators.cache.get', return_value=None)
def test_log_data_truncated(self, cache_set_mock, cache_get_mock):
"""Tests that query2str properly truncates data to the
max_length (default 1024)
"""
@ -308,7 +359,10 @@ class AccessAttemptTest(TestCase):
self.assertContains(response, 'Logged out')
@patch('axes.decorators.DISABLE_SUCCESS_ACCESS_LOG', True)
def test_non_valid_login_without_success_log(self):
@patch('axes.decorators.cache.set', return_value=None)
@patch('axes.decorators.cache.get', return_value=None)
def test_non_valid_login_without_success_log(self, cache_set_mock,
cache_get_mock):
"""
A non-valid login does generate an AccessLog when
`DISABLE_SUCCESS_ACCESS_LOG=True`.
@ -344,7 +398,9 @@ class AccessAttemptTest(TestCase):
self.assertContains(response, 'Logged out')
@patch('axes.decorators.DISABLE_ACCESS_LOG', True)
def test_non_valid_login_without_log(self):
@patch('axes.decorators.cache.set', return_value=None)
@patch('axes.decorators.cache.get', return_value=None)
def test_non_valid_login_without_log(self, cache_set_mock, cache_get_mock):
"""
A non-valid login does generate an AccessLog when
`DISABLE_ACCESS_LOG=True`.

View file

@ -1,3 +1,5 @@
from django.core.cache import cache
from axes.models import AccessAttempt
@ -15,8 +17,13 @@ def reset(ip=None, username=None):
if attempts:
count = attempts.count()
attempts.delete()
from axes.decorators import get_cache_key
for attempt in attempts:
cache_hash_key = get_cache_key(attempt)
if cache.get(cache_hash_key):
cache.delete(cache_hash_key)
attempts.delete()
return count