mirror of
https://github.com/jazzband/django-defender.git
synced 2026-05-02 04:34:47 +00:00
The `is_already_locked` method assumes how the username is stored in the request. This patch adds helpers that don't to allow for more flexible implementation.
706 lines
27 KiB
Python
706 lines
27 KiB
Python
import random
|
|
import string
|
|
import time
|
|
|
|
from mock import patch
|
|
|
|
from django.contrib.auth.models import User
|
|
from django.contrib.auth.models import AnonymousUser
|
|
from django.contrib.sessions.backends.db import SessionStore
|
|
from django.core.urlresolvers import NoReverseMatch
|
|
from django.core.urlresolvers import reverse
|
|
from django.http import HttpRequest
|
|
from django.test.client import RequestFactory
|
|
|
|
from . import utils
|
|
from . import config
|
|
from .connection import parse_redis_url, get_redis_connection
|
|
from .models import AccessAttempt
|
|
from .test import DefenderTestCase, DefenderTransactionTestCase
|
|
|
|
|
|
# Django >= 1.7 compatibility
|
|
try:
|
|
LOGIN_FORM_KEY = '<form action="/admin/login/" method="post"'
|
|
' id="login-form">'
|
|
ADMIN_LOGIN_URL = reverse('admin:login')
|
|
except NoReverseMatch:
|
|
ADMIN_LOGIN_URL = reverse('admin:index')
|
|
LOGIN_FORM_KEY = 'this_is_the_login_form'
|
|
|
|
|
|
VALID_USERNAME = VALID_PASSWORD = 'valid'
|
|
|
|
|
|
class AccessAttemptTest(DefenderTestCase):
|
|
""" Test case using custom settings for testing
|
|
"""
|
|
LOCKED_MESSAGE = 'Account locked: too many login attempts.'
|
|
PERMANENT_LOCKED_MESSAGE = (
|
|
LOCKED_MESSAGE + ' Contact an admin to unlock your account.'
|
|
)
|
|
|
|
def _get_random_str(self):
|
|
""" Returns a random str """
|
|
chars = string.ascii_uppercase + string.digits
|
|
|
|
return ''.join(random.choice(chars) for x in range(20))
|
|
|
|
def _login(self, username=None, password=None, user_agent='test-browser',
|
|
remote_addr='127.0.0.1'):
|
|
""" Login a user. If the username or password is not provided
|
|
it will use a random string instead. Use the VALID_USERNAME and
|
|
VALID_PASSWORD to make a valid login.
|
|
"""
|
|
if username is None:
|
|
username = self._get_random_str()
|
|
|
|
if password is None:
|
|
password = self._get_random_str()
|
|
|
|
response = self.client.post(ADMIN_LOGIN_URL, {
|
|
'username': username,
|
|
'password': password,
|
|
LOGIN_FORM_KEY: 1,
|
|
}, HTTP_USER_AGENT=user_agent, REMOTE_ADDR=remote_addr)
|
|
|
|
return response
|
|
|
|
def setUp(self):
|
|
""" Create a valid user for login
|
|
"""
|
|
self.user = User.objects.create_superuser(
|
|
username=VALID_USERNAME,
|
|
email='test@example.com',
|
|
password=VALID_PASSWORD,
|
|
)
|
|
|
|
def test_data_integrity_of_get_blocked_ips(self):
|
|
""" Test whether data retrieved from redis via
|
|
get_blocked_ips() is the same as the data saved
|
|
"""
|
|
data_in = ['127.0.0.1', '4.2.2.1']
|
|
for ip in data_in:
|
|
utils.block_ip(ip)
|
|
data_out = utils.get_blocked_ips()
|
|
self.assertEqual(sorted(data_in), sorted(data_out))
|
|
|
|
# send in None, should have same values.
|
|
utils.block_ip(None)
|
|
data_out = utils.get_blocked_ips()
|
|
self.assertEqual(sorted(data_in), sorted(data_out))
|
|
|
|
def test_data_integrity_of_get_blocked_usernames(self):
|
|
""" Test whether data retrieved from redis via
|
|
get_blocked_usernames() is the same as the data saved
|
|
"""
|
|
data_in = ['foo', 'bar']
|
|
for username in data_in:
|
|
utils.block_username(username)
|
|
data_out = utils.get_blocked_usernames()
|
|
self.assertEqual(sorted(data_in), sorted(data_out))
|
|
|
|
# send in None, should have same values.
|
|
utils.block_username(None)
|
|
data_out = utils.get_blocked_usernames()
|
|
self.assertEqual(sorted(data_in), sorted(data_out))
|
|
|
|
def test_login_get(self):
|
|
""" visit the login page """
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertEquals(response.status_code, 200)
|
|
|
|
def test_failure_limit_by_ip_once(self):
|
|
""" Tests the login lock by ip when trying to login
|
|
one more time than failure limit
|
|
"""
|
|
for i in range(0, config.FAILURE_LIMIT):
|
|
response = self._login()
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
# But we should get one now
|
|
response = self._login()
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
# doing a get should also get locked out message
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
def test_failure_limit_by_ip_many(self):
|
|
""" Tests the login lock by ip when trying to
|
|
login a lot of times more than failure limit
|
|
"""
|
|
for i in range(0, config.FAILURE_LIMIT):
|
|
response = self._login()
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
# But we should get one now
|
|
for i in range(0, random.randrange(1, 10)):
|
|
# try to log in a bunch of times
|
|
response = self._login()
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
# doing a get should also get locked out message
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
def test_failure_limit_by_username_once(self):
|
|
""" Tests the login lock by username when trying to login
|
|
one more time than failure limit
|
|
"""
|
|
for i in range(0, config.FAILURE_LIMIT):
|
|
ip = '74.125.239.{0}.'.format(i)
|
|
response = self._login(username=VALID_USERNAME, remote_addr=ip)
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
# But we should get one now
|
|
response = self._login()
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
# doing a get should also get locked out message
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
def test_valid_login(self):
|
|
""" Tests a valid login for a real username
|
|
"""
|
|
response = self._login(username=VALID_USERNAME,
|
|
password=VALID_PASSWORD)
|
|
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)
|
|
|
|
def test_reset_after_valid_login(self):
|
|
""" Tests the counter gets reset after a valid login
|
|
"""
|
|
for i in range(0, config.FAILURE_LIMIT):
|
|
self._login(username=VALID_USERNAME)
|
|
|
|
# now login with a valid username and password
|
|
self._login(username=VALID_USERNAME, password=VALID_PASSWORD)
|
|
|
|
# and we should be able to try again without hitting the failure limit
|
|
response = self._login(username=VALID_USERNAME)
|
|
self.assertNotContains(response, self.LOCKED_MESSAGE)
|
|
|
|
def test_blocked_ip_cannot_login(self):
|
|
""" Test an user with blocked ip cannot login with another username
|
|
"""
|
|
for i in range(0, config.FAILURE_LIMIT + 1):
|
|
response = self._login(username=VALID_USERNAME)
|
|
|
|
# try to login with a different user
|
|
response = self._login(username='myuser')
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
def test_blocked_username_cannot_login(self):
|
|
""" Test an user with blocked username cannot login using
|
|
another ip
|
|
"""
|
|
for i in range(0, config.FAILURE_LIMIT + 1):
|
|
ip = '74.125.239.{0}.'.format(i)
|
|
response = self._login(username=VALID_USERNAME, remote_addr=ip)
|
|
|
|
# try to login with a different ip
|
|
response = self._login(username=VALID_USERNAME, remote_addr='8.8.8.8')
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
def test_cooling_off(self):
|
|
""" Tests if the cooling time allows a user to login
|
|
"""
|
|
self.test_failure_limit_by_ip_once()
|
|
# Wait for the cooling off period
|
|
time.sleep(config.COOLOFF_TIME)
|
|
|
|
if config.MOCK_REDIS:
|
|
# mock redis require that we expire on our own
|
|
get_redis_connection().do_expire() # pragma: no cover
|
|
# It should be possible to login again, make sure it is.
|
|
self.test_valid_login()
|
|
|
|
def test_cooling_off_for_trusted_user(self):
|
|
""" Test the cooling time for a trusted user
|
|
"""
|
|
# Try the cooling off time
|
|
self.test_cooling_off()
|
|
|
|
def test_long_user_agent_valid(self):
|
|
""" Tests if can handle a long user agent
|
|
"""
|
|
long_user_agent = 'ie6' * 1024
|
|
response = self._login(username=VALID_USERNAME, password=VALID_PASSWORD,
|
|
user_agent=long_user_agent)
|
|
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)
|
|
|
|
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
|
|
@patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_FORWARDED_FOR')
|
|
def test_get_ip_reverse_proxy(self):
|
|
""" Tests if can handle a long user agent
|
|
"""
|
|
request_factory = RequestFactory()
|
|
request = request_factory.get(ADMIN_LOGIN_URL)
|
|
request.user = AnonymousUser()
|
|
request.session = SessionStore()
|
|
|
|
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24'
|
|
self.assertEquals(utils.get_ip(request), '192.168.24.24')
|
|
|
|
request_factory = RequestFactory()
|
|
request = request_factory.get(ADMIN_LOGIN_URL)
|
|
request.user = AnonymousUser()
|
|
request.session = SessionStore()
|
|
|
|
request.META['REMOTE_ADDR'] = '24.24.24.24'
|
|
self.assertEquals(utils.get_ip(request), '24.24.24.24')
|
|
|
|
def test_get_ip(self):
|
|
""" Tests if can handle a long user agent
|
|
"""
|
|
request_factory = RequestFactory()
|
|
request = request_factory.get(ADMIN_LOGIN_URL)
|
|
request.user = AnonymousUser()
|
|
request.session = SessionStore()
|
|
|
|
self.assertEquals(utils.get_ip(request), '127.0.0.1')
|
|
|
|
def test_long_user_agent_not_valid(self):
|
|
""" Tests if can handle a long user agent with failure
|
|
"""
|
|
long_user_agent = 'ie6' * 1024
|
|
for i in range(0, config.FAILURE_LIMIT + 1):
|
|
response = self._login(user_agent=long_user_agent)
|
|
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
def test_reset_ip(self):
|
|
""" Tests if can reset an ip address
|
|
"""
|
|
# Make a lockout
|
|
self.test_failure_limit_by_ip_once()
|
|
|
|
# Reset the ip so we can try again
|
|
utils.reset_failed_attempts(ip_address='127.0.0.1')
|
|
|
|
# Make a login attempt again
|
|
self.test_valid_login()
|
|
|
|
@patch('defender.config.LOCKOUT_URL', 'http://localhost/othe/login/')
|
|
def test_failed_login_redirect_to_URL(self):
|
|
""" Test to make sure that after lockout we send to the correct
|
|
redirect URL """
|
|
|
|
for i in range(0, config.FAILURE_LIMIT):
|
|
response = self._login()
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
# But we should get one now, check redirect make sure it is valid.
|
|
response = self._login()
|
|
self.assertEquals(response.status_code, 302)
|
|
self.assertEquals(response['Location'], 'http://localhost/othe/login/')
|
|
|
|
# doing a get should also get locked out message
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertEquals(response.status_code, 302)
|
|
self.assertEquals(response['Location'], 'http://localhost/othe/login/')
|
|
|
|
@patch('defender.config.LOCKOUT_URL', '/o/login/')
|
|
def test_failed_login_redirect_to_URL_local(self):
|
|
""" Test to make sure that after lockout we send to the correct
|
|
redirect URL """
|
|
|
|
for i in range(0, config.FAILURE_LIMIT):
|
|
response = self._login()
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
# But we should get one now, check redirect make sure it is valid.
|
|
response = self._login()
|
|
self.assertEquals(response.status_code, 302)
|
|
self.assertEquals(response['Location'], 'http://testserver/o/login/')
|
|
|
|
# doing a get should also get locked out message
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertEquals(response.status_code, 302)
|
|
self.assertEquals(response['Location'], 'http://testserver/o/login/')
|
|
|
|
@patch('defender.config.LOCKOUT_TEMPLATE', 'defender/lockout.html')
|
|
def test_failed_login_redirect_to_template(self):
|
|
""" Test to make sure that after lockout we send to the correct
|
|
template """
|
|
|
|
for i in range(0, config.FAILURE_LIMIT):
|
|
response = self._login()
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
# But we should get one now, check template make sure it is valid.
|
|
response = self._login()
|
|
self.assertEquals(response.status_code, 200)
|
|
self.assertTemplateUsed(response, 'defender/lockout.html')
|
|
|
|
# doing a get should also get locked out message
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertEquals(response.status_code, 200)
|
|
self.assertTemplateUsed(response, 'defender/lockout.html')
|
|
|
|
@patch('defender.config.COOLOFF_TIME', 0)
|
|
def test_failed_login_no_cooloff(self):
|
|
for i in range(0, config.FAILURE_LIMIT):
|
|
response = self._login()
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
# But we should get one now, check redirect make sure it is valid.
|
|
response = self._login()
|
|
self.assertContains(response, self.PERMANENT_LOCKED_MESSAGE)
|
|
|
|
# doing a get should also get locked out message
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertContains(response, self.PERMANENT_LOCKED_MESSAGE)
|
|
|
|
def test_login_attempt_model(self):
|
|
""" test the login model"""
|
|
|
|
response = self._login()
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
self.assertEquals(AccessAttempt.objects.count(), 1)
|
|
self.assertIsNotNone(str(AccessAttempt.objects.all()[0]))
|
|
|
|
def test_is_valid_ip(self):
|
|
""" Test the is_valid_ip() method """
|
|
self.assertEquals(utils.is_valid_ip('192.168.0.1'), True)
|
|
self.assertEquals(utils.is_valid_ip('130.80.100.24'), True)
|
|
self.assertEquals(utils.is_valid_ip('8.8.8.8'), True)
|
|
self.assertEquals(utils.is_valid_ip('127.0.0.1'), True)
|
|
self.assertEquals(utils.is_valid_ip('fish'), False)
|
|
self.assertEquals(utils.is_valid_ip(None), False)
|
|
self.assertEquals(utils.is_valid_ip(''), False)
|
|
self.assertEquals(utils.is_valid_ip('0x41.0x41.0x41.0x41'), False)
|
|
self.assertEquals(utils.is_valid_ip('192.168.100.34.y'), False)
|
|
self.assertEquals(
|
|
utils.is_valid_ip('2001:0db8:85a3:0000:0000:8a2e:0370:7334'), True)
|
|
self.assertEquals(
|
|
utils.is_valid_ip('2001:db8:85a3:0:0:8a2e:370:7334'), True)
|
|
self.assertEquals(
|
|
utils.is_valid_ip('2001:db8:85a3::8a2e:370:7334'), True)
|
|
self.assertEquals(
|
|
utils.is_valid_ip('::ffff:192.0.2.128'), True)
|
|
self.assertEquals(
|
|
utils.is_valid_ip('::ffff:8.8.8.8'), True)
|
|
|
|
def test_parse_redis_url(self):
|
|
""" test the parse_redis_url method """
|
|
# full regular
|
|
conf = parse_redis_url("redis://user:password@localhost2:1234/2")
|
|
self.assertEquals(conf.get('HOST'), 'localhost2')
|
|
self.assertEquals(conf.get('DB'), 2)
|
|
self.assertEquals(conf.get('PASSWORD'), 'password')
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
# full non local
|
|
conf = parse_redis_url("redis://user:pass@www.localhost.com:1234/2")
|
|
self.assertEquals(conf.get('HOST'), 'www.localhost.com')
|
|
self.assertEquals(conf.get('DB'), 2)
|
|
self.assertEquals(conf.get('PASSWORD'), 'pass')
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
# no user name
|
|
conf = parse_redis_url("redis://password@localhost2:1234/2")
|
|
self.assertEquals(conf.get('HOST'), 'localhost2')
|
|
self.assertEquals(conf.get('DB'), 2)
|
|
self.assertEquals(conf.get('PASSWORD'), None)
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
# no user name 2 with colon
|
|
conf = parse_redis_url("redis://:password@localhost2:1234/2")
|
|
self.assertEquals(conf.get('HOST'), 'localhost2')
|
|
self.assertEquals(conf.get('DB'), 2)
|
|
self.assertEquals(conf.get('PASSWORD'), 'password')
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
# Empty
|
|
conf = parse_redis_url(None)
|
|
self.assertEquals(conf.get('HOST'), 'localhost')
|
|
self.assertEquals(conf.get('DB'), 0)
|
|
self.assertEquals(conf.get('PASSWORD'), None)
|
|
self.assertEquals(conf.get('PORT'), 6379)
|
|
|
|
# no db
|
|
conf = parse_redis_url("redis://:password@localhost2:1234")
|
|
self.assertEquals(conf.get('HOST'), 'localhost2')
|
|
self.assertEquals(conf.get('DB'), 0)
|
|
self.assertEquals(conf.get('PASSWORD'), 'password')
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
# no password
|
|
conf = parse_redis_url("redis://localhost2:1234/0")
|
|
self.assertEquals(conf.get('HOST'), 'localhost2')
|
|
self.assertEquals(conf.get('DB'), 0)
|
|
self.assertEquals(conf.get('PASSWORD'), None)
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
def test_get_ip_address_from_request(self):
|
|
req = HttpRequest()
|
|
req.META['REMOTE_ADDR'] = '1.2.3.4'
|
|
ip = utils.get_ip_address_from_request(req)
|
|
self.assertEqual(ip, '1.2.3.4')
|
|
|
|
req = HttpRequest()
|
|
req.META['REMOTE_ADDR'] = '1.2.3.4 '
|
|
ip = utils.get_ip_address_from_request(req)
|
|
self.assertEqual(ip, '1.2.3.4')
|
|
|
|
req = HttpRequest()
|
|
req.META['REMOTE_ADDR'] = '192.168.100.34.y'
|
|
ip = utils.get_ip_address_from_request(req)
|
|
self.assertEqual(ip, '127.0.0.1')
|
|
|
|
req = HttpRequest()
|
|
req.META['REMOTE_ADDR'] = 'cat'
|
|
ip = utils.get_ip_address_from_request(req)
|
|
self.assertEqual(ip, '127.0.0.1')
|
|
|
|
req = HttpRequest()
|
|
ip = utils.get_ip_address_from_request(req)
|
|
self.assertEqual(ip, '127.0.0.1')
|
|
|
|
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
|
|
@patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_PROXIED')
|
|
def test_get_ip_reverse_proxy_custom_header(self):
|
|
req = HttpRequest()
|
|
req.META['HTTP_X_PROXIED'] = '1.2.3.4'
|
|
self.assertEqual(utils.get_ip(req), '1.2.3.4')
|
|
|
|
req = HttpRequest()
|
|
req.META['HTTP_X_PROXIED'] = '1.2.3.4, 5.6.7.8, 127.0.0.1'
|
|
self.assertEqual(utils.get_ip(req), '1.2.3.4')
|
|
|
|
req = HttpRequest()
|
|
req.META['REMOTE_ADDR'] = '1.2.3.4'
|
|
self.assertEqual(utils.get_ip(req), '1.2.3.4')
|
|
|
|
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
|
|
@patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_REAL_IP')
|
|
def test_get_user_attempts(self):
|
|
ip_attempts = random.randint(3, 12)
|
|
username_attempts = random.randint(3, 12)
|
|
for i in range(0, ip_attempts):
|
|
utils.increment_key(utils.get_ip_attempt_cache_key('1.2.3.4'))
|
|
for i in range(0, username_attempts):
|
|
utils.increment_key(utils.get_username_attempt_cache_key('foobar'))
|
|
req = HttpRequest()
|
|
req.POST['username'] = 'foobar'
|
|
req.META['HTTP_X_REAL_IP'] = '1.2.3.4'
|
|
self.assertEqual(
|
|
utils.get_user_attempts(req), max(ip_attempts, username_attempts)
|
|
)
|
|
|
|
req = HttpRequest()
|
|
req.POST['username'] = 'foobar'
|
|
req.META['HTTP_X_REAL_IP'] = '5.6.7.8'
|
|
self.assertEqual(
|
|
utils.get_user_attempts(req), username_attempts
|
|
)
|
|
|
|
req = HttpRequest()
|
|
req.POST['username'] = 'barfoo'
|
|
req.META['HTTP_X_REAL_IP'] = '1.2.3.4'
|
|
self.assertEqual(
|
|
utils.get_user_attempts(req), ip_attempts
|
|
)
|
|
|
|
def test_admin(self):
|
|
""" test the admin pages for this app """
|
|
from .admin import AccessAttemptAdmin
|
|
AccessAttemptAdmin
|
|
|
|
def test_decorator_middleware(self):
|
|
# because watch_login is called twice in this test (once by the
|
|
# middleware and once by the decorator) we have half as many attempts
|
|
# before getting locked out.
|
|
# this is getting called twice, once for each decorator, not sure how
|
|
# to dynamically remove one of the middlewares during a test so we
|
|
# divide the failure limit by 2.
|
|
|
|
for i in range(0, int(config.FAILURE_LIMIT)):
|
|
response = self._login()
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
# But we should get one now
|
|
response = self._login()
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
# doing a get should also get locked out message
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
def test_get_view(self):
|
|
""" Check that the decorator doesn't tamper with GET requests"""
|
|
for i in range(0, config.FAILURE_LIMIT):
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertNotContains(response, self.LOCKED_MESSAGE)
|
|
|
|
@patch('defender.config.USE_CELERY', True)
|
|
def test_use_celery(self):
|
|
""" Check that use celery works"""
|
|
|
|
self.assertEquals(AccessAttempt.objects.count(), 0)
|
|
|
|
for i in range(0, int(config.FAILURE_LIMIT)):
|
|
response = self._login()
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
# But we should get one now
|
|
response = self._login()
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
self.assertEquals(AccessAttempt.objects.count(),
|
|
config.FAILURE_LIMIT+1)
|
|
self.assertIsNotNone(str(AccessAttempt.objects.all()[0]))
|
|
|
|
@patch('defender.config.LOCKOUT_BY_IP_USERNAME', True)
|
|
def test_lockout_by_ip_and_username(self):
|
|
"""Check that lockout still works when locking out by IP and Username combined"""
|
|
|
|
username = 'testy'
|
|
|
|
for i in range(0, config.FAILURE_LIMIT):
|
|
response = self._login(username=username)
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
# But we should get one now
|
|
response = self._login(username=username)
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
# We shouldn't get a lockout message when attempting to use no username
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# We shouldn't get a lockout message when attempting to use a different username
|
|
response = self._login()
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# We shouldn't get a lockout message when attempting to use a different ip address
|
|
ip = '74.125.239.60'
|
|
response = self._login(username=VALID_USERNAME, remote_addr=ip)
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
@patch('defender.config.DISABLE_IP_LOCKOUT', True)
|
|
def test_disable_ip_lockout(self):
|
|
"""Check that lockout still works when we disable IP Lock out"""
|
|
|
|
username = 'testy'
|
|
|
|
# try logging in with the same IP, but different username
|
|
# we shouldn't be blocked.
|
|
# same IP different, usernames
|
|
ip = '74.125.239.60'
|
|
for i in range(0, config.FAILURE_LIMIT+10):
|
|
login_username = u"{0}{1}".format(username, i)
|
|
response = self._login(username=login_username, remote_addr=ip)
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
# same username with same IP
|
|
for i in range(0, config.FAILURE_LIMIT):
|
|
response = self._login(username=username)
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# But we should get one now
|
|
# same username and Ip, over the limit for username.
|
|
response = self._login(username=username)
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
# We shouldn't get a lockout message when attempting to use no username
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# We shouldn't get a lockout message when attempting to use a different username
|
|
response = self._login()
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# We shouldn't get a lockout message when attempting to use a different ip address
|
|
second_ip = '74.125.239.99'
|
|
response = self._login(username=VALID_USERNAME, remote_addr=second_ip)
|
|
# Check if we are in the same login page
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
# we should have no ip's blocked
|
|
data_out = utils.get_blocked_ips()
|
|
self.assertEqual(data_out, [])
|
|
|
|
# even if we try to manually block one it still won't be in there.
|
|
utils.block_ip(second_ip)
|
|
|
|
# we should still have no ip's blocked
|
|
data_out = utils.get_blocked_ips()
|
|
self.assertEqual(data_out, [])
|
|
|
|
|
|
class DefenderTestCaseTest(DefenderTestCase):
|
|
"""Make sure that we're cleaning the cache between tests"""
|
|
key = 'test_key'
|
|
|
|
def test_first_incr(self):
|
|
utils.REDIS_SERVER.incr(self.key)
|
|
result = int(utils.REDIS_SERVER.get(self.key))
|
|
self.assertEqual(result, 1)
|
|
|
|
def test_second_incr(self):
|
|
utils.REDIS_SERVER.incr(self.key)
|
|
result = int(utils.REDIS_SERVER.get(self.key))
|
|
self.assertEqual(result, 1)
|
|
|
|
|
|
class DefenderTransactionTestCaseTest(DefenderTransactionTestCase):
|
|
"""Make sure that we're cleaning the cache between tests"""
|
|
key = 'test_key'
|
|
|
|
def test_first_incr(self):
|
|
utils.REDIS_SERVER.incr(self.key)
|
|
result = int(utils.REDIS_SERVER.get(self.key))
|
|
self.assertEqual(result, 1)
|
|
|
|
def test_second_incr(self):
|
|
utils.REDIS_SERVER.incr(self.key)
|
|
result = int(utils.REDIS_SERVER.get(self.key))
|
|
self.assertEqual(result, 1)
|
|
|
|
|
|
class TestUtils(DefenderTestCase):
|
|
def test_username_blocking(self):
|
|
username = 'foo'
|
|
self.assertFalse(utils.is_user_already_locked(username))
|
|
utils.block_username(username)
|
|
self.assertTrue(utils.is_user_already_locked(username))
|
|
utils.unblock_username(username)
|
|
self.assertFalse(utils.is_user_already_locked(username))
|
|
|
|
def test_ip_address_blocking(self):
|
|
ip = '1.2.3.4'
|
|
self.assertFalse(utils.is_source_ip_already_locked(ip))
|
|
utils.block_ip(ip)
|
|
self.assertTrue(utils.is_source_ip_already_locked(ip))
|
|
utils.unblock_ip(ip)
|
|
self.assertFalse(utils.is_source_ip_already_locked(ip))
|