From cf136033a1338be4b9a41e13df8b79915407e5ec Mon Sep 17 00:00:00 2001 From: Ken Cochrane Date: Wed, 31 Dec 2014 17:00:45 -0500 Subject: [PATCH] refactored the code a little, and updated readme to include missing config --- README.md | 3 + defender/decorators.py | 266 +---------------------------------------- defender/middleware.py | 2 +- defender/tests.py | 23 ++-- defender/utils.py | 266 +++++++++++++++++++++++++++++++++++++++++ 5 files changed, 288 insertions(+), 272 deletions(-) create mode 100644 defender/utils.py diff --git a/README.md b/README.md index f6e1fc7..a6ec422 100644 --- a/README.md +++ b/README.md @@ -202,6 +202,9 @@ users usernames. Default: ``username`` reverse proxy IP address Default: ``HTTP_X_FORWARDED_FOR`` * ``DEFENDER_CACHE_PREFIX``: The cache prefix for your defender keys. Default: ``defender`` +* ``DEFENDER_LOCKOUT_URL``: The URL you want to redirect to if someone is +locked out. + * ``REDIS_HOST``: the host name for your redis server * ``REDIS_PORT``: the host port for your redis server * ``REDIS_PASSWORD``: the password for your redis server diff --git a/defender/decorators.py b/defender/decorators.py index 26f3c49..acfc1fc 100644 --- a/defender/decorators.py +++ b/defender/decorators.py @@ -1,271 +1,17 @@ import logging -import socket - -import redis from django.conf import settings -from django.http import HttpResponse -from django.http import HttpResponseRedirect -from django.shortcuts import render_to_response -from django.template import RequestContext -from django.utils.translation import ugettext_lazy from .models import AccessAttempt - -REDIS_HOST = settings.REDIS_HOST -REDIS_PORT = settings.REDIS_PORT -REDIS_PASSWORD = settings.REDIS_PASSWORD -REDIS_DB = settings.REDIS_DB - -# see if the user has overridden the failure limit -FAILURE_LIMIT = getattr(settings, 'DEFENDER_LOGIN_FAILURE_LIMIT', 3) - -USE_USER_AGENT = getattr(settings, 'DEFENDER_USE_USER_AGENT', False) +from . import utils # use a specific username field to retrieve from login POST data USERNAME_FORM_FIELD = getattr(settings, 'DEFENDER_USERNAME_FORM_FIELD', 'username') -# see if the django app is sitting behind a reverse proxy -BEHIND_REVERSE_PROXY = getattr(settings, - 'DEFENDER_BEHIND_REVERSE_PROXY', - False) -# the prefix for these keys in your cache. -CACHE_PREFIX = getattr(settings, - 'DEFENDER_CACHE_PREFIX', - 'defender') - -# if the django app is behind a reverse proxy, look for the -# ip address using this HTTP header value -REVERSE_PROXY_HEADER = getattr(settings, - 'DEFENDER_REVERSE_PROXY_HEADER', - 'HTTP_X_FORWARDED_FOR') - -# how long to wait before the bad login attempt gets forgotten. in seconds. -COOLOFF_TIME = getattr(settings, 'DEFENDER_COOLOFF_TIME', 300) # seconds - -LOCKOUT_TEMPLATE = getattr(settings, 'DEFENDER_LOCKOUT_TEMPLATE', None) - -ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. " - "Note that both fields are case-sensitive.") - -redis_server = redis.StrictRedis( - host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD) - log = logging.getLogger(__name__) -def is_valid_ip(ip_address): - """ Check Validity of an IP address """ - valid = True - try: - socket.inet_aton(ip_address.strip()) - except: - valid = False - return valid - - -def get_ip_address_from_request(request): - """ Makes the best attempt to get the client's real IP or return - the loopback """ - PRIVATE_IPS_PREFIX = ('10.', '172.', '192.', '127.') - ip_address = '' - x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', '') - if x_forwarded_for and ',' not in x_forwarded_for: - if not x_forwarded_for.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( - x_forwarded_for): - ip_address = x_forwarded_for.strip() - else: - ips = [ip.strip() for ip in x_forwarded_for.split(',')] - for ip in ips: - if ip.startswith(PRIVATE_IPS_PREFIX): - continue - elif not is_valid_ip(ip): - continue - else: - ip_address = ip - break - if not ip_address: - x_real_ip = request.META.get('HTTP_X_REAL_IP', '') - if x_real_ip: - if not x_real_ip.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( - x_real_ip): - ip_address = x_real_ip.strip() - if not ip_address: - remote_addr = request.META.get('REMOTE_ADDR', '') - if remote_addr: - if not remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( - remote_addr): - ip_address = remote_addr.strip() - if remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( - remote_addr): - ip_address = remote_addr.strip() - if not ip_address: - ip_address = '127.0.0.1' - return ip_address - - -def get_ip(request): - """ get the ip address from the request """ - if not BEHIND_REVERSE_PROXY: - ip = get_ip_address_from_request(request) - else: - ip = request.META.get(REVERSE_PROXY_HEADER, '') - ip = ip.split(",", 1)[0].strip() - if ip == '': - ip = request.META.get('REMOTE_ADDR', '') - return ip - - -def get_lockout_url(): - """ get the lockout url from the settings """ - return getattr(settings, 'DEFENDER_LOCKOUT_URL', None) - - -def get_ip_attempt_cache_key(ip): - """ get the cache key by ip """ - return "{0}:failed:ip:{1}".format(CACHE_PREFIX, ip) - - -def get_username_attempt_cache_key(username): - """ get the cache key by username """ - return "{0}:failed:username:{1}".format(CACHE_PREFIX, username) - - -def get_ip_blocked_cache_key(ip): - """ get the cache key by ip """ - return "{0}:blocked:ip:{1}".format(CACHE_PREFIX, ip) - - -def get_username_blocked_cache_key(username): - """ get the cache key by username """ - return "{0}:blocked:username:{1}".format(CACHE_PREFIX, username) - - -def increment_key(key): - """ given a key increment the value """ - # TODO make this one transaction, not two different ones. - new_value = redis_server.incr(key, 1) - redis_server.expire(key, COOLOFF_TIME) - return new_value - - -def get_user_attempts(request): - """Returns number of access attempts for this ip, username - """ - ip = get_ip(request) - - username = request.POST.get(USERNAME_FORM_FIELD, None) - - # get by IP - ip_count = redis_server.get(get_ip_attempt_cache_key(ip)) - if not ip_count: - ip_count = 0 - - # get by username - username_count = redis_server.get(get_username_attempt_cache_key(username)) - if not username_count: - username_count = 0 - - # return the larger of the two. - return max(ip_count, username_count) - - -def block_ip(ip): - """ given the ip, block it""" - key = get_ip_blocked_cache_key(ip) - redis_server.set(key, 'blocked', COOLOFF_TIME) - - -def block_username(username): - """ given the username block it. """ - key = get_username_blocked_cache_key(username) - redis_server.set(key, 'blocked', COOLOFF_TIME) - - -def record_failed_attempt(ip, username): - """ record the failed login attempt """ - # increment the failed count, and get current number - ip_count = increment_key(get_ip_attempt_cache_key(ip)) - user_count = increment_key(get_username_attempt_cache_key(username)) - - # if either are over the limit, add to block - if ip_count > FAILURE_LIMIT or user_count > FAILURE_LIMIT: - block_ip(ip) - block_username(username) - return False - return True - - -def reset_failed_attempts(ip=None, username=None): - """ reset the failed attempts for these ip's and usernames - TODO: run all commands in one redis transaction - """ - if ip: - redis_server.delete(get_ip_attempt_cache_key(ip)) - redis_server.delete(get_ip_blocked_cache_key(ip)) - if username: - redis_server.delete(get_username_attempt_cache_key(username)) - redis_server.delete(get_username_blocked_cache_key(username)) - - -def lockout_response(request): - """ if we are locked out, here is the response """ - if LOCKOUT_TEMPLATE: - context = { - 'cooloff_time': COOLOFF_TIME, - 'failure_limit': FAILURE_LIMIT, - } - return render_to_response(LOCKOUT_TEMPLATE, context, - context_instance=RequestContext(request)) - - LOCKOUT_URL = get_lockout_url() - if LOCKOUT_URL: - return HttpResponseRedirect(LOCKOUT_URL) - - if COOLOFF_TIME: - return HttpResponse("Account locked: too many login attempts. " - "Please try again later.") - else: - return HttpResponse("Account locked: too many login attempts. " - "Contact an admin to unlock your account.") - - -def is_already_locked(request): - """ Is this IP/username already locked? """ - ip_address = get_ip(request) - username = request.POST.get(USERNAME_FORM_FIELD, None) - - # ip blocked? - ip_blocked = redis_server.get(get_ip_blocked_cache_key(ip_address)) - - if not ip_blocked: - ip_blocked = False - - # username blocked? - user_blocked = redis_server.get(get_username_blocked_cache_key(username)) - if not user_blocked: - user_blocked = False - - return ip_blocked or user_blocked - - -def check_request(request, login_unsuccessful): - """ check the request, and process results""" - ip_address = get_ip(request) - username = request.POST.get(USERNAME_FORM_FIELD, None) - result = True - - if login_unsuccessful: - # add a failed attempt for this user - result = record_failed_attempt(ip_address, username) - else: - # user logged in -- forget the failed attempts - reset_failed_attempts(ip=ip_address, username=username) - - return result - - def watch_login(func): """ Used to decorate the django.contrib.admin.site.login method. @@ -275,8 +21,8 @@ def watch_login(func): # if the request is currently under lockout, do not proceed to the # login function, go directly to lockout url, do not pass go, do not # collect messages about this login attempt - if is_already_locked(request): - return lockout_response(request) + if utils.is_already_locked(request): + return utils.lockout_response(request) # call the login function response = func(request, *args, **kwargs) @@ -300,16 +46,16 @@ def watch_login(func): AccessAttempt.objects.create( user_agent=request.META.get('HTTP_USER_AGENT', '')[:255], - ip_address=get_ip(request), + ip_address=utils.get_ip(request), username=request.POST.get(USERNAME_FORM_FIELD, None), http_accept=request.META.get('HTTP_ACCEPT', ''), path_info=request.META.get('PATH_INFO', ''), login_valid=not login_unsuccessful, ) - if check_request(request, login_unsuccessful): + if utils.check_request(request, login_unsuccessful): return response - return lockout_response(request) + return utils.lockout_response(request) return response diff --git a/defender/middleware.py b/defender/middleware.py index 97fa5b5..3ef5986 100644 --- a/defender/middleware.py +++ b/defender/middleware.py @@ -1,7 +1,7 @@ from django.conf import settings from django.contrib.auth import views as auth_views -from defender.decorators import watch_login +from .decorators import watch_login class FailedLoginMiddleware(object): diff --git a/defender/tests.py b/defender/tests.py index e9fedd5..f43315a 100644 --- a/defender/tests.py +++ b/defender/tests.py @@ -1,6 +1,7 @@ import random import string import time + from mock import patch import mockredis @@ -9,7 +10,7 @@ from django.contrib.auth.models import User from django.core.urlresolvers import NoReverseMatch from django.core.urlresolvers import reverse -from defender.decorators import ( +from .utils import ( COOLOFF_TIME, FAILURE_LIMIT, reset_failed_attempts) @@ -36,7 +37,7 @@ class AccessAttemptTest(TestCase): return ''.join(random.choice(chars) for x in range(20)) - @patch('defender.decorators.redis_server', redis_client) + @patch('defender.utils.redis_server', redis_client) def _login(self, is_valid=False, user_agent='test-browser'): """Login a user. A valid credential is used when is_valid is True, otherwise it will use a random string to make a failed login. @@ -51,7 +52,7 @@ class AccessAttemptTest(TestCase): return response - @patch('defender.decorators.redis_server', redis_client) + @patch('defender.utils.redis_server', redis_client) def setUp(self): """Create a valid user for login """ @@ -65,7 +66,7 @@ class AccessAttemptTest(TestCase): """ clean up the db """ redis_client.flushdb() - @patch('defender.decorators.redis_server', redis_client) + @patch('defender.utils.redis_server', redis_client) def test_failure_limit_once(self): """Tests the login lock trying to login one more time than failure limit @@ -80,7 +81,7 @@ class AccessAttemptTest(TestCase): response = self._login() self.assertContains(response, self.LOCKED_MESSAGE) - @patch('defender.decorators.redis_server', redis_client) + @patch('defender.utils.redis_server', redis_client) def test_failure_limit_many(self): """Tests the login lock trying to login a lot of times more than failure limit @@ -97,14 +98,14 @@ class AccessAttemptTest(TestCase): response = self._login() self.assertContains(response, self.LOCKED_MESSAGE) - @patch('defender.decorators.redis_server', redis_client) + @patch('defender.utils.redis_server', redis_client) def test_valid_login(self): """Tests a valid login for a real username """ response = self._login(is_valid=True) self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302) - @patch('defender.decorators.redis_server', redis_client) + @patch('defender.utils.redis_server', redis_client) def test_cooling_off(self): """Tests if the cooling time allows a user to login """ @@ -116,14 +117,14 @@ class AccessAttemptTest(TestCase): # It should be possible to login again, make sure it is. self.test_valid_login() - @patch('defender.decorators.redis_server', redis_client) + @patch('defender.utils.redis_server', redis_client) def test_cooling_off_for_trusted_user(self): """Test the cooling time for a trusted user """ # Try the cooling off time self.test_cooling_off() - @patch('defender.decorators.redis_server', redis_client) + @patch('defender.utils.redis_server', redis_client) def test_long_user_agent_valid(self): """Tests if can handle a long user agent """ @@ -131,7 +132,7 @@ class AccessAttemptTest(TestCase): response = self._login(is_valid=True, user_agent=long_user_agent) self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302) - @patch('defender.decorators.redis_server', redis_client) + @patch('defender.utils.redis_server', redis_client) def test_long_user_agent_not_valid(self): """Tests if can handle a long user agent with failure """ @@ -141,7 +142,7 @@ class AccessAttemptTest(TestCase): self.assertContains(response, self.LOCKED_MESSAGE) - @patch('defender.decorators.redis_server', redis_client) + @patch('defender.utils.redis_server', redis_client) def test_reset_ip(self): """Tests if can reset an ip address """ diff --git a/defender/utils.py b/defender/utils.py new file mode 100644 index 0000000..1db1a78 --- /dev/null +++ b/defender/utils.py @@ -0,0 +1,266 @@ +import logging +import socket + +import redis +from django.conf import settings +from django.http import HttpResponse +from django.http import HttpResponseRedirect +from django.shortcuts import render_to_response +from django.template import RequestContext +from django.utils.translation import ugettext_lazy + +REDIS_HOST = settings.REDIS_HOST +REDIS_PORT = settings.REDIS_PORT +REDIS_PASSWORD = settings.REDIS_PASSWORD +REDIS_DB = settings.REDIS_DB + +# see if the user has overridden the failure limit +FAILURE_LIMIT = getattr(settings, 'DEFENDER_LOGIN_FAILURE_LIMIT', 3) + +USE_USER_AGENT = getattr(settings, 'DEFENDER_USE_USER_AGENT', False) + +# use a specific username field to retrieve from login POST data +USERNAME_FORM_FIELD = getattr(settings, + 'DEFENDER_USERNAME_FORM_FIELD', + 'username') + +# see if the django app is sitting behind a reverse proxy +BEHIND_REVERSE_PROXY = getattr(settings, + 'DEFENDER_BEHIND_REVERSE_PROXY', + False) +# the prefix for these keys in your cache. +CACHE_PREFIX = getattr(settings, + 'DEFENDER_CACHE_PREFIX', + 'defender') + +# if the django app is behind a reverse proxy, look for the +# ip address using this HTTP header value +REVERSE_PROXY_HEADER = getattr(settings, + 'DEFENDER_REVERSE_PROXY_HEADER', + 'HTTP_X_FORWARDED_FOR') + +# how long to wait before the bad login attempt gets forgotten. in seconds. +COOLOFF_TIME = getattr(settings, 'DEFENDER_COOLOFF_TIME', 300) # seconds + +LOCKOUT_TEMPLATE = getattr(settings, 'DEFENDER_LOCKOUT_TEMPLATE', None) + +ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. " + "Note that both fields are case-sensitive.") + +redis_server = redis.StrictRedis( + host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD) + +log = logging.getLogger(__name__) + + +def is_valid_ip(ip_address): + """ Check Validity of an IP address """ + valid = True + try: + socket.inet_aton(ip_address.strip()) + except: + valid = False + return valid + + +def get_ip_address_from_request(request): + """ Makes the best attempt to get the client's real IP or return + the loopback """ + PRIVATE_IPS_PREFIX = ('10.', '172.', '192.', '127.') + ip_address = '' + x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', '') + if x_forwarded_for and ',' not in x_forwarded_for: + if not x_forwarded_for.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( + x_forwarded_for): + ip_address = x_forwarded_for.strip() + else: + ips = [ip.strip() for ip in x_forwarded_for.split(',')] + for ip in ips: + if ip.startswith(PRIVATE_IPS_PREFIX): + continue + elif not is_valid_ip(ip): + continue + else: + ip_address = ip + break + if not ip_address: + x_real_ip = request.META.get('HTTP_X_REAL_IP', '') + if x_real_ip: + if not x_real_ip.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( + x_real_ip): + ip_address = x_real_ip.strip() + if not ip_address: + remote_addr = request.META.get('REMOTE_ADDR', '') + if remote_addr: + if not remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( + remote_addr): + ip_address = remote_addr.strip() + if remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( + remote_addr): + ip_address = remote_addr.strip() + if not ip_address: + ip_address = '127.0.0.1' + return ip_address + + +def get_ip(request): + """ get the ip address from the request """ + if not BEHIND_REVERSE_PROXY: + ip = get_ip_address_from_request(request) + else: + ip = request.META.get(REVERSE_PROXY_HEADER, '') + ip = ip.split(",", 1)[0].strip() + if ip == '': + ip = request.META.get('REMOTE_ADDR', '') + return ip + + +def get_lockout_url(): + """ get the lockout url from the settings """ + return getattr(settings, 'DEFENDER_LOCKOUT_URL', None) + + +def get_ip_attempt_cache_key(ip): + """ get the cache key by ip """ + return "{0}:failed:ip:{1}".format(CACHE_PREFIX, ip) + + +def get_username_attempt_cache_key(username): + """ get the cache key by username """ + return "{0}:failed:username:{1}".format(CACHE_PREFIX, username) + + +def get_ip_blocked_cache_key(ip): + """ get the cache key by ip """ + return "{0}:blocked:ip:{1}".format(CACHE_PREFIX, ip) + + +def get_username_blocked_cache_key(username): + """ get the cache key by username """ + return "{0}:blocked:username:{1}".format(CACHE_PREFIX, username) + + +def increment_key(key): + """ given a key increment the value """ + # TODO make this one transaction, not two different ones. + new_value = redis_server.incr(key, 1) + redis_server.expire(key, COOLOFF_TIME) + return new_value + + +def get_user_attempts(request): + """Returns number of access attempts for this ip, username + """ + ip = get_ip(request) + + username = request.POST.get(USERNAME_FORM_FIELD, None) + + # get by IP + ip_count = redis_server.get(get_ip_attempt_cache_key(ip)) + if not ip_count: + ip_count = 0 + + # get by username + username_count = redis_server.get(get_username_attempt_cache_key(username)) + if not username_count: + username_count = 0 + + # return the larger of the two. + return max(ip_count, username_count) + + +def block_ip(ip): + """ given the ip, block it""" + key = get_ip_blocked_cache_key(ip) + redis_server.set(key, 'blocked', COOLOFF_TIME) + + +def block_username(username): + """ given the username block it. """ + key = get_username_blocked_cache_key(username) + redis_server.set(key, 'blocked', COOLOFF_TIME) + + +def record_failed_attempt(ip, username): + """ record the failed login attempt, if over limit return False, + if not over limit return True """ + # increment the failed count, and get current number + ip_count = increment_key(get_ip_attempt_cache_key(ip)) + user_count = increment_key(get_username_attempt_cache_key(username)) + + # if either are over the limit, add to block + if ip_count > FAILURE_LIMIT or user_count > FAILURE_LIMIT: + block_ip(ip) + block_username(username) + return False + return True + + +def reset_failed_attempts(ip=None, username=None): + """ reset the failed attempts for these ip's and usernames + TODO: run all commands in one redis transaction + """ + if ip: + redis_server.delete(get_ip_attempt_cache_key(ip)) + redis_server.delete(get_ip_blocked_cache_key(ip)) + if username: + redis_server.delete(get_username_attempt_cache_key(username)) + redis_server.delete(get_username_blocked_cache_key(username)) + + +def lockout_response(request): + """ if we are locked out, here is the response """ + if LOCKOUT_TEMPLATE: + context = { + 'cooloff_time': COOLOFF_TIME, + 'failure_limit': FAILURE_LIMIT, + } + return render_to_response(LOCKOUT_TEMPLATE, context, + context_instance=RequestContext(request)) + + LOCKOUT_URL = get_lockout_url() + if LOCKOUT_URL: + return HttpResponseRedirect(LOCKOUT_URL) + + if COOLOFF_TIME: + return HttpResponse("Account locked: too many login attempts. " + "Please try again later.") + else: + return HttpResponse("Account locked: too many login attempts. " + "Contact an admin to unlock your account.") + + +def is_already_locked(request): + """ Is this IP/username already locked? """ + ip_address = get_ip(request) + username = request.POST.get(USERNAME_FORM_FIELD, None) + + # ip blocked? + ip_blocked = redis_server.get(get_ip_blocked_cache_key(ip_address)) + + if not ip_blocked: + ip_blocked = False + else: + # short circuit no need to check username if ip is already blocked. + return True + + # username blocked? + user_blocked = redis_server.get(get_username_blocked_cache_key(username)) + if not user_blocked: + user_blocked = False + + return ip_blocked or user_blocked + + +def check_request(request, login_unsuccessful): + """ check the request, and process results""" + ip_address = get_ip(request) + username = request.POST.get(USERNAME_FORM_FIELD, None) + + if not login_unsuccessful: + # user logged in -- forget the failed attempts + reset_failed_attempts(ip=ip_address, username=username) + return True + else: + # add a failed attempt for this user + return record_failed_attempt(ip_address, username)