initial checkin

This commit is contained in:
Ken Cochrane 2014-09-23 17:31:17 -07:00
parent f361ec3e48
commit 859bc4a25d
8 changed files with 552 additions and 1 deletions

3
CHANGES Normal file
View file

@ -0,0 +1,3 @@
0.1
===
Initial Version

125
README.md
View file

@ -1,4 +1,127 @@
django-defender
===============
A simple django reusable app that blocks people from brute forcing login attemps
A simple django reusable app that blocks people from brute forcing login
attempts. The goal is to make this as fast as possible, so that we do not
slow down the login attempts.
We will use a cache so that it doesn't have to hit the database in order to
check the database on each login attempt. The first version will be based on
Redis, but the goal is to make this configurable so that people can use what
they want for a backend, so it is configurable.
Version 0.1 will be very limited in features, it will only do a few things, but
the goal is to do those things very well, and have full unit tests with docs.
Goals for 0.1
=============
- Log all login attempts to the database
- support for reverse proxies with different headers for IP addresses
- rate limit based on:
- username
- ip address
- use redis for the blacklist
- configuration
- redis server
- host
- port
- database
- password
- key_prefix
- block length
- number of incorrect attempts before block
- 100% code coverage
- full documentation
- admin pages
- list of blocked usernames and ip's
- ability to unblock people
- list of recent login attempts
- search by username for recent login attempts
Long term goals
===============
- pluggable backends, so people can use something other then redis.
- email users when their account is blocked
- add a whitelist for username and ip's that we will never block (admin's, etc)
- add a permanent black list
- ip address
- scan for known proxy ip's and don't block requests coming from those
(improve the chances that a good IP is blocked)
- add management command to prune old (configurable) login attempts.
Why not django-axes
===================
django-axes is great but it puts everything in the database, and this causes
a bottle neck when you have a lot of data. It slows down the auth requests by
as much as 200-300ms. This might not be much for some sites, but for others it
is too long.
This started out as a fork of django-axes, and is using as much of their code
as possible, and removing the parts not needed, and speeding up the lookups
to improve the login.
requirements
============
- django >= 1.6 (may work is previous versions, but not officially supported)
- redis
How it works
============
1. When someone tries to login, we first check to see if they are currently
blocked. We check the username they are trying to use, as well as the IP
address. If they are blocked, goto step 10. If not blocked go to step 2.
2. They are not blocked, so we check to see if the login was valid. If valid
go to step 20. If not valid go to step 3.
3. Login attempt wasn't valid. Add their username and IP address for this
attempt to the cache. If this brings them over the limit, add them to the
blocked list, and then goto step 10. If not over the limit goto step 4.
4. login was invalid, but not over the limit. Send them back to the login screen
to try again.
10. User is blocked: Send them to the blocked page, telling them they are
blocked, and give an estimate on when they will be unblocked.
20. Login is valid. Reset any failed login attempts, and forward to their
destination.
Cache backend:
==============
IP_attempts (count, TTL)
username_attempts (count, TTL)
ip_blocks (list) # how to expire when in a list?
username_blocks (list) # how to expire item in the list?
prefix:failed:ip:[ip] (count, expires)
prefix:failed:username:[username] (count, expires)
prefix:blocked:ip:[ip] (true, TTL)
prefix:blocked:username:[username] (true, TTL)
# example of how to do rate limiting by IP
# assuming it is 10 requests being the limit
# this assumes there is a DECAY of DECAY_TIME
# to remove invalid logins after a set number of time
# For every incorrect login, we reset the block time.
FUNCTION LIMIT_API_CALL(ip)
current = LLEN(ip)
IF current > 10 THEN
ERROR "too many requests per second"
ELSE
MULTI
RPUSH(ip, ip)
EXPIRE(ip, DECAY_TIME)
EXEC
END

0
defender/__init__.py Normal file
View file

44
defender/admin.py Normal file
View file

@ -0,0 +1,44 @@
from django.contrib import admin
from .models import AccessAttempt
class AccessAttemptAdmin(admin.ModelAdmin):
list_display = (
'attempt_time',
'ip_address',
'user_agent',
'username',
'path_info',
'login_valid',
)
list_filter = [
'attempt_time',
'ip_address',
'username',
'path_info',
]
search_fields = [
'ip_address',
'username',
'user_agent',
'path_info',
]
date_hierarchy = 'attempt_time'
fieldsets = (
(None, {
'fields': ('path_info', 'login_valid')
}),
('Form Data', {
'fields': ('get_data', 'post_data')
}),
('Meta Data', {
'fields': ('user_agent', 'ip_address', 'http_accept')
})
)
admin.site.register(AccessAttempt, AccessAttemptAdmin)

312
defender/decorators.py Normal file
View file

@ -0,0 +1,312 @@
import logging
import socket
import redis
from django.conf import settings
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.shortcuts import render_to_response
from django.template import RequestContext
from django.utils.translation import ugettext_lazy
from .models import AccessAttempt
REDIS_HOST = settings.REDIS_HOST
REDIS_PORT = settings.REDIS_PORT
REDIS_PASSWORD = settings.REDIS_PASSWORD
REDIS_DB = settings.REDIS_DB
# see if the user has overridden the failure limit
FAILURE_LIMIT = getattr(settings, 'DEFENDER_LOGIN_FAILURE_LIMIT', 3)
USE_USER_AGENT = getattr(settings, 'DEFENDER_USE_USER_AGENT', False)
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = getattr(settings,
'DEFENDER_USERNAME_FORM_FIELD',
'username')
# see if the django app is sitting behind a reverse proxy
BEHIND_REVERSE_PROXY = getattr(settings,
'DEFENDER_BEHIND_REVERSE_PROXY',
False)
# the prefix for these keys in your cache.
CACHE_PREFIX = getattr(settings,
'DEFENDER_CACHE_PREFIX',
False)
# if the django app is behind a reverse proxy, look for the
# ip address using this HTTP header value
REVERSE_PROXY_HEADER = getattr(settings,
'DEFENDER_REVERSE_PROXY_HEADER',
'HTTP_X_FORWARDED_FOR')
# how long to wait before the bad login attempt gets forgotten. in seconds.
COOLOFF_TIME = getattr(settings, 'DEFENDER_COOLOFF_TIME', 300) # seconds
LOCKOUT_TEMPLATE = getattr(settings, 'DEFENDER_LOCKOUT_TEMPLATE', None)
VERBOSE = getattr(settings, 'DEFENDER_VERBOSE', True)
ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. "
"Note that both fields are case-sensitive.")
redis_server = redis.StrictRedis(
host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB, password=REDIS_PASSWORD)
log = logging.getLogger(__name__)
def is_valid_ip(ip_address):
""" Check Validity of an IP address """
valid = True
try:
socket.inet_aton(ip_address.strip())
except:
valid = False
return valid
def get_ip_address_from_request(request):
""" Makes the best attempt to get the client's real IP or return
the loopback """
PRIVATE_IPS_PREFIX = ('10.', '172.', '192.', '127.')
ip_address = ''
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', '')
if x_forwarded_for and ',' not in x_forwarded_for:
if not x_forwarded_for.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
x_forwarded_for):
ip_address = x_forwarded_for.strip()
else:
ips = [ip.strip() for ip in x_forwarded_for.split(',')]
for ip in ips:
if ip.startswith(PRIVATE_IPS_PREFIX):
continue
elif not is_valid_ip(ip):
continue
else:
ip_address = ip
break
if not ip_address:
x_real_ip = request.META.get('HTTP_X_REAL_IP', '')
if x_real_ip:
if not x_real_ip.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
x_real_ip):
ip_address = x_real_ip.strip()
if not ip_address:
remote_addr = request.META.get('REMOTE_ADDR', '')
if remote_addr:
if not remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
remote_addr):
ip_address = remote_addr.strip()
if remote_addr.startswith(PRIVATE_IPS_PREFIX) and is_valid_ip(
remote_addr):
ip_address = remote_addr.strip()
if not ip_address:
ip_address = '127.0.0.1'
return ip_address
def get_ip(request):
""" get the ip address from the request """
if not BEHIND_REVERSE_PROXY:
ip = get_ip_address_from_request(request)
else:
ip = request.META.get(REVERSE_PROXY_HEADER, '')
ip = ip.split(",", 1)[0].strip()
if ip == '':
ip = request.META.get('REMOTE_ADDR', '')
return ip
def get_lockout_url():
""" get the lockout url from the settings """
return getattr(settings, 'DEFENDER_LOCKOUT_URL', None)
def get_ip_attempt_cache_key(ip):
""" get the cache key by ip """
return "{0}:failed:ip:{1}".format(CACHE_PREFIX, ip)
def get_username_attempt_cache_key(username):
""" get the cache key by username """
return "{0}:failed:username:{1}".format(CACHE_PREFIX, username)
def get_ip_blocked_cache_key(ip):
""" get the cache key by ip """
return "{0}:blocked:ip:{1}".format(CACHE_PREFIX, ip)
def get_username_blocked_cache_key(username):
""" get the cache key by username """
return "{0}:blocked:username:{1}".format(CACHE_PREFIX, username)
def increment_key(key):
""" given a key increment the value """
# TODO make this one transaction, not two different ones.
new_value = redis_server.incr(key, 1)
redis_server.expire(key, COOLOFF_TIME)
return new_value
def get_user_attempts(request):
"""Returns number of access attempts for this ip, username
"""
ip = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
# get by IP
ip_count = redis_server.get(get_ip_attempt_cache_key(ip))
if not ip_count:
ip_count = 0
# get by username
username_count = redis_server.get(get_username_attempt_cache_key(username))
if not username_count:
username_count = 0
# return the larger of the two.
return max(ip_count, username_count)
def block_ip(ip):
""" given the ip, block it"""
key = get_ip_blocked_cache_key(ip)
redis_server.set(key, COOLOFF_TIME)
def block_username(username):
""" given the username block it. """
key = get_username_blocked_cache_key(username)
redis_server.set(key, COOLOFF_TIME)
def record_failed_attempt(ip, username):
""" record the failed login attempt """
# increment the failed count, and get current number
ip_count = increment_key(get_ip_attempt_cache_key(ip))
user_count = increment_key(get_username_attempt_cache_key(username))
# if either are over the limit, add to block
if ip_count > FAILURE_LIMIT or user_count > FAILURE_LIMIT:
block_ip(ip)
block_username(username)
return False
return True
def reset_failed_attempts(ip, username):
""" reset the failed attempts for these ip's and usernames """
redis_server.delete(get_ip_attempt_cache_key(ip))
redis_server.delete(get_username_attempt_cache_key(username))
redis_server.delete(get_username_blocked_cache_key(username))
redis_server.delete(get_ip_blocked_cache_key(ip))
def lockout_response(request):
""" if we are locked out, here is the response """
if LOCKOUT_TEMPLATE:
context = {
'cooloff_time': COOLOFF_TIME,
'failure_limit': FAILURE_LIMIT,
}
return render_to_response(LOCKOUT_TEMPLATE, context,
context_instance=RequestContext(request))
LOCKOUT_URL = get_lockout_url()
if LOCKOUT_URL:
return HttpResponseRedirect(LOCKOUT_URL)
if COOLOFF_TIME:
return HttpResponse("Account locked: too many login attempts. "
"Please try again later.")
else:
return HttpResponse("Account locked: too many login attempts. "
"Contact an admin to unlock your account.")
def is_already_locked(request):
""" Is this IP/username already locked? """
ip_address = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
# ip blocked?
ip_blocked = redis_server.get(get_ip_blocked_cache_key(ip_address))
if not ip_blocked:
ip_blocked = False
# username blocked?
user_blocked = redis_server.get(get_username_blocked_cache_key(username))
if not user_blocked:
user_blocked = False
return ip_blocked or user_blocked
def check_request(request, login_unsuccessful):
""" check the request, and process results"""
ip_address = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
result = True
if login_unsuccessful:
# add a failed attempt for this user
result = record_failed_attempt(ip_address, username)
else:
# user logged in -- forget the failed attempts
reset_failed_attempts(ip_address, username)
return result
def watch_login(func):
"""
Used to decorate the django.contrib.admin.site.login method.
"""
def decorated_login(request, *args, **kwargs):
# if the request is currently under lockout, do not proceed to the
# login function, go directly to lockout url, do not pass go, do not
# collect messages about this login attempt
if is_already_locked(request):
return lockout_response(request)
# call the login function
response = func(request, *args, **kwargs)
if func.__name__ == 'decorated_login':
# if we're dealing with this function itself, don't bother checking
# for invalid login attempts. I suppose there's a bunch of
# recursion going on here that used to cause one failed login
# attempt to generate 10+ failed access attempt records (with 3
# failed attempts each supposedly)
return response
if request.method == 'POST':
# see if the login was successful
login_unsuccessful = (
response and
not response.has_header('location') and
response.status_code != 302
)
AccessAttempt.objects.create(
user_agent=request.META.get('HTTP_USER_AGENT',
'<unknown>')[:255],
ip_address=get_ip(request),
username=request.POST.get(USERNAME_FORM_FIELD, None),
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
login_valid=not login_unsuccessful,
)
if check_request(request, login_unsuccessful):
return response
return lockout_response(request)
return response
return decorated_login

33
defender/models.py Normal file
View file

@ -0,0 +1,33 @@
from django.db import models
class AccessAttempt(models.Model):
user_agent = models.CharField(
max_length=255,
)
ip_address = models.GenericIPAddressField(
verbose_name='IP Address',
null=True,
)
username = models.CharField(
max_length=255,
null=True,
)
http_accept = models.CharField(
verbose_name='HTTP Accept',
max_length=1025,
)
path_info = models.CharField(
verbose_name='Path',
max_length=255,
)
attempt_time = models.DateTimeField(
auto_now_add=True,
)
login_valid = models.BooleanField(
default=False,
)
class Meta:
abstract = True
ordering = ['-attempt_time']

1
defender/views.py Normal file
View file

@ -0,0 +1 @@
# no views required.

35
setup.py Normal file
View file

@ -0,0 +1,35 @@
#!/usr/bin/env python
try:
from setuptools import setup
except ImportError:
from distutils.core import setup
version = '0.1'
setup(name='django-defender',
version=version,
description="redis based Django app that locks out users after too "
"many failed login attempts.",
long_description=open('README.md').read(),
classifiers=[
'Development Status :: 4 - Beta',
'Framework :: Django',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache 2 License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Topic :: Security',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules', ],
keywords='django, cache, security, authentication',
author='Ken Cochrane',
url='https://github.com/kencochrane/django-defender',
author_email='kencochrane@gmail.com',
license='Apache 2',
packages=['defender'],
install_requires=['django==1.6.7', 'redis==2.10.3', 'hiredis==0.1.4', ],
)