django-defender/defender/utils.py
Alex White 9209f0579f Fix DataError on login
A watched login failure causes a 500 saving a 256 character long username into the login attempts.  Conditionally slice it to fit AccessAttempt
2016-05-11 14:07:50 -07:00

333 lines
10 KiB
Python

import logging
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.shortcuts import render
from django.core.validators import validate_ipv46_address
from django.core.exceptions import ValidationError
from .connection import get_redis_connection
from . import config
from .data import store_login_attempt
REDIS_SERVER = get_redis_connection()
LOG = logging.getLogger(__name__)
def is_valid_ip(ip_address):
""" Check Validity of an IP address """
if not ip_address:
return False
ip_address = ip_address.strip()
try:
validate_ipv46_address(ip_address)
return True
except ValidationError:
return False
def get_ip_address_from_request(request):
""" Makes the best attempt to get the client's real IP or return
the loopback """
remote_addr = request.META.get('REMOTE_ADDR', '')
if remote_addr and is_valid_ip(remote_addr):
return remote_addr.strip()
return '127.0.0.1'
def get_ip(request):
""" get the ip address from the request """
if config.BEHIND_REVERSE_PROXY:
ip_address = request.META.get(config.REVERSE_PROXY_HEADER, '')
ip_address = ip_address.split(",", 1)[0].strip()
if ip_address == '':
ip_address = get_ip_address_from_request(request)
else:
ip_address = get_ip_address_from_request(request)
return ip_address
def get_ip_attempt_cache_key(ip_address):
""" get the cache key by ip """
return "{0}:failed:ip:{1}".format(config.CACHE_PREFIX, ip_address)
def get_username_attempt_cache_key(username):
""" get the cache key by username """
return "{0}:failed:username:{1}".format(config.CACHE_PREFIX, username)
def get_ip_blocked_cache_key(ip_address):
""" get the cache key by ip """
return "{0}:blocked:ip:{1}".format(config.CACHE_PREFIX, ip_address)
def get_username_blocked_cache_key(username):
""" get the cache key by username """
return "{0}:blocked:username:{1}".format(config.CACHE_PREFIX, username)
def strip_keys(key_list):
""" Given a list of keys, remove the prefix and remove just
the data we care about.
for example:
['defender:blocked:ip:ken', 'defender:blocked:ip:joffrey']
would result in:
['ken', 'joffrey']
"""
return [key.split(":")[-1] for key in key_list]
def get_blocked_ips():
""" get a list of blocked ips from redis """
if config.DISABLE_IP_LOCKOUT:
# There are no blocked IP's since we disabled them.
return []
key = get_ip_blocked_cache_key("*")
key_list = [redis_key.decode('utf-8')
for redis_key in REDIS_SERVER.keys(key)]
return strip_keys(key_list)
def get_blocked_usernames():
""" get a list of blocked usernames from redis """
key = get_username_blocked_cache_key("*")
key_list = [redis_key.decode('utf-8')
for redis_key in REDIS_SERVER.keys(key)]
return strip_keys(key_list)
def increment_key(key):
""" given a key increment the value """
pipe = REDIS_SERVER.pipeline()
pipe.incr(key, 1)
if config.COOLOFF_TIME:
pipe.expire(key, config.COOLOFF_TIME)
new_value = pipe.execute()[0]
return new_value
def get_user_attempts(request):
""" Returns number of access attempts for this ip, username
"""
ip_address = get_ip(request)
username = request.POST.get(config.USERNAME_FORM_FIELD, None)
# get by IP
ip_count = REDIS_SERVER.get(get_ip_attempt_cache_key(ip_address))
if not ip_count:
ip_count = 0
ip_count = int(ip_count)
# get by username
username_count = REDIS_SERVER.get(get_username_attempt_cache_key(username))
if not username_count:
username_count = 0
username_count = int(username_count)
# return the larger of the two.
return max(ip_count, username_count)
def block_ip(ip_address):
""" given the ip, block it """
if not ip_address:
# no reason to continue when there is no ip
return
if config.DISABLE_IP_LOCKOUT:
# no need to block, we disabled it.
return
key = get_ip_blocked_cache_key(ip_address)
if config.COOLOFF_TIME:
REDIS_SERVER.set(key, 'blocked', config.COOLOFF_TIME)
else:
REDIS_SERVER.set(key, 'blocked')
def block_username(username):
""" given the username block it. """
if not username:
# no reason to continue when there is no username
return
key = get_username_blocked_cache_key(username)
if config.COOLOFF_TIME:
REDIS_SERVER.set(key, 'blocked', config.COOLOFF_TIME)
else:
REDIS_SERVER.set(key, 'blocked')
def record_failed_attempt(ip_address, username):
""" record the failed login attempt, if over limit return False,
if not over limit return True """
# increment the failed count, and get current number
ip_block = False
if not config.DISABLE_IP_LOCKOUT:
# we only want to increment the IP if this is disabled.
ip_count = increment_key(get_ip_attempt_cache_key(ip_address))
# if over the limit, add to block
if ip_count > config.FAILURE_LIMIT:
block_ip(ip_address)
ip_block = True
user_block = False
user_count = increment_key(get_username_attempt_cache_key(username))
# if over the limit, add to block
if user_count > config.FAILURE_LIMIT:
block_username(username)
user_block = True
# if we have this turned on, then there is no reason to look at ip_block
# we will just look at user_block, and short circut the result since
# we don't need to continue.
if config.DISABLE_IP_LOCKOUT:
# if user_block is True, it means it was blocked
# we need to return False
return not user_block
# we want to make sure both the IP and user is blocked before we
# return False
# this is mostly used when a lot of your users are using proxies,
# and you don't want one user to block everyone on that one IP.
if config.LOCKOUT_BY_IP_USERNAME:
# both ip_block and user_block need to be True in order
# to return a False.
return not (ip_block and user_block)
# if any blocks return False, no blocks. return True
return not (ip_block or user_block)
def unblock_ip(ip_address, pipe=None):
""" unblock the given IP """
do_commit = False
if not pipe:
pipe = REDIS_SERVER.pipeline()
do_commit = True
if ip_address:
pipe.delete(get_ip_attempt_cache_key(ip_address))
pipe.delete(get_ip_blocked_cache_key(ip_address))
if do_commit:
pipe.execute()
def unblock_username(username, pipe=None):
""" unblock the given Username """
do_commit = False
if not pipe:
pipe = REDIS_SERVER.pipeline()
do_commit = True
if username:
pipe.delete(get_username_attempt_cache_key(username))
pipe.delete(get_username_blocked_cache_key(username))
if do_commit:
pipe.execute()
def reset_failed_attempts(ip_address=None, username=None):
""" reset the failed attempts for these ip's and usernames
"""
pipe = REDIS_SERVER.pipeline()
unblock_ip(ip_address, pipe=pipe)
unblock_username(username, pipe=pipe)
pipe.execute()
def lockout_response(request):
""" if we are locked out, here is the response """
if config.LOCKOUT_TEMPLATE:
context = {
'cooloff_time_seconds': config.COOLOFF_TIME,
'cooloff_time_minutes': config.COOLOFF_TIME / 60,
'failure_limit': config.FAILURE_LIMIT,
}
return render(request, config.LOCKOUT_TEMPLATE, context)
if config.LOCKOUT_URL:
return HttpResponseRedirect(config.LOCKOUT_URL)
if config.COOLOFF_TIME:
return HttpResponse("Account locked: too many login attempts. "
"Please try again later.")
else:
return HttpResponse("Account locked: too many login attempts. "
"Contact an admin to unlock your account.")
def is_user_already_locked(username):
"""Is this username already locked?"""
if username is None:
return False
return REDIS_SERVER.get(get_username_blocked_cache_key(username))
def is_source_ip_already_locked(ip_address):
"""Is this IP already locked?"""
if ip_address is None:
return False
if config.DISABLE_IP_LOCKOUT:
return False
return REDIS_SERVER.get(get_ip_blocked_cache_key(ip_address))
def is_already_locked(request):
"""Parse the username & IP from the request, and see if it's already locked."""
user_blocked = is_user_already_locked(
request.POST.get(config.USERNAME_FORM_FIELD, None))
ip_blocked = is_source_ip_already_locked(get_ip(request))
if config.LOCKOUT_BY_IP_USERNAME:
# if both this IP and this username are present the request is blocked
return ip_blocked and user_blocked
return ip_blocked or user_blocked
def check_request(request, login_unsuccessful):
""" check the request, and process results"""
ip_address = get_ip(request)
username = request.POST.get(config.USERNAME_FORM_FIELD, None)
if not login_unsuccessful:
# user logged in -- forget the failed attempts
reset_failed_attempts(ip_address=ip_address, username=username)
return True
else:
# add a failed attempt for this user
return record_failed_attempt(ip_address, username)
def add_login_attempt_to_db(request, login_valid):
""" Create a record for the login attempt If using celery call celery
task, if not, call the method normally """
if not config.STORE_ACCESS_ATTEMPTS:
# If we don't want to store in the database, then don't proceed.
return
if config.USERNAME_FORM_FIELD in request.POST:
username = request.POST[config.USERNAME_FORM_FIELD][:255]
else:
username = None
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
ip_address = get_ip(request)
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')
path_info = request.META.get('PATH_INFO', '<unknown>')
if config.USE_CELERY:
from .tasks import add_login_attempt_task
add_login_attempt_task.delay(user_agent, ip_address, username,
http_accept, path_info, login_valid)
else:
store_login_attempt(user_agent, ip_address, username,
http_accept, path_info, login_valid)