|
|
|
|
@ -1,9 +1,11 @@
|
|
|
|
|
import random
|
|
|
|
|
import string
|
|
|
|
|
import time
|
|
|
|
|
from distutils.version import StrictVersion
|
|
|
|
|
|
|
|
|
|
from mock import patch
|
|
|
|
|
|
|
|
|
|
from django import get_version
|
|
|
|
|
from django.contrib.auth.models import User
|
|
|
|
|
from django.contrib.auth.models import AnonymousUser
|
|
|
|
|
from django.contrib.sessions.backends.db import SessionStore
|
|
|
|
|
@ -18,7 +20,6 @@ from .connection import parse_redis_url, get_redis_connection
|
|
|
|
|
from .models import AccessAttempt
|
|
|
|
|
from .test import DefenderTestCase, DefenderTransactionTestCase
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Django >= 1.7 compatibility
|
|
|
|
|
try:
|
|
|
|
|
LOGIN_FORM_KEY = '<form action="/admin/login/" method="post"'
|
|
|
|
|
@ -28,6 +29,7 @@ except NoReverseMatch:
|
|
|
|
|
ADMIN_LOGIN_URL = reverse('admin:index')
|
|
|
|
|
LOGIN_FORM_KEY = 'this_is_the_login_form'
|
|
|
|
|
|
|
|
|
|
DJANGO_VERSION = StrictVersion(get_version())
|
|
|
|
|
|
|
|
|
|
VALID_USERNAME = VALID_PASSWORD = 'valid'
|
|
|
|
|
|
|
|
|
|
@ -108,7 +110,7 @@ class AccessAttemptTest(DefenderTestCase):
|
|
|
|
|
def test_login_get(self):
|
|
|
|
|
""" visit the login page """
|
|
|
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
|
|
|
self.assertEquals(response.status_code, 200)
|
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
|
|
|
|
|
|
def test_failure_limit_by_ip_once(self):
|
|
|
|
|
""" Tests the login lock by ip when trying to login
|
|
|
|
|
@ -247,7 +249,7 @@ class AccessAttemptTest(DefenderTestCase):
|
|
|
|
|
request.session = SessionStore()
|
|
|
|
|
|
|
|
|
|
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24'
|
|
|
|
|
self.assertEquals(utils.get_ip(request), '192.168.24.24')
|
|
|
|
|
self.assertEqual(utils.get_ip(request), '192.168.24.24')
|
|
|
|
|
|
|
|
|
|
request_factory = RequestFactory()
|
|
|
|
|
request = request_factory.get(ADMIN_LOGIN_URL)
|
|
|
|
|
@ -255,7 +257,7 @@ class AccessAttemptTest(DefenderTestCase):
|
|
|
|
|
request.session = SessionStore()
|
|
|
|
|
|
|
|
|
|
request.META['REMOTE_ADDR'] = '24.24.24.24'
|
|
|
|
|
self.assertEquals(utils.get_ip(request), '24.24.24.24')
|
|
|
|
|
self.assertEqual(utils.get_ip(request), '24.24.24.24')
|
|
|
|
|
|
|
|
|
|
def test_get_ip(self):
|
|
|
|
|
""" Tests if can handle a long user agent
|
|
|
|
|
@ -265,7 +267,7 @@ class AccessAttemptTest(DefenderTestCase):
|
|
|
|
|
request.user = AnonymousUser()
|
|
|
|
|
request.session = SessionStore()
|
|
|
|
|
|
|
|
|
|
self.assertEquals(utils.get_ip(request), '127.0.0.1')
|
|
|
|
|
self.assertEqual(utils.get_ip(request), '127.0.0.1')
|
|
|
|
|
|
|
|
|
|
def test_long_user_agent_not_valid(self):
|
|
|
|
|
""" Tests if can handle a long user agent with failure
|
|
|
|
|
@ -301,13 +303,13 @@ class AccessAttemptTest(DefenderTestCase):
|
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
|
|
|
# But we should get one now, check redirect make sure it is valid.
|
|
|
|
|
response = self._login()
|
|
|
|
|
self.assertEquals(response.status_code, 302)
|
|
|
|
|
self.assertEquals(response['Location'], 'http://localhost/othe/login/')
|
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
|
self.assertEqual(response['Location'], 'http://localhost/othe/login/')
|
|
|
|
|
|
|
|
|
|
# doing a get should also get locked out message
|
|
|
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
|
|
|
self.assertEquals(response.status_code, 302)
|
|
|
|
|
self.assertEquals(response['Location'], 'http://localhost/othe/login/')
|
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
|
self.assertEqual(response['Location'], 'http://localhost/othe/login/')
|
|
|
|
|
|
|
|
|
|
@patch('defender.config.LOCKOUT_URL', '/o/login/')
|
|
|
|
|
def test_failed_login_redirect_to_URL_local(self):
|
|
|
|
|
@ -319,16 +321,23 @@ class AccessAttemptTest(DefenderTestCase):
|
|
|
|
|
# Check if we are in the same login page
|
|
|
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
|
|
|
|
|
|
# RFC 7231 allows relative URIs in Location header.
|
|
|
|
|
# Django from version 1.9 is support this:
|
|
|
|
|
# https://docs.djangoproject.com/en/1.9/releases/1.9/#http-redirects-no-longer-forced-to-absolute-uris
|
|
|
|
|
lockout_url = 'http://testserver/o/login/'
|
|
|
|
|
if DJANGO_VERSION >= StrictVersion('1.9'):
|
|
|
|
|
lockout_url = '/o/login/'
|
|
|
|
|
|
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
|
|
|
# But we should get one now, check redirect make sure it is valid.
|
|
|
|
|
response = self._login()
|
|
|
|
|
self.assertEquals(response.status_code, 302)
|
|
|
|
|
self.assertEquals(response['Location'], 'http://testserver/o/login/')
|
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
|
self.assertEqual(response['Location'], lockout_url)
|
|
|
|
|
|
|
|
|
|
# doing a get should also get locked out message
|
|
|
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
|
|
|
self.assertEquals(response.status_code, 302)
|
|
|
|
|
self.assertEquals(response['Location'], 'http://testserver/o/login/')
|
|
|
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
|
self.assertEqual(response['Location'], lockout_url)
|
|
|
|
|
|
|
|
|
|
@patch('defender.config.LOCKOUT_TEMPLATE', 'defender/lockout.html')
|
|
|
|
|
def test_failed_login_redirect_to_template(self):
|
|
|
|
|
@ -343,12 +352,12 @@ class AccessAttemptTest(DefenderTestCase):
|
|
|
|
|
# So, we shouldn't have gotten a lock-out yet.
|
|
|
|
|
# But we should get one now, check template make sure it is valid.
|
|
|
|
|
response = self._login()
|
|
|
|
|
self.assertEquals(response.status_code, 200)
|
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
|
self.assertTemplateUsed(response, 'defender/lockout.html')
|
|
|
|
|
|
|
|
|
|
# doing a get should also get locked out message
|
|
|
|
|
response = self.client.get(ADMIN_LOGIN_URL)
|
|
|
|
|
self.assertEquals(response.status_code, 200)
|
|
|
|
|
self.assertEqual(response.status_code, 200)
|
|
|
|
|
self.assertTemplateUsed(response, 'defender/lockout.html')
|
|
|
|
|
|
|
|
|
|
@patch('defender.config.COOLOFF_TIME', 0)
|
|
|
|
|
@ -372,81 +381,81 @@ class AccessAttemptTest(DefenderTestCase):
|
|
|
|
|
|
|
|
|
|
response = self._login()
|
|
|
|
|
self.assertContains(response, LOGIN_FORM_KEY)
|
|
|
|
|
self.assertEquals(AccessAttempt.objects.count(), 1)
|
|
|
|
|
self.assertEqual(AccessAttempt.objects.count(), 1)
|
|
|
|
|
self.assertIsNotNone(str(AccessAttempt.objects.all()[0]))
|
|
|
|
|
|
|
|
|
|
def test_is_valid_ip(self):
|
|
|
|
|
""" Test the is_valid_ip() method """
|
|
|
|
|
self.assertEquals(utils.is_valid_ip('192.168.0.1'), True)
|
|
|
|
|
self.assertEquals(utils.is_valid_ip('130.80.100.24'), True)
|
|
|
|
|
self.assertEquals(utils.is_valid_ip('8.8.8.8'), True)
|
|
|
|
|
self.assertEquals(utils.is_valid_ip('127.0.0.1'), True)
|
|
|
|
|
self.assertEquals(utils.is_valid_ip('fish'), False)
|
|
|
|
|
self.assertEquals(utils.is_valid_ip(None), False)
|
|
|
|
|
self.assertEquals(utils.is_valid_ip(''), False)
|
|
|
|
|
self.assertEquals(utils.is_valid_ip('0x41.0x41.0x41.0x41'), False)
|
|
|
|
|
self.assertEquals(utils.is_valid_ip('192.168.100.34.y'), False)
|
|
|
|
|
self.assertEquals(
|
|
|
|
|
self.assertEqual(utils.is_valid_ip('192.168.0.1'), True)
|
|
|
|
|
self.assertEqual(utils.is_valid_ip('130.80.100.24'), True)
|
|
|
|
|
self.assertEqual(utils.is_valid_ip('8.8.8.8'), True)
|
|
|
|
|
self.assertEqual(utils.is_valid_ip('127.0.0.1'), True)
|
|
|
|
|
self.assertEqual(utils.is_valid_ip('fish'), False)
|
|
|
|
|
self.assertEqual(utils.is_valid_ip(None), False)
|
|
|
|
|
self.assertEqual(utils.is_valid_ip(''), False)
|
|
|
|
|
self.assertEqual(utils.is_valid_ip('0x41.0x41.0x41.0x41'), False)
|
|
|
|
|
self.assertEqual(utils.is_valid_ip('192.168.100.34.y'), False)
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
utils.is_valid_ip('2001:0db8:85a3:0000:0000:8a2e:0370:7334'), True)
|
|
|
|
|
self.assertEquals(
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
utils.is_valid_ip('2001:db8:85a3:0:0:8a2e:370:7334'), True)
|
|
|
|
|
self.assertEquals(
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
utils.is_valid_ip('2001:db8:85a3::8a2e:370:7334'), True)
|
|
|
|
|
self.assertEquals(
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
utils.is_valid_ip('::ffff:192.0.2.128'), True)
|
|
|
|
|
self.assertEquals(
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
utils.is_valid_ip('::ffff:8.8.8.8'), True)
|
|
|
|
|
|
|
|
|
|
def test_parse_redis_url(self):
|
|
|
|
|
""" test the parse_redis_url method """
|
|
|
|
|
# full regular
|
|
|
|
|
conf = parse_redis_url("redis://user:password@localhost2:1234/2")
|
|
|
|
|
self.assertEquals(conf.get('HOST'), 'localhost2')
|
|
|
|
|
self.assertEquals(conf.get('DB'), 2)
|
|
|
|
|
self.assertEquals(conf.get('PASSWORD'), 'password')
|
|
|
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
|
self.assertEqual(conf.get('HOST'), 'localhost2')
|
|
|
|
|
self.assertEqual(conf.get('DB'), 2)
|
|
|
|
|
self.assertEqual(conf.get('PASSWORD'), 'password')
|
|
|
|
|
self.assertEqual(conf.get('PORT'), 1234)
|
|
|
|
|
|
|
|
|
|
# full non local
|
|
|
|
|
conf = parse_redis_url("redis://user:pass@www.localhost.com:1234/2")
|
|
|
|
|
self.assertEquals(conf.get('HOST'), 'www.localhost.com')
|
|
|
|
|
self.assertEquals(conf.get('DB'), 2)
|
|
|
|
|
self.assertEquals(conf.get('PASSWORD'), 'pass')
|
|
|
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
|
self.assertEqual(conf.get('HOST'), 'www.localhost.com')
|
|
|
|
|
self.assertEqual(conf.get('DB'), 2)
|
|
|
|
|
self.assertEqual(conf.get('PASSWORD'), 'pass')
|
|
|
|
|
self.assertEqual(conf.get('PORT'), 1234)
|
|
|
|
|
|
|
|
|
|
# no user name
|
|
|
|
|
conf = parse_redis_url("redis://password@localhost2:1234/2")
|
|
|
|
|
self.assertEquals(conf.get('HOST'), 'localhost2')
|
|
|
|
|
self.assertEquals(conf.get('DB'), 2)
|
|
|
|
|
self.assertEquals(conf.get('PASSWORD'), None)
|
|
|
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
|
self.assertEqual(conf.get('HOST'), 'localhost2')
|
|
|
|
|
self.assertEqual(conf.get('DB'), 2)
|
|
|
|
|
self.assertEqual(conf.get('PASSWORD'), None)
|
|
|
|
|
self.assertEqual(conf.get('PORT'), 1234)
|
|
|
|
|
|
|
|
|
|
# no user name 2 with colon
|
|
|
|
|
conf = parse_redis_url("redis://:password@localhost2:1234/2")
|
|
|
|
|
self.assertEquals(conf.get('HOST'), 'localhost2')
|
|
|
|
|
self.assertEquals(conf.get('DB'), 2)
|
|
|
|
|
self.assertEquals(conf.get('PASSWORD'), 'password')
|
|
|
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
|
self.assertEqual(conf.get('HOST'), 'localhost2')
|
|
|
|
|
self.assertEqual(conf.get('DB'), 2)
|
|
|
|
|
self.assertEqual(conf.get('PASSWORD'), 'password')
|
|
|
|
|
self.assertEqual(conf.get('PORT'), 1234)
|
|
|
|
|
|
|
|
|
|
# Empty
|
|
|
|
|
conf = parse_redis_url(None)
|
|
|
|
|
self.assertEquals(conf.get('HOST'), 'localhost')
|
|
|
|
|
self.assertEquals(conf.get('DB'), 0)
|
|
|
|
|
self.assertEquals(conf.get('PASSWORD'), None)
|
|
|
|
|
self.assertEquals(conf.get('PORT'), 6379)
|
|
|
|
|
self.assertEqual(conf.get('HOST'), 'localhost')
|
|
|
|
|
self.assertEqual(conf.get('DB'), 0)
|
|
|
|
|
self.assertEqual(conf.get('PASSWORD'), None)
|
|
|
|
|
self.assertEqual(conf.get('PORT'), 6379)
|
|
|
|
|
|
|
|
|
|
# no db
|
|
|
|
|
conf = parse_redis_url("redis://:password@localhost2:1234")
|
|
|
|
|
self.assertEquals(conf.get('HOST'), 'localhost2')
|
|
|
|
|
self.assertEquals(conf.get('DB'), 0)
|
|
|
|
|
self.assertEquals(conf.get('PASSWORD'), 'password')
|
|
|
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
|
self.assertEqual(conf.get('HOST'), 'localhost2')
|
|
|
|
|
self.assertEqual(conf.get('DB'), 0)
|
|
|
|
|
self.assertEqual(conf.get('PASSWORD'), 'password')
|
|
|
|
|
self.assertEqual(conf.get('PORT'), 1234)
|
|
|
|
|
|
|
|
|
|
# no password
|
|
|
|
|
conf = parse_redis_url("redis://localhost2:1234/0")
|
|
|
|
|
self.assertEquals(conf.get('HOST'), 'localhost2')
|
|
|
|
|
self.assertEquals(conf.get('DB'), 0)
|
|
|
|
|
self.assertEquals(conf.get('PASSWORD'), None)
|
|
|
|
|
self.assertEquals(conf.get('PORT'), 1234)
|
|
|
|
|
self.assertEqual(conf.get('HOST'), 'localhost2')
|
|
|
|
|
self.assertEqual(conf.get('DB'), 0)
|
|
|
|
|
self.assertEqual(conf.get('PASSWORD'), None)
|
|
|
|
|
self.assertEqual(conf.get('PORT'), 1234)
|
|
|
|
|
|
|
|
|
|
def test_get_ip_address_from_request(self):
|
|
|
|
|
req = HttpRequest()
|
|
|
|
|
@ -558,7 +567,7 @@ class AccessAttemptTest(DefenderTestCase):
|
|
|
|
|
def test_use_celery(self):
|
|
|
|
|
""" Check that use celery works"""
|
|
|
|
|
|
|
|
|
|
self.assertEquals(AccessAttempt.objects.count(), 0)
|
|
|
|
|
self.assertEqual(AccessAttempt.objects.count(), 0)
|
|
|
|
|
|
|
|
|
|
for i in range(0, int(config.FAILURE_LIMIT)):
|
|
|
|
|
response = self._login()
|
|
|
|
|
@ -570,8 +579,8 @@ class AccessAttemptTest(DefenderTestCase):
|
|
|
|
|
response = self._login()
|
|
|
|
|
self.assertContains(response, self.LOCKED_MESSAGE)
|
|
|
|
|
|
|
|
|
|
self.assertEquals(AccessAttempt.objects.count(),
|
|
|
|
|
config.FAILURE_LIMIT+1)
|
|
|
|
|
self.assertEqual(AccessAttempt.objects.count(),
|
|
|
|
|
config.FAILURE_LIMIT + 1)
|
|
|
|
|
self.assertIsNotNone(str(AccessAttempt.objects.all()[0]))
|
|
|
|
|
|
|
|
|
|
@patch('defender.config.LOCKOUT_BY_IP_USERNAME', True)
|
|
|
|
|
@ -614,7 +623,7 @@ class AccessAttemptTest(DefenderTestCase):
|
|
|
|
|
# we shouldn't be blocked.
|
|
|
|
|
# same IP different, usernames
|
|
|
|
|
ip = '74.125.239.60'
|
|
|
|
|
for i in range(0, config.FAILURE_LIMIT+10):
|
|
|
|
|
for i in range(0, config.FAILURE_LIMIT + 10):
|
|
|
|
|
login_username = u"{0}{1}".format(username, i)
|
|
|
|
|
response = self._login(username=login_username, remote_addr=ip)
|
|
|
|
|
# Check if we are in the same login page
|
|
|
|
|
|