mirror of
https://github.com/jazzband/django-defender.git
synced 2026-03-16 22:10:32 +00:00
316 lines
10 KiB
Python
316 lines
10 KiB
Python
import logging
|
|
import socket
|
|
|
|
import redis
|
|
from django.conf import settings
|
|
from django.http import HttpResponse
|
|
from django.http import HttpResponseRedirect
|
|
from django.shortcuts import render_to_response
|
|
from django.template import RequestContext
|
|
from django.utils.translation import ugettext_lazy
|
|
|
|
from .models import AccessAttempt
|
|
|
|
REDIS_HOST = settings.REDIS_HOST
|
|
REDIS_PORT = settings.REDIS_PORT
|
|
REDIS_PASSWORD = settings.REDIS_PASSWORD
|
|
REDIS_DB = settings.REDIS_DB
|
|
|
|
# see if the user has overridden the failure limit
|
|
FAILURE_LIMIT = getattr(settings, 'DEFENDER_LOGIN_FAILURE_LIMIT', 3)
|
|
|
|
USE_USER_AGENT = getattr(settings, 'DEFENDER_USE_USER_AGENT', False)
|
|
|
|
# use a specific username field to retrieve from login POST data
|
|
USERNAME_FORM_FIELD = getattr(settings,
|
|
'DEFENDER_USERNAME_FORM_FIELD',
|
|
'username')
|
|
|
|
# see if the django app is sitting behind a reverse proxy
|
|
BEHIND_REVERSE_PROXY = getattr(settings,
|
|
'DEFENDER_BEHIND_REVERSE_PROXY',
|
|
False)
|
|
# the prefix for these keys in your cache.
|
|
CACHE_PREFIX = getattr(settings,
|
|
'DEFENDER_CACHE_PREFIX',
|
|
'defender')
|
|
|
|
# if the django app is behind a reverse proxy, look for the
|
|
# ip address using this HTTP header value
|
|
REVERSE_PROXY_HEADER = getattr(settings,
|
|
'DEFENDER_REVERSE_PROXY_HEADER',
|
|
'HTTP_X_FORWARDED_FOR')
|
|
|
|
# how long to wait before the bad login attempt gets forgotten. in seconds.
|
|
COOLOFF_TIME = getattr(settings, 'DEFENDER_COOLOFF_TIME', 300) # seconds
|
|
|
|
LOCKOUT_TEMPLATE = getattr(settings, 'DEFENDER_LOCKOUT_TEMPLATE', None)
|
|
|
|
ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. "
|
|
"Note that both fields are case-sensitive.")
|
|
|
|
redis_server = redis.StrictRedis(
|
|
host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD)
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def is_valid_ip(ip_address):
|
|
""" Check Validity of an IP address """
|
|
valid = True
|
|
try:
|
|
socket.inet_aton(ip_address.strip())
|
|
except:
|
|
valid = False
|
|
return valid
|
|
|
|
|
|
def get_ip_address_from_request(request):
|
|
""" Makes the best attempt to get the client's real IP or return
|
|
the loopback """
|
|
PRIVATE_IPS_PREFIX = ('10.', '172.', '192.', '127.')
|
|
ip_address = ''
|
|
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', '')
|
|
if x_forwarded_for and ',' not in x_forwarded_for:
|
|
if not x_forwarded_for.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
|
|
x_forwarded_for):
|
|
ip_address = x_forwarded_for.strip()
|
|
else:
|
|
ips = [ip.strip() for ip in x_forwarded_for.split(',')]
|
|
for ip in ips:
|
|
if ip.startswith(PRIVATE_IPS_PREFIX):
|
|
continue
|
|
elif not is_valid_ip(ip):
|
|
continue
|
|
else:
|
|
ip_address = ip
|
|
break
|
|
if not ip_address:
|
|
x_real_ip = request.META.get('HTTP_X_REAL_IP', '')
|
|
if x_real_ip:
|
|
if not x_real_ip.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
|
|
x_real_ip):
|
|
ip_address = x_real_ip.strip()
|
|
if not ip_address:
|
|
remote_addr = request.META.get('REMOTE_ADDR', '')
|
|
if remote_addr:
|
|
if not remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
|
|
remote_addr):
|
|
ip_address = remote_addr.strip()
|
|
if remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
|
|
remote_addr):
|
|
ip_address = remote_addr.strip()
|
|
if not ip_address:
|
|
ip_address = '127.0.0.1'
|
|
return ip_address
|
|
|
|
|
|
def get_ip(request):
|
|
""" get the ip address from the request """
|
|
if not BEHIND_REVERSE_PROXY:
|
|
ip = get_ip_address_from_request(request)
|
|
else:
|
|
ip = request.META.get(REVERSE_PROXY_HEADER, '')
|
|
ip = ip.split(",", 1)[0].strip()
|
|
if ip == '':
|
|
ip = request.META.get('REMOTE_ADDR', '')
|
|
return ip
|
|
|
|
|
|
def get_lockout_url():
|
|
""" get the lockout url from the settings """
|
|
return getattr(settings, 'DEFENDER_LOCKOUT_URL', None)
|
|
|
|
|
|
def get_ip_attempt_cache_key(ip):
|
|
""" get the cache key by ip """
|
|
return "{0}:failed:ip:{1}".format(CACHE_PREFIX, ip)
|
|
|
|
|
|
def get_username_attempt_cache_key(username):
|
|
""" get the cache key by username """
|
|
return "{0}:failed:username:{1}".format(CACHE_PREFIX, username)
|
|
|
|
|
|
def get_ip_blocked_cache_key(ip):
|
|
""" get the cache key by ip """
|
|
return "{0}:blocked:ip:{1}".format(CACHE_PREFIX, ip)
|
|
|
|
|
|
def get_username_blocked_cache_key(username):
|
|
""" get the cache key by username """
|
|
return "{0}:blocked:username:{1}".format(CACHE_PREFIX, username)
|
|
|
|
|
|
def increment_key(key):
|
|
""" given a key increment the value """
|
|
# TODO make this one transaction, not two different ones.
|
|
new_value = redis_server.incr(key, 1)
|
|
redis_server.expire(key, COOLOFF_TIME)
|
|
return new_value
|
|
|
|
|
|
def get_user_attempts(request):
|
|
"""Returns number of access attempts for this ip, username
|
|
"""
|
|
ip = get_ip(request)
|
|
|
|
username = request.POST.get(USERNAME_FORM_FIELD, None)
|
|
|
|
# get by IP
|
|
ip_count = redis_server.get(get_ip_attempt_cache_key(ip))
|
|
if not ip_count:
|
|
ip_count = 0
|
|
|
|
# get by username
|
|
username_count = redis_server.get(get_username_attempt_cache_key(username))
|
|
if not username_count:
|
|
username_count = 0
|
|
|
|
# return the larger of the two.
|
|
return max(ip_count, username_count)
|
|
|
|
|
|
def block_ip(ip):
|
|
""" given the ip, block it"""
|
|
key = get_ip_blocked_cache_key(ip)
|
|
redis_server.set(key, 'blocked', COOLOFF_TIME)
|
|
|
|
|
|
def block_username(username):
|
|
""" given the username block it. """
|
|
key = get_username_blocked_cache_key(username)
|
|
redis_server.set(key, 'blocked', COOLOFF_TIME)
|
|
|
|
|
|
def record_failed_attempt(ip, username):
|
|
""" record the failed login attempt """
|
|
# increment the failed count, and get current number
|
|
ip_count = increment_key(get_ip_attempt_cache_key(ip))
|
|
user_count = increment_key(get_username_attempt_cache_key(username))
|
|
|
|
# if either are over the limit, add to block
|
|
if ip_count > FAILURE_LIMIT or user_count > FAILURE_LIMIT:
|
|
block_ip(ip)
|
|
block_username(username)
|
|
return False
|
|
return True
|
|
|
|
|
|
def reset_failed_attempts(ip=None, username=None):
|
|
""" reset the failed attempts for these ip's and usernames
|
|
TODO: run all commands in one redis transaction
|
|
"""
|
|
if ip:
|
|
redis_server.delete(get_ip_attempt_cache_key(ip))
|
|
redis_server.delete(get_ip_blocked_cache_key(ip))
|
|
if username:
|
|
redis_server.delete(get_username_attempt_cache_key(username))
|
|
redis_server.delete(get_username_blocked_cache_key(username))
|
|
|
|
|
|
def lockout_response(request):
|
|
""" if we are locked out, here is the response """
|
|
if LOCKOUT_TEMPLATE:
|
|
context = {
|
|
'cooloff_time': COOLOFF_TIME,
|
|
'failure_limit': FAILURE_LIMIT,
|
|
}
|
|
return render_to_response(LOCKOUT_TEMPLATE, context,
|
|
context_instance=RequestContext(request))
|
|
|
|
LOCKOUT_URL = get_lockout_url()
|
|
if LOCKOUT_URL:
|
|
return HttpResponseRedirect(LOCKOUT_URL)
|
|
|
|
if COOLOFF_TIME:
|
|
return HttpResponse("Account locked: too many login attempts. "
|
|
"Please try again later.")
|
|
else:
|
|
return HttpResponse("Account locked: too many login attempts. "
|
|
"Contact an admin to unlock your account.")
|
|
|
|
|
|
def is_already_locked(request):
|
|
""" Is this IP/username already locked? """
|
|
ip_address = get_ip(request)
|
|
username = request.POST.get(USERNAME_FORM_FIELD, None)
|
|
|
|
# ip blocked?
|
|
ip_blocked = redis_server.get(get_ip_blocked_cache_key(ip_address))
|
|
|
|
if not ip_blocked:
|
|
ip_blocked = False
|
|
|
|
# username blocked?
|
|
user_blocked = redis_server.get(get_username_blocked_cache_key(username))
|
|
if not user_blocked:
|
|
user_blocked = False
|
|
|
|
return ip_blocked or user_blocked
|
|
|
|
|
|
def check_request(request, login_unsuccessful):
|
|
""" check the request, and process results"""
|
|
ip_address = get_ip(request)
|
|
username = request.POST.get(USERNAME_FORM_FIELD, None)
|
|
result = True
|
|
|
|
if login_unsuccessful:
|
|
# add a failed attempt for this user
|
|
result = record_failed_attempt(ip_address, username)
|
|
else:
|
|
# user logged in -- forget the failed attempts
|
|
reset_failed_attempts(ip=ip_address, username=username)
|
|
|
|
return result
|
|
|
|
|
|
def watch_login(func):
|
|
"""
|
|
Used to decorate the django.contrib.admin.site.login method.
|
|
"""
|
|
|
|
def decorated_login(request, *args, **kwargs):
|
|
# if the request is currently under lockout, do not proceed to the
|
|
# login function, go directly to lockout url, do not pass go, do not
|
|
# collect messages about this login attempt
|
|
if is_already_locked(request):
|
|
return lockout_response(request)
|
|
|
|
# call the login function
|
|
response = func(request, *args, **kwargs)
|
|
|
|
if func.__name__ == 'decorated_login':
|
|
# if we're dealing with this function itself, don't bother checking
|
|
# for invalid login attempts. I suppose there's a bunch of
|
|
# recursion going on here that used to cause one failed login
|
|
# attempt to generate 10+ failed access attempt records (with 3
|
|
# failed attempts each supposedly)
|
|
return response
|
|
|
|
if request.method == 'POST':
|
|
# see if the login was successful
|
|
login_unsuccessful = (
|
|
response and
|
|
not response.has_header('location') and
|
|
response.status_code != 302
|
|
)
|
|
|
|
AccessAttempt.objects.create(
|
|
user_agent=request.META.get('HTTP_USER_AGENT',
|
|
'<unknown>')[:255],
|
|
ip_address=get_ip(request),
|
|
username=request.POST.get(USERNAME_FORM_FIELD, None),
|
|
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
|
|
path_info=request.META.get('PATH_INFO', '<unknown>'),
|
|
login_valid=not login_unsuccessful,
|
|
)
|
|
if check_request(request, login_unsuccessful):
|
|
return response
|
|
|
|
return lockout_response(request)
|
|
|
|
return response
|
|
|
|
return decorated_login
|