PEP8 formatting (#147)

Run black with Python 2.7 as target version
to unify the code styling and make it more
linter and style guide compliant
This commit is contained in:
Aleksi Häkli 2019-11-15 20:22:14 +02:00 committed by GitHub
parent afa47bcbf0
commit a1d526f318
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
24 changed files with 636 additions and 572 deletions

View file

@ -4,29 +4,26 @@ from .models import AccessAttempt
class AccessAttemptAdmin(admin.ModelAdmin):
""" Access attempt admin config """
list_display = (
'attempt_time',
'ip_address',
'user_agent',
'username',
'path_info',
'login_valid',
"attempt_time",
"ip_address",
"user_agent",
"username",
"path_info",
"login_valid",
)
search_fields = [
'ip_address',
'username',
"ip_address",
"username",
]
date_hierarchy = 'attempt_time'
date_hierarchy = "attempt_time"
fieldsets = (
(None, {
'fields': ('path_info', 'login_valid')
}),
('Meta Data', {
'fields': ('user_agent', 'ip_address')
})
(None, {"fields": ("path_info", "login_valid")}),
("Meta Data", {"fields": ("user_agent", "ip_address")}),
)

View file

@ -9,76 +9,78 @@ def get_setting(variable, default=None):
# redis server host
DEFENDER_REDIS_URL = get_setting('DEFENDER_REDIS_URL')
DEFENDER_REDIS_URL = get_setting("DEFENDER_REDIS_URL")
# reuse declared cache from django settings
DEFENDER_REDIS_NAME = get_setting('DEFENDER_REDIS_NAME')
DEFENDER_REDIS_NAME = get_setting("DEFENDER_REDIS_NAME")
MOCK_REDIS = get_setting('DEFENDER_MOCK_REDIS', False)
MOCK_REDIS = get_setting("DEFENDER_MOCK_REDIS", False)
# see if the user has overridden the failure limit
FAILURE_LIMIT = get_setting('DEFENDER_LOGIN_FAILURE_LIMIT', 3)
USERNAME_FAILURE_LIMIT = get_setting('DEFENDER_LOGIN_FAILURE_LIMIT_USERNAME', FAILURE_LIMIT)
IP_FAILURE_LIMIT = get_setting('DEFENDER_LOGIN_FAILURE_LIMIT_IP', FAILURE_LIMIT)
FAILURE_LIMIT = get_setting("DEFENDER_LOGIN_FAILURE_LIMIT", 3)
USERNAME_FAILURE_LIMIT = get_setting(
"DEFENDER_LOGIN_FAILURE_LIMIT_USERNAME", FAILURE_LIMIT
)
IP_FAILURE_LIMIT = get_setting("DEFENDER_LOGIN_FAILURE_LIMIT_IP", FAILURE_LIMIT)
# If this is True, the lockout checks to evaluate if the IP failure limit and
# the username failure limit has been reached before issuing the lockout.
LOCKOUT_BY_IP_USERNAME = get_setting(
'DEFENDER_LOCK_OUT_BY_IP_AND_USERNAME', False)
LOCKOUT_BY_IP_USERNAME = get_setting("DEFENDER_LOCK_OUT_BY_IP_AND_USERNAME", False)
# if this is True, The users IP address will not get locked when
# there are too many login attempts.
DISABLE_IP_LOCKOUT = get_setting('DEFENDER_DISABLE_IP_LOCKOUT', False)
DISABLE_IP_LOCKOUT = get_setting("DEFENDER_DISABLE_IP_LOCKOUT", False)
# If this is True, usernames will not get locked when
# there are too many login attempts.
DISABLE_USERNAME_LOCKOUT = get_setting(
'DEFENDER_DISABLE_USERNAME_LOCKOUT', False)
DISABLE_USERNAME_LOCKOUT = get_setting("DEFENDER_DISABLE_USERNAME_LOCKOUT", False)
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username')
USERNAME_FORM_FIELD = get_setting("DEFENDER_USERNAME_FORM_FIELD", "username")
# see if the django app is sitting behind a reverse proxy
BEHIND_REVERSE_PROXY = get_setting('DEFENDER_BEHIND_REVERSE_PROXY', False)
BEHIND_REVERSE_PROXY = get_setting("DEFENDER_BEHIND_REVERSE_PROXY", False)
# the prefix for these keys in your cache.
CACHE_PREFIX = get_setting('DEFENDER_CACHE_PREFIX', 'defender')
CACHE_PREFIX = get_setting("DEFENDER_CACHE_PREFIX", "defender")
# if the django app is behind a reverse proxy, look for the
# ip address using this HTTP header value
REVERSE_PROXY_HEADER = get_setting('DEFENDER_REVERSE_PROXY_HEADER',
'HTTP_X_FORWARDED_FOR')
REVERSE_PROXY_HEADER = get_setting(
"DEFENDER_REVERSE_PROXY_HEADER", "HTTP_X_FORWARDED_FOR"
)
try:
# how long to wait before the bad login attempt gets forgotten. in seconds.
COOLOFF_TIME = int(get_setting('DEFENDER_COOLOFF_TIME', 300)) # seconds
COOLOFF_TIME = int(get_setting("DEFENDER_COOLOFF_TIME", 300)) # seconds
except ValueError: # pragma: no cover
raise Exception(
'DEFENDER_COOLOFF_TIME needs to be an integer') # pragma: no cover
raise Exception("DEFENDER_COOLOFF_TIME needs to be an integer") # pragma: no cover
LOCKOUT_TEMPLATE = get_setting('DEFENDER_LOCKOUT_TEMPLATE')
LOCKOUT_TEMPLATE = get_setting("DEFENDER_LOCKOUT_TEMPLATE")
ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. "
"Note that both fields are case-sensitive.")
ERROR_MESSAGE = ugettext_lazy(
"Please enter a correct username and password. "
"Note that both fields are case-sensitive."
)
LOCKOUT_URL = get_setting('DEFENDER_LOCKOUT_URL')
LOCKOUT_URL = get_setting("DEFENDER_LOCKOUT_URL")
USE_CELERY = get_setting('DEFENDER_USE_CELERY', False)
USE_CELERY = get_setting("DEFENDER_USE_CELERY", False)
STORE_ACCESS_ATTEMPTS = get_setting('DEFENDER_STORE_ACCESS_ATTEMPTS', True)
STORE_ACCESS_ATTEMPTS = get_setting("DEFENDER_STORE_ACCESS_ATTEMPTS", True)
# Used by the management command to decide how long to keep access attempt
# recods. Number is # of hours.
try:
ACCESS_ATTEMPT_EXPIRATION = int(get_setting(
'DEFENDER_ACCESS_ATTEMPT_EXPIRATION', 24))
ACCESS_ATTEMPT_EXPIRATION = int(
get_setting("DEFENDER_ACCESS_ATTEMPT_EXPIRATION", 24)
)
except ValueError: # pragma: no cover
raise Exception(
'DEFENDER_ACCESS_ATTEMPT_EXPIRATION'
' needs to be an integer') # pragma: no cover
"DEFENDER_ACCESS_ATTEMPT_EXPIRATION" " needs to be an integer"
) # pragma: no cover
GET_USERNAME_FROM_REQUEST_PATH = get_setting(
'DEFENDER_GET_USERNAME_FROM_REQUEST_PATH',
'defender.utils.username_from_request'
"DEFENDER_GET_USERNAME_FROM_REQUEST_PATH", "defender.utils.username_from_request"
)

View file

@ -2,6 +2,7 @@ from django.core.cache import caches
from django.core.cache.backends.base import InvalidCacheBackendError
import redis
try:
import urlparse
except ImportError: # pragma: no cover
@ -12,21 +13,20 @@ from . import config
# Register database schemes in URLs.
urlparse.uses_netloc.append("redis")
INVALID_CACHE_ERROR_MSG = 'The cache {} was not found on the django cache' \
' settings.'
INVALID_CACHE_ERROR_MSG = "The cache {} was not found on the django cache" " settings."
def get_redis_connection():
""" Get the redis connection if not using mock """
if config.MOCK_REDIS: # pragma: no cover
import mockredis
return mockredis.mock_strict_redis_client() # pragma: no cover
elif config.DEFENDER_REDIS_NAME: # pragma: no cover
try:
cache = caches[config.DEFENDER_REDIS_NAME]
except InvalidCacheBackendError:
raise KeyError(INVALID_CACHE_ERROR_MSG.format(
config.DEFENDER_REDIS_NAME))
raise KeyError(INVALID_CACHE_ERROR_MSG.format(config.DEFENDER_REDIS_NAME))
# every redis backend implement it own way to get the low level client
try:
# redis_cache.RedisCache case (django-redis-cache package)
@ -37,12 +37,12 @@ def get_redis_connection():
else: # pragma: no cover
redis_config = parse_redis_url(config.DEFENDER_REDIS_URL)
return redis.StrictRedis(
host=redis_config.get('HOST'),
port=redis_config.get('PORT'),
db=redis_config.get('DB'),
password=redis_config.get('PASSWORD'),
ssl=redis_config.get('SSL'))
host=redis_config.get("HOST"),
port=redis_config.get("PORT"),
db=redis_config.get("DB"),
password=redis_config.get("PASSWORD"),
ssl=redis_config.get("SSL"),
)
def parse_redis_url(url):
@ -54,7 +54,7 @@ def parse_redis_url(url):
"PASSWORD": None,
"HOST": "localhost",
"PORT": 6379,
"SSL": False
"SSL": False,
}
if not url:
@ -63,7 +63,7 @@ def parse_redis_url(url):
url = urlparse.urlparse(url)
# Remove query strings.
path = url.path[1:]
path = path.split('?', 2)[0]
path = path.split("?", 2)[0]
if path:
redis_config.update({"DB": int(path)})
@ -73,7 +73,7 @@ def parse_redis_url(url):
redis_config.update({"HOST": url.hostname})
if url.port:
redis_config.update({"PORT": int(url.port)})
if url.scheme in ['https', 'rediss']:
if url.scheme in ["https", "rediss"]:
redis_config.update({"SSL": True})
return redis_config

View file

@ -1,8 +1,9 @@
from .models import AccessAttempt
def store_login_attempt(user_agent, ip_address, username,
http_accept, path_info, login_valid):
def store_login_attempt(
user_agent, ip_address, username, http_accept, path_info, login_valid
):
""" Store the login attempt to the db. """
AccessAttempt.objects.create(
user_agent=user_agent,

View file

@ -3,8 +3,7 @@ from . import utils
import functools
def watch_login(status_code=302, msg='',
get_username=utils.get_username_from_request):
def watch_login(status_code=302, msg="", get_username=utils.get_username_from_request):
"""
Used to decorate the django.contrib.admin.site.login method or
any other function you want to protect by brute forcing.
@ -12,6 +11,7 @@ def watch_login(status_code=302, msg='',
indicate a failure and/or a string that will be checked within the
response body.
"""
def decorated_login(func):
@functools.wraps(func)
def wrapper(request, *args, **kwargs):
@ -24,29 +24,30 @@ def watch_login(status_code=302, msg='',
# call the login function
response = func(request, *args, **kwargs)
if request.method == 'POST':
if request.method == "POST":
# see if the login was successful
if status_code == 302: # standard Django login view
login_unsuccessful = (
response and
not response.has_header('location') and
response.status_code != status_code
response
and not response.has_header("location")
and response.status_code != status_code
)
else:
# If msg is not passed the last condition will be evaluated
# always to True so the first 2 will decide the result.
login_unsuccessful = (
response and response.status_code == status_code
and msg in response.content.decode('utf-8')
response
and response.status_code == status_code
and msg in response.content.decode("utf-8")
)
# ideally make this background task, but to keep simple,
# keeping it inline for now.
utils.add_login_attempt_to_db(request, not login_unsuccessful,
get_username)
utils.add_login_attempt_to_db(
request, not login_unsuccessful, get_username
)
if utils.check_request(request, login_unsuccessful,
get_username):
if utils.check_request(request, login_unsuccessful, get_username):
return response
return utils.lockout_response(request)
@ -54,4 +55,5 @@ def watch_login(status_code=302, msg='',
return response
return wrapper
return decorated_login

View file

@ -2,22 +2,21 @@ import os
from celery import Celery
PROJECT_DIR = lambda base: os.path.abspath(
os.path.join(os.path.dirname(__file__), base).replace('\\', '/'))
MEDIA_ROOT = PROJECT_DIR(os.path.join('media'))
MEDIA_URL = '/media/'
STATIC_ROOT = PROJECT_DIR(os.path.join('static'))
STATIC_URL = '/static/'
STATICFILES_DIRS = (
PROJECT_DIR(os.path.join('media', 'static')),
os.path.join(os.path.dirname(__file__), base).replace("\\", "/")
)
MEDIA_ROOT = PROJECT_DIR(os.path.join("media"))
MEDIA_URL = "/media/"
STATIC_ROOT = PROJECT_DIR(os.path.join("static"))
STATIC_URL = "/static/"
STATICFILES_DIRS = (PROJECT_DIR(os.path.join("media", "static")),)
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': PROJECT_DIR('defender.sb'),
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": PROJECT_DIR("defender.sb"),
}
}
@ -25,35 +24,35 @@ DATABASES = {
SITE_ID = 1
MIDDLEWARE = (
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'defender.middleware.FailedLoginMiddleware',
"django.middleware.common.CommonMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"defender.middleware.FailedLoginMiddleware",
)
ROOT_URLCONF = 'defender.exampleapp.urls'
ROOT_URLCONF = "defender.exampleapp.urls"
INSTALLED_APPS = [
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
'django.contrib.messages',
'django.contrib.admin',
'django.contrib.staticfiles',
'defender',
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.sites",
"django.contrib.messages",
"django.contrib.admin",
"django.contrib.staticfiles",
"defender",
]
# List of finder classes that know how to find static files in
# various locations.
STATICFILES_FINDERS = (
'django.contrib.staticfiles.finders.FileSystemFinder',
'django.contrib.staticfiles.finders.AppDirectoriesFinder',
"django.contrib.staticfiles.finders.FileSystemFinder",
"django.contrib.staticfiles.finders.AppDirectoriesFinder",
)
SECRET_KEY = os.environ.get('SECRET_KEY', 'too-secret-for-test')
SECRET_KEY = os.environ.get("SECRET_KEY", "too-secret-for-test")
LOGIN_REDIRECT_URL = '/admin'
LOGIN_REDIRECT_URL = "/admin"
DEFENDER_LOGIN_FAILURE_LIMIT = 1
DEFENDER_COOLOFF_TIME = 60
@ -62,22 +61,22 @@ DEFENDER_REDIS_URL = "redis://localhost:6379/1"
DEFENDER_MOCK_REDIS = False
# Let's use custom function and strip username string from request.
DEFENDER_GET_USERNAME_FROM_REQUEST_PATH = (
'defender.exampleapp.utils.strip_username_from_request'
"defender.exampleapp.utils.strip_username_from_request"
)
# Celery settings:
CELERY_ALWAYS_EAGER = True
BROKER_BACKEND = 'memory'
BROKER_URL = 'memory://'
BROKER_BACKEND = "memory"
BROKER_URL = "memory://"
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.exampleapp.settings')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "defender.exampleapp.settings")
app = Celery('defender')
app = Celery("defender")
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.config_from_object("django.conf:settings")
app.autodiscover_tasks(lambda: INSTALLED_APPS)
DEBUG = True

View file

@ -7,9 +7,9 @@ from django.conf.urls.static import static
admin.autodiscover()
urlpatterns = patterns(
'',
(r'^admin/', include(admin.site.urls)),
(r'^admin/defender/', include('defender.urls')),
"",
(r"^admin/", include(admin.site.urls)),
(r"^admin/defender/", include("defender.urls")),
)

View file

@ -10,6 +10,7 @@ from ... import config
class Command(BaseCommand):
""" clean up management command """
help = "Cleans up django-defender AccessAttempt table"
def handle(self, **options):
@ -31,5 +32,6 @@ class Command(BaseCommand):
print(
"Finished. Removed {0} AccessAttempt entries.".format(
attempts_to_clean_count)
attempts_to_clean_count
)
)

View file

@ -10,6 +10,7 @@ from .decorators import watch_login
class FailedLoginMiddleware(MIDDLEWARE_BASE_CLASS):
""" Failed login middleware """
patched = False
def __init__(self, *args, **kwargs):
@ -22,6 +23,7 @@ class FailedLoginMiddleware(MIDDLEWARE_BASE_CLASS):
# `LoginView` class-based view
try:
from django.contrib.auth.views import LoginView
our_decorator = watch_login()
watch_login_method = method_decorator(our_decorator)
LoginView.dispatch = watch_login_method(LoginView.dispatch)

View file

@ -7,25 +7,36 @@ from django.db import models, migrations
class Migration(migrations.Migration):
""" Initial migrations """
dependencies = [
]
dependencies = []
operations = [
migrations.CreateModel(
name='AccessAttempt',
name="AccessAttempt",
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('user_agent', models.CharField(max_length=255)),
('ip_address', models.GenericIPAddressField(null=True, verbose_name='IP Address')),
('username', models.CharField(max_length=255, null=True)),
('http_accept', models.CharField(max_length=1025, verbose_name='HTTP Accept')),
('path_info', models.CharField(max_length=255, verbose_name='Path')),
('attempt_time', models.DateTimeField(auto_now_add=True)),
('login_valid', models.BooleanField(default=False)),
(
"id",
models.AutoField(
verbose_name="ID",
serialize=False,
auto_created=True,
primary_key=True,
),
),
("user_agent", models.CharField(max_length=255)),
(
"ip_address",
models.GenericIPAddressField(null=True, verbose_name="IP Address"),
),
("username", models.CharField(max_length=255, null=True)),
(
"http_accept",
models.CharField(max_length=1025, verbose_name="HTTP Accept"),
),
("path_info", models.CharField(max_length=255, verbose_name="Path")),
("attempt_time", models.DateTimeField(auto_now_add=True)),
("login_valid", models.BooleanField(default=False)),
],
options={
'ordering': ['-attempt_time'],
},
options={"ordering": ["-attempt_time"],},
bases=(models.Model,),
),
]

View file

@ -7,37 +7,20 @@ from django.utils.encoding import python_2_unicode_compatible
@python_2_unicode_compatible
class AccessAttempt(models.Model):
""" Access Attempt log """
user_agent = models.CharField(
max_length=255,
)
ip_address = models.GenericIPAddressField(
verbose_name='IP Address',
null=True,
)
username = models.CharField(
max_length=255,
null=True,
)
http_accept = models.CharField(
verbose_name='HTTP Accept',
max_length=1025,
)
path_info = models.CharField(
verbose_name='Path',
max_length=255,
)
attempt_time = models.DateTimeField(
auto_now_add=True,
)
login_valid = models.BooleanField(
default=False,
)
user_agent = models.CharField(max_length=255,)
ip_address = models.GenericIPAddressField(verbose_name="IP Address", null=True,)
username = models.CharField(max_length=255, null=True,)
http_accept = models.CharField(verbose_name="HTTP Accept", max_length=1025,)
path_info = models.CharField(verbose_name="Path", max_length=255,)
attempt_time = models.DateTimeField(auto_now_add=True,)
login_valid = models.BooleanField(default=False,)
class Meta:
ordering = ['-attempt_time']
ordering = ["-attempt_time"]
def __str__(self):
""" unicode value for this model """
return "{0} @ {1} | {2}".format(self.username,
self.attempt_time,
self.login_valid)
return "{0} @ {1} | {2}".format(
self.username, self.attempt_time, self.login_valid
)

View file

@ -1,9 +1,9 @@
from django.dispatch import Signal
username_block = Signal(providing_args=['username'])
username_unblock = Signal(providing_args=['username'])
ip_block = Signal(providing_args=['ip_address'])
ip_unblock = Signal(providing_args=['ip_address'])
username_block = Signal(providing_args=["username"])
username_unblock = Signal(providing_args=["username"])
ip_block = Signal(providing_args=["ip_address"])
ip_unblock = Signal(providing_args=["ip_address"])
class BlockSignal:
@ -11,6 +11,7 @@ class BlockSignal:
Providing a sender is mandatory when sending signals, hence
this empty sender class.
"""
pass

View file

@ -10,36 +10,92 @@ class Migration(SchemaMigration):
def forwards(self, orm):
""" Adding model 'AccessAttempt' """
db.create_table(u'defender_accessattempt', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('user_agent', self.gf('django.db.models.fields.CharField')(max_length=255)),
('ip_address', self.gf('django.db.models.fields.GenericIPAddressField')(max_length=39, null=True)),
('username', self.gf('django.db.models.fields.CharField')(max_length=255, null=True)),
('http_accept', self.gf('django.db.models.fields.CharField')(max_length=1025)),
('path_info', self.gf('django.db.models.fields.CharField')(max_length=255)),
('attempt_time', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, blank=True)),
('login_valid', self.gf('django.db.models.fields.BooleanField')(default=False)),
))
db.send_create_signal(u'defender', ['AccessAttempt'])
db.create_table(
"defender_accessattempt",
(
("id", self.gf("django.db.models.fields.AutoField")(primary_key=True)),
(
"user_agent",
self.gf("django.db.models.fields.CharField")(max_length=255),
),
(
"ip_address",
self.gf("django.db.models.fields.GenericIPAddressField")(
max_length=39, null=True
),
),
(
"username",
self.gf("django.db.models.fields.CharField")(
max_length=255, null=True
),
),
(
"http_accept",
self.gf("django.db.models.fields.CharField")(max_length=1025),
),
(
"path_info",
self.gf("django.db.models.fields.CharField")(max_length=255),
),
(
"attempt_time",
self.gf("django.db.models.fields.DateTimeField")(
auto_now_add=True, blank=True
),
),
(
"login_valid",
self.gf("django.db.models.fields.BooleanField")(default=False),
),
),
)
db.send_create_signal("defender", ["AccessAttempt"])
def backwards(self, orm):
# Deleting model 'AccessAttempt'
db.delete_table(u'defender_accessattempt')
db.delete_table("defender_accessattempt")
models = {
u'defender.accessattempt': {
'Meta': {'ordering': "[u'-attempt_time']", 'object_name': 'AccessAttempt'},
'attempt_time': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}),
'http_accept': ('django.db.models.fields.CharField', [], {'max_length': '1025'}),
u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}),
'ip_address': ('django.db.models.fields.GenericIPAddressField', [], {'max_length': '39', 'null': 'True'}),
'login_valid': ('django.db.models.fields.BooleanField', [], {'default': 'False'}),
'path_info': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'user_agent': ('django.db.models.fields.CharField', [], {'max_length': '255'}),
'username': ('django.db.models.fields.CharField', [], {'max_length': '255', 'null': 'True'})
"defender.accessattempt": {
"Meta": {"ordering": "[u'-attempt_time']", "object_name": "AccessAttempt"},
"attempt_time": (
"django.db.models.fields.DateTimeField",
[],
{"auto_now_add": "True", "blank": "True"},
),
"http_accept": (
"django.db.models.fields.CharField",
[],
{"max_length": "1025"},
),
"id": ("django.db.models.fields.AutoField", [], {"primary_key": "True"}),
"ip_address": (
"django.db.models.fields.GenericIPAddressField",
[],
{"max_length": "39", "null": "True"},
),
"login_valid": (
"django.db.models.fields.BooleanField",
[],
{"default": "False"},
),
"path_info": (
"django.db.models.fields.CharField",
[],
{"max_length": "255"},
),
"user_agent": (
"django.db.models.fields.CharField",
[],
{"max_length": "255"},
),
"username": (
"django.db.models.fields.CharField",
[],
{"max_length": "255", "null": "True"},
),
}
}
complete_apps = ['defender']
complete_apps = ["defender"]

View file

@ -7,8 +7,10 @@ from celery import shared_task
@shared_task()
def add_login_attempt_task(user_agent, ip_address, username,
http_accept, path_info, login_valid):
def add_login_attempt_task(
user_agent, ip_address, username, http_accept, path_info, login_valid
):
""" Create a record for the login attempt """
store_login_attempt(user_agent, ip_address, username,
http_accept, path_info, login_valid)
store_login_attempt(
user_agent, ip_address, username, http_accept, path_info, login_valid
)

View file

@ -14,9 +14,11 @@ class DefenderTestCaseMixin(object):
class DefenderTransactionTestCase(DefenderTestCaseMixin, TransactionTestCase):
"""Helper TransactionTestCase that cleans the cache after each test"""
pass
class DefenderTestCase(DefenderTestCaseMixin, TestCase):
"""Helper TestCase that cleans the cache after each test"""
pass

View file

@ -1,56 +1,51 @@
import os
from celery import Celery
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
}
}
DATABASES = {"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:",}}
SITE_ID = 1
MIDDLEWARE = (
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'defender.middleware.FailedLoginMiddleware',
"django.middleware.common.CommonMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"defender.middleware.FailedLoginMiddleware",
)
ROOT_URLCONF = 'defender.test_urls'
ROOT_URLCONF = "defender.test_urls"
INSTALLED_APPS = [
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
'django.contrib.messages',
'django.contrib.admin',
'defender',
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.sites",
"django.contrib.messages",
"django.contrib.admin",
"defender",
]
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.contrib.auth.context_processors.auth',
'django.template.context_processors.debug',
'django.template.context_processors.i18n',
'django.template.context_processors.media',
'django.template.context_processors.static',
'django.template.context_processors.tz',
'django.contrib.messages.context_processors.messages',
"BACKEND": "django.template.backends.django.DjangoTemplates",
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.contrib.auth.context_processors.auth",
"django.template.context_processors.debug",
"django.template.context_processors.i18n",
"django.template.context_processors.media",
"django.template.context_processors.static",
"django.template.context_processors.tz",
"django.contrib.messages.context_processors.messages",
],
},
},
]
SECRET_KEY = os.environ.get('SECRET_KEY', 'too-secret-for-test')
SECRET_KEY = os.environ.get("SECRET_KEY", "too-secret-for-test")
LOGIN_REDIRECT_URL = '/admin'
LOGIN_REDIRECT_URL = "/admin"
DEFENDER_LOGIN_FAILURE_LIMIT = 10
DEFENDER_COOLOFF_TIME = 2
@ -60,15 +55,15 @@ DEFENDER_MOCK_REDIS = True
# celery settings
CELERY_ALWAYS_EAGER = True
BROKER_BACKEND = 'memory'
BROKER_URL = 'memory://'
BROKER_BACKEND = "memory"
BROKER_URL = "memory://"
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.test_settings')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "defender.test_settings")
app = Celery('defender')
app = Celery("defender")
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.config_from_object("django.conf:settings")
app.autodiscover_tasks(lambda: INSTALLED_APPS)

View file

@ -3,6 +3,4 @@ from django.contrib import admin
from .urls import urlpatterns as original_urlpatterns
urlpatterns = [
url(r'^admin/', admin.site.urls),
] + original_urlpatterns
urlpatterns = [url(r"^admin/", admin.site.urls),] + original_urlpatterns

View file

@ -16,6 +16,7 @@ from django.contrib.sessions.backends.db import SessionStore
from django.http import HttpRequest, HttpResponse
from django.test.client import RequestFactory
from redis.client import Redis
try:
from django.urls import reverse
except ImportError:
@ -35,30 +36,36 @@ from .models import AccessAttempt
from .test import DefenderTestCase, DefenderTransactionTestCase
LOGIN_FORM_KEY = '<form action="/admin/login/" method="post" id="login-form">'
ADMIN_LOGIN_URL = reverse('admin:login')
ADMIN_LOGIN_URL = reverse("admin:login")
DJANGO_VERSION = StrictVersion(get_version())
VALID_USERNAME = VALID_PASSWORD = 'valid'
UPPER_USERNAME = 'VALID'
VALID_USERNAME = VALID_PASSWORD = "valid"
UPPER_USERNAME = "VALID"
class AccessAttemptTest(DefenderTestCase):
""" Test case using custom settings for testing
"""
LOCKED_MESSAGE = 'Account locked: too many login attempts.'
LOCKED_MESSAGE = "Account locked: too many login attempts."
PERMANENT_LOCKED_MESSAGE = (
LOCKED_MESSAGE + ' Contact an admin to unlock your account.'
LOCKED_MESSAGE + " Contact an admin to unlock your account."
)
def _get_random_str(self):
""" Returns a random str """
chars = string.ascii_uppercase + string.digits
return ''.join(random.choice(chars) for _ in range(20))
return "".join(random.choice(chars) for _ in range(20))
def _login(self, username=None, password=None, user_agent='test-browser',
remote_addr='127.0.0.1'):
def _login(
self,
username=None,
password=None,
user_agent="test-browser",
remote_addr="127.0.0.1",
):
""" Login a user. If the username or password is not provided
it will use a random string instead. Use the VALID_USERNAME and
VALID_PASSWORD to make a valid login.
@ -69,11 +76,12 @@ class AccessAttemptTest(DefenderTestCase):
if password is None:
password = self._get_random_str()
response = self.client.post(ADMIN_LOGIN_URL, {
'username': username,
'password': password,
LOGIN_FORM_KEY: 1,
}, HTTP_USER_AGENT=user_agent, REMOTE_ADDR=remote_addr)
response = self.client.post(
ADMIN_LOGIN_URL,
{"username": username, "password": password, LOGIN_FORM_KEY: 1,},
HTTP_USER_AGENT=user_agent,
REMOTE_ADDR=remote_addr,
)
return response
@ -81,16 +89,14 @@ class AccessAttemptTest(DefenderTestCase):
""" Create a valid user for login
"""
self.user = User.objects.create_superuser(
username=VALID_USERNAME,
email='test@example.com',
password=VALID_PASSWORD,
username=VALID_USERNAME, email="test@example.com", password=VALID_PASSWORD,
)
def test_data_integrity_of_get_blocked_ips(self):
""" Test whether data retrieved from redis via
get_blocked_ips() is the same as the data saved
"""
data_in = ['127.0.0.1', '4.2.2.1']
data_in = ["127.0.0.1", "4.2.2.1"]
for ip in data_in:
utils.block_ip(ip)
data_out = utils.get_blocked_ips()
@ -105,7 +111,7 @@ class AccessAttemptTest(DefenderTestCase):
""" Test whether data retrieved from redis via
get_blocked_usernames() is the same as the data saved
"""
data_in = ['foo', 'bar']
data_in = ["foo", "bar"]
for username in data_in:
utils.block_username(username)
data_out = utils.get_blocked_usernames()
@ -164,7 +170,7 @@ class AccessAttemptTest(DefenderTestCase):
one more time than failure limit
"""
for i in range(0, config.FAILURE_LIMIT):
ip = '74.125.239.{0}.'.format(i)
ip = "74.125.239.{0}.".format(i)
response = self._login(username=VALID_USERNAME, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@ -178,13 +184,13 @@ class AccessAttemptTest(DefenderTestCase):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, self.LOCKED_MESSAGE)
@patch('defender.config.USERNAME_FAILURE_LIMIT', 3)
@patch("defender.config.USERNAME_FAILURE_LIMIT", 3)
def test_username_failure_limit(self):
""" Tests that the username failure limit setting is
respected when trying to login one more time than failure limit
"""
for i in range(0, config.USERNAME_FAILURE_LIMIT):
ip = '74.125.239.{0}.'.format(i)
ip = "74.125.239.{0}.".format(i)
response = self._login(username=VALID_USERNAME, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@ -198,13 +204,13 @@ class AccessAttemptTest(DefenderTestCase):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, LOGIN_FORM_KEY)
@patch('defender.config.IP_FAILURE_LIMIT', 3)
@patch("defender.config.IP_FAILURE_LIMIT", 3)
def test_ip_failure_limit(self):
""" Tests that the IP failure limit setting is
respected when trying to login one more time than failure limit
"""
for i in range(0, config.IP_FAILURE_LIMIT):
username = 'john-doe__%d' % i
username = "john-doe__%d" % i
response = self._login(username=username)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@ -221,8 +227,7 @@ class AccessAttemptTest(DefenderTestCase):
def test_valid_login(self):
""" Tests a valid login for a real username
"""
response = self._login(username=VALID_USERNAME,
password=VALID_PASSWORD)
response = self._login(username=VALID_USERNAME, password=VALID_PASSWORD)
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)
def test_reset_after_valid_login(self):
@ -245,7 +250,7 @@ class AccessAttemptTest(DefenderTestCase):
self._login(username=VALID_USERNAME)
# try to login with a different user
response = self._login(username='myuser')
response = self._login(username="myuser")
self.assertContains(response, self.LOCKED_MESSAGE)
def test_blocked_username_cannot_login(self):
@ -253,11 +258,11 @@ class AccessAttemptTest(DefenderTestCase):
another ip
"""
for i in range(0, config.FAILURE_LIMIT + 1):
ip = '74.125.239.{0}.'.format(i)
ip = "74.125.239.{0}.".format(i)
self._login(username=VALID_USERNAME, remote_addr=ip)
# try to login with a different ip
response = self._login(username=VALID_USERNAME, remote_addr='8.8.8.8')
response = self._login(username=VALID_USERNAME, remote_addr="8.8.8.8")
self.assertContains(response, self.LOCKED_MESSAGE)
def test_blocked_username_uppercase_saved_lower(self):
@ -266,7 +271,7 @@ class AccessAttemptTest(DefenderTestCase):
within the cache.
"""
for i in range(0, config.FAILURE_LIMIT + 2):
ip = '74.125.239.{0}.'.format(i)
ip = "74.125.239.{0}.".format(i)
self._login(username=UPPER_USERNAME, remote_addr=ip)
self.assertNotIn(UPPER_USERNAME, utils.get_blocked_usernames())
@ -278,7 +283,7 @@ class AccessAttemptTest(DefenderTestCase):
"""
for username in ["", None]:
for i in range(0, config.FAILURE_LIMIT + 2):
ip = '74.125.239.{0}.'.format(i)
ip = "74.125.239.{0}.".format(i)
self._login(username=username, remote_addr=ip)
self.assertNotIn(username, utils.get_blocked_usernames())
@ -311,15 +316,14 @@ class AccessAttemptTest(DefenderTestCase):
def test_long_user_agent_valid(self):
""" Tests if can handle a long user agent
"""
long_user_agent = 'ie6' * 1024
long_user_agent = "ie6" * 1024
response = self._login(
username=VALID_USERNAME, password=VALID_PASSWORD,
user_agent=long_user_agent
username=VALID_USERNAME, password=VALID_PASSWORD, user_agent=long_user_agent
)
self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302)
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
@patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_FORWARDED_FOR')
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.REVERSE_PROXY_HEADER", "HTTP_X_FORWARDED_FOR")
def test_get_ip_reverse_proxy(self):
""" Tests if can handle a long user agent
"""
@ -328,16 +332,16 @@ class AccessAttemptTest(DefenderTestCase):
request.user = AnonymousUser()
request.session = SessionStore()
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24'
self.assertEqual(utils.get_ip(request), '192.168.24.24')
request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24"
self.assertEqual(utils.get_ip(request), "192.168.24.24")
request_factory = RequestFactory()
request = request_factory.get(ADMIN_LOGIN_URL)
request.user = AnonymousUser()
request.session = SessionStore()
request.META['REMOTE_ADDR'] = '24.24.24.24'
self.assertEqual(utils.get_ip(request), '24.24.24.24')
request.META["REMOTE_ADDR"] = "24.24.24.24"
self.assertEqual(utils.get_ip(request), "24.24.24.24")
def test_get_ip(self):
""" Tests if can handle a long user agent
@ -347,12 +351,12 @@ class AccessAttemptTest(DefenderTestCase):
request.user = AnonymousUser()
request.session = SessionStore()
self.assertEqual(utils.get_ip(request), '127.0.0.1')
self.assertEqual(utils.get_ip(request), "127.0.0.1")
def test_long_user_agent_not_valid(self):
""" Tests if can handle a long user agent with failure
"""
long_user_agent = 'ie6' * 1024
long_user_agent = "ie6" * 1024
for i in range(0, config.FAILURE_LIMIT + 1):
response = self._login(user_agent=long_user_agent)
@ -365,12 +369,12 @@ class AccessAttemptTest(DefenderTestCase):
self.test_failure_limit_by_ip_once()
# Reset the ip so we can try again
utils.reset_failed_attempts(ip_address='127.0.0.1')
utils.reset_failed_attempts(ip_address="127.0.0.1")
# Make a login attempt again
self.test_valid_login()
@patch('defender.config.LOCKOUT_URL', 'http://localhost/othe/login/')
@patch("defender.config.LOCKOUT_URL", "http://localhost/othe/login/")
def test_failed_login_redirect_to_url(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """
@ -384,14 +388,14 @@ class AccessAttemptTest(DefenderTestCase):
# But we should get one now, check redirect make sure it is valid.
response = self._login()
self.assertEqual(response.status_code, 302)
self.assertEqual(response['Location'], 'http://localhost/othe/login/')
self.assertEqual(response["Location"], "http://localhost/othe/login/")
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertEqual(response.status_code, 302)
self.assertEqual(response['Location'], 'http://localhost/othe/login/')
self.assertEqual(response["Location"], "http://localhost/othe/login/")
@patch('defender.config.LOCKOUT_URL', '/o/login/')
@patch("defender.config.LOCKOUT_URL", "/o/login/")
def test_failed_login_redirect_to_url_local(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """
@ -404,22 +408,22 @@ class AccessAttemptTest(DefenderTestCase):
# RFC 7231 allows relative URIs in Location header.
# Django from version 1.9 is support this:
# https://docs.djangoproject.com/en/1.9/releases/1.9/#http-redirects-no-longer-forced-to-absolute-uris
lockout_url = 'http://testserver/o/login/'
if DJANGO_VERSION >= StrictVersion('1.9'):
lockout_url = '/o/login/'
lockout_url = "http://testserver/o/login/"
if DJANGO_VERSION >= StrictVersion("1.9"):
lockout_url = "/o/login/"
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now, check redirect make sure it is valid.
response = self._login()
self.assertEqual(response.status_code, 302)
self.assertEqual(response['Location'], lockout_url)
self.assertEqual(response["Location"], lockout_url)
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertEqual(response.status_code, 302)
self.assertEqual(response['Location'], lockout_url)
self.assertEqual(response["Location"], lockout_url)
@patch('defender.config.LOCKOUT_TEMPLATE', 'defender/lockout.html')
@patch("defender.config.LOCKOUT_TEMPLATE", "defender/lockout.html")
def test_failed_login_redirect_to_template(self):
""" Test to make sure that after lockout we send to the correct
template """
@ -433,14 +437,14 @@ class AccessAttemptTest(DefenderTestCase):
# But we should get one now, check template make sure it is valid.
response = self._login()
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'defender/lockout.html')
self.assertTemplateUsed(response, "defender/lockout.html")
# doing a get should also get locked out message
response = self.client.get(ADMIN_LOGIN_URL)
self.assertEqual(response.status_code, 200)
self.assertTemplateUsed(response, 'defender/lockout.html')
self.assertTemplateUsed(response, "defender/lockout.html")
@patch('defender.config.COOLOFF_TIME', 0)
@patch("defender.config.COOLOFF_TIME", 0)
def test_failed_login_no_cooloff(self):
""" failed login no cooloff """
for i in range(0, config.FAILURE_LIMIT):
@ -467,166 +471,159 @@ class AccessAttemptTest(DefenderTestCase):
def test_is_valid_ip(self):
""" Test the is_valid_ip() method """
self.assertEqual(utils.is_valid_ip('192.168.0.1'), True)
self.assertEqual(utils.is_valid_ip('130.80.100.24'), True)
self.assertEqual(utils.is_valid_ip('8.8.8.8'), True)
self.assertEqual(utils.is_valid_ip('127.0.0.1'), True)
self.assertEqual(utils.is_valid_ip('fish'), False)
self.assertEqual(utils.is_valid_ip("192.168.0.1"), True)
self.assertEqual(utils.is_valid_ip("130.80.100.24"), True)
self.assertEqual(utils.is_valid_ip("8.8.8.8"), True)
self.assertEqual(utils.is_valid_ip("127.0.0.1"), True)
self.assertEqual(utils.is_valid_ip("fish"), False)
self.assertEqual(utils.is_valid_ip(None), False)
self.assertEqual(utils.is_valid_ip(''), False)
self.assertEqual(utils.is_valid_ip('0x41.0x41.0x41.0x41'), False)
self.assertEqual(utils.is_valid_ip('192.168.100.34.y'), False)
self.assertEqual(utils.is_valid_ip(""), False)
self.assertEqual(utils.is_valid_ip("0x41.0x41.0x41.0x41"), False)
self.assertEqual(utils.is_valid_ip("192.168.100.34.y"), False)
self.assertEqual(
utils.is_valid_ip('2001:0db8:85a3:0000:0000:8a2e:0370:7334'), True)
self.assertEqual(
utils.is_valid_ip('2001:db8:85a3:0:0:8a2e:370:7334'), True)
self.assertEqual(
utils.is_valid_ip('2001:db8:85a3::8a2e:370:7334'), True)
self.assertEqual(
utils.is_valid_ip('::ffff:192.0.2.128'), True)
self.assertEqual(
utils.is_valid_ip('::ffff:8.8.8.8'), True)
utils.is_valid_ip("2001:0db8:85a3:0000:0000:8a2e:0370:7334"), True
)
self.assertEqual(utils.is_valid_ip("2001:db8:85a3:0:0:8a2e:370:7334"), True)
self.assertEqual(utils.is_valid_ip("2001:db8:85a3::8a2e:370:7334"), True)
self.assertEqual(utils.is_valid_ip("::ffff:192.0.2.128"), True)
self.assertEqual(utils.is_valid_ip("::ffff:8.8.8.8"), True)
def test_parse_redis_url(self):
""" test the parse_redis_url method """
# full regular
conf = parse_redis_url("redis://user:password@localhost2:1234/2")
self.assertEqual(conf.get('HOST'), 'localhost2')
self.assertEqual(conf.get('DB'), 2)
self.assertEqual(conf.get('PASSWORD'), 'password')
self.assertEqual(conf.get('PORT'), 1234)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
# full non local
conf = parse_redis_url("redis://user:pass@www.localhost.com:1234/2")
self.assertEqual(conf.get('HOST'), 'www.localhost.com')
self.assertEqual(conf.get('DB'), 2)
self.assertEqual(conf.get('PASSWORD'), 'pass')
self.assertEqual(conf.get('PORT'), 1234)
self.assertEqual(conf.get("HOST"), "www.localhost.com")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), "pass")
self.assertEqual(conf.get("PORT"), 1234)
# no user name
conf = parse_redis_url("redis://password@localhost2:1234/2")
self.assertEqual(conf.get('HOST'), 'localhost2')
self.assertEqual(conf.get('DB'), 2)
self.assertEqual(conf.get('PASSWORD'), None)
self.assertEqual(conf.get('PORT'), 1234)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), None)
self.assertEqual(conf.get("PORT"), 1234)
# no user name 2 with colon
conf = parse_redis_url("redis://:password@localhost2:1234/2")
self.assertEqual(conf.get('HOST'), 'localhost2')
self.assertEqual(conf.get('DB'), 2)
self.assertEqual(conf.get('PASSWORD'), 'password')
self.assertEqual(conf.get('PORT'), 1234)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
# Empty
conf = parse_redis_url(None)
self.assertEqual(conf.get('HOST'), 'localhost')
self.assertEqual(conf.get('DB'), 0)
self.assertEqual(conf.get('PASSWORD'), None)
self.assertEqual(conf.get('PORT'), 6379)
self.assertEqual(conf.get("HOST"), "localhost")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), None)
self.assertEqual(conf.get("PORT"), 6379)
# no db
conf = parse_redis_url("redis://:password@localhost2:1234")
self.assertEqual(conf.get('HOST'), 'localhost2')
self.assertEqual(conf.get('DB'), 0)
self.assertEqual(conf.get('PASSWORD'), 'password')
self.assertEqual(conf.get('PORT'), 1234)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
# no password
conf = parse_redis_url("redis://localhost2:1234/0")
self.assertEqual(conf.get('HOST'), 'localhost2')
self.assertEqual(conf.get('DB'), 0)
self.assertEqual(conf.get('PASSWORD'), None)
self.assertEqual(conf.get('PORT'), 1234)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), None)
self.assertEqual(conf.get("PORT"), 1234)
@patch('defender.config.DEFENDER_REDIS_NAME', 'default')
@patch("defender.config.DEFENDER_REDIS_NAME", "default")
def test_get_redis_connection_django_conf(self):
""" get the redis connection """
redis_client = get_redis_connection()
self.assertIsInstance(redis_client, Redis)
@patch('defender.config.DEFENDER_REDIS_NAME', 'bad-key')
@patch("defender.config.DEFENDER_REDIS_NAME", "bad-key")
def test_get_redis_connection_django_conf_wrong_key(self):
""" see if we get the correct error """
error_msg = ('The cache bad-key was not found on the django '
'cache settings.')
error_msg = "The cache bad-key was not found on the django " "cache settings."
self.assertRaisesMessage(KeyError, error_msg, get_redis_connection)
def test_get_ip_address_from_request(self):
""" get ip from request, make sure it is correct """
req = HttpRequest()
req.META['REMOTE_ADDR'] = '1.2.3.4'
req.META["REMOTE_ADDR"] = "1.2.3.4"
ip = utils.get_ip_address_from_request(req)
self.assertEqual(ip, '1.2.3.4')
self.assertEqual(ip, "1.2.3.4")
req = HttpRequest()
req.META['REMOTE_ADDR'] = '1.2.3.4 '
req.META["REMOTE_ADDR"] = "1.2.3.4 "
ip = utils.get_ip_address_from_request(req)
self.assertEqual(ip, '1.2.3.4')
self.assertEqual(ip, "1.2.3.4")
req = HttpRequest()
req.META['REMOTE_ADDR'] = '192.168.100.34.y'
req.META["REMOTE_ADDR"] = "192.168.100.34.y"
ip = utils.get_ip_address_from_request(req)
self.assertEqual(ip, '127.0.0.1')
self.assertEqual(ip, "127.0.0.1")
req = HttpRequest()
req.META['REMOTE_ADDR'] = 'cat'
req.META["REMOTE_ADDR"] = "cat"
ip = utils.get_ip_address_from_request(req)
self.assertEqual(ip, '127.0.0.1')
self.assertEqual(ip, "127.0.0.1")
req = HttpRequest()
ip = utils.get_ip_address_from_request(req)
self.assertEqual(ip, '127.0.0.1')
self.assertEqual(ip, "127.0.0.1")
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
@patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_PROXIED')
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.REVERSE_PROXY_HEADER", "HTTP_X_PROXIED")
def test_get_ip_reverse_proxy_custom_header(self):
""" make sure the ip is correct behind reverse proxy """
req = HttpRequest()
req.META['HTTP_X_PROXIED'] = '1.2.3.4'
self.assertEqual(utils.get_ip(req), '1.2.3.4')
req.META["HTTP_X_PROXIED"] = "1.2.3.4"
self.assertEqual(utils.get_ip(req), "1.2.3.4")
req = HttpRequest()
req.META['HTTP_X_PROXIED'] = '1.2.3.4, 5.6.7.8, 127.0.0.1'
self.assertEqual(utils.get_ip(req), '1.2.3.4')
req.META["HTTP_X_PROXIED"] = "1.2.3.4, 5.6.7.8, 127.0.0.1"
self.assertEqual(utils.get_ip(req), "1.2.3.4")
req = HttpRequest()
req.META['REMOTE_ADDR'] = '1.2.3.4'
self.assertEqual(utils.get_ip(req), '1.2.3.4')
req.META["REMOTE_ADDR"] = "1.2.3.4"
self.assertEqual(utils.get_ip(req), "1.2.3.4")
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
@patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_REAL_IP')
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.REVERSE_PROXY_HEADER", "HTTP_X_REAL_IP")
def test_get_user_attempts(self):
""" Get the user attempts make sure they are correct """
ip_attempts = random.randint(3, 12)
username_attempts = random.randint(3, 12)
for i in range(0, ip_attempts):
utils.increment_key(utils.get_ip_attempt_cache_key('1.2.3.4'))
utils.increment_key(utils.get_ip_attempt_cache_key("1.2.3.4"))
for i in range(0, username_attempts):
utils.increment_key(utils.get_username_attempt_cache_key('foobar'))
utils.increment_key(utils.get_username_attempt_cache_key("foobar"))
req = HttpRequest()
req.POST['username'] = 'foobar'
req.META['HTTP_X_REAL_IP'] = '1.2.3.4'
req.POST["username"] = "foobar"
req.META["HTTP_X_REAL_IP"] = "1.2.3.4"
self.assertEqual(
utils.get_user_attempts(req), max(ip_attempts, username_attempts)
)
req = HttpRequest()
req.POST['username'] = 'foobar'
req.META['HTTP_X_REAL_IP'] = '5.6.7.8'
self.assertEqual(
utils.get_user_attempts(req), username_attempts
)
req.POST["username"] = "foobar"
req.META["HTTP_X_REAL_IP"] = "5.6.7.8"
self.assertEqual(utils.get_user_attempts(req), username_attempts)
req = HttpRequest()
req.POST['username'] = 'barfoo'
req.META['HTTP_X_REAL_IP'] = '1.2.3.4'
self.assertEqual(
utils.get_user_attempts(req), ip_attempts
)
req.POST["username"] = "barfoo"
req.META["HTTP_X_REAL_IP"] = "1.2.3.4"
self.assertEqual(utils.get_user_attempts(req), ip_attempts)
def test_admin(self):
""" test the admin pages for this app """
from .admin import AccessAttemptAdmin
AccessAttemptAdmin
def test_unblock_view_user_with_plus(self):
@ -636,16 +633,18 @@ class AccessAttemptTest(DefenderTestCase):
Regression test for #GH76.
"""
reverse('defender_unblock_username_view',
kwargs={'username': 'user+test@test.tld'})
reverse(
"defender_unblock_username_view", kwargs={"username": "user+test@test.tld"}
)
def test_unblock_view_user_with_special_symbols(self):
"""
There is an available admin view for unblocking a user
with a exclamation mark sign in the username.
"""
reverse('defender_unblock_username_view',
kwargs={'username': 'user!test@test.tld'})
reverse(
"defender_unblock_username_view", kwargs={"username": "user!test@test.tld"}
)
def test_decorator_middleware(self):
# because watch_login is called twice in this test (once by the
@ -678,7 +677,7 @@ class AccessAttemptTest(DefenderTestCase):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertNotContains(response, self.LOCKED_MESSAGE)
@patch('defender.config.USE_CELERY', True)
@patch("defender.config.USE_CELERY", True)
def test_use_celery(self):
""" Check that use celery works """
@ -694,16 +693,15 @@ class AccessAttemptTest(DefenderTestCase):
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE)
self.assertEqual(AccessAttempt.objects.count(),
config.FAILURE_LIMIT + 1)
self.assertEqual(AccessAttempt.objects.count(), config.FAILURE_LIMIT + 1)
self.assertIsNotNone(str(AccessAttempt.objects.all()[0]))
@patch('defender.config.LOCKOUT_BY_IP_USERNAME', True)
@patch("defender.config.LOCKOUT_BY_IP_USERNAME", True)
def test_lockout_by_ip_and_username(self):
""" Check that lockout still works when locking out by
IP and Username combined """
username = 'testy'
username = "testy"
for i in range(0, config.FAILURE_LIMIT):
response = self._login(username=username)
@ -726,23 +724,23 @@ class AccessAttemptTest(DefenderTestCase):
# We shouldn't get a lockout message when attempting to use a
# different ip address
ip = '74.125.239.60'
ip = "74.125.239.60"
response = self._login(username=VALID_USERNAME, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@patch('defender.config.DISABLE_IP_LOCKOUT', True)
@patch("defender.config.DISABLE_IP_LOCKOUT", True)
def test_disable_ip_lockout(self):
""" Check that lockout still works when we disable IP Lock out """
username = 'testy'
username = "testy"
# try logging in with the same IP, but different username
# we shouldn't be blocked.
# same IP different, usernames
ip = '74.125.239.60'
ip = "74.125.239.60"
for i in range(0, config.FAILURE_LIMIT + 10):
login_username = u"{0}{1}".format(username, i)
login_username = "{0}{1}".format(username, i)
response = self._login(username=login_username, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@ -770,7 +768,7 @@ class AccessAttemptTest(DefenderTestCase):
# We shouldn't get a lockout message when attempting to use a
# different ip address
second_ip = '74.125.239.99'
second_ip = "74.125.239.99"
response = self._login(username=VALID_USERNAME, remote_addr=second_ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@ -786,22 +784,22 @@ class AccessAttemptTest(DefenderTestCase):
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, [])
@patch('defender.config.DISABLE_USERNAME_LOCKOUT', True)
@patch("defender.config.DISABLE_USERNAME_LOCKOUT", True)
def test_disable_username_lockout(self):
""" Check lockouting still works when we disable username lockout """
username = 'testy'
username = "testy"
# try logging in with the same username, but different IPs.
# we shouldn't be locked.
for i in range(0, config.FAILURE_LIMIT + 10):
ip = '74.125.126.{0}'.format(i)
ip = "74.125.126.{0}".format(i)
response = self._login(username=username, remote_addr=ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
# same ip and same username
ip = '74.125.127.1'
ip = "74.125.127.1"
for i in range(0, config.FAILURE_LIMIT):
response = self._login(username=username, remote_addr=ip)
# Check if we are in the same login page
@ -818,7 +816,7 @@ class AccessAttemptTest(DefenderTestCase):
# We shouldn't get a lockout message when attempting to use a
# different ip address to be sure that username is not blocked.
second_ip = '74.125.127.2'
second_ip = "74.125.127.2"
response = self._login(username=username, remote_addr=second_ip)
# Check if we are in the same login page
self.assertContains(response, LOGIN_FORM_KEY)
@ -834,8 +832,8 @@ class AccessAttemptTest(DefenderTestCase):
data_out = utils.get_blocked_usernames()
self.assertEqual(data_out, [])
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
@patch('defender.config.IP_FAILURE_LIMIT', 3)
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.IP_FAILURE_LIMIT", 3)
def test_login_blocked_for_non_standard_login_views_without_msg(self):
"""
Check that a view wich returns the expected status code is causing
@ -849,11 +847,11 @@ class AccessAttemptTest(DefenderTestCase):
return HttpResponse(status=401)
request_factory = RequestFactory()
request = request_factory.post('api/login')
request = request_factory.post("api/login")
request.user = AnonymousUser()
request.session = SessionStore()
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24'
request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24"
for _ in range(3):
fake_api_401_login_view_without_msg(request)
@ -864,27 +862,27 @@ class AccessAttemptTest(DefenderTestCase):
fake_api_401_login_view_without_msg(request)
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, ['192.168.24.24'])
self.assertEqual(data_out, ["192.168.24.24"])
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
@patch('defender.config.IP_FAILURE_LIMIT', 3)
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.IP_FAILURE_LIMIT", 3)
def test_login_blocked_for_non_standard_login_views_with_msg(self):
"""
Check that a view wich returns the expected status code and the
expected message is causing the IP to be locked out.
"""
@watch_login(status_code=401, msg='Invalid credentials')
@watch_login(status_code=401, msg="Invalid credentials")
def fake_api_401_login_view_without_msg(request):
""" Fake the api login with 401 """
return HttpResponse('Sorry, Invalid credentials',
status=401)
return HttpResponse("Sorry, Invalid credentials", status=401)
request_factory = RequestFactory()
request = request_factory.post('api/login')
request = request_factory.post("api/login")
request.user = AnonymousUser()
request.session = SessionStore()
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24'
request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24"
for _ in range(3):
fake_api_401_login_view_without_msg(request)
@ -895,27 +893,27 @@ class AccessAttemptTest(DefenderTestCase):
fake_api_401_login_view_without_msg(request)
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, ['192.168.24.24'])
self.assertEqual(data_out, ["192.168.24.24"])
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
@patch('defender.config.IP_FAILURE_LIMIT', 3)
@patch("defender.config.BEHIND_REVERSE_PROXY", True)
@patch("defender.config.IP_FAILURE_LIMIT", 3)
def test_login_non_blocked_for_non_standard_login_views_different_msg(self):
"""
Check that a view wich returns the expected status code but not the
expected message is not causing the IP to be locked out.
"""
@watch_login(status_code=401, msg='Invalid credentials')
@watch_login(status_code=401, msg="Invalid credentials")
def fake_api_401_login_view_without_msg(request):
""" Fake the api login with 401 """
return HttpResponse('Ups, wrong credentials',
status=401)
return HttpResponse("Ups, wrong credentials", status=401)
request_factory = RequestFactory()
request = request_factory.post('api/login')
request = request_factory.post("api/login")
request.user = AnonymousUser()
request.session = SessionStore()
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24'
request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24"
for _ in range(4):
fake_api_401_login_view_without_msg(request)
@ -935,17 +933,17 @@ class SignalTest(DefenderTestCase):
self.blocked_ip = ip_address
ip_block_signal.connect(handler)
utils.block_ip('8.8.8.8')
self.assertEqual(self.blocked_ip, '8.8.8.8')
utils.block_ip("8.8.8.8")
self.assertEqual(self.blocked_ip, "8.8.8.8")
def test_should_send_signal_when_unblocking_ip(self):
self.blocked_ip = ('8.8.8.8')
self.blocked_ip = "8.8.8.8"
def handler(sender, ip_address, **kwargs):
self.blocked_ip = None
ip_unblock_signal.connect(handler)
utils.unblock_ip('8.8.8.8')
utils.unblock_ip("8.8.8.8")
self.assertIsNone(self.blocked_ip)
def test_should_not_send_signal_when_ip_already_blocked(self):
@ -956,10 +954,10 @@ class SignalTest(DefenderTestCase):
ip_block_signal.connect(handler)
key = utils.get_ip_blocked_cache_key('8.8.8.8')
utils.REDIS_SERVER.set(key, 'blocked')
key = utils.get_ip_blocked_cache_key("8.8.8.8")
utils.REDIS_SERVER.set(key, "blocked")
utils.block_ip('8.8.8.8')
utils.block_ip("8.8.8.8")
self.assertIsNone(self.blocked_ip)
def test_should_send_signal_when_blocking_username(self):
@ -969,17 +967,17 @@ class SignalTest(DefenderTestCase):
self.blocked_username = username
username_block_signal.connect(handler)
utils.block_username('richard_hendricks')
self.assertEqual(self.blocked_username, 'richard_hendricks')
utils.block_username("richard_hendricks")
self.assertEqual(self.blocked_username, "richard_hendricks")
def test_should_send_signal_when_unblocking_username(self):
self.blocked_username = 'richard_hendricks'
self.blocked_username = "richard_hendricks"
def handler(sender, username, **kwargs):
self.blocked_username = None
username_unblock_signal.connect(handler)
utils.unblock_username('richard_hendricks')
utils.unblock_username("richard_hendricks")
self.assertIsNone(self.blocked_username)
def test_should_not_send_signal_when_username_already_blocked(self):
@ -990,16 +988,17 @@ class SignalTest(DefenderTestCase):
username_block_signal.connect(handler)
key = utils.get_username_blocked_cache_key('richard hendricks')
utils.REDIS_SERVER.set(key, 'blocked')
key = utils.get_username_blocked_cache_key("richard hendricks")
utils.REDIS_SERVER.set(key, "blocked")
utils.block_ip('richard hendricks')
utils.block_ip("richard hendricks")
self.assertIsNone(self.blocked_username)
class DefenderTestCaseTest(DefenderTestCase):
""" Make sure that we're cleaning the cache between tests """
key = 'test_key'
key = "test_key"
def test_first_incr(self):
""" first increment """
@ -1016,7 +1015,8 @@ class DefenderTestCaseTest(DefenderTestCase):
class DefenderTransactionTestCaseTest(DefenderTransactionTestCase):
""" Make sure that we're cleaning the cache between tests """
key = 'test_key'
key = "test_key"
def test_first_incr(self):
""" first increment """
@ -1036,7 +1036,7 @@ class TestUtils(DefenderTestCase):
def test_username_blocking(self):
""" test username blocking """
username = 'foo'
username = "foo"
self.assertFalse(utils.is_user_already_locked(username))
utils.block_username(username)
self.assertTrue(utils.is_user_already_locked(username))
@ -1045,7 +1045,7 @@ class TestUtils(DefenderTestCase):
def test_ip_address_blocking(self):
""" ip address blocking """
ip = '1.2.3.4'
ip = "1.2.3.4"
self.assertFalse(utils.is_source_ip_already_locked(ip))
utils.block_ip(ip)
self.assertTrue(utils.is_source_ip_already_locked(ip))
@ -1059,18 +1059,14 @@ class TestUtils(DefenderTestCase):
request = request_factory.get(ADMIN_LOGIN_URL)
request.user = AnonymousUser()
request.session = SessionStore()
username = 'johndoe'
username = "johndoe"
utils.block_username(request.user.username)
self.assertFalse(utils.is_already_locked(request, username=username))
utils.check_request(request, True, username=username)
self.assertEqual(
utils.get_user_attempts(request, username=username), 1
)
self.assertEqual(utils.get_user_attempts(request, username=username), 1)
utils.add_login_attempt_to_db(request, True, username=username)
self.assertEqual(
AccessAttempt.objects.filter(username=username).count(), 1
)
self.assertEqual(AccessAttempt.objects.filter(username=username).count(), 1)

View file

@ -2,63 +2,55 @@ import os
from celery import Celery
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
}
}
DATABASES = {"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:",}}
CACHES = {
'default': {
'BACKEND': 'redis_cache.RedisCache',
'LOCATION': 'localhost:6379',
}
"default": {"BACKEND": "redis_cache.RedisCache", "LOCATION": "localhost:6379",}
}
SITE_ID = 1
MIDDLEWARE = (
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'defender.middleware.FailedLoginMiddleware',
"django.middleware.common.CommonMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"defender.middleware.FailedLoginMiddleware",
)
ROOT_URLCONF = 'defender.test_urls'
ROOT_URLCONF = "defender.test_urls"
INSTALLED_APPS = [
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
'django.contrib.messages',
'django.contrib.admin',
'defender',
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.sites",
"django.contrib.messages",
"django.contrib.admin",
"defender",
]
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.contrib.auth.context_processors.auth',
'django.template.context_processors.debug',
'django.template.context_processors.i18n',
'django.template.context_processors.media',
'django.template.context_processors.static',
'django.template.context_processors.tz',
'django.contrib.messages.context_processors.messages',
"BACKEND": "django.template.backends.django.DjangoTemplates",
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.contrib.auth.context_processors.auth",
"django.template.context_processors.debug",
"django.template.context_processors.i18n",
"django.template.context_processors.media",
"django.template.context_processors.static",
"django.template.context_processors.tz",
"django.contrib.messages.context_processors.messages",
],
},
},
]
SECRET_KEY = os.environ.get('SECRET_KEY', 'too-secret-for-test')
SECRET_KEY = os.environ.get("SECRET_KEY", "too-secret-for-test")
LOGIN_REDIRECT_URL = '/admin'
LOGIN_REDIRECT_URL = "/admin"
DEFENDER_LOGIN_FAILURE_LIMIT = 10
DEFENDER_COOLOFF_TIME = 2
@ -68,15 +60,15 @@ DEFENDER_MOCK_REDIS = False
# Celery settings:
CELERY_ALWAYS_EAGER = True
BROKER_BACKEND = 'memory'
BROKER_URL = 'memory://'
BROKER_BACKEND = "memory"
BROKER_URL = "memory://"
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.travis_settings')
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "defender.travis_settings")
app = Celery('defender')
app = Celery("defender")
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.config_from_object("django.conf:settings")
app.autodiscover_tasks(lambda: INSTALLED_APPS)

View file

@ -2,11 +2,15 @@ from django.conf.urls import url
from .views import block_view, unblock_ip_view, unblock_username_view
urlpatterns = [
url(r'^blocks/$', block_view,
name="defender_blocks_view"),
url(r'^blocks/ip/(?P<ip_address>[A-Za-z0-9-._]+)/unblock$', unblock_ip_view,
name="defender_unblock_ip_view"),
url(r'^blocks/username/(?P<username>[\w]+[^\/]*)/unblock$',
url(r"^blocks/$", block_view, name="defender_blocks_view"),
url(
r"^blocks/ip/(?P<ip_address>[A-Za-z0-9-._]+)/unblock$",
unblock_ip_view,
name="defender_unblock_ip_view",
),
url(
r"^blocks/username/(?P<username>[\w]+[^\/]*)/unblock$",
unblock_username_view,
name="defender_unblock_username_view"),
name="defender_unblock_username_view",
),
]

View file

@ -10,8 +10,12 @@ from django.utils.module_loading import import_string
from .connection import get_redis_connection
from . import config
from .data import store_login_attempt
from .signals import (send_username_block_signal, send_ip_block_signal,
send_username_unblock_signal, send_ip_unblock_signal)
from .signals import (
send_username_block_signal,
send_ip_block_signal,
send_username_unblock_signal,
send_ip_unblock_signal,
)
REDIS_SERVER = get_redis_connection()
@ -33,18 +37,18 @@ def is_valid_ip(ip_address):
def get_ip_address_from_request(request):
""" Makes the best attempt to get the client's real IP or return
the loopback """
remote_addr = request.META.get('REMOTE_ADDR', '')
remote_addr = request.META.get("REMOTE_ADDR", "")
if remote_addr and is_valid_ip(remote_addr):
return remote_addr.strip()
return '127.0.0.1'
return "127.0.0.1"
def get_ip(request):
""" get the ip address from the request """
if config.BEHIND_REVERSE_PROXY:
ip_address = request.META.get(config.REVERSE_PROXY_HEADER, '')
ip_address = request.META.get(config.REVERSE_PROXY_HEADER, "")
ip_address = ip_address.split(",", 1)[0].strip()
if ip_address == '':
if ip_address == "":
ip_address = get_ip_address_from_request(request)
else:
ip_address = get_ip_address_from_request(request)
@ -68,8 +72,9 @@ def get_ip_attempt_cache_key(ip_address):
def get_username_attempt_cache_key(username):
""" get the cache key by username """
return "{0}:failed:username:{1}".format(config.CACHE_PREFIX,
lower_username(username))
return "{0}:failed:username:{1}".format(
config.CACHE_PREFIX, lower_username(username)
)
def get_ip_blocked_cache_key(ip_address):
@ -79,8 +84,9 @@ def get_ip_blocked_cache_key(ip_address):
def get_username_blocked_cache_key(username):
""" get the cache key by username """
return "{0}:blocked:username:{1}".format(config.CACHE_PREFIX,
lower_username(username))
return "{0}:blocked:username:{1}".format(
config.CACHE_PREFIX, lower_username(username)
)
def strip_keys(key_list):
@ -105,8 +111,7 @@ def get_blocked_ips():
# There are no blocked IP's since we disabled them.
return []
key = get_ip_blocked_cache_key("*")
key_list = [redis_key.decode('utf-8')
for redis_key in REDIS_SERVER.keys(key)]
key_list = [redis_key.decode("utf-8") for redis_key in REDIS_SERVER.keys(key)]
return strip_keys(key_list)
@ -116,8 +121,7 @@ def get_blocked_usernames():
# There are no blocked usernames since we disabled them.
return []
key = get_username_blocked_cache_key("*")
key_list = [redis_key.decode('utf-8')
for redis_key in REDIS_SERVER.keys(key)]
key_list = [redis_key.decode("utf-8") for redis_key in REDIS_SERVER.keys(key)]
return strip_keys(key_list)
@ -138,9 +142,7 @@ def username_from_request(request):
return None
get_username_from_request = import_string(
config.GET_USERNAME_FROM_REQUEST_PATH
)
get_username_from_request = import_string(config.GET_USERNAME_FROM_REQUEST_PATH)
def get_user_attempts(request, get_username=get_username_from_request, username=None):
@ -177,9 +179,9 @@ def block_ip(ip_address):
already_blocked = is_source_ip_already_locked(ip_address)
key = get_ip_blocked_cache_key(ip_address)
if config.COOLOFF_TIME:
REDIS_SERVER.set(key, 'blocked', config.COOLOFF_TIME)
REDIS_SERVER.set(key, "blocked", config.COOLOFF_TIME)
else:
REDIS_SERVER.set(key, 'blocked')
REDIS_SERVER.set(key, "blocked")
if not already_blocked:
send_ip_block_signal(ip_address)
@ -195,9 +197,9 @@ def block_username(username):
already_blocked = is_user_already_locked(username)
key = get_username_blocked_cache_key(username)
if config.COOLOFF_TIME:
REDIS_SERVER.set(key, 'blocked', config.COOLOFF_TIME)
REDIS_SERVER.set(key, "blocked", config.COOLOFF_TIME)
else:
REDIS_SERVER.set(key, 'blocked')
REDIS_SERVER.set(key, "blocked")
if not already_blocked:
send_username_block_signal(username)
@ -291,9 +293,9 @@ def lockout_response(request):
""" if we are locked out, here is the response """
if config.LOCKOUT_TEMPLATE:
context = {
'cooloff_time_seconds': config.COOLOFF_TIME,
'cooloff_time_minutes': config.COOLOFF_TIME / 60,
'failure_limit': config.FAILURE_LIMIT,
"cooloff_time_seconds": config.COOLOFF_TIME,
"cooloff_time_minutes": config.COOLOFF_TIME / 60,
"failure_limit": config.FAILURE_LIMIT,
}
return render(request, config.LOCKOUT_TEMPLATE, context)
@ -301,11 +303,14 @@ def lockout_response(request):
return HttpResponseRedirect(config.LOCKOUT_URL)
if config.COOLOFF_TIME:
return HttpResponse("Account locked: too many login attempts. "
"Please try again later.")
return HttpResponse(
"Account locked: too many login attempts. " "Please try again later."
)
else:
return HttpResponse("Account locked: too many login attempts. "
"Contact an admin to unlock your account.")
return HttpResponse(
"Account locked: too many login attempts. "
"Contact an admin to unlock your account."
)
def is_user_already_locked(username):
@ -339,9 +344,9 @@ def is_already_locked(request, get_username=get_username_from_request, username=
return ip_blocked or user_blocked
def check_request(request, login_unsuccessful,
get_username=get_username_from_request,
username=None):
def check_request(
request, login_unsuccessful, get_username=get_username_from_request, username=None
):
""" check the request, and process results"""
ip_address = get_ip(request)
username = username or get_username(request)
@ -355,9 +360,9 @@ def check_request(request, login_unsuccessful,
return record_failed_attempt(ip_address, username)
def add_login_attempt_to_db(request, login_valid,
get_username=get_username_from_request,
username=None):
def add_login_attempt_to_db(
request, login_valid, get_username=get_username_from_request, username=None
):
""" Create a record for the login attempt If using celery call celery
task, if not, call the method normally """
@ -367,15 +372,18 @@ def add_login_attempt_to_db(request, login_valid,
username = username or get_username(request)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
user_agent = request.META.get("HTTP_USER_AGENT", "<unknown>")[:255]
ip_address = get_ip(request)
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')
path_info = request.META.get('PATH_INFO', '<unknown>')
http_accept = request.META.get("HTTP_ACCEPT", "<unknown>")
path_info = request.META.get("PATH_INFO", "<unknown>")
if config.USE_CELERY:
from .tasks import add_login_attempt_task
add_login_attempt_task.delay(user_agent, ip_address, username,
http_accept, path_info, login_valid)
add_login_attempt_task.delay(
user_agent, ip_address, username, http_accept, path_info, login_valid
)
else:
store_login_attempt(user_agent, ip_address, username,
http_accept, path_info, login_valid)
store_login_attempt(
user_agent, ip_address, username, http_accept, path_info, login_valid
)

View file

@ -1,13 +1,13 @@
from django.shortcuts import render
from django.http import HttpResponseRedirect
from django.contrib.admin.views.decorators import staff_member_required
try:
from django.urls import reverse
except ImportError:
from django.core.urlresolvers import reverse
from .utils import (
get_blocked_ips, get_blocked_usernames, unblock_ip, unblock_username)
from .utils import get_blocked_ips, get_blocked_usernames, unblock_ip, unblock_username
@staff_member_required
@ -16,15 +16,17 @@ def block_view(request):
blocked_ip_list = get_blocked_ips()
blocked_username_list = get_blocked_usernames()
context = {'blocked_ip_list': blocked_ip_list,
'blocked_username_list': blocked_username_list}
return render(request, 'defender/admin/blocks.html', context)
context = {
"blocked_ip_list": blocked_ip_list,
"blocked_username_list": blocked_username_list,
}
return render(request, "defender/admin/blocks.html", context)
@staff_member_required
def unblock_ip_view(request, ip_address):
""" upblock the given ip """
if request.method == 'POST':
if request.method == "POST":
unblock_ip(ip_address)
return HttpResponseRedirect(reverse("defender_blocks_view"))
@ -32,6 +34,6 @@ def unblock_ip_view(request, ip_address):
@staff_member_required
def unblock_username_view(request, username):
""" unblockt he given username """
if request.method == 'POST':
if request.method == "POST":
unblock_username(username)
return HttpResponseRedirect(reverse("defender_blocks_view"))

View file

@ -33,8 +33,7 @@ master_doc = "index"
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named "sphinx.ext.*") or your custom
# ones.
extensions = [
]
extensions = []
# Add any paths that contain templates here, relative to this directory.
templates_path = ["_templates"]

100
setup.py
View file

@ -9,16 +9,18 @@ except ImportError:
from distutils.core import setup
version = '0.6.2'
version = "0.6.2"
def get_packages(package):
"""
Return root package and all sub-packages.
"""
return [dirpath
for dirpath, dirnames, filenames in os.walk(package)
if os.path.exists(os.path.join(dirpath, '__init__.py'))]
return [
dirpath
for dirpath, dirnames, filenames in os.walk(package)
if os.path.exists(os.path.join(dirpath, "__init__.py"))
]
def get_package_data(package):
@ -26,50 +28,58 @@ def get_package_data(package):
Return all files under the root package, that are not in a
package themselves.
"""
walk = [(dirpath.replace(package + os.sep, '', 1), filenames)
for dirpath, dirnames, filenames in os.walk(package)
if not os.path.exists(os.path.join(dirpath, '__init__.py'))]
walk = [
(dirpath.replace(package + os.sep, "", 1), filenames)
for dirpath, dirnames, filenames in os.walk(package)
if not os.path.exists(os.path.join(dirpath, "__init__.py"))
]
filepaths = []
for base, filenames in walk:
filepaths.extend([os.path.join(base, filename)
for filename in filenames])
filepaths.extend([os.path.join(base, filename) for filename in filenames])
return {package: filepaths}
setup(name='django-defender',
version=version,
description="redis based Django app that locks out users after too "
"many failed login attempts.",
long_description="redis based Django app based on speed, that locks out"
"users after too many failed login attempts.",
classifiers=[
'Development Status :: 5 - Production/Stable',
'Framework :: Django',
'Intended Audience :: Developers',
'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent',
'Programming Language :: Python',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: Implementation :: PyPy',
'Programming Language :: Python :: Implementation :: CPython',
'Topic :: Internet :: WWW/HTTP :: Dynamic Content',
'Topic :: Security',
'Topic :: Software Development :: Libraries',
'Topic :: Software Development :: Libraries :: Python Modules', ],
keywords='django, cache, security, authentication, throttle, login',
author='Ken Cochrane',
url='https://github.com/kencochrane/django-defender',
author_email='kencochrane@gmail.com',
license='Apache 2',
include_package_data=True,
packages=get_packages('defender'),
package_data=get_package_data('defender'),
install_requires=['Django>=1.8,<2.3', 'redis>=2.10.3,<3.3'],
tests_require=['mock', 'mockredispy>=2.9.0.11,<3.0', 'coverage',
'celery', 'django-redis-cache'],
)
setup(
name="django-defender",
version=version,
description="redis based Django app that locks out users after too "
"many failed login attempts.",
long_description="redis based Django app based on speed, that locks out"
"users after too many failed login attempts.",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Framework :: Django",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Operating System :: OS Independent",
"Programming Language :: Python",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3.5",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: Implementation :: PyPy",
"Programming Language :: Python :: Implementation :: CPython",
"Topic :: Internet :: WWW/HTTP :: Dynamic Content",
"Topic :: Security",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
keywords="django, cache, security, authentication, throttle, login",
author="Ken Cochrane",
url="https://github.com/kencochrane/django-defender",
author_email="kencochrane@gmail.com",
license="Apache 2",
include_package_data=True,
packages=get_packages("defender"),
package_data=get_package_data("defender"),
install_requires=["Django>=1.8,<2.3", "redis>=2.10.3,<3.3"],
tests_require=[
"mock",
"mockredispy>=2.9.0.11,<3.0",
"coverage",
"celery",
"django-redis-cache",
],
)