refactored the code a little, and updated readme to include missing config

This commit is contained in:
Ken Cochrane 2014-12-31 17:00:45 -05:00
parent 1974643c14
commit cf136033a1
5 changed files with 288 additions and 272 deletions

View file

@ -202,6 +202,9 @@ users usernames. Default: ``username``
reverse proxy IP address Default: ``HTTP_X_FORWARDED_FOR``
* ``DEFENDER_CACHE_PREFIX``: The cache prefix for your defender keys.
Default: ``defender``
* ``DEFENDER_LOCKOUT_URL``: The URL you want to redirect to if someone is
locked out.
* ``REDIS_HOST``: the host name for your redis server
* ``REDIS_PORT``: the host port for your redis server
* ``REDIS_PASSWORD``: the password for your redis server

View file

@ -1,271 +1,17 @@
import logging
import socket
import redis
from django.conf import settings
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.utils.translation import ugettext_lazy
from .models import AccessAttempt
REDIS_HOST = settings.REDIS_HOST
REDIS_PORT = settings.REDIS_PORT
REDIS_PASSWORD = settings.REDIS_PASSWORD
REDIS_DB = settings.REDIS_DB
# see if the user has overridden the failure limit
FAILURE_LIMIT = getattr(settings, 'DEFENDER_LOGIN_FAILURE_LIMIT', 3)
USE_USER_AGENT = getattr(settings, 'DEFENDER_USE_USER_AGENT', False)
from . import utils
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = getattr(settings,
'DEFENDER_USERNAME_FORM_FIELD',
'username')
# see if the django app is sitting behind a reverse proxy
BEHIND_REVERSE_PROXY = getattr(settings,
'DEFENDER_BEHIND_REVERSE_PROXY',
False)
# the prefix for these keys in your cache.
CACHE_PREFIX = getattr(settings,
'DEFENDER_CACHE_PREFIX',
'defender')
# if the django app is behind a reverse proxy, look for the
# ip address using this HTTP header value
REVERSE_PROXY_HEADER = getattr(settings,
'DEFENDER_REVERSE_PROXY_HEADER',
'HTTP_X_FORWARDED_FOR')
# how long to wait before the bad login attempt gets forgotten. in seconds.
COOLOFF_TIME = getattr(settings, 'DEFENDER_COOLOFF_TIME', 300) # seconds
LOCKOUT_TEMPLATE = getattr(settings, 'DEFENDER_LOCKOUT_TEMPLATE', None)
ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. "
"Note that both fields are case-sensitive.")
redis_server = redis.StrictRedis(
host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD)
log = logging.getLogger(__name__)
def is_valid_ip(ip_address):
""" Check Validity of an IP address """
valid = True
try:
socket.inet_aton(ip_address.strip())
except:
valid = False
return valid
def get_ip_address_from_request(request):
""" Makes the best attempt to get the client's real IP or return
the loopback """
PRIVATE_IPS_PREFIX = ('10.', '172.', '192.', '127.')
ip_address = ''
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', '')
if x_forwarded_for and ',' not in x_forwarded_for:
if not x_forwarded_for.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
x_forwarded_for):
ip_address = x_forwarded_for.strip()
else:
ips = [ip.strip() for ip in x_forwarded_for.split(',')]
for ip in ips:
if ip.startswith(PRIVATE_IPS_PREFIX):
continue
elif not is_valid_ip(ip):
continue
else:
ip_address = ip
break
if not ip_address:
x_real_ip = request.META.get('HTTP_X_REAL_IP', '')
if x_real_ip:
if not x_real_ip.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
x_real_ip):
ip_address = x_real_ip.strip()
if not ip_address:
remote_addr = request.META.get('REMOTE_ADDR', '')
if remote_addr:
if not remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
remote_addr):
ip_address = remote_addr.strip()
if remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
remote_addr):
ip_address = remote_addr.strip()
if not ip_address:
ip_address = '127.0.0.1'
return ip_address
def get_ip(request):
""" get the ip address from the request """
if not BEHIND_REVERSE_PROXY:
ip = get_ip_address_from_request(request)
else:
ip = request.META.get(REVERSE_PROXY_HEADER, '')
ip = ip.split(",", 1)[0].strip()
if ip == '':
ip = request.META.get('REMOTE_ADDR', '')
return ip
def get_lockout_url():
""" get the lockout url from the settings """
return getattr(settings, 'DEFENDER_LOCKOUT_URL', None)
def get_ip_attempt_cache_key(ip):
""" get the cache key by ip """
return "{0}:failed:ip:{1}".format(CACHE_PREFIX, ip)
def get_username_attempt_cache_key(username):
""" get the cache key by username """
return "{0}:failed:username:{1}".format(CACHE_PREFIX, username)
def get_ip_blocked_cache_key(ip):
""" get the cache key by ip """
return "{0}:blocked:ip:{1}".format(CACHE_PREFIX, ip)
def get_username_blocked_cache_key(username):
""" get the cache key by username """
return "{0}:blocked:username:{1}".format(CACHE_PREFIX, username)
def increment_key(key):
""" given a key increment the value """
# TODO make this one transaction, not two different ones.
new_value = redis_server.incr(key, 1)
redis_server.expire(key, COOLOFF_TIME)
return new_value
def get_user_attempts(request):
"""Returns number of access attempts for this ip, username
"""
ip = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
# get by IP
ip_count = redis_server.get(get_ip_attempt_cache_key(ip))
if not ip_count:
ip_count = 0
# get by username
username_count = redis_server.get(get_username_attempt_cache_key(username))
if not username_count:
username_count = 0
# return the larger of the two.
return max(ip_count, username_count)
def block_ip(ip):
""" given the ip, block it"""
key = get_ip_blocked_cache_key(ip)
redis_server.set(key, 'blocked', COOLOFF_TIME)
def block_username(username):
""" given the username block it. """
key = get_username_blocked_cache_key(username)
redis_server.set(key, 'blocked', COOLOFF_TIME)
def record_failed_attempt(ip, username):
""" record the failed login attempt """
# increment the failed count, and get current number
ip_count = increment_key(get_ip_attempt_cache_key(ip))
user_count = increment_key(get_username_attempt_cache_key(username))
# if either are over the limit, add to block
if ip_count > FAILURE_LIMIT or user_count > FAILURE_LIMIT:
block_ip(ip)
block_username(username)
return False
return True
def reset_failed_attempts(ip=None, username=None):
""" reset the failed attempts for these ip's and usernames
TODO: run all commands in one redis transaction
"""
if ip:
redis_server.delete(get_ip_attempt_cache_key(ip))
redis_server.delete(get_ip_blocked_cache_key(ip))
if username:
redis_server.delete(get_username_attempt_cache_key(username))
redis_server.delete(get_username_blocked_cache_key(username))
def lockout_response(request):
""" if we are locked out, here is the response """
if LOCKOUT_TEMPLATE:
context = {
'cooloff_time': COOLOFF_TIME,
'failure_limit': FAILURE_LIMIT,
}
return render_to_response(LOCKOUT_TEMPLATE, context,
context_instance=RequestContext(request))
LOCKOUT_URL = get_lockout_url()
if LOCKOUT_URL:
return HttpResponseRedirect(LOCKOUT_URL)
if COOLOFF_TIME:
return HttpResponse("Account locked: too many login attempts. "
"Please try again later.")
else:
return HttpResponse("Account locked: too many login attempts. "
"Contact an admin to unlock your account.")
def is_already_locked(request):
""" Is this IP/username already locked? """
ip_address = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
# ip blocked?
ip_blocked = redis_server.get(get_ip_blocked_cache_key(ip_address))
if not ip_blocked:
ip_blocked = False
# username blocked?
user_blocked = redis_server.get(get_username_blocked_cache_key(username))
if not user_blocked:
user_blocked = False
return ip_blocked or user_blocked
def check_request(request, login_unsuccessful):
""" check the request, and process results"""
ip_address = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
result = True
if login_unsuccessful:
# add a failed attempt for this user
result = record_failed_attempt(ip_address, username)
else:
# user logged in -- forget the failed attempts
reset_failed_attempts(ip=ip_address, username=username)
return result
def watch_login(func):
"""
Used to decorate the django.contrib.admin.site.login method.
@ -275,8 +21,8 @@ def watch_login(func):
# if the request is currently under lockout, do not proceed to the
# login function, go directly to lockout url, do not pass go, do not
# collect messages about this login attempt
if is_already_locked(request):
return lockout_response(request)
if utils.is_already_locked(request):
return utils.lockout_response(request)
# call the login function
response = func(request, *args, **kwargs)
@ -300,16 +46,16 @@ def watch_login(func):
AccessAttempt.objects.create(
user_agent=request.META.get('HTTP_USER_AGENT',
'<unknown>')[:255],
ip_address=get_ip(request),
ip_address=utils.get_ip(request),
username=request.POST.get(USERNAME_FORM_FIELD, None),
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
login_valid=not login_unsuccessful,
)
if check_request(request, login_unsuccessful):
if utils.check_request(request, login_unsuccessful):
return response
return lockout_response(request)
return utils.lockout_response(request)
return response

View file

@ -1,7 +1,7 @@
from django.conf import settings
from django.contrib.auth import views as auth_views
from defender.decorators import watch_login
from .decorators import watch_login
class FailedLoginMiddleware(object):

View file

@ -1,6 +1,7 @@
import random
import string
import time
from mock import patch
import mockredis
@ -9,7 +10,7 @@ from django.contrib.auth.models import User
from django.core.urlresolvers import NoReverseMatch
from django.core.urlresolvers import reverse
from defender.decorators import (
from .utils import (
COOLOFF_TIME, FAILURE_LIMIT, reset_failed_attempts)
@ -36,7 +37,7 @@ class AccessAttemptTest(TestCase):
return ''.join(random.choice(chars) for x in range(20))
@patch('defender.decorators.redis_server', redis_client)
@patch('defender.utils.redis_server', redis_client)
def _login(self, is_valid=False, user_agent='test-browser'):
"""Login a user. A valid credential is used when is_valid is True,
otherwise it will use a random string to make a failed login.
@ -51,7 +52,7 @@ class AccessAttemptTest(TestCase):
return response
@patch('defender.decorators.redis_server', redis_client)
@patch('defender.utils.redis_server', redis_client)
def setUp(self):
"""Create a valid user for login
"""
@ -65,7 +66,7 @@ class AccessAttemptTest(TestCase):
""" clean up the db """
redis_client.flushdb()
@patch('defender.decorators.redis_server', redis_client)
@patch('defender.utils.redis_server', redis_client)
def test_failure_limit_once(self):
"""Tests the login lock trying to login one more time
than failure limit
@ -80,7 +81,7 @@ class AccessAttemptTest(TestCase):
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE)
@patch('defender.decorators.redis_server', redis_client)
@patch('defender.utils.redis_server', redis_client)
def test_failure_limit_many(self):
"""Tests the login lock trying to login a lot of times more
than failure limit
@ -97,14 +98,14 @@ class AccessAttemptTest(TestCase):
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE)
@patch('defender.decorators.redis_server', redis_client)
@patch('defender.utils.redis_server', redis_client)
def test_valid_login(self):
"""Tests a valid login for a real username
"""
response = self._login(is_valid=True)
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)
@patch('defender.decorators.redis_server', redis_client)
@patch('defender.utils.redis_server', redis_client)
def test_cooling_off(self):
"""Tests if the cooling time allows a user to login
"""
@ -116,14 +117,14 @@ class AccessAttemptTest(TestCase):
# It should be possible to login again, make sure it is.
self.test_valid_login()
@patch('defender.decorators.redis_server', redis_client)
@patch('defender.utils.redis_server', redis_client)
def test_cooling_off_for_trusted_user(self):
"""Test the cooling time for a trusted user
"""
# Try the cooling off time
self.test_cooling_off()
@patch('defender.decorators.redis_server', redis_client)
@patch('defender.utils.redis_server', redis_client)
def test_long_user_agent_valid(self):
"""Tests if can handle a long user agent
"""
@ -131,7 +132,7 @@ class AccessAttemptTest(TestCase):
response = self._login(is_valid=True, user_agent=long_user_agent)
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)
@patch('defender.decorators.redis_server', redis_client)
@patch('defender.utils.redis_server', redis_client)
def test_long_user_agent_not_valid(self):
"""Tests if can handle a long user agent with failure
"""
@ -141,7 +142,7 @@ class AccessAttemptTest(TestCase):
self.assertContains(response, self.LOCKED_MESSAGE)
@patch('defender.decorators.redis_server', redis_client)
@patch('defender.utils.redis_server', redis_client)
def test_reset_ip(self):
"""Tests if can reset an ip address
"""

266
defender/utils.py Normal file
View file

@ -0,0 +1,266 @@
import logging
import socket
import redis
from django.conf import settings
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.utils.translation import ugettext_lazy
REDIS_HOST = settings.REDIS_HOST
REDIS_PORT = settings.REDIS_PORT
REDIS_PASSWORD = settings.REDIS_PASSWORD
REDIS_DB = settings.REDIS_DB
# see if the user has overridden the failure limit
FAILURE_LIMIT = getattr(settings, 'DEFENDER_LOGIN_FAILURE_LIMIT', 3)
USE_USER_AGENT = getattr(settings, 'DEFENDER_USE_USER_AGENT', False)
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = getattr(settings,
'DEFENDER_USERNAME_FORM_FIELD',
'username')
# see if the django app is sitting behind a reverse proxy
BEHIND_REVERSE_PROXY = getattr(settings,
'DEFENDER_BEHIND_REVERSE_PROXY',
False)
# the prefix for these keys in your cache.
CACHE_PREFIX = getattr(settings,
'DEFENDER_CACHE_PREFIX',
'defender')
# if the django app is behind a reverse proxy, look for the
# ip address using this HTTP header value
REVERSE_PROXY_HEADER = getattr(settings,
'DEFENDER_REVERSE_PROXY_HEADER',
'HTTP_X_FORWARDED_FOR')
# how long to wait before the bad login attempt gets forgotten. in seconds.
COOLOFF_TIME = getattr(settings, 'DEFENDER_COOLOFF_TIME', 300) # seconds
LOCKOUT_TEMPLATE = getattr(settings, 'DEFENDER_LOCKOUT_TEMPLATE', None)
ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. "
"Note that both fields are case-sensitive.")
redis_server = redis.StrictRedis(
host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD)
log = logging.getLogger(__name__)
def is_valid_ip(ip_address):
""" Check Validity of an IP address """
valid = True
try:
socket.inet_aton(ip_address.strip())
except:
valid = False
return valid
def get_ip_address_from_request(request):
""" Makes the best attempt to get the client's real IP or return
the loopback """
PRIVATE_IPS_PREFIX = ('10.', '172.', '192.', '127.')
ip_address = ''
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', '')
if x_forwarded_for and ',' not in x_forwarded_for:
if not x_forwarded_for.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
x_forwarded_for):
ip_address = x_forwarded_for.strip()
else:
ips = [ip.strip() for ip in x_forwarded_for.split(',')]
for ip in ips:
if ip.startswith(PRIVATE_IPS_PREFIX):
continue
elif not is_valid_ip(ip):
continue
else:
ip_address = ip
break
if not ip_address:
x_real_ip = request.META.get('HTTP_X_REAL_IP', '')
if x_real_ip:
if not x_real_ip.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
x_real_ip):
ip_address = x_real_ip.strip()
if not ip_address:
remote_addr = request.META.get('REMOTE_ADDR', '')
if remote_addr:
if not remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
remote_addr):
ip_address = remote_addr.strip()
if remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
remote_addr):
ip_address = remote_addr.strip()
if not ip_address:
ip_address = '127.0.0.1'
return ip_address
def get_ip(request):
""" get the ip address from the request """
if not BEHIND_REVERSE_PROXY:
ip = get_ip_address_from_request(request)
else:
ip = request.META.get(REVERSE_PROXY_HEADER, '')
ip = ip.split(",", 1)[0].strip()
if ip == '':
ip = request.META.get('REMOTE_ADDR', '')
return ip
def get_lockout_url():
""" get the lockout url from the settings """
return getattr(settings, 'DEFENDER_LOCKOUT_URL', None)
def get_ip_attempt_cache_key(ip):
""" get the cache key by ip """
return "{0}:failed:ip:{1}".format(CACHE_PREFIX, ip)
def get_username_attempt_cache_key(username):
""" get the cache key by username """
return "{0}:failed:username:{1}".format(CACHE_PREFIX, username)
def get_ip_blocked_cache_key(ip):
""" get the cache key by ip """
return "{0}:blocked:ip:{1}".format(CACHE_PREFIX, ip)
def get_username_blocked_cache_key(username):
""" get the cache key by username """
return "{0}:blocked:username:{1}".format(CACHE_PREFIX, username)
def increment_key(key):
""" given a key increment the value """
# TODO make this one transaction, not two different ones.
new_value = redis_server.incr(key, 1)
redis_server.expire(key, COOLOFF_TIME)
return new_value
def get_user_attempts(request):
"""Returns number of access attempts for this ip, username
"""
ip = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
# get by IP
ip_count = redis_server.get(get_ip_attempt_cache_key(ip))
if not ip_count:
ip_count = 0
# get by username
username_count = redis_server.get(get_username_attempt_cache_key(username))
if not username_count:
username_count = 0
# return the larger of the two.
return max(ip_count, username_count)
def block_ip(ip):
""" given the ip, block it"""
key = get_ip_blocked_cache_key(ip)
redis_server.set(key, 'blocked', COOLOFF_TIME)
def block_username(username):
""" given the username block it. """
key = get_username_blocked_cache_key(username)
redis_server.set(key, 'blocked', COOLOFF_TIME)
def record_failed_attempt(ip, username):
""" record the failed login attempt, if over limit return False,
if not over limit return True """
# increment the failed count, and get current number
ip_count = increment_key(get_ip_attempt_cache_key(ip))
user_count = increment_key(get_username_attempt_cache_key(username))
# if either are over the limit, add to block
if ip_count > FAILURE_LIMIT or user_count > FAILURE_LIMIT:
block_ip(ip)
block_username(username)
return False
return True
def reset_failed_attempts(ip=None, username=None):
""" reset the failed attempts for these ip's and usernames
TODO: run all commands in one redis transaction
"""
if ip:
redis_server.delete(get_ip_attempt_cache_key(ip))
redis_server.delete(get_ip_blocked_cache_key(ip))
if username:
redis_server.delete(get_username_attempt_cache_key(username))
redis_server.delete(get_username_blocked_cache_key(username))
def lockout_response(request):
""" if we are locked out, here is the response """
if LOCKOUT_TEMPLATE:
context = {
'cooloff_time': COOLOFF_TIME,
'failure_limit': FAILURE_LIMIT,
}
return render_to_response(LOCKOUT_TEMPLATE, context,
context_instance=RequestContext(request))
LOCKOUT_URL = get_lockout_url()
if LOCKOUT_URL:
return HttpResponseRedirect(LOCKOUT_URL)
if COOLOFF_TIME:
return HttpResponse("Account locked: too many login attempts. "
"Please try again later.")
else:
return HttpResponse("Account locked: too many login attempts. "
"Contact an admin to unlock your account.")
def is_already_locked(request):
""" Is this IP/username already locked? """
ip_address = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
# ip blocked?
ip_blocked = redis_server.get(get_ip_blocked_cache_key(ip_address))
if not ip_blocked:
ip_blocked = False
else:
# short circuit no need to check username if ip is already blocked.
return True
# username blocked?
user_blocked = redis_server.get(get_username_blocked_cache_key(username))
if not user_blocked:
user_blocked = False
return ip_blocked or user_blocked
def check_request(request, login_unsuccessful):
""" check the request, and process results"""
ip_address = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
if not login_unsuccessful:
# user logged in -- forget the failed attempts
reset_failed_attempts(ip=ip_address, username=username)
return True
else:
# add a failed attempt for this user
return record_failed_attempt(ip_address, username)