django-defender/defender/tests.py
calmkart 71312eb841
FIX: support for special character in redis password(such like '@') (#155)
* FIX: if special character in redis password, we can set DEFENDER_REDIS_PASSWORD_QUOTE to True, and use quote password

* MOD:add test cases with password_quota = True
2020-03-13 08:13:54 -04:00

1087 lines
41 KiB
Python

import random
import string
import time
from distutils.version import StrictVersion
# Python 3 has mock in the stdlib
try:
from mock import patch
except ImportError:
from unittest.mock import patch
from django import get_version
from django.contrib.auth.models import User
from django.contrib.auth.models import AnonymousUser
from django.contrib.sessions.backends.db import SessionStore
from django.http import HttpRequest, HttpResponse
from django.test.client import RequestFactory
from redis.client import Redis
try:
from django.urls import reverse
except ImportError:
from django.core.urlresolvers import reverse
from . import utils
from . import config
from .signals import (
ip_block as ip_block_signal,
ip_unblock as ip_unblock_signal,
username_block as username_block_signal,
username_unblock as username_unblock_signal,
)
from .connection import parse_redis_url, get_redis_connection
from .decorators import watch_login
from .models import AccessAttempt
from .test import DefenderTestCase, DefenderTransactionTestCase
LOGIN_FORM_KEY = '<form action="/admin/login/" method="post" id="login-form">'
ADMIN_LOGIN_URL = reverse("admin:login")
DJANGO_VERSION = StrictVersion(get_version())
VALID_USERNAME = VALID_PASSWORD = "valid"
UPPER_USERNAME = "VALID"
class AccessAttemptTest(DefenderTestCase):
""" Test case using custom settings for testing
"""
LOCKED_MESSAGE = "Account locked: too many login attempts."
PERMANENT_LOCKED_MESSAGE = (
LOCKED_MESSAGE + " Contact an admin to unlock your account."
)
def _get_random_str(self):
""" Returns a random str """
chars = string.ascii_uppercase + string.digits
return "".join(random.choice(chars) for _ in range(20))
def _login(
self,
username=None,
password=None,
user_agent="test-browser",
remote_addr="127.0.0.1",
):
""" Login a user. If the username or password is not provided
it will use a random string instead. Use the VALID_USERNAME and
VALID_PASSWORD to make a valid login.
"""
if username is None:
username = self._get_random_str()
if password is None:
password = self._get_random_str()
response = self.client.post(
ADMIN_LOGIN_URL,
{"username": username, "password": password, LOGIN_FORM_KEY: 1,},
HTTP_USER_AGENT=user_agent,
REMOTE_ADDR=remote_addr,
)
return response
def setUp(self):
""" Create a valid user for login
"""
self.user = User.objects.create_superuser(
username=VALID_USERNAME, email="test@example.com", password=VALID_PASSWORD,
)
def test_data_integrity_of_get_blocked_ips(self):
""" Test whether data retrieved from redis via
get_blocked_ips() is the same as the data saved
"""
data_in = ["127.0.0.1", "4.2.2.1"]
for ip in data_in:
utils.block_ip(ip)
data_out = utils.get_blocked_ips()
self.assertEqual(sorted(data_in), sorted(data_out))
# send in None, should have same values.
utils.block_ip(None)
data_out = utils.get_blocked_ips()
self.assertEqual(sorted(data_in), sorted(data_out))
def test_data_integrity_of_get_blocked_usernames(self):
""" Test whether data retrieved from redis via
get_blocked_usernames() is the same as the data saved
"""
data_in = ["foo", "bar"]
for username in data_in:
utils.block_username(username)
data_out = utils.get_blocked_usernames()
self.assertEqual(sorted(data_in), sorted(data_out))
# send in None, should have same values.
utils.block_username(None)
data_out = utils.get_blocked_usernames()
self.assertEqual(sorted(data_in), sorted(data_out))
def test_login_get(self):
""" visit the login page """
response = self.client.get(ADMIN_LOGIN_URL)
self.assertEqual(response.status_code, 200)
def test_failure_limit_by_ip_once(self):
""" Tests the login lock by ip when trying to login
one more time than failure limit
"""
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE)
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, self.LOCKED_MESSAGE)
def test_failure_limit_by_ip_many(self):
""" Tests the login lock by ip when trying to
login a lot of times more than failure limit
"""
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
for i in range(0, random.randrange(1, 10)):
# try to log in a bunch of times
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE)
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, self.LOCKED_MESSAGE)
def test_failure_limit_by_username_once(self):
""" Tests the login lock by username when trying to login
one more time than failure limit
"""
for i in range(0, config.FAILURE_LIMIT):
ip = "74.125.239.{0}.".format(i)
response = self._login(username=VALID_USERNAME, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE)
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, self.LOCKED_MESSAGE)
@patch("defender.config.USERNAME_FAILURE_LIMIT", 3)
def test_username_failure_limit(self):
""" Tests that the username failure limit setting is
respected when trying to login one more time than failure limit
"""
for i in range(0, config.USERNAME_FAILURE_LIMIT):
ip = "74.125.239.{0}.".format(i)
response = self._login(username=VALID_USERNAME, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login(username=VALID_USERNAME, remote_addr=ip)
self.assertContains(response, self.LOCKED_MESSAGE)
# doing a get should not get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, LOGIN_FORM_KEY)
@patch("defender.config.IP_FAILURE_LIMIT", 3)
def test_ip_failure_limit(self):
""" Tests that the IP failure limit setting is
respected when trying to login one more time than failure limit
"""
for i in range(0, config.IP_FAILURE_LIMIT):
username = "john-doe__%d" % i
response = self._login(username=username)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login(username=VALID_USERNAME)
self.assertContains(response, self.LOCKED_MESSAGE)
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, self.LOCKED_MESSAGE)
def test_valid_login(self):
""" Tests a valid login for a real username
"""
response = self._login(username=VALID_USERNAME, password=VALID_PASSWORD)
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)
def test_reset_after_valid_login(self):
""" Tests the counter gets reset after a valid login
"""
for i in range(0, config.FAILURE_LIMIT):
self._login(username=VALID_USERNAME)
# now login with a valid username and password
self._login(username=VALID_USERNAME, password=VALID_PASSWORD)
# and we should be able to try again without hitting the failure limit
response = self._login(username=VALID_USERNAME)
self.assertNotContains(response, self.LOCKED_MESSAGE)
def test_blocked_ip_cannot_login(self):
""" Test an user with blocked ip cannot login with another username
"""
for i in range(0, config.FAILURE_LIMIT + 1):
self._login(username=VALID_USERNAME)
# try to login with a different user
response = self._login(username="myuser")
self.assertContains(response, self.LOCKED_MESSAGE)
def test_blocked_username_cannot_login(self):
""" Test an user with blocked username cannot login using
another ip
"""
for i in range(0, config.FAILURE_LIMIT + 1):
ip = "74.125.239.{0}.".format(i)
self._login(username=VALID_USERNAME, remote_addr=ip)
# try to login with a different ip
response = self._login(username=VALID_USERNAME, remote_addr="8.8.8.8")
self.assertContains(response, self.LOCKED_MESSAGE)
def test_blocked_username_uppercase_saved_lower(self):
"""
Test that a uppercase username is saved in lowercase
within the cache.
"""
for i in range(0, config.FAILURE_LIMIT + 2):
ip = "74.125.239.{0}.".format(i)
self._login(username=UPPER_USERNAME, remote_addr=ip)
self.assertNotIn(UPPER_USERNAME, utils.get_blocked_usernames())
self.assertIn(UPPER_USERNAME.lower(), utils.get_blocked_usernames())
def test_empty_username_cannot_be_blocked(self):
"""
Test that an empty username, or one that is None, cannot be blocked.
"""
for username in ["", None]:
for i in range(0, config.FAILURE_LIMIT + 2):
ip = "74.125.239.{0}.".format(i)
self._login(username=username, remote_addr=ip)
self.assertNotIn(username, utils.get_blocked_usernames())
def test_lowercase(self):
"""
Test that the lowercase(None) returns None.
"""
self.assertEquals(utils.lower_username(None), None)
def test_cooling_off(self):
""" Tests if the cooling time allows a user to login
"""
self.test_failure_limit_by_ip_once()
# Wait for the cooling off period
time.sleep(config.COOLOFF_TIME)
if config.MOCK_REDIS:
# mock redis require that we expire on our own
get_redis_connection().do_expire() # pragma: no cover
# It should be possible to login again, make sure it is.
self.test_valid_login()
def test_cooling_off_for_trusted_user(self):
""" Test the cooling time for a trusted user
"""
# Try the cooling off time
self.test_cooling_off()
def test_long_user_agent_valid(self):
""" Tests if can handle a long user agent
"""
long_user_agent = "ie6" * 1024
response = self._login(
username=VALID_USERNAME, password=VALID_PASSWORD, user_agent=long_user_agent
)
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.REVERSE_PROXY_HEADER", "HTTP_X_FORWARDED_FOR")
def test_get_ip_reverse_proxy(self):
""" Tests if can handle a long user agent
"""
request_factory = RequestFactory()
request = request_factory.get(ADMIN_LOGIN_URL)
request.user = AnonymousUser()
request.session = SessionStore()
request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24"
self.assertEqual(utils.get_ip(request), "192.168.24.24")
request_factory = RequestFactory()
request = request_factory.get(ADMIN_LOGIN_URL)
request.user = AnonymousUser()
request.session = SessionStore()
request.META["REMOTE_ADDR"] = "24.24.24.24"
self.assertEqual(utils.get_ip(request), "24.24.24.24")
def test_get_ip(self):
""" Tests if can handle a long user agent
"""
request_factory = RequestFactory()
request = request_factory.get(ADMIN_LOGIN_URL)
request.user = AnonymousUser()
request.session = SessionStore()
self.assertEqual(utils.get_ip(request), "127.0.0.1")
def test_long_user_agent_not_valid(self):
""" Tests if can handle a long user agent with failure
"""
long_user_agent = "ie6" * 1024
for i in range(0, config.FAILURE_LIMIT + 1):
response = self._login(user_agent=long_user_agent)
self.assertContains(response, self.LOCKED_MESSAGE)
def test_reset_ip(self):
""" Tests if can reset an ip address
"""
# Make a lockout
self.test_failure_limit_by_ip_once()
# Reset the ip so we can try again
utils.reset_failed_attempts(ip_address="127.0.0.1")
# Make a login attempt again
self.test_valid_login()
@patch("defender.config.LOCKOUT_URL", "http://localhost/othe/login/")
def test_failed_login_redirect_to_url(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now, check redirect make sure it is valid.
response = self._login()
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], "http://localhost/othe/login/")
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], "http://localhost/othe/login/")
@patch("defender.config.LOCKOUT_URL", "/o/login/")
def test_failed_login_redirect_to_url_local(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# RFC 7231 allows relative URIs in Location header.
# Django from version 1.9 is support this:
# https://docs.djangoproject.com/en/1.9/releases/1.9/#http-redirects-no-longer-forced-to-absolute-uris
lockout_url = "http://testserver/o/login/"
if DJANGO_VERSION >= StrictVersion("1.9"):
lockout_url = "/o/login/"
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now, check redirect make sure it is valid.
response = self._login()
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], lockout_url)
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertEqual(response.status_code, 302)
self.assertEqual(response["Location"], lockout_url)
@patch("defender.config.LOCKOUT_TEMPLATE", "defender/lockout.html")
def test_failed_login_redirect_to_template(self):
""" Test to make sure that after lockout we send to the correct
template """
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now, check template make sure it is valid.
response = self._login()
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "defender/lockout.html")
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, "defender/lockout.html")
@patch("defender.config.COOLOFF_TIME", 0)
def test_failed_login_no_cooloff(self):
""" failed login no cooloff """
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now, check redirect make sure it is valid.
response = self._login()
self.assertContains(response, self.PERMANENT_LOCKED_MESSAGE)
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, self.PERMANENT_LOCKED_MESSAGE)
def test_login_attempt_model(self):
""" test the login model """
response = self._login()
self.assertContains(response, LOGIN_FORM_KEY)
self.assertEqual(AccessAttempt.objects.count(), 1)
self.assertIsNotNone(str(AccessAttempt.objects.all()[0]))
def test_is_valid_ip(self):
""" Test the is_valid_ip() method """
self.assertEqual(utils.is_valid_ip("192.168.0.1"), True)
self.assertEqual(utils.is_valid_ip("130.80.100.24"), True)
self.assertEqual(utils.is_valid_ip("8.8.8.8"), True)
self.assertEqual(utils.is_valid_ip("127.0.0.1"), True)
self.assertEqual(utils.is_valid_ip("fish"), False)
self.assertEqual(utils.is_valid_ip(None), False)
self.assertEqual(utils.is_valid_ip(""), False)
self.assertEqual(utils.is_valid_ip("0x41.0x41.0x41.0x41"), False)
self.assertEqual(utils.is_valid_ip("192.168.100.34.y"), False)
self.assertEqual(
utils.is_valid_ip("2001:0db8:85a3:0000:0000:8a2e:0370:7334"), True
)
self.assertEqual(utils.is_valid_ip("2001:db8:85a3:0:0:8a2e:370:7334"), True)
self.assertEqual(utils.is_valid_ip("2001:db8:85a3::8a2e:370:7334"), True)
self.assertEqual(utils.is_valid_ip("::ffff:192.0.2.128"), True)
self.assertEqual(utils.is_valid_ip("::ffff:8.8.8.8"), True)
def test_parse_redis_url(self):
""" test the parse_redis_url method """
# full regular
conf = parse_redis_url("redis://user:password@localhost2:1234/2", False)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
# full non local
conf = parse_redis_url(
"redis://user:pass@www.localhost.com:1234/2", False)
self.assertEqual(conf.get("HOST"), "www.localhost.com")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), "pass")
self.assertEqual(conf.get("PORT"), 1234)
# no user name
conf = parse_redis_url("redis://password@localhost2:1234/2", False)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), None)
self.assertEqual(conf.get("PORT"), 1234)
# no user name 2 with colon
conf = parse_redis_url("redis://:password@localhost2:1234/2", False)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
# Empty
conf = parse_redis_url(None, False)
self.assertEqual(conf.get("HOST"), "localhost")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), None)
self.assertEqual(conf.get("PORT"), 6379)
# no db
conf = parse_redis_url("redis://:password@localhost2:1234", False)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
# no password
conf = parse_redis_url("redis://localhost2:1234/0", False)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), None)
self.assertEqual(conf.get("PORT"), 1234)
# password with special character and set the password_quote = True
conf = parse_redis_url("redis://:calmkart%23%40%21@localhost:6379/0", True)
self.assertEqual(conf.get("HOST"), "localhost")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), "calmkart#@!")
self.assertEqual(conf.get("PORT"), 6379)
# password without special character and set the password_quote = True
conf = parse_redis_url("redis://:password@localhost2:1234", True)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
@patch("defender.config.DEFENDER_REDIS_NAME", "default")
def test_get_redis_connection_django_conf(self):
""" get the redis connection """
redis_client = get_redis_connection()
self.assertIsInstance(redis_client, Redis)
@patch("defender.config.DEFENDER_REDIS_NAME", "bad-key")
def test_get_redis_connection_django_conf_wrong_key(self):
""" see if we get the correct error """
error_msg = "The cache bad-key was not found on the django " "cache settings."
self.assertRaisesMessage(KeyError, error_msg, get_redis_connection)
def test_get_ip_address_from_request(self):
""" get ip from request, make sure it is correct """
req = HttpRequest()
req.META["REMOTE_ADDR"] = "1.2.3.4"
ip = utils.get_ip_address_from_request(req)
self.assertEqual(ip, "1.2.3.4")
req = HttpRequest()
req.META["REMOTE_ADDR"] = "1.2.3.4 "
ip = utils.get_ip_address_from_request(req)
self.assertEqual(ip, "1.2.3.4")
req = HttpRequest()
req.META["REMOTE_ADDR"] = "192.168.100.34.y"
ip = utils.get_ip_address_from_request(req)
self.assertEqual(ip, "127.0.0.1")
req = HttpRequest()
req.META["REMOTE_ADDR"] = "cat"
ip = utils.get_ip_address_from_request(req)
self.assertEqual(ip, "127.0.0.1")
req = HttpRequest()
ip = utils.get_ip_address_from_request(req)
self.assertEqual(ip, "127.0.0.1")
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.REVERSE_PROXY_HEADER", "HTTP_X_PROXIED")
def test_get_ip_reverse_proxy_custom_header(self):
""" make sure the ip is correct behind reverse proxy """
req = HttpRequest()
req.META["HTTP_X_PROXIED"] = "1.2.3.4"
self.assertEqual(utils.get_ip(req), "1.2.3.4")
req = HttpRequest()
req.META["HTTP_X_PROXIED"] = "1.2.3.4, 5.6.7.8, 127.0.0.1"
self.assertEqual(utils.get_ip(req), "1.2.3.4")
req = HttpRequest()
req.META["REMOTE_ADDR"] = "1.2.3.4"
self.assertEqual(utils.get_ip(req), "1.2.3.4")
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.REVERSE_PROXY_HEADER", "HTTP_X_REAL_IP")
def test_get_user_attempts(self):
""" Get the user attempts make sure they are correct """
ip_attempts = random.randint(3, 12)
username_attempts = random.randint(3, 12)
for i in range(0, ip_attempts):
utils.increment_key(utils.get_ip_attempt_cache_key("1.2.3.4"))
for i in range(0, username_attempts):
utils.increment_key(utils.get_username_attempt_cache_key("foobar"))
req = HttpRequest()
req.POST["username"] = "foobar"
req.META["HTTP_X_REAL_IP"] = "1.2.3.4"
self.assertEqual(
utils.get_user_attempts(req), max(ip_attempts, username_attempts)
)
req = HttpRequest()
req.POST["username"] = "foobar"
req.META["HTTP_X_REAL_IP"] = "5.6.7.8"
self.assertEqual(utils.get_user_attempts(req), username_attempts)
req = HttpRequest()
req.POST["username"] = "barfoo"
req.META["HTTP_X_REAL_IP"] = "1.2.3.4"
self.assertEqual(utils.get_user_attempts(req), ip_attempts)
def test_admin(self):
""" test the admin pages for this app """
from .admin import AccessAttemptAdmin
AccessAttemptAdmin
def test_unblock_view_user_with_plus(self):
"""
There is an available admin view for unblocking a user
with a plus sign in the username.
Regression test for #GH76.
"""
reverse(
"defender_unblock_username_view", kwargs={"username": "user+test@test.tld"}
)
def test_unblock_view_user_with_special_symbols(self):
"""
There is an available admin view for unblocking a user
with a exclamation mark sign in the username.
"""
reverse(
"defender_unblock_username_view", kwargs={"username": "user!test@test.tld"}
)
def test_decorator_middleware(self):
# because watch_login is called twice in this test (once by the
# middleware and once by the decorator) we have half as many attempts
# before getting locked out.
# this is getting called twice, once for each decorator, not sure how
# to dynamically remove one of the middlewares during a test so we
# divide the failure limit by 2.
for i in range(0, int(config.FAILURE_LIMIT)):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE)
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, self.LOCKED_MESSAGE)
def test_get_view(self):
""" Check that the decorator doesn't tamper with GET requests """
for i in range(0, config.FAILURE_LIMIT):
response = self.client.get(ADMIN_LOGIN_URL)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
response = self.client.get(ADMIN_LOGIN_URL)
self.assertNotContains(response, self.LOCKED_MESSAGE)
@patch("defender.config.USE_CELERY", True)
def test_use_celery(self):
""" Check that use celery works """
self.assertEqual(AccessAttempt.objects.count(), 0)
for i in range(0, int(config.FAILURE_LIMIT)):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE)
self.assertEqual(AccessAttempt.objects.count(), config.FAILURE_LIMIT + 1)
self.assertIsNotNone(str(AccessAttempt.objects.all()[0]))
@patch("defender.config.LOCKOUT_BY_IP_USERNAME", True)
def test_lockout_by_ip_and_username(self):
""" Check that lockout still works when locking out by
IP and Username combined """
username = "testy"
for i in range(0, config.FAILURE_LIMIT):
response = self._login(username=username)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login(username=username)
self.assertContains(response, self.LOCKED_MESSAGE)
# We shouldn't get a lockout message when attempting to use no username
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, LOGIN_FORM_KEY)
# We shouldn't get a lockout message when attempting to use a
# different username
response = self._login()
self.assertContains(response, LOGIN_FORM_KEY)
# We shouldn't get a lockout message when attempting to use a
# different ip address
ip = "74.125.239.60"
response = self._login(username=VALID_USERNAME, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@patch("defender.config.DISABLE_IP_LOCKOUT", True)
def test_disable_ip_lockout(self):
""" Check that lockout still works when we disable IP Lock out """
username = "testy"
# try logging in with the same IP, but different username
# we shouldn't be blocked.
# same IP different, usernames
ip = "74.125.239.60"
for i in range(0, config.FAILURE_LIMIT + 10):
login_username = "{0}{1}".format(username, i)
response = self._login(username=login_username, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# same username with same IP
for i in range(0, config.FAILURE_LIMIT):
response = self._login(username=username)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# But we should get one now
# same username and Ip, over the limit for username.
response = self._login(username=username)
self.assertContains(response, self.LOCKED_MESSAGE)
# We shouldn't get a lockout message when attempting to use no username
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, LOGIN_FORM_KEY)
# We shouldn't get a lockout message when attempting to use a
# different username
response = self._login()
self.assertContains(response, LOGIN_FORM_KEY)
# We shouldn't get a lockout message when attempting to use a
# different ip address
second_ip = "74.125.239.99"
response = self._login(username=VALID_USERNAME, remote_addr=second_ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# we should have no ip's blocked
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, [])
# even if we try to manually block one it still won't be in there.
utils.block_ip(second_ip)
# we should still have no ip's blocked
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, [])
@patch("defender.config.DISABLE_USERNAME_LOCKOUT", True)
def test_disable_username_lockout(self):
""" Check lockouting still works when we disable username lockout """
username = "testy"
# try logging in with the same username, but different IPs.
# we shouldn't be locked.
for i in range(0, config.FAILURE_LIMIT + 10):
ip = "74.125.126.{0}".format(i)
response = self._login(username=username, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# same ip and same username
ip = "74.125.127.1"
for i in range(0, config.FAILURE_LIMIT):
response = self._login(username=username, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# But we should get one now
# same username and Ip, over the limit.
response = self._login(username=username, remote_addr=ip)
self.assertContains(response, self.LOCKED_MESSAGE)
# We shouldn't get a lockout message when attempting to use no username
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, LOGIN_FORM_KEY)
# We shouldn't get a lockout message when attempting to use a
# different ip address to be sure that username is not blocked.
second_ip = "74.125.127.2"
response = self._login(username=username, remote_addr=second_ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# we should have no usernames are blocked
data_out = utils.get_blocked_usernames()
self.assertEqual(data_out, [])
# even if we try to manually block one it still won't be in there.
utils.block_username(username)
# we should still have no ip's blocked
data_out = utils.get_blocked_usernames()
self.assertEqual(data_out, [])
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.IP_FAILURE_LIMIT", 3)
def test_login_blocked_for_non_standard_login_views_without_msg(self):
"""
Check that a view wich returns the expected status code is causing
the user to be locked out when we do not expect a specific message
to be returned.
"""
@watch_login(status_code=401)
def fake_api_401_login_view_without_msg(request):
""" Fake the api login with 401 """
return HttpResponse(status=401)
request_factory = RequestFactory()
request = request_factory.post("api/login")
request.user = AnonymousUser()
request.session = SessionStore()
request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24"
for _ in range(3):
fake_api_401_login_view_without_msg(request)
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, [])
fake_api_401_login_view_without_msg(request)
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, ["192.168.24.24"])
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.IP_FAILURE_LIMIT", 3)
def test_login_blocked_for_non_standard_login_views_with_msg(self):
"""
Check that a view wich returns the expected status code and the
expected message is causing the IP to be locked out.
"""
@watch_login(status_code=401, msg="Invalid credentials")
def fake_api_401_login_view_without_msg(request):
""" Fake the api login with 401 """
return HttpResponse("Sorry, Invalid credentials", status=401)
request_factory = RequestFactory()
request = request_factory.post("api/login")
request.user = AnonymousUser()
request.session = SessionStore()
request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24"
for _ in range(3):
fake_api_401_login_view_without_msg(request)
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, [])
fake_api_401_login_view_without_msg(request)
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, ["192.168.24.24"])
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.IP_FAILURE_LIMIT", 3)
def test_login_non_blocked_for_non_standard_login_views_different_msg(self):
"""
Check that a view wich returns the expected status code but not the
expected message is not causing the IP to be locked out.
"""
@watch_login(status_code=401, msg="Invalid credentials")
def fake_api_401_login_view_without_msg(request):
""" Fake the api login with 401 """
return HttpResponse("Ups, wrong credentials", status=401)
request_factory = RequestFactory()
request = request_factory.post("api/login")
request.user = AnonymousUser()
request.session = SessionStore()
request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24"
for _ in range(4):
fake_api_401_login_view_without_msg(request)
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, [])
class SignalTest(DefenderTestCase):
""" Test that signals are properly sent when blocking usernames and IPs.
"""
def test_should_send_signal_when_blocking_ip(self):
self.blocked_ip = None
def handler(sender, ip_address, **kwargs):
self.blocked_ip = ip_address
ip_block_signal.connect(handler)
utils.block_ip("8.8.8.8")
self.assertEqual(self.blocked_ip, "8.8.8.8")
def test_should_send_signal_when_unblocking_ip(self):
self.blocked_ip = "8.8.8.8"
def handler(sender, ip_address, **kwargs):
self.blocked_ip = None
ip_unblock_signal.connect(handler)
utils.unblock_ip("8.8.8.8")
self.assertIsNone(self.blocked_ip)
def test_should_not_send_signal_when_ip_already_blocked(self):
self.blocked_ip = None
def handler(sender, ip_address, **kwargs):
self.blocked_ip = ip_address
ip_block_signal.connect(handler)
key = utils.get_ip_blocked_cache_key("8.8.8.8")
utils.REDIS_SERVER.set(key, "blocked")
utils.block_ip("8.8.8.8")
self.assertIsNone(self.blocked_ip)
def test_should_send_signal_when_blocking_username(self):
self.blocked_username = None
def handler(sender, username, **kwargs):
self.blocked_username = username
username_block_signal.connect(handler)
utils.block_username("richard_hendricks")
self.assertEqual(self.blocked_username, "richard_hendricks")
def test_should_send_signal_when_unblocking_username(self):
self.blocked_username = "richard_hendricks"
def handler(sender, username, **kwargs):
self.blocked_username = None
username_unblock_signal.connect(handler)
utils.unblock_username("richard_hendricks")
self.assertIsNone(self.blocked_username)
def test_should_not_send_signal_when_username_already_blocked(self):
self.blocked_username = None
def handler(sender, username, **kwargs):
self.blocked_username = username
username_block_signal.connect(handler)
key = utils.get_username_blocked_cache_key("richard hendricks")
utils.REDIS_SERVER.set(key, "blocked")
utils.block_ip("richard hendricks")
self.assertIsNone(self.blocked_username)
class DefenderTestCaseTest(DefenderTestCase):
""" Make sure that we're cleaning the cache between tests """
key = "test_key"
def test_first_incr(self):
""" first increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
def test_second_incr(self):
""" second increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
class DefenderTransactionTestCaseTest(DefenderTransactionTestCase):
""" Make sure that we're cleaning the cache between tests """
key = "test_key"
def test_first_incr(self):
""" first increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
def test_second_incr(self):
""" second increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
class TestUtils(DefenderTestCase):
""" Unit tests for util methods """
def test_username_blocking(self):
""" test username blocking """
username = "foo"
self.assertFalse(utils.is_user_already_locked(username))
utils.block_username(username)
self.assertTrue(utils.is_user_already_locked(username))
utils.unblock_username(username)
self.assertFalse(utils.is_user_already_locked(username))
def test_ip_address_blocking(self):
""" ip address blocking """
ip = "1.2.3.4"
self.assertFalse(utils.is_source_ip_already_locked(ip))
utils.block_ip(ip)
self.assertTrue(utils.is_source_ip_already_locked(ip))
utils.unblock_ip(ip)
self.assertFalse(utils.is_source_ip_already_locked(ip))
def test_username_argument_precedence(self):
""" test that the optional username argument has highest precedence
when provided """
request_factory = RequestFactory()
request = request_factory.get(ADMIN_LOGIN_URL)
request.user = AnonymousUser()
request.session = SessionStore()
username = "johndoe"
utils.block_username(request.user.username)
self.assertFalse(utils.is_already_locked(request, username=username))
utils.check_request(request, True, username=username)
self.assertEqual(utils.get_user_attempts(request, username=username), 1)
utils.add_login_attempt_to_db(request, True, username=username)
self.assertEqual(AccessAttempt.objects.filter(username=username).count(), 1)