more refactoring and unit tests

This commit is contained in:
Ken Cochrane 2015-01-01 12:51:46 -05:00
parent bd9698951f
commit a8e721dd3d
7 changed files with 212 additions and 120 deletions

View file

@ -73,7 +73,7 @@ to improve the login.
requirements
============
- django >= 1.6 (may work is previous versions, but not officially supported)
- django: 1.4.x, 1.6.x, 1.7.x
- redis
@ -113,8 +113,8 @@ Cache backend:
cache keys:
-----------
- prefix:failed:ip:[ip] (count, expires)
- prefix:failed:username:[username] (count, expires)
- prefix:failed:ip:[ip] (count, TTL)
- prefix:failed:username:[username] (count, TTL)
- prefix:blocked:ip:[ip] (true, TTL)
- prefix:blocked:username:[username] (true, TTL)
@ -211,11 +211,9 @@ reverse proxy IP address Default: ``HTTP_X_FORWARDED_FOR``
Default: ``defender``
* ``DEFENDER_LOCKOUT_URL``: The URL you want to redirect to if someone is
locked out.
* ``REDIS_HOST``: the host name for your redis server
* ``REDIS_PORT``: the host port for your redis server
* ``REDIS_PASSWORD``: the password for your redis server
* ``REDIS_DB``: the db number for your redis server
* ``DEFENDER_REDIS_URL``: the redis url for defender.
Default: ``redis://localhost:6379/0``
(Example with password: ``redis://:mypassword@localhost:6379/0``)
Running Tests

45
defender/config.py Normal file
View file

@ -0,0 +1,45 @@
from django.conf import settings
from django.utils.translation import ugettext_lazy
def get_setting(variable, default=None):
""" get the 'variable' from settings if not there use the
provided default """
return getattr(settings, variable, default)
# redis server host
DEFENDER_REDIS_URL = get_setting('DEFENDER_REDIS_URL')
MOCK_REDIS = get_setting('DEFENDER_MOCK_REDIS', False)
# see if the user has overridden the failure limit
FAILURE_LIMIT = get_setting('DEFENDER_LOGIN_FAILURE_LIMIT', 3)
USE_USER_AGENT = get_setting('DEFENDER_USE_USER_AGENT', False)
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username')
# see if the django app is sitting behind a reverse proxy
BEHIND_REVERSE_PROXY = get_setting('DEFENDER_BEHIND_REVERSE_PROXY', False)
# the prefix for these keys in your cache.
CACHE_PREFIX = get_setting('DEFENDER_CACHE_PREFIX', 'defender')
# if the django app is behind a reverse proxy, look for the
# ip address using this HTTP header value
REVERSE_PROXY_HEADER = get_setting('DEFENDER_REVERSE_PROXY_HEADER',
'HTTP_X_FORWARDED_FOR')
# how long to wait before the bad login attempt gets forgotten. in seconds.
COOLOFF_TIME = get_setting('DEFENDER_COOLOFF_TIME', 300) # seconds
LOCKOUT_TEMPLATE = get_setting('DEFENDER_LOCKOUT_TEMPLATE')
ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. "
"Note that both fields are case-sensitive.")
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username')
LOCKOUT_URL = get_setting('DEFENDER_LOCKOUT_URL')

52
defender/connection.py Normal file
View file

@ -0,0 +1,52 @@
import redis
try:
import urlparse
except ImportError:
import urllib.parse as urlparse
from . import config
# Register database schemes in URLs.
urlparse.uses_netloc.append("redis")
def get_redis_connection():
""" Get the redis connection """
redis_config = parse_redis_url(config.DEFENDER_REDIS_URL)
return redis.StrictRedis(
host=redis_config.get('HOST'),
port=redis_config.get('PORT'),
db=redis_config.get('DB'),
password=redis_config.get('PASSWORD'))
def parse_redis_url(url):
"""Parses a redis URL."""
# create config with some sane defaults
config = {
"DB": 0,
"PASSWORD": None,
"HOST": "localhost",
"PORT": 6379,
}
if not url:
return config
url = urlparse.urlparse(url)
print url
# Remove query strings.
path = url.path[1:]
path = path.split('?', 2)[0]
if path:
config.update({"DB": int(path)})
if url.password:
config.update({"PASSWORD": url.password})
if url.hostname:
config.update({"HOST": url.hostname})
if url.port:
config.update({"PORT": int(url.port)})
return config

View file

@ -8,11 +8,6 @@ DATABASES = {
SITE_ID = 1
REDIS_HOST = 'localhost'
REDIS_PORT = '1234'
REDIS_PASSWORD = 'mypassword'
REDIS_DB = 1
MIDDLEWARE_CLASSES = (
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
@ -38,4 +33,6 @@ LOGIN_REDIRECT_URL = '/admin'
DEFENDER_LOGIN_FAILURE_LIMIT = 10
DEFENDER_COOLOFF_TIME = 2
MOCK_REDIS = True
DEFENDER_REDIS_URL = None
# use mock redis in unit tests locally.
DEFENDER_MOCK_REDIS = True

View file

@ -11,13 +11,15 @@ from django.core.urlresolvers import NoReverseMatch
from django.core.urlresolvers import reverse
from django.conf import settings
from .connection import parse_redis_url
from . import utils
from . import config
if settings.MOCK_REDIS:
if config.MOCK_REDIS:
redis_client = mockredis.mock_strict_redis_client()
else:
from .utils import redis_server
redis_client = redis_server
from .connection import get_redis_connection
redis_client = get_redis_connection()
# Django >= 1.7 compatibility
try:
@ -40,7 +42,7 @@ class AccessAttemptTest(TestCase):
return ''.join(random.choice(chars) for x in range(20))
@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def _login(self, is_valid=False, user_agent='test-browser'):
""" Login a user. A valid credential is used when is_valid is True,
otherwise it will use a random string to make a failed login.
@ -55,7 +57,7 @@ class AccessAttemptTest(TestCase):
return response
@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def setUp(self):
""" Create a valid user for login
"""
@ -69,12 +71,12 @@ class AccessAttemptTest(TestCase):
""" clean up the db """
redis_client.flushdb()
@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_failure_limit_once(self):
""" Tests the login lock trying to login one more time
than failure limit
"""
for i in range(0, utils.FAILURE_LIMIT):
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@ -88,12 +90,12 @@ class AccessAttemptTest(TestCase):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, self.LOCKED_MESSAGE)
@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_failure_limit_many(self):
""" Tests the login lock trying to login a lot of times more
than failure limit
"""
for i in range(0, utils.FAILURE_LIMIT):
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@ -109,35 +111,35 @@ class AccessAttemptTest(TestCase):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, self.LOCKED_MESSAGE)
@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_valid_login(self):
""" Tests a valid login for a real username
"""
response = self._login(is_valid=True)
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)
@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_cooling_off(self):
""" Tests if the cooling time allows a user to login
"""
self.test_failure_limit_once()
# Wait for the cooling off period
time.sleep(utils.COOLOFF_TIME)
time.sleep(config.COOLOFF_TIME)
if settings.MOCK_REDIS:
if config.MOCK_REDIS:
# mock redis require that we expire on our own
redis_client.do_expire()
# It should be possible to login again, make sure it is.
self.test_valid_login()
@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_cooling_off_for_trusted_user(self):
""" Test the cooling time for a trusted user
"""
# Try the cooling off time
self.test_cooling_off()
@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_long_user_agent_valid(self):
""" Tests if can handle a long user agent
"""
@ -145,17 +147,17 @@ class AccessAttemptTest(TestCase):
response = self._login(is_valid=True, user_agent=long_user_agent)
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)
@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_long_user_agent_not_valid(self):
""" Tests if can handle a long user agent with failure
"""
long_user_agent = 'ie6' * 1024
for i in range(0, utils.FAILURE_LIMIT + 1):
for i in range(0, config.FAILURE_LIMIT + 1):
response = self._login(user_agent=long_user_agent)
self.assertContains(response, self.LOCKED_MESSAGE)
@patch('defender.utils.redis_server', redis_client)
@patch('defender.connection.get_redis_connection', redis_client)
def test_reset_ip(self):
""" Tests if can reset an ip address
"""
@ -168,13 +170,13 @@ class AccessAttemptTest(TestCase):
# Make a login attempt again
self.test_valid_login()
@patch('defender.utils.LOCKOUT_URL', 'http://localhost/othe/login/')
@patch('defender.utils.redis_server', redis_client)
@patch('defender.config.LOCKOUT_URL', 'http://localhost/othe/login/')
@patch('defender.connection.get_redis_connection', redis_client)
def test_failed_login_redirect_to_URL(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """
for i in range(0, utils.FAILURE_LIMIT):
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@ -190,13 +192,13 @@ class AccessAttemptTest(TestCase):
self.assertEquals(response.status_code, 302)
self.assertEquals(response['Location'], 'http://localhost/othe/login/')
@patch('defender.utils.LOCKOUT_URL', '/o/login/')
@patch('defender.utils.redis_server', redis_client)
@patch('defender.config.LOCKOUT_URL', '/o/login/')
@patch('defender.connection.get_redis_connection', redis_client)
def test_failed_login_redirect_to_URL_local(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """
for i in range(0, utils.FAILURE_LIMIT):
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@ -222,3 +224,56 @@ class AccessAttemptTest(TestCase):
self.assertEquals(utils.is_valid_ip('fish'), False)
self.assertEquals(utils.is_valid_ip(None), False)
self.assertEquals(utils.is_valid_ip(''), False)
def test_parse_redis_url(self):
""" """
# full regular
conf = parse_redis_url("redis://user:password@localhost2:1234/2")
self.assertEquals(conf.get('HOST'), 'localhost2')
self.assertEquals(conf.get('DB'), 2)
self.assertEquals(conf.get('PASSWORD'), 'password')
self.assertEquals(conf.get('PORT'), 1234)
# full non local
conf = parse_redis_url("redis://user:pass@www.localhost.com:1234/2")
self.assertEquals(conf.get('HOST'), 'www.localhost.com')
self.assertEquals(conf.get('DB'), 2)
self.assertEquals(conf.get('PASSWORD'), 'pass')
self.assertEquals(conf.get('PORT'), 1234)
# no user name
conf = parse_redis_url("redis://password@localhost2:1234/2")
print conf
self.assertEquals(conf.get('HOST'), 'localhost2')
self.assertEquals(conf.get('DB'), 2)
self.assertEquals(conf.get('PASSWORD'), None)
self.assertEquals(conf.get('PORT'), 1234)
# no user name 2 with colon
conf = parse_redis_url("redis://:password@localhost2:1234/2")
print conf
self.assertEquals(conf.get('HOST'), 'localhost2')
self.assertEquals(conf.get('DB'), 2)
self.assertEquals(conf.get('PASSWORD'), 'password')
self.assertEquals(conf.get('PORT'), 1234)
# Empty
conf = parse_redis_url(None)
self.assertEquals(conf.get('HOST'), 'localhost')
self.assertEquals(conf.get('DB'), 0)
self.assertEquals(conf.get('PASSWORD'), None)
self.assertEquals(conf.get('PORT'), 6379)
# no db
conf = parse_redis_url("redis://:password@localhost2:1234")
self.assertEquals(conf.get('HOST'), 'localhost2')
self.assertEquals(conf.get('DB'), 0)
self.assertEquals(conf.get('PASSWORD'), 'password')
self.assertEquals(conf.get('PORT'), 1234)
# no password
conf = parse_redis_url("redis://localhost2:1234/0")
self.assertEquals(conf.get('HOST'), 'localhost2')
self.assertEquals(conf.get('DB'), 0)
self.assertEquals(conf.get('PASSWORD'), None)
self.assertEquals(conf.get('PORT'), 1234)

View file

@ -9,11 +9,6 @@ DATABASES = {
SITE_ID = 1
REDIS_HOST = 'localhost'
REDIS_PORT = '6379'
REDIS_PASSWORD = None
REDIS_DB = 1
MIDDLEWARE_CLASSES = (
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
@ -39,4 +34,6 @@ LOGIN_REDIRECT_URL = '/admin'
DEFENDER_LOGIN_FAILURE_LIMIT = 10
DEFENDER_COOLOFF_TIME = 2
MOCK_REDIS = False
DEFENDER_REDIS_URL = "redis://localhost:6379/1"
# don't use mock redis in unit tests, we will use real redis on travis.
DEFENDER_MOCK_REDIS = False

View file

@ -1,68 +1,16 @@
import logging
import socket
import redis
from django.conf import settings
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.utils.translation import ugettext_lazy
from .models import AccessAttempt
from .connection import get_redis_connection
from . import config
def get_setting(variable, default=None):
""" get the 'variable' from settings if not there use the
provided default """
return getattr(settings, variable, default)
# redis server host
REDIS_HOST = get_setting('REDIS_HOST')
# redis server port
REDIS_PORT = get_setting('REDIS_PORT')
# redis server password
REDIS_PASSWORD = get_setting('REDIS_PASSWORD')
# redis db
REDIS_DB = get_setting('REDIS_DB')
# see if the user has overridden the failure limit
FAILURE_LIMIT = get_setting('DEFENDER_LOGIN_FAILURE_LIMIT', 3)
USE_USER_AGENT = get_setting('DEFENDER_USE_USER_AGENT', False)
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username')
# see if the django app is sitting behind a reverse proxy
BEHIND_REVERSE_PROXY = get_setting('DEFENDER_BEHIND_REVERSE_PROXY', False)
# the prefix for these keys in your cache.
CACHE_PREFIX = get_setting('DEFENDER_CACHE_PREFIX', 'defender')
# if the django app is behind a reverse proxy, look for the
# ip address using this HTTP header value
REVERSE_PROXY_HEADER = get_setting('DEFENDER_REVERSE_PROXY_HEADER',
'HTTP_X_FORWARDED_FOR')
# how long to wait before the bad login attempt gets forgotten. in seconds.
COOLOFF_TIME = get_setting('DEFENDER_COOLOFF_TIME', 300) # seconds
LOCKOUT_TEMPLATE = get_setting('DEFENDER_LOCKOUT_TEMPLATE')
ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. "
"Note that both fields are case-sensitive.")
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username')
LOCKOUT_URL = get_setting('DEFENDER_LOCKOUT_URL')
redis_server = redis.StrictRedis(
host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD)
redis_server = get_redis_connection()
log = logging.getLogger(__name__)
@ -119,10 +67,10 @@ def get_ip_address_from_request(request):
def get_ip(request):
""" get the ip address from the request """
if not BEHIND_REVERSE_PROXY:
if not config.BEHIND_REVERSE_PROXY:
ip = get_ip_address_from_request(request)
else:
ip = request.META.get(REVERSE_PROXY_HEADER, '')
ip = request.META.get(config.REVERSE_PROXY_HEADER, '')
ip = ip.split(",", 1)[0].strip()
if ip == '':
ip = request.META.get('REMOTE_ADDR', '')
@ -131,29 +79,29 @@ def get_ip(request):
def get_ip_attempt_cache_key(ip):
""" get the cache key by ip """
return "{0}:failed:ip:{1}".format(CACHE_PREFIX, ip)
return "{0}:failed:ip:{1}".format(config.CACHE_PREFIX, ip)
def get_username_attempt_cache_key(username):
""" get the cache key by username """
return "{0}:failed:username:{1}".format(CACHE_PREFIX, username)
return "{0}:failed:username:{1}".format(config.CACHE_PREFIX, username)
def get_ip_blocked_cache_key(ip):
""" get the cache key by ip """
return "{0}:blocked:ip:{1}".format(CACHE_PREFIX, ip)
return "{0}:blocked:ip:{1}".format(config.CACHE_PREFIX, ip)
def get_username_blocked_cache_key(username):
""" get the cache key by username """
return "{0}:blocked:username:{1}".format(CACHE_PREFIX, username)
return "{0}:blocked:username:{1}".format(config.CACHE_PREFIX, username)
def increment_key(key):
""" given a key increment the value """
# TODO make this one transaction, not two different ones.
new_value = redis_server.incr(key, 1)
redis_server.expire(key, COOLOFF_TIME)
redis_server.expire(key, config.COOLOFF_TIME)
return new_value
@ -162,7 +110,7 @@ def get_user_attempts(request):
"""
ip = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
username = request.POST.get(config.USERNAME_FORM_FIELD, None)
# get by IP
ip_count = redis_server.get(get_ip_attempt_cache_key(ip))
@ -181,13 +129,13 @@ def get_user_attempts(request):
def block_ip(ip):
""" given the ip, block it """
key = get_ip_blocked_cache_key(ip)
redis_server.set(key, 'blocked', COOLOFF_TIME)
redis_server.set(key, 'blocked', config.COOLOFF_TIME)
def block_username(username):
""" given the username block it. """
key = get_username_blocked_cache_key(username)
redis_server.set(key, 'blocked', COOLOFF_TIME)
redis_server.set(key, 'blocked', config.COOLOFF_TIME)
def record_failed_attempt(ip, username):
@ -198,7 +146,7 @@ def record_failed_attempt(ip, username):
user_count = increment_key(get_username_attempt_cache_key(username))
# if either are over the limit, add to block
if ip_count > FAILURE_LIMIT or user_count > FAILURE_LIMIT:
if ip_count > config.FAILURE_LIMIT or user_count > config.FAILURE_LIMIT:
block_ip(ip)
block_username(username)
return False
@ -219,18 +167,18 @@ def reset_failed_attempts(ip=None, username=None):
def lockout_response(request):
""" if we are locked out, here is the response """
if LOCKOUT_TEMPLATE:
if config.LOCKOUT_TEMPLATE:
context = {
'cooloff_time': COOLOFF_TIME,
'failure_limit': FAILURE_LIMIT,
'cooloff_time': config.COOLOFF_TIME,
'failure_limit': config.FAILURE_LIMIT,
}
return render_to_response(LOCKOUT_TEMPLATE, context,
return render_to_response(config.LOCKOUT_TEMPLATE, context,
context_instance=RequestContext(request))
if LOCKOUT_URL:
return HttpResponseRedirect(LOCKOUT_URL)
if config.LOCKOUT_URL:
return HttpResponseRedirect(config.LOCKOUT_URL)
if COOLOFF_TIME:
if config.COOLOFF_TIME:
return HttpResponse("Account locked: too many login attempts. "
"Please try again later.")
else:
@ -241,7 +189,7 @@ def lockout_response(request):
def is_already_locked(request):
""" Is this IP/username already locked? """
ip_address = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
username = request.POST.get(config.USERNAME_FORM_FIELD, None)
# ip blocked?
ip_blocked = redis_server.get(get_ip_blocked_cache_key(ip_address))
@ -263,7 +211,7 @@ def is_already_locked(request):
def check_request(request, login_unsuccessful):
""" check the request, and process results"""
ip_address = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
username = request.POST.get(config.USERNAME_FORM_FIELD, None)
if not login_unsuccessful:
# user logged in -- forget the failed attempts
@ -280,7 +228,7 @@ def add_login_attempt(request, login_valid):
user_agent=request.META.get('HTTP_USER_AGENT',
'<unknown>')[:255],
ip_address=get_ip(request),
username=request.POST.get(USERNAME_FORM_FIELD, None),
username=request.POST.get(config.USERNAME_FORM_FIELD, None),
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
login_valid=login_valid,