import logging import socket import redis from django.conf import settings from django.http import HttpResponse from django.http import HttpResponseRedirect from django.shortcuts import render_to_response from django.template import RequestContext from django.utils.translation import ugettext_lazy from .models import AccessAttempt def get_setting(variable, default=None): """ get the 'variable' from settings if not there use the provided default """ return getattr(settings, variable, default) # redis server host REDIS_HOST = get_setting('REDIS_HOST') # redis server port REDIS_PORT = get_setting('REDIS_PORT') # redis server password REDIS_PASSWORD = get_setting('REDIS_PASSWORD') # redis db REDIS_DB = get_setting('REDIS_DB') # see if the user has overridden the failure limit FAILURE_LIMIT = get_setting('DEFENDER_LOGIN_FAILURE_LIMIT', 3) USE_USER_AGENT = get_setting('DEFENDER_USE_USER_AGENT', False) # use a specific username field to retrieve from login POST data USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username') # see if the django app is sitting behind a reverse proxy BEHIND_REVERSE_PROXY = get_setting('DEFENDER_BEHIND_REVERSE_PROXY', False) # the prefix for these keys in your cache. CACHE_PREFIX = get_setting('DEFENDER_CACHE_PREFIX', 'defender') # if the django app is behind a reverse proxy, look for the # ip address using this HTTP header value REVERSE_PROXY_HEADER = get_setting('DEFENDER_REVERSE_PROXY_HEADER', 'HTTP_X_FORWARDED_FOR') # how long to wait before the bad login attempt gets forgotten. in seconds. COOLOFF_TIME = get_setting('DEFENDER_COOLOFF_TIME', 300) # seconds LOCKOUT_TEMPLATE = get_setting('DEFENDER_LOCKOUT_TEMPLATE') ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. " "Note that both fields are case-sensitive.") # use a specific username field to retrieve from login POST data USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username') LOCKOUT_URL = get_setting('DEFENDER_LOCKOUT_URL') redis_server = redis.StrictRedis( host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD) log = logging.getLogger(__name__) def is_valid_ip(ip_address): """ Check Validity of an IP address """ valid = True try: socket.inet_aton(ip_address.strip()) except: valid = False return valid def get_ip_address_from_request(request): """ Makes the best attempt to get the client's real IP or return the loopback """ PRIVATE_IPS_PREFIX = ('10.', '172.', '192.', '127.') ip_address = '' x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', '') if x_forwarded_for and ',' not in x_forwarded_for: if not x_forwarded_for.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( x_forwarded_for): ip_address = x_forwarded_for.strip() else: ips = [ip.strip() for ip in x_forwarded_for.split(',')] for ip in ips: if ip.startswith(PRIVATE_IPS_PREFIX): continue elif not is_valid_ip(ip): continue else: ip_address = ip break if not ip_address: x_real_ip = request.META.get('HTTP_X_REAL_IP', '') if x_real_ip: if not x_real_ip.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( x_real_ip): ip_address = x_real_ip.strip() if not ip_address: remote_addr = request.META.get('REMOTE_ADDR', '') if remote_addr: if not remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( remote_addr): ip_address = remote_addr.strip() if remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip( remote_addr): ip_address = remote_addr.strip() if not ip_address: ip_address = '127.0.0.1' return ip_address def get_ip(request): """ get the ip address from the request """ if not BEHIND_REVERSE_PROXY: ip = get_ip_address_from_request(request) else: ip = request.META.get(REVERSE_PROXY_HEADER, '') ip = ip.split(",", 1)[0].strip() if ip == '': ip = request.META.get('REMOTE_ADDR', '') return ip def get_ip_attempt_cache_key(ip): """ get the cache key by ip """ return "{0}:failed:ip:{1}".format(CACHE_PREFIX, ip) def get_username_attempt_cache_key(username): """ get the cache key by username """ return "{0}:failed:username:{1}".format(CACHE_PREFIX, username) def get_ip_blocked_cache_key(ip): """ get the cache key by ip """ return "{0}:blocked:ip:{1}".format(CACHE_PREFIX, ip) def get_username_blocked_cache_key(username): """ get the cache key by username """ return "{0}:blocked:username:{1}".format(CACHE_PREFIX, username) def increment_key(key): """ given a key increment the value """ # TODO make this one transaction, not two different ones. new_value = redis_server.incr(key, 1) redis_server.expire(key, COOLOFF_TIME) return new_value def get_user_attempts(request): """ Returns number of access attempts for this ip, username """ ip = get_ip(request) username = request.POST.get(USERNAME_FORM_FIELD, None) # get by IP ip_count = redis_server.get(get_ip_attempt_cache_key(ip)) if not ip_count: ip_count = 0 # get by username username_count = redis_server.get(get_username_attempt_cache_key(username)) if not username_count: username_count = 0 # return the larger of the two. return max(ip_count, username_count) def block_ip(ip): """ given the ip, block it """ key = get_ip_blocked_cache_key(ip) redis_server.set(key, 'blocked', COOLOFF_TIME) def block_username(username): """ given the username block it. """ key = get_username_blocked_cache_key(username) redis_server.set(key, 'blocked', COOLOFF_TIME) def record_failed_attempt(ip, username): """ record the failed login attempt, if over limit return False, if not over limit return True """ # increment the failed count, and get current number ip_count = increment_key(get_ip_attempt_cache_key(ip)) user_count = increment_key(get_username_attempt_cache_key(username)) # if either are over the limit, add to block if ip_count > FAILURE_LIMIT or user_count > FAILURE_LIMIT: block_ip(ip) block_username(username) return False return True def reset_failed_attempts(ip=None, username=None): """ reset the failed attempts for these ip's and usernames TODO: run all commands in one redis transaction """ if ip: redis_server.delete(get_ip_attempt_cache_key(ip)) redis_server.delete(get_ip_blocked_cache_key(ip)) if username: redis_server.delete(get_username_attempt_cache_key(username)) redis_server.delete(get_username_blocked_cache_key(username)) def lockout_response(request): """ if we are locked out, here is the response """ if LOCKOUT_TEMPLATE: context = { 'cooloff_time': COOLOFF_TIME, 'failure_limit': FAILURE_LIMIT, } return render_to_response(LOCKOUT_TEMPLATE, context, context_instance=RequestContext(request)) if LOCKOUT_URL: return HttpResponseRedirect(LOCKOUT_URL) if COOLOFF_TIME: return HttpResponse("Account locked: too many login attempts. " "Please try again later.") else: return HttpResponse("Account locked: too many login attempts. " "Contact an admin to unlock your account.") def is_already_locked(request): """ Is this IP/username already locked? """ ip_address = get_ip(request) username = request.POST.get(USERNAME_FORM_FIELD, None) # ip blocked? ip_blocked = redis_server.get(get_ip_blocked_cache_key(ip_address)) if not ip_blocked: ip_blocked = False else: # short circuit no need to check username if ip is already blocked. return True # username blocked? user_blocked = redis_server.get(get_username_blocked_cache_key(username)) if not user_blocked: user_blocked = False return ip_blocked or user_blocked def check_request(request, login_unsuccessful): """ check the request, and process results""" ip_address = get_ip(request) username = request.POST.get(USERNAME_FORM_FIELD, None) if not login_unsuccessful: # user logged in -- forget the failed attempts reset_failed_attempts(ip=ip_address, username=username) return True else: # add a failed attempt for this user return record_failed_attempt(ip_address, username) def add_login_attempt(request, login_valid): """ Create a record for the login attempt """ AccessAttempt.objects.create( user_agent=request.META.get('HTTP_USER_AGENT', '')[:255], ip_address=get_ip(request), username=request.POST.get(USERNAME_FORM_FIELD, None), http_accept=request.META.get('HTTP_ACCEPT', ''), path_info=request.META.get('PATH_INFO', ''), login_valid=login_valid, )