diff --git a/defender/admin.py b/defender/admin.py index 625c6be..8238f78 100644 --- a/defender/admin.py +++ b/defender/admin.py @@ -4,29 +4,26 @@ from .models import AccessAttempt class AccessAttemptAdmin(admin.ModelAdmin): """ Access attempt admin config """ + list_display = ( - 'attempt_time', - 'ip_address', - 'user_agent', - 'username', - 'path_info', - 'login_valid', + "attempt_time", + "ip_address", + "user_agent", + "username", + "path_info", + "login_valid", ) search_fields = [ - 'ip_address', - 'username', + "ip_address", + "username", ] - date_hierarchy = 'attempt_time' + date_hierarchy = "attempt_time" fieldsets = ( - (None, { - 'fields': ('path_info', 'login_valid') - }), - ('Meta Data', { - 'fields': ('user_agent', 'ip_address') - }) + (None, {"fields": ("path_info", "login_valid")}), + ("Meta Data", {"fields": ("user_agent", "ip_address")}), ) diff --git a/defender/config.py b/defender/config.py index a61a88a..18afde4 100644 --- a/defender/config.py +++ b/defender/config.py @@ -9,76 +9,78 @@ def get_setting(variable, default=None): # redis server host -DEFENDER_REDIS_URL = get_setting('DEFENDER_REDIS_URL') +DEFENDER_REDIS_URL = get_setting("DEFENDER_REDIS_URL") # reuse declared cache from django settings -DEFENDER_REDIS_NAME = get_setting('DEFENDER_REDIS_NAME') +DEFENDER_REDIS_NAME = get_setting("DEFENDER_REDIS_NAME") -MOCK_REDIS = get_setting('DEFENDER_MOCK_REDIS', False) +MOCK_REDIS = get_setting("DEFENDER_MOCK_REDIS", False) # see if the user has overridden the failure limit -FAILURE_LIMIT = get_setting('DEFENDER_LOGIN_FAILURE_LIMIT', 3) -USERNAME_FAILURE_LIMIT = get_setting('DEFENDER_LOGIN_FAILURE_LIMIT_USERNAME', FAILURE_LIMIT) -IP_FAILURE_LIMIT = get_setting('DEFENDER_LOGIN_FAILURE_LIMIT_IP', FAILURE_LIMIT) +FAILURE_LIMIT = get_setting("DEFENDER_LOGIN_FAILURE_LIMIT", 3) +USERNAME_FAILURE_LIMIT = get_setting( + "DEFENDER_LOGIN_FAILURE_LIMIT_USERNAME", FAILURE_LIMIT +) +IP_FAILURE_LIMIT = get_setting("DEFENDER_LOGIN_FAILURE_LIMIT_IP", FAILURE_LIMIT) # If this is True, the lockout checks to evaluate if the IP failure limit and # the username failure limit has been reached before issuing the lockout. -LOCKOUT_BY_IP_USERNAME = get_setting( - 'DEFENDER_LOCK_OUT_BY_IP_AND_USERNAME', False) +LOCKOUT_BY_IP_USERNAME = get_setting("DEFENDER_LOCK_OUT_BY_IP_AND_USERNAME", False) # if this is True, The users IP address will not get locked when # there are too many login attempts. -DISABLE_IP_LOCKOUT = get_setting('DEFENDER_DISABLE_IP_LOCKOUT', False) +DISABLE_IP_LOCKOUT = get_setting("DEFENDER_DISABLE_IP_LOCKOUT", False) # If this is True, usernames will not get locked when # there are too many login attempts. -DISABLE_USERNAME_LOCKOUT = get_setting( - 'DEFENDER_DISABLE_USERNAME_LOCKOUT', False) +DISABLE_USERNAME_LOCKOUT = get_setting("DEFENDER_DISABLE_USERNAME_LOCKOUT", False) # use a specific username field to retrieve from login POST data -USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username') +USERNAME_FORM_FIELD = get_setting("DEFENDER_USERNAME_FORM_FIELD", "username") # see if the django app is sitting behind a reverse proxy -BEHIND_REVERSE_PROXY = get_setting('DEFENDER_BEHIND_REVERSE_PROXY', False) +BEHIND_REVERSE_PROXY = get_setting("DEFENDER_BEHIND_REVERSE_PROXY", False) # the prefix for these keys in your cache. -CACHE_PREFIX = get_setting('DEFENDER_CACHE_PREFIX', 'defender') +CACHE_PREFIX = get_setting("DEFENDER_CACHE_PREFIX", "defender") # if the django app is behind a reverse proxy, look for the # ip address using this HTTP header value -REVERSE_PROXY_HEADER = get_setting('DEFENDER_REVERSE_PROXY_HEADER', - 'HTTP_X_FORWARDED_FOR') +REVERSE_PROXY_HEADER = get_setting( + "DEFENDER_REVERSE_PROXY_HEADER", "HTTP_X_FORWARDED_FOR" +) try: # how long to wait before the bad login attempt gets forgotten. in seconds. - COOLOFF_TIME = int(get_setting('DEFENDER_COOLOFF_TIME', 300)) # seconds + COOLOFF_TIME = int(get_setting("DEFENDER_COOLOFF_TIME", 300)) # seconds except ValueError: # pragma: no cover - raise Exception( - 'DEFENDER_COOLOFF_TIME needs to be an integer') # pragma: no cover + raise Exception("DEFENDER_COOLOFF_TIME needs to be an integer") # pragma: no cover -LOCKOUT_TEMPLATE = get_setting('DEFENDER_LOCKOUT_TEMPLATE') +LOCKOUT_TEMPLATE = get_setting("DEFENDER_LOCKOUT_TEMPLATE") -ERROR_MESSAGE = ugettext_lazy("Please enter a correct username and password. " - "Note that both fields are case-sensitive.") +ERROR_MESSAGE = ugettext_lazy( + "Please enter a correct username and password. " + "Note that both fields are case-sensitive." +) -LOCKOUT_URL = get_setting('DEFENDER_LOCKOUT_URL') +LOCKOUT_URL = get_setting("DEFENDER_LOCKOUT_URL") -USE_CELERY = get_setting('DEFENDER_USE_CELERY', False) +USE_CELERY = get_setting("DEFENDER_USE_CELERY", False) -STORE_ACCESS_ATTEMPTS = get_setting('DEFENDER_STORE_ACCESS_ATTEMPTS', True) +STORE_ACCESS_ATTEMPTS = get_setting("DEFENDER_STORE_ACCESS_ATTEMPTS", True) # Used by the management command to decide how long to keep access attempt # recods. Number is # of hours. try: - ACCESS_ATTEMPT_EXPIRATION = int(get_setting( - 'DEFENDER_ACCESS_ATTEMPT_EXPIRATION', 24)) + ACCESS_ATTEMPT_EXPIRATION = int( + get_setting("DEFENDER_ACCESS_ATTEMPT_EXPIRATION", 24) + ) except ValueError: # pragma: no cover raise Exception( - 'DEFENDER_ACCESS_ATTEMPT_EXPIRATION' - ' needs to be an integer') # pragma: no cover + "DEFENDER_ACCESS_ATTEMPT_EXPIRATION" " needs to be an integer" + ) # pragma: no cover GET_USERNAME_FROM_REQUEST_PATH = get_setting( - 'DEFENDER_GET_USERNAME_FROM_REQUEST_PATH', - 'defender.utils.username_from_request' + "DEFENDER_GET_USERNAME_FROM_REQUEST_PATH", "defender.utils.username_from_request" ) diff --git a/defender/connection.py b/defender/connection.py index 23e4a7e..172c6fe 100644 --- a/defender/connection.py +++ b/defender/connection.py @@ -2,6 +2,7 @@ from django.core.cache import caches from django.core.cache.backends.base import InvalidCacheBackendError import redis + try: import urlparse except ImportError: # pragma: no cover @@ -12,21 +13,20 @@ from . import config # Register database schemes in URLs. urlparse.uses_netloc.append("redis") -INVALID_CACHE_ERROR_MSG = 'The cache {} was not found on the django cache' \ - ' settings.' +INVALID_CACHE_ERROR_MSG = "The cache {} was not found on the django cache" " settings." def get_redis_connection(): """ Get the redis connection if not using mock """ if config.MOCK_REDIS: # pragma: no cover import mockredis + return mockredis.mock_strict_redis_client() # pragma: no cover elif config.DEFENDER_REDIS_NAME: # pragma: no cover try: cache = caches[config.DEFENDER_REDIS_NAME] except InvalidCacheBackendError: - raise KeyError(INVALID_CACHE_ERROR_MSG.format( - config.DEFENDER_REDIS_NAME)) + raise KeyError(INVALID_CACHE_ERROR_MSG.format(config.DEFENDER_REDIS_NAME)) # every redis backend implement it own way to get the low level client try: # redis_cache.RedisCache case (django-redis-cache package) @@ -37,12 +37,12 @@ def get_redis_connection(): else: # pragma: no cover redis_config = parse_redis_url(config.DEFENDER_REDIS_URL) return redis.StrictRedis( - host=redis_config.get('HOST'), - port=redis_config.get('PORT'), - db=redis_config.get('DB'), - password=redis_config.get('PASSWORD'), - ssl=redis_config.get('SSL')) - + host=redis_config.get("HOST"), + port=redis_config.get("PORT"), + db=redis_config.get("DB"), + password=redis_config.get("PASSWORD"), + ssl=redis_config.get("SSL"), + ) def parse_redis_url(url): @@ -54,7 +54,7 @@ def parse_redis_url(url): "PASSWORD": None, "HOST": "localhost", "PORT": 6379, - "SSL": False + "SSL": False, } if not url: @@ -63,7 +63,7 @@ def parse_redis_url(url): url = urlparse.urlparse(url) # Remove query strings. path = url.path[1:] - path = path.split('?', 2)[0] + path = path.split("?", 2)[0] if path: redis_config.update({"DB": int(path)}) @@ -73,7 +73,7 @@ def parse_redis_url(url): redis_config.update({"HOST": url.hostname}) if url.port: redis_config.update({"PORT": int(url.port)}) - if url.scheme in ['https', 'rediss']: + if url.scheme in ["https", "rediss"]: redis_config.update({"SSL": True}) return redis_config diff --git a/defender/data.py b/defender/data.py index 8febfb9..31e64db 100644 --- a/defender/data.py +++ b/defender/data.py @@ -1,8 +1,9 @@ from .models import AccessAttempt -def store_login_attempt(user_agent, ip_address, username, - http_accept, path_info, login_valid): +def store_login_attempt( + user_agent, ip_address, username, http_accept, path_info, login_valid +): """ Store the login attempt to the db. """ AccessAttempt.objects.create( user_agent=user_agent, diff --git a/defender/decorators.py b/defender/decorators.py index bb4b004..3612f2a 100644 --- a/defender/decorators.py +++ b/defender/decorators.py @@ -3,8 +3,7 @@ from . import utils import functools -def watch_login(status_code=302, msg='', - get_username=utils.get_username_from_request): +def watch_login(status_code=302, msg="", get_username=utils.get_username_from_request): """ Used to decorate the django.contrib.admin.site.login method or any other function you want to protect by brute forcing. @@ -12,6 +11,7 @@ def watch_login(status_code=302, msg='', indicate a failure and/or a string that will be checked within the response body. """ + def decorated_login(func): @functools.wraps(func) def wrapper(request, *args, **kwargs): @@ -24,29 +24,30 @@ def watch_login(status_code=302, msg='', # call the login function response = func(request, *args, **kwargs) - if request.method == 'POST': + if request.method == "POST": # see if the login was successful if status_code == 302: # standard Django login view login_unsuccessful = ( - response and - not response.has_header('location') and - response.status_code != status_code + response + and not response.has_header("location") + and response.status_code != status_code ) else: # If msg is not passed the last condition will be evaluated # always to True so the first 2 will decide the result. login_unsuccessful = ( - response and response.status_code == status_code - and msg in response.content.decode('utf-8') + response + and response.status_code == status_code + and msg in response.content.decode("utf-8") ) # ideally make this background task, but to keep simple, # keeping it inline for now. - utils.add_login_attempt_to_db(request, not login_unsuccessful, - get_username) + utils.add_login_attempt_to_db( + request, not login_unsuccessful, get_username + ) - if utils.check_request(request, login_unsuccessful, - get_username): + if utils.check_request(request, login_unsuccessful, get_username): return response return utils.lockout_response(request) @@ -54,4 +55,5 @@ def watch_login(status_code=302, msg='', return response return wrapper + return decorated_login diff --git a/defender/exampleapp/settings.py b/defender/exampleapp/settings.py index 4889801..9e97954 100644 --- a/defender/exampleapp/settings.py +++ b/defender/exampleapp/settings.py @@ -2,22 +2,21 @@ import os from celery import Celery PROJECT_DIR = lambda base: os.path.abspath( - os.path.join(os.path.dirname(__file__), base).replace('\\', '/')) - - -MEDIA_ROOT = PROJECT_DIR(os.path.join('media')) -MEDIA_URL = '/media/' -STATIC_ROOT = PROJECT_DIR(os.path.join('static')) -STATIC_URL = '/static/' - -STATICFILES_DIRS = ( - PROJECT_DIR(os.path.join('media', 'static')), + os.path.join(os.path.dirname(__file__), base).replace("\\", "/") ) + +MEDIA_ROOT = PROJECT_DIR(os.path.join("media")) +MEDIA_URL = "/media/" +STATIC_ROOT = PROJECT_DIR(os.path.join("static")) +STATIC_URL = "/static/" + +STATICFILES_DIRS = (PROJECT_DIR(os.path.join("media", "static")),) + DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': PROJECT_DIR('defender.sb'), + "default": { + "ENGINE": "django.db.backends.sqlite3", + "NAME": PROJECT_DIR("defender.sb"), } } @@ -25,35 +24,35 @@ DATABASES = { SITE_ID = 1 MIDDLEWARE = ( - 'django.middleware.common.CommonMiddleware', - 'django.contrib.sessions.middleware.SessionMiddleware', - 'django.contrib.auth.middleware.AuthenticationMiddleware', - 'defender.middleware.FailedLoginMiddleware', + "django.middleware.common.CommonMiddleware", + "django.contrib.sessions.middleware.SessionMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", + "defender.middleware.FailedLoginMiddleware", ) -ROOT_URLCONF = 'defender.exampleapp.urls' +ROOT_URLCONF = "defender.exampleapp.urls" INSTALLED_APPS = [ - 'django.contrib.auth', - 'django.contrib.contenttypes', - 'django.contrib.sessions', - 'django.contrib.sites', - 'django.contrib.messages', - 'django.contrib.admin', - 'django.contrib.staticfiles', - 'defender', + "django.contrib.auth", + "django.contrib.contenttypes", + "django.contrib.sessions", + "django.contrib.sites", + "django.contrib.messages", + "django.contrib.admin", + "django.contrib.staticfiles", + "defender", ] # List of finder classes that know how to find static files in # various locations. STATICFILES_FINDERS = ( - 'django.contrib.staticfiles.finders.FileSystemFinder', - 'django.contrib.staticfiles.finders.AppDirectoriesFinder', + "django.contrib.staticfiles.finders.FileSystemFinder", + "django.contrib.staticfiles.finders.AppDirectoriesFinder", ) -SECRET_KEY = os.environ.get('SECRET_KEY', 'too-secret-for-test') +SECRET_KEY = os.environ.get("SECRET_KEY", "too-secret-for-test") -LOGIN_REDIRECT_URL = '/admin' +LOGIN_REDIRECT_URL = "/admin" DEFENDER_LOGIN_FAILURE_LIMIT = 1 DEFENDER_COOLOFF_TIME = 60 @@ -62,22 +61,22 @@ DEFENDER_REDIS_URL = "redis://localhost:6379/1" DEFENDER_MOCK_REDIS = False # Let's use custom function and strip username string from request. DEFENDER_GET_USERNAME_FROM_REQUEST_PATH = ( - 'defender.exampleapp.utils.strip_username_from_request' + "defender.exampleapp.utils.strip_username_from_request" ) # Celery settings: CELERY_ALWAYS_EAGER = True -BROKER_BACKEND = 'memory' -BROKER_URL = 'memory://' +BROKER_BACKEND = "memory" +BROKER_URL = "memory://" # set the default Django settings module for the 'celery' program. -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.exampleapp.settings') +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "defender.exampleapp.settings") -app = Celery('defender') +app = Celery("defender") # Using a string here means the worker will not have to # pickle the object when using Windows. -app.config_from_object('django.conf:settings') +app.config_from_object("django.conf:settings") app.autodiscover_tasks(lambda: INSTALLED_APPS) DEBUG = True diff --git a/defender/exampleapp/urls.py b/defender/exampleapp/urls.py index c0d8596..1f367b6 100644 --- a/defender/exampleapp/urls.py +++ b/defender/exampleapp/urls.py @@ -7,9 +7,9 @@ from django.conf.urls.static import static admin.autodiscover() urlpatterns = patterns( - '', - (r'^admin/', include(admin.site.urls)), - (r'^admin/defender/', include('defender.urls')), + "", + (r"^admin/", include(admin.site.urls)), + (r"^admin/defender/", include("defender.urls")), ) diff --git a/defender/management/commands/cleanup_django_defender.py b/defender/management/commands/cleanup_django_defender.py index d0c506c..12af5d2 100644 --- a/defender/management/commands/cleanup_django_defender.py +++ b/defender/management/commands/cleanup_django_defender.py @@ -10,6 +10,7 @@ from ... import config class Command(BaseCommand): """ clean up management command """ + help = "Cleans up django-defender AccessAttempt table" def handle(self, **options): @@ -31,5 +32,6 @@ class Command(BaseCommand): print( "Finished. Removed {0} AccessAttempt entries.".format( - attempts_to_clean_count) + attempts_to_clean_count + ) ) diff --git a/defender/middleware.py b/defender/middleware.py index 45a6fa7..f6ef56b 100644 --- a/defender/middleware.py +++ b/defender/middleware.py @@ -10,6 +10,7 @@ from .decorators import watch_login class FailedLoginMiddleware(MIDDLEWARE_BASE_CLASS): """ Failed login middleware """ + patched = False def __init__(self, *args, **kwargs): @@ -22,6 +23,7 @@ class FailedLoginMiddleware(MIDDLEWARE_BASE_CLASS): # `LoginView` class-based view try: from django.contrib.auth.views import LoginView + our_decorator = watch_login() watch_login_method = method_decorator(our_decorator) LoginView.dispatch = watch_login_method(LoginView.dispatch) diff --git a/defender/migrations/0001_initial.py b/defender/migrations/0001_initial.py index 915bcc3..195a541 100644 --- a/defender/migrations/0001_initial.py +++ b/defender/migrations/0001_initial.py @@ -7,25 +7,36 @@ from django.db import models, migrations class Migration(migrations.Migration): """ Initial migrations """ - dependencies = [ - ] + dependencies = [] operations = [ migrations.CreateModel( - name='AccessAttempt', + name="AccessAttempt", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('user_agent', models.CharField(max_length=255)), - ('ip_address', models.GenericIPAddressField(null=True, verbose_name='IP Address')), - ('username', models.CharField(max_length=255, null=True)), - ('http_accept', models.CharField(max_length=1025, verbose_name='HTTP Accept')), - ('path_info', models.CharField(max_length=255, verbose_name='Path')), - ('attempt_time', models.DateTimeField(auto_now_add=True)), - ('login_valid', models.BooleanField(default=False)), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ("user_agent", models.CharField(max_length=255)), + ( + "ip_address", + models.GenericIPAddressField(null=True, verbose_name="IP Address"), + ), + ("username", models.CharField(max_length=255, null=True)), + ( + "http_accept", + models.CharField(max_length=1025, verbose_name="HTTP Accept"), + ), + ("path_info", models.CharField(max_length=255, verbose_name="Path")), + ("attempt_time", models.DateTimeField(auto_now_add=True)), + ("login_valid", models.BooleanField(default=False)), ], - options={ - 'ordering': ['-attempt_time'], - }, + options={"ordering": ["-attempt_time"],}, bases=(models.Model,), ), ] diff --git a/defender/models.py b/defender/models.py index 868f77e..82450f4 100644 --- a/defender/models.py +++ b/defender/models.py @@ -7,37 +7,20 @@ from django.utils.encoding import python_2_unicode_compatible @python_2_unicode_compatible class AccessAttempt(models.Model): """ Access Attempt log """ - user_agent = models.CharField( - max_length=255, - ) - ip_address = models.GenericIPAddressField( - verbose_name='IP Address', - null=True, - ) - username = models.CharField( - max_length=255, - null=True, - ) - http_accept = models.CharField( - verbose_name='HTTP Accept', - max_length=1025, - ) - path_info = models.CharField( - verbose_name='Path', - max_length=255, - ) - attempt_time = models.DateTimeField( - auto_now_add=True, - ) - login_valid = models.BooleanField( - default=False, - ) + + user_agent = models.CharField(max_length=255,) + ip_address = models.GenericIPAddressField(verbose_name="IP Address", null=True,) + username = models.CharField(max_length=255, null=True,) + http_accept = models.CharField(verbose_name="HTTP Accept", max_length=1025,) + path_info = models.CharField(verbose_name="Path", max_length=255,) + attempt_time = models.DateTimeField(auto_now_add=True,) + login_valid = models.BooleanField(default=False,) class Meta: - ordering = ['-attempt_time'] + ordering = ["-attempt_time"] def __str__(self): """ unicode value for this model """ - return "{0} @ {1} | {2}".format(self.username, - self.attempt_time, - self.login_valid) + return "{0} @ {1} | {2}".format( + self.username, self.attempt_time, self.login_valid + ) diff --git a/defender/signals.py b/defender/signals.py index c9cd9c1..a0674b2 100644 --- a/defender/signals.py +++ b/defender/signals.py @@ -1,9 +1,9 @@ from django.dispatch import Signal -username_block = Signal(providing_args=['username']) -username_unblock = Signal(providing_args=['username']) -ip_block = Signal(providing_args=['ip_address']) -ip_unblock = Signal(providing_args=['ip_address']) +username_block = Signal(providing_args=["username"]) +username_unblock = Signal(providing_args=["username"]) +ip_block = Signal(providing_args=["ip_address"]) +ip_unblock = Signal(providing_args=["ip_address"]) class BlockSignal: @@ -11,6 +11,7 @@ class BlockSignal: Providing a sender is mandatory when sending signals, hence this empty sender class. """ + pass diff --git a/defender/south_migrations/0001_initial.py b/defender/south_migrations/0001_initial.py index 1d17ee1..c60bad3 100644 --- a/defender/south_migrations/0001_initial.py +++ b/defender/south_migrations/0001_initial.py @@ -10,36 +10,92 @@ class Migration(SchemaMigration): def forwards(self, orm): """ Adding model 'AccessAttempt' """ - db.create_table(u'defender_accessattempt', ( - (u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), - ('user_agent', self.gf('django.db.models.fields.CharField')(max_length=255)), - ('ip_address', self.gf('django.db.models.fields.GenericIPAddressField')(max_length=39, null=True)), - ('username', self.gf('django.db.models.fields.CharField')(max_length=255, null=True)), - ('http_accept', self.gf('django.db.models.fields.CharField')(max_length=1025)), - ('path_info', self.gf('django.db.models.fields.CharField')(max_length=255)), - ('attempt_time', self.gf('django.db.models.fields.DateTimeField')(auto_now_add=True, blank=True)), - ('login_valid', self.gf('django.db.models.fields.BooleanField')(default=False)), - )) - db.send_create_signal(u'defender', ['AccessAttempt']) - + db.create_table( + "defender_accessattempt", + ( + ("id", self.gf("django.db.models.fields.AutoField")(primary_key=True)), + ( + "user_agent", + self.gf("django.db.models.fields.CharField")(max_length=255), + ), + ( + "ip_address", + self.gf("django.db.models.fields.GenericIPAddressField")( + max_length=39, null=True + ), + ), + ( + "username", + self.gf("django.db.models.fields.CharField")( + max_length=255, null=True + ), + ), + ( + "http_accept", + self.gf("django.db.models.fields.CharField")(max_length=1025), + ), + ( + "path_info", + self.gf("django.db.models.fields.CharField")(max_length=255), + ), + ( + "attempt_time", + self.gf("django.db.models.fields.DateTimeField")( + auto_now_add=True, blank=True + ), + ), + ( + "login_valid", + self.gf("django.db.models.fields.BooleanField")(default=False), + ), + ), + ) + db.send_create_signal("defender", ["AccessAttempt"]) def backwards(self, orm): # Deleting model 'AccessAttempt' - db.delete_table(u'defender_accessattempt') - + db.delete_table("defender_accessattempt") models = { - u'defender.accessattempt': { - 'Meta': {'ordering': "[u'-attempt_time']", 'object_name': 'AccessAttempt'}, - 'attempt_time': ('django.db.models.fields.DateTimeField', [], {'auto_now_add': 'True', 'blank': 'True'}), - 'http_accept': ('django.db.models.fields.CharField', [], {'max_length': '1025'}), - u'id': ('django.db.models.fields.AutoField', [], {'primary_key': 'True'}), - 'ip_address': ('django.db.models.fields.GenericIPAddressField', [], {'max_length': '39', 'null': 'True'}), - 'login_valid': ('django.db.models.fields.BooleanField', [], {'default': 'False'}), - 'path_info': ('django.db.models.fields.CharField', [], {'max_length': '255'}), - 'user_agent': ('django.db.models.fields.CharField', [], {'max_length': '255'}), - 'username': ('django.db.models.fields.CharField', [], {'max_length': '255', 'null': 'True'}) + "defender.accessattempt": { + "Meta": {"ordering": "[u'-attempt_time']", "object_name": "AccessAttempt"}, + "attempt_time": ( + "django.db.models.fields.DateTimeField", + [], + {"auto_now_add": "True", "blank": "True"}, + ), + "http_accept": ( + "django.db.models.fields.CharField", + [], + {"max_length": "1025"}, + ), + "id": ("django.db.models.fields.AutoField", [], {"primary_key": "True"}), + "ip_address": ( + "django.db.models.fields.GenericIPAddressField", + [], + {"max_length": "39", "null": "True"}, + ), + "login_valid": ( + "django.db.models.fields.BooleanField", + [], + {"default": "False"}, + ), + "path_info": ( + "django.db.models.fields.CharField", + [], + {"max_length": "255"}, + ), + "user_agent": ( + "django.db.models.fields.CharField", + [], + {"max_length": "255"}, + ), + "username": ( + "django.db.models.fields.CharField", + [], + {"max_length": "255", "null": "True"}, + ), } } - complete_apps = ['defender'] + complete_apps = ["defender"] diff --git a/defender/tasks.py b/defender/tasks.py index b7d9ff9..41c416b 100644 --- a/defender/tasks.py +++ b/defender/tasks.py @@ -7,8 +7,10 @@ from celery import shared_task @shared_task() -def add_login_attempt_task(user_agent, ip_address, username, - http_accept, path_info, login_valid): +def add_login_attempt_task( + user_agent, ip_address, username, http_accept, path_info, login_valid +): """ Create a record for the login attempt """ - store_login_attempt(user_agent, ip_address, username, - http_accept, path_info, login_valid) + store_login_attempt( + user_agent, ip_address, username, http_accept, path_info, login_valid + ) diff --git a/defender/test.py b/defender/test.py index c72689d..b02f209 100644 --- a/defender/test.py +++ b/defender/test.py @@ -14,9 +14,11 @@ class DefenderTestCaseMixin(object): class DefenderTransactionTestCase(DefenderTestCaseMixin, TransactionTestCase): """Helper TransactionTestCase that cleans the cache after each test""" + pass class DefenderTestCase(DefenderTestCaseMixin, TestCase): """Helper TestCase that cleans the cache after each test""" + pass diff --git a/defender/test_settings.py b/defender/test_settings.py index 7e450a1..3371b10 100644 --- a/defender/test_settings.py +++ b/defender/test_settings.py @@ -1,56 +1,51 @@ import os from celery import Celery -DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': ':memory:', - } -} +DATABASES = {"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:",}} SITE_ID = 1 MIDDLEWARE = ( - 'django.middleware.common.CommonMiddleware', - 'django.contrib.sessions.middleware.SessionMiddleware', - 'django.contrib.auth.middleware.AuthenticationMiddleware', - 'defender.middleware.FailedLoginMiddleware', + "django.middleware.common.CommonMiddleware", + "django.contrib.sessions.middleware.SessionMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", + "defender.middleware.FailedLoginMiddleware", ) -ROOT_URLCONF = 'defender.test_urls' +ROOT_URLCONF = "defender.test_urls" INSTALLED_APPS = [ - 'django.contrib.auth', - 'django.contrib.contenttypes', - 'django.contrib.sessions', - 'django.contrib.sites', - 'django.contrib.messages', - 'django.contrib.admin', - 'defender', + "django.contrib.auth", + "django.contrib.contenttypes", + "django.contrib.sessions", + "django.contrib.sites", + "django.contrib.messages", + "django.contrib.admin", + "defender", ] TEMPLATES = [ { - 'BACKEND': 'django.template.backends.django.DjangoTemplates', - 'APP_DIRS': True, - 'OPTIONS': { - 'context_processors': [ - 'django.contrib.auth.context_processors.auth', - 'django.template.context_processors.debug', - 'django.template.context_processors.i18n', - 'django.template.context_processors.media', - 'django.template.context_processors.static', - 'django.template.context_processors.tz', - 'django.contrib.messages.context_processors.messages', + "BACKEND": "django.template.backends.django.DjangoTemplates", + "APP_DIRS": True, + "OPTIONS": { + "context_processors": [ + "django.contrib.auth.context_processors.auth", + "django.template.context_processors.debug", + "django.template.context_processors.i18n", + "django.template.context_processors.media", + "django.template.context_processors.static", + "django.template.context_processors.tz", + "django.contrib.messages.context_processors.messages", ], }, }, ] -SECRET_KEY = os.environ.get('SECRET_KEY', 'too-secret-for-test') +SECRET_KEY = os.environ.get("SECRET_KEY", "too-secret-for-test") -LOGIN_REDIRECT_URL = '/admin' +LOGIN_REDIRECT_URL = "/admin" DEFENDER_LOGIN_FAILURE_LIMIT = 10 DEFENDER_COOLOFF_TIME = 2 @@ -60,15 +55,15 @@ DEFENDER_MOCK_REDIS = True # celery settings CELERY_ALWAYS_EAGER = True -BROKER_BACKEND = 'memory' -BROKER_URL = 'memory://' +BROKER_BACKEND = "memory" +BROKER_URL = "memory://" # set the default Django settings module for the 'celery' program. -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.test_settings') +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "defender.test_settings") -app = Celery('defender') +app = Celery("defender") # Using a string here means the worker will not have to # pickle the object when using Windows. -app.config_from_object('django.conf:settings') +app.config_from_object("django.conf:settings") app.autodiscover_tasks(lambda: INSTALLED_APPS) diff --git a/defender/test_urls.py b/defender/test_urls.py index 3d65a26..1776d41 100644 --- a/defender/test_urls.py +++ b/defender/test_urls.py @@ -3,6 +3,4 @@ from django.contrib import admin from .urls import urlpatterns as original_urlpatterns -urlpatterns = [ - url(r'^admin/', admin.site.urls), -] + original_urlpatterns +urlpatterns = [url(r"^admin/", admin.site.urls),] + original_urlpatterns diff --git a/defender/tests.py b/defender/tests.py index 99e41fc..9609f86 100644 --- a/defender/tests.py +++ b/defender/tests.py @@ -16,6 +16,7 @@ from django.contrib.sessions.backends.db import SessionStore from django.http import HttpRequest, HttpResponse from django.test.client import RequestFactory from redis.client import Redis + try: from django.urls import reverse except ImportError: @@ -35,30 +36,36 @@ from .models import AccessAttempt from .test import DefenderTestCase, DefenderTransactionTestCase LOGIN_FORM_KEY = '
' -ADMIN_LOGIN_URL = reverse('admin:login') +ADMIN_LOGIN_URL = reverse("admin:login") DJANGO_VERSION = StrictVersion(get_version()) -VALID_USERNAME = VALID_PASSWORD = 'valid' -UPPER_USERNAME = 'VALID' +VALID_USERNAME = VALID_PASSWORD = "valid" +UPPER_USERNAME = "VALID" class AccessAttemptTest(DefenderTestCase): """ Test case using custom settings for testing """ - LOCKED_MESSAGE = 'Account locked: too many login attempts.' + + LOCKED_MESSAGE = "Account locked: too many login attempts." PERMANENT_LOCKED_MESSAGE = ( - LOCKED_MESSAGE + ' Contact an admin to unlock your account.' + LOCKED_MESSAGE + " Contact an admin to unlock your account." ) def _get_random_str(self): """ Returns a random str """ chars = string.ascii_uppercase + string.digits - return ''.join(random.choice(chars) for _ in range(20)) + return "".join(random.choice(chars) for _ in range(20)) - def _login(self, username=None, password=None, user_agent='test-browser', - remote_addr='127.0.0.1'): + def _login( + self, + username=None, + password=None, + user_agent="test-browser", + remote_addr="127.0.0.1", + ): """ Login a user. If the username or password is not provided it will use a random string instead. Use the VALID_USERNAME and VALID_PASSWORD to make a valid login. @@ -69,11 +76,12 @@ class AccessAttemptTest(DefenderTestCase): if password is None: password = self._get_random_str() - response = self.client.post(ADMIN_LOGIN_URL, { - 'username': username, - 'password': password, - LOGIN_FORM_KEY: 1, - }, HTTP_USER_AGENT=user_agent, REMOTE_ADDR=remote_addr) + response = self.client.post( + ADMIN_LOGIN_URL, + {"username": username, "password": password, LOGIN_FORM_KEY: 1,}, + HTTP_USER_AGENT=user_agent, + REMOTE_ADDR=remote_addr, + ) return response @@ -81,16 +89,14 @@ class AccessAttemptTest(DefenderTestCase): """ Create a valid user for login """ self.user = User.objects.create_superuser( - username=VALID_USERNAME, - email='test@example.com', - password=VALID_PASSWORD, + username=VALID_USERNAME, email="test@example.com", password=VALID_PASSWORD, ) def test_data_integrity_of_get_blocked_ips(self): """ Test whether data retrieved from redis via get_blocked_ips() is the same as the data saved """ - data_in = ['127.0.0.1', '4.2.2.1'] + data_in = ["127.0.0.1", "4.2.2.1"] for ip in data_in: utils.block_ip(ip) data_out = utils.get_blocked_ips() @@ -105,7 +111,7 @@ class AccessAttemptTest(DefenderTestCase): """ Test whether data retrieved from redis via get_blocked_usernames() is the same as the data saved """ - data_in = ['foo', 'bar'] + data_in = ["foo", "bar"] for username in data_in: utils.block_username(username) data_out = utils.get_blocked_usernames() @@ -164,7 +170,7 @@ class AccessAttemptTest(DefenderTestCase): one more time than failure limit """ for i in range(0, config.FAILURE_LIMIT): - ip = '74.125.239.{0}.'.format(i) + ip = "74.125.239.{0}.".format(i) response = self._login(username=VALID_USERNAME, remote_addr=ip) # Check if we are in the same login page self.assertContains(response, LOGIN_FORM_KEY) @@ -178,13 +184,13 @@ class AccessAttemptTest(DefenderTestCase): response = self.client.get(ADMIN_LOGIN_URL) self.assertContains(response, self.LOCKED_MESSAGE) - @patch('defender.config.USERNAME_FAILURE_LIMIT', 3) + @patch("defender.config.USERNAME_FAILURE_LIMIT", 3) def test_username_failure_limit(self): """ Tests that the username failure limit setting is respected when trying to login one more time than failure limit """ for i in range(0, config.USERNAME_FAILURE_LIMIT): - ip = '74.125.239.{0}.'.format(i) + ip = "74.125.239.{0}.".format(i) response = self._login(username=VALID_USERNAME, remote_addr=ip) # Check if we are in the same login page self.assertContains(response, LOGIN_FORM_KEY) @@ -198,13 +204,13 @@ class AccessAttemptTest(DefenderTestCase): response = self.client.get(ADMIN_LOGIN_URL) self.assertContains(response, LOGIN_FORM_KEY) - @patch('defender.config.IP_FAILURE_LIMIT', 3) + @patch("defender.config.IP_FAILURE_LIMIT", 3) def test_ip_failure_limit(self): """ Tests that the IP failure limit setting is respected when trying to login one more time than failure limit """ for i in range(0, config.IP_FAILURE_LIMIT): - username = 'john-doe__%d' % i + username = "john-doe__%d" % i response = self._login(username=username) # Check if we are in the same login page self.assertContains(response, LOGIN_FORM_KEY) @@ -221,8 +227,7 @@ class AccessAttemptTest(DefenderTestCase): def test_valid_login(self): """ Tests a valid login for a real username """ - response = self._login(username=VALID_USERNAME, - password=VALID_PASSWORD) + response = self._login(username=VALID_USERNAME, password=VALID_PASSWORD) self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302) def test_reset_after_valid_login(self): @@ -245,7 +250,7 @@ class AccessAttemptTest(DefenderTestCase): self._login(username=VALID_USERNAME) # try to login with a different user - response = self._login(username='myuser') + response = self._login(username="myuser") self.assertContains(response, self.LOCKED_MESSAGE) def test_blocked_username_cannot_login(self): @@ -253,11 +258,11 @@ class AccessAttemptTest(DefenderTestCase): another ip """ for i in range(0, config.FAILURE_LIMIT + 1): - ip = '74.125.239.{0}.'.format(i) + ip = "74.125.239.{0}.".format(i) self._login(username=VALID_USERNAME, remote_addr=ip) # try to login with a different ip - response = self._login(username=VALID_USERNAME, remote_addr='8.8.8.8') + response = self._login(username=VALID_USERNAME, remote_addr="8.8.8.8") self.assertContains(response, self.LOCKED_MESSAGE) def test_blocked_username_uppercase_saved_lower(self): @@ -266,7 +271,7 @@ class AccessAttemptTest(DefenderTestCase): within the cache. """ for i in range(0, config.FAILURE_LIMIT + 2): - ip = '74.125.239.{0}.'.format(i) + ip = "74.125.239.{0}.".format(i) self._login(username=UPPER_USERNAME, remote_addr=ip) self.assertNotIn(UPPER_USERNAME, utils.get_blocked_usernames()) @@ -278,7 +283,7 @@ class AccessAttemptTest(DefenderTestCase): """ for username in ["", None]: for i in range(0, config.FAILURE_LIMIT + 2): - ip = '74.125.239.{0}.'.format(i) + ip = "74.125.239.{0}.".format(i) self._login(username=username, remote_addr=ip) self.assertNotIn(username, utils.get_blocked_usernames()) @@ -311,15 +316,14 @@ class AccessAttemptTest(DefenderTestCase): def test_long_user_agent_valid(self): """ Tests if can handle a long user agent """ - long_user_agent = 'ie6' * 1024 + long_user_agent = "ie6" * 1024 response = self._login( - username=VALID_USERNAME, password=VALID_PASSWORD, - user_agent=long_user_agent + username=VALID_USERNAME, password=VALID_PASSWORD, user_agent=long_user_agent ) self.assertNotContains(response, LOGIN_FORM_KEY, status_code=302) - @patch('defender.config.BEHIND_REVERSE_PROXY', True) - @patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_FORWARDED_FOR') + @patch("defender.config.BEHIND_REVERSE_PROXY", True) + @patch("defender.config.REVERSE_PROXY_HEADER", "HTTP_X_FORWARDED_FOR") def test_get_ip_reverse_proxy(self): """ Tests if can handle a long user agent """ @@ -328,16 +332,16 @@ class AccessAttemptTest(DefenderTestCase): request.user = AnonymousUser() request.session = SessionStore() - request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24' - self.assertEqual(utils.get_ip(request), '192.168.24.24') + request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24" + self.assertEqual(utils.get_ip(request), "192.168.24.24") request_factory = RequestFactory() request = request_factory.get(ADMIN_LOGIN_URL) request.user = AnonymousUser() request.session = SessionStore() - request.META['REMOTE_ADDR'] = '24.24.24.24' - self.assertEqual(utils.get_ip(request), '24.24.24.24') + request.META["REMOTE_ADDR"] = "24.24.24.24" + self.assertEqual(utils.get_ip(request), "24.24.24.24") def test_get_ip(self): """ Tests if can handle a long user agent @@ -347,12 +351,12 @@ class AccessAttemptTest(DefenderTestCase): request.user = AnonymousUser() request.session = SessionStore() - self.assertEqual(utils.get_ip(request), '127.0.0.1') + self.assertEqual(utils.get_ip(request), "127.0.0.1") def test_long_user_agent_not_valid(self): """ Tests if can handle a long user agent with failure """ - long_user_agent = 'ie6' * 1024 + long_user_agent = "ie6" * 1024 for i in range(0, config.FAILURE_LIMIT + 1): response = self._login(user_agent=long_user_agent) @@ -365,12 +369,12 @@ class AccessAttemptTest(DefenderTestCase): self.test_failure_limit_by_ip_once() # Reset the ip so we can try again - utils.reset_failed_attempts(ip_address='127.0.0.1') + utils.reset_failed_attempts(ip_address="127.0.0.1") # Make a login attempt again self.test_valid_login() - @patch('defender.config.LOCKOUT_URL', 'http://localhost/othe/login/') + @patch("defender.config.LOCKOUT_URL", "http://localhost/othe/login/") def test_failed_login_redirect_to_url(self): """ Test to make sure that after lockout we send to the correct redirect URL """ @@ -384,14 +388,14 @@ class AccessAttemptTest(DefenderTestCase): # But we should get one now, check redirect make sure it is valid. response = self._login() self.assertEqual(response.status_code, 302) - self.assertEqual(response['Location'], 'http://localhost/othe/login/') + self.assertEqual(response["Location"], "http://localhost/othe/login/") # doing a get should also get locked out message response = self.client.get(ADMIN_LOGIN_URL) self.assertEqual(response.status_code, 302) - self.assertEqual(response['Location'], 'http://localhost/othe/login/') + self.assertEqual(response["Location"], "http://localhost/othe/login/") - @patch('defender.config.LOCKOUT_URL', '/o/login/') + @patch("defender.config.LOCKOUT_URL", "/o/login/") def test_failed_login_redirect_to_url_local(self): """ Test to make sure that after lockout we send to the correct redirect URL """ @@ -404,22 +408,22 @@ class AccessAttemptTest(DefenderTestCase): # RFC 7231 allows relative URIs in Location header. # Django from version 1.9 is support this: # https://docs.djangoproject.com/en/1.9/releases/1.9/#http-redirects-no-longer-forced-to-absolute-uris - lockout_url = 'http://testserver/o/login/' - if DJANGO_VERSION >= StrictVersion('1.9'): - lockout_url = '/o/login/' + lockout_url = "http://testserver/o/login/" + if DJANGO_VERSION >= StrictVersion("1.9"): + lockout_url = "/o/login/" # So, we shouldn't have gotten a lock-out yet. # But we should get one now, check redirect make sure it is valid. response = self._login() self.assertEqual(response.status_code, 302) - self.assertEqual(response['Location'], lockout_url) + self.assertEqual(response["Location"], lockout_url) # doing a get should also get locked out message response = self.client.get(ADMIN_LOGIN_URL) self.assertEqual(response.status_code, 302) - self.assertEqual(response['Location'], lockout_url) + self.assertEqual(response["Location"], lockout_url) - @patch('defender.config.LOCKOUT_TEMPLATE', 'defender/lockout.html') + @patch("defender.config.LOCKOUT_TEMPLATE", "defender/lockout.html") def test_failed_login_redirect_to_template(self): """ Test to make sure that after lockout we send to the correct template """ @@ -433,14 +437,14 @@ class AccessAttemptTest(DefenderTestCase): # But we should get one now, check template make sure it is valid. response = self._login() self.assertEqual(response.status_code, 200) - self.assertTemplateUsed(response, 'defender/lockout.html') + self.assertTemplateUsed(response, "defender/lockout.html") # doing a get should also get locked out message response = self.client.get(ADMIN_LOGIN_URL) self.assertEqual(response.status_code, 200) - self.assertTemplateUsed(response, 'defender/lockout.html') + self.assertTemplateUsed(response, "defender/lockout.html") - @patch('defender.config.COOLOFF_TIME', 0) + @patch("defender.config.COOLOFF_TIME", 0) def test_failed_login_no_cooloff(self): """ failed login no cooloff """ for i in range(0, config.FAILURE_LIMIT): @@ -467,166 +471,159 @@ class AccessAttemptTest(DefenderTestCase): def test_is_valid_ip(self): """ Test the is_valid_ip() method """ - self.assertEqual(utils.is_valid_ip('192.168.0.1'), True) - self.assertEqual(utils.is_valid_ip('130.80.100.24'), True) - self.assertEqual(utils.is_valid_ip('8.8.8.8'), True) - self.assertEqual(utils.is_valid_ip('127.0.0.1'), True) - self.assertEqual(utils.is_valid_ip('fish'), False) + self.assertEqual(utils.is_valid_ip("192.168.0.1"), True) + self.assertEqual(utils.is_valid_ip("130.80.100.24"), True) + self.assertEqual(utils.is_valid_ip("8.8.8.8"), True) + self.assertEqual(utils.is_valid_ip("127.0.0.1"), True) + self.assertEqual(utils.is_valid_ip("fish"), False) self.assertEqual(utils.is_valid_ip(None), False) - self.assertEqual(utils.is_valid_ip(''), False) - self.assertEqual(utils.is_valid_ip('0x41.0x41.0x41.0x41'), False) - self.assertEqual(utils.is_valid_ip('192.168.100.34.y'), False) + self.assertEqual(utils.is_valid_ip(""), False) + self.assertEqual(utils.is_valid_ip("0x41.0x41.0x41.0x41"), False) + self.assertEqual(utils.is_valid_ip("192.168.100.34.y"), False) self.assertEqual( - utils.is_valid_ip('2001:0db8:85a3:0000:0000:8a2e:0370:7334'), True) - self.assertEqual( - utils.is_valid_ip('2001:db8:85a3:0:0:8a2e:370:7334'), True) - self.assertEqual( - utils.is_valid_ip('2001:db8:85a3::8a2e:370:7334'), True) - self.assertEqual( - utils.is_valid_ip('::ffff:192.0.2.128'), True) - self.assertEqual( - utils.is_valid_ip('::ffff:8.8.8.8'), True) + utils.is_valid_ip("2001:0db8:85a3:0000:0000:8a2e:0370:7334"), True + ) + self.assertEqual(utils.is_valid_ip("2001:db8:85a3:0:0:8a2e:370:7334"), True) + self.assertEqual(utils.is_valid_ip("2001:db8:85a3::8a2e:370:7334"), True) + self.assertEqual(utils.is_valid_ip("::ffff:192.0.2.128"), True) + self.assertEqual(utils.is_valid_ip("::ffff:8.8.8.8"), True) def test_parse_redis_url(self): """ test the parse_redis_url method """ # full regular conf = parse_redis_url("redis://user:password@localhost2:1234/2") - self.assertEqual(conf.get('HOST'), 'localhost2') - self.assertEqual(conf.get('DB'), 2) - self.assertEqual(conf.get('PASSWORD'), 'password') - self.assertEqual(conf.get('PORT'), 1234) + self.assertEqual(conf.get("HOST"), "localhost2") + self.assertEqual(conf.get("DB"), 2) + self.assertEqual(conf.get("PASSWORD"), "password") + self.assertEqual(conf.get("PORT"), 1234) # full non local conf = parse_redis_url("redis://user:pass@www.localhost.com:1234/2") - self.assertEqual(conf.get('HOST'), 'www.localhost.com') - self.assertEqual(conf.get('DB'), 2) - self.assertEqual(conf.get('PASSWORD'), 'pass') - self.assertEqual(conf.get('PORT'), 1234) + self.assertEqual(conf.get("HOST"), "www.localhost.com") + self.assertEqual(conf.get("DB"), 2) + self.assertEqual(conf.get("PASSWORD"), "pass") + self.assertEqual(conf.get("PORT"), 1234) # no user name conf = parse_redis_url("redis://password@localhost2:1234/2") - self.assertEqual(conf.get('HOST'), 'localhost2') - self.assertEqual(conf.get('DB'), 2) - self.assertEqual(conf.get('PASSWORD'), None) - self.assertEqual(conf.get('PORT'), 1234) + self.assertEqual(conf.get("HOST"), "localhost2") + self.assertEqual(conf.get("DB"), 2) + self.assertEqual(conf.get("PASSWORD"), None) + self.assertEqual(conf.get("PORT"), 1234) # no user name 2 with colon conf = parse_redis_url("redis://:password@localhost2:1234/2") - self.assertEqual(conf.get('HOST'), 'localhost2') - self.assertEqual(conf.get('DB'), 2) - self.assertEqual(conf.get('PASSWORD'), 'password') - self.assertEqual(conf.get('PORT'), 1234) + self.assertEqual(conf.get("HOST"), "localhost2") + self.assertEqual(conf.get("DB"), 2) + self.assertEqual(conf.get("PASSWORD"), "password") + self.assertEqual(conf.get("PORT"), 1234) # Empty conf = parse_redis_url(None) - self.assertEqual(conf.get('HOST'), 'localhost') - self.assertEqual(conf.get('DB'), 0) - self.assertEqual(conf.get('PASSWORD'), None) - self.assertEqual(conf.get('PORT'), 6379) + self.assertEqual(conf.get("HOST"), "localhost") + self.assertEqual(conf.get("DB"), 0) + self.assertEqual(conf.get("PASSWORD"), None) + self.assertEqual(conf.get("PORT"), 6379) # no db conf = parse_redis_url("redis://:password@localhost2:1234") - self.assertEqual(conf.get('HOST'), 'localhost2') - self.assertEqual(conf.get('DB'), 0) - self.assertEqual(conf.get('PASSWORD'), 'password') - self.assertEqual(conf.get('PORT'), 1234) + self.assertEqual(conf.get("HOST"), "localhost2") + self.assertEqual(conf.get("DB"), 0) + self.assertEqual(conf.get("PASSWORD"), "password") + self.assertEqual(conf.get("PORT"), 1234) # no password conf = parse_redis_url("redis://localhost2:1234/0") - self.assertEqual(conf.get('HOST'), 'localhost2') - self.assertEqual(conf.get('DB'), 0) - self.assertEqual(conf.get('PASSWORD'), None) - self.assertEqual(conf.get('PORT'), 1234) + self.assertEqual(conf.get("HOST"), "localhost2") + self.assertEqual(conf.get("DB"), 0) + self.assertEqual(conf.get("PASSWORD"), None) + self.assertEqual(conf.get("PORT"), 1234) - @patch('defender.config.DEFENDER_REDIS_NAME', 'default') + @patch("defender.config.DEFENDER_REDIS_NAME", "default") def test_get_redis_connection_django_conf(self): """ get the redis connection """ redis_client = get_redis_connection() self.assertIsInstance(redis_client, Redis) - @patch('defender.config.DEFENDER_REDIS_NAME', 'bad-key') + @patch("defender.config.DEFENDER_REDIS_NAME", "bad-key") def test_get_redis_connection_django_conf_wrong_key(self): """ see if we get the correct error """ - error_msg = ('The cache bad-key was not found on the django ' - 'cache settings.') + error_msg = "The cache bad-key was not found on the django " "cache settings." self.assertRaisesMessage(KeyError, error_msg, get_redis_connection) def test_get_ip_address_from_request(self): """ get ip from request, make sure it is correct """ req = HttpRequest() - req.META['REMOTE_ADDR'] = '1.2.3.4' + req.META["REMOTE_ADDR"] = "1.2.3.4" ip = utils.get_ip_address_from_request(req) - self.assertEqual(ip, '1.2.3.4') + self.assertEqual(ip, "1.2.3.4") req = HttpRequest() - req.META['REMOTE_ADDR'] = '1.2.3.4 ' + req.META["REMOTE_ADDR"] = "1.2.3.4 " ip = utils.get_ip_address_from_request(req) - self.assertEqual(ip, '1.2.3.4') + self.assertEqual(ip, "1.2.3.4") req = HttpRequest() - req.META['REMOTE_ADDR'] = '192.168.100.34.y' + req.META["REMOTE_ADDR"] = "192.168.100.34.y" ip = utils.get_ip_address_from_request(req) - self.assertEqual(ip, '127.0.0.1') + self.assertEqual(ip, "127.0.0.1") req = HttpRequest() - req.META['REMOTE_ADDR'] = 'cat' + req.META["REMOTE_ADDR"] = "cat" ip = utils.get_ip_address_from_request(req) - self.assertEqual(ip, '127.0.0.1') + self.assertEqual(ip, "127.0.0.1") req = HttpRequest() ip = utils.get_ip_address_from_request(req) - self.assertEqual(ip, '127.0.0.1') + self.assertEqual(ip, "127.0.0.1") - @patch('defender.config.BEHIND_REVERSE_PROXY', True) - @patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_PROXIED') + @patch("defender.config.BEHIND_REVERSE_PROXY", True) + @patch("defender.config.REVERSE_PROXY_HEADER", "HTTP_X_PROXIED") def test_get_ip_reverse_proxy_custom_header(self): """ make sure the ip is correct behind reverse proxy """ req = HttpRequest() - req.META['HTTP_X_PROXIED'] = '1.2.3.4' - self.assertEqual(utils.get_ip(req), '1.2.3.4') + req.META["HTTP_X_PROXIED"] = "1.2.3.4" + self.assertEqual(utils.get_ip(req), "1.2.3.4") req = HttpRequest() - req.META['HTTP_X_PROXIED'] = '1.2.3.4, 5.6.7.8, 127.0.0.1' - self.assertEqual(utils.get_ip(req), '1.2.3.4') + req.META["HTTP_X_PROXIED"] = "1.2.3.4, 5.6.7.8, 127.0.0.1" + self.assertEqual(utils.get_ip(req), "1.2.3.4") req = HttpRequest() - req.META['REMOTE_ADDR'] = '1.2.3.4' - self.assertEqual(utils.get_ip(req), '1.2.3.4') + req.META["REMOTE_ADDR"] = "1.2.3.4" + self.assertEqual(utils.get_ip(req), "1.2.3.4") - @patch('defender.config.BEHIND_REVERSE_PROXY', True) - @patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_REAL_IP') + @patch("defender.config.BEHIND_REVERSE_PROXY", True) + @patch("defender.config.REVERSE_PROXY_HEADER", "HTTP_X_REAL_IP") def test_get_user_attempts(self): """ Get the user attempts make sure they are correct """ ip_attempts = random.randint(3, 12) username_attempts = random.randint(3, 12) for i in range(0, ip_attempts): - utils.increment_key(utils.get_ip_attempt_cache_key('1.2.3.4')) + utils.increment_key(utils.get_ip_attempt_cache_key("1.2.3.4")) for i in range(0, username_attempts): - utils.increment_key(utils.get_username_attempt_cache_key('foobar')) + utils.increment_key(utils.get_username_attempt_cache_key("foobar")) req = HttpRequest() - req.POST['username'] = 'foobar' - req.META['HTTP_X_REAL_IP'] = '1.2.3.4' + req.POST["username"] = "foobar" + req.META["HTTP_X_REAL_IP"] = "1.2.3.4" self.assertEqual( utils.get_user_attempts(req), max(ip_attempts, username_attempts) ) req = HttpRequest() - req.POST['username'] = 'foobar' - req.META['HTTP_X_REAL_IP'] = '5.6.7.8' - self.assertEqual( - utils.get_user_attempts(req), username_attempts - ) + req.POST["username"] = "foobar" + req.META["HTTP_X_REAL_IP"] = "5.6.7.8" + self.assertEqual(utils.get_user_attempts(req), username_attempts) req = HttpRequest() - req.POST['username'] = 'barfoo' - req.META['HTTP_X_REAL_IP'] = '1.2.3.4' - self.assertEqual( - utils.get_user_attempts(req), ip_attempts - ) + req.POST["username"] = "barfoo" + req.META["HTTP_X_REAL_IP"] = "1.2.3.4" + self.assertEqual(utils.get_user_attempts(req), ip_attempts) def test_admin(self): """ test the admin pages for this app """ from .admin import AccessAttemptAdmin + AccessAttemptAdmin def test_unblock_view_user_with_plus(self): @@ -636,16 +633,18 @@ class AccessAttemptTest(DefenderTestCase): Regression test for #GH76. """ - reverse('defender_unblock_username_view', - kwargs={'username': 'user+test@test.tld'}) + reverse( + "defender_unblock_username_view", kwargs={"username": "user+test@test.tld"} + ) def test_unblock_view_user_with_special_symbols(self): """ There is an available admin view for unblocking a user with a exclamation mark sign in the username. """ - reverse('defender_unblock_username_view', - kwargs={'username': 'user!test@test.tld'}) + reverse( + "defender_unblock_username_view", kwargs={"username": "user!test@test.tld"} + ) def test_decorator_middleware(self): # because watch_login is called twice in this test (once by the @@ -678,7 +677,7 @@ class AccessAttemptTest(DefenderTestCase): response = self.client.get(ADMIN_LOGIN_URL) self.assertNotContains(response, self.LOCKED_MESSAGE) - @patch('defender.config.USE_CELERY', True) + @patch("defender.config.USE_CELERY", True) def test_use_celery(self): """ Check that use celery works """ @@ -694,16 +693,15 @@ class AccessAttemptTest(DefenderTestCase): response = self._login() self.assertContains(response, self.LOCKED_MESSAGE) - self.assertEqual(AccessAttempt.objects.count(), - config.FAILURE_LIMIT + 1) + self.assertEqual(AccessAttempt.objects.count(), config.FAILURE_LIMIT + 1) self.assertIsNotNone(str(AccessAttempt.objects.all()[0])) - @patch('defender.config.LOCKOUT_BY_IP_USERNAME', True) + @patch("defender.config.LOCKOUT_BY_IP_USERNAME", True) def test_lockout_by_ip_and_username(self): """ Check that lockout still works when locking out by IP and Username combined """ - username = 'testy' + username = "testy" for i in range(0, config.FAILURE_LIMIT): response = self._login(username=username) @@ -726,23 +724,23 @@ class AccessAttemptTest(DefenderTestCase): # We shouldn't get a lockout message when attempting to use a # different ip address - ip = '74.125.239.60' + ip = "74.125.239.60" response = self._login(username=VALID_USERNAME, remote_addr=ip) # Check if we are in the same login page self.assertContains(response, LOGIN_FORM_KEY) - @patch('defender.config.DISABLE_IP_LOCKOUT', True) + @patch("defender.config.DISABLE_IP_LOCKOUT", True) def test_disable_ip_lockout(self): """ Check that lockout still works when we disable IP Lock out """ - username = 'testy' + username = "testy" # try logging in with the same IP, but different username # we shouldn't be blocked. # same IP different, usernames - ip = '74.125.239.60' + ip = "74.125.239.60" for i in range(0, config.FAILURE_LIMIT + 10): - login_username = u"{0}{1}".format(username, i) + login_username = "{0}{1}".format(username, i) response = self._login(username=login_username, remote_addr=ip) # Check if we are in the same login page self.assertContains(response, LOGIN_FORM_KEY) @@ -770,7 +768,7 @@ class AccessAttemptTest(DefenderTestCase): # We shouldn't get a lockout message when attempting to use a # different ip address - second_ip = '74.125.239.99' + second_ip = "74.125.239.99" response = self._login(username=VALID_USERNAME, remote_addr=second_ip) # Check if we are in the same login page self.assertContains(response, LOGIN_FORM_KEY) @@ -786,22 +784,22 @@ class AccessAttemptTest(DefenderTestCase): data_out = utils.get_blocked_ips() self.assertEqual(data_out, []) - @patch('defender.config.DISABLE_USERNAME_LOCKOUT', True) + @patch("defender.config.DISABLE_USERNAME_LOCKOUT", True) def test_disable_username_lockout(self): """ Check lockouting still works when we disable username lockout """ - username = 'testy' + username = "testy" # try logging in with the same username, but different IPs. # we shouldn't be locked. for i in range(0, config.FAILURE_LIMIT + 10): - ip = '74.125.126.{0}'.format(i) + ip = "74.125.126.{0}".format(i) response = self._login(username=username, remote_addr=ip) # Check if we are in the same login page self.assertContains(response, LOGIN_FORM_KEY) # same ip and same username - ip = '74.125.127.1' + ip = "74.125.127.1" for i in range(0, config.FAILURE_LIMIT): response = self._login(username=username, remote_addr=ip) # Check if we are in the same login page @@ -818,7 +816,7 @@ class AccessAttemptTest(DefenderTestCase): # We shouldn't get a lockout message when attempting to use a # different ip address to be sure that username is not blocked. - second_ip = '74.125.127.2' + second_ip = "74.125.127.2" response = self._login(username=username, remote_addr=second_ip) # Check if we are in the same login page self.assertContains(response, LOGIN_FORM_KEY) @@ -834,8 +832,8 @@ class AccessAttemptTest(DefenderTestCase): data_out = utils.get_blocked_usernames() self.assertEqual(data_out, []) - @patch('defender.config.BEHIND_REVERSE_PROXY', True) - @patch('defender.config.IP_FAILURE_LIMIT', 3) + @patch("defender.config.BEHIND_REVERSE_PROXY", True) + @patch("defender.config.IP_FAILURE_LIMIT", 3) def test_login_blocked_for_non_standard_login_views_without_msg(self): """ Check that a view wich returns the expected status code is causing @@ -849,11 +847,11 @@ class AccessAttemptTest(DefenderTestCase): return HttpResponse(status=401) request_factory = RequestFactory() - request = request_factory.post('api/login') + request = request_factory.post("api/login") request.user = AnonymousUser() request.session = SessionStore() - request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24' + request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24" for _ in range(3): fake_api_401_login_view_without_msg(request) @@ -864,27 +862,27 @@ class AccessAttemptTest(DefenderTestCase): fake_api_401_login_view_without_msg(request) data_out = utils.get_blocked_ips() - self.assertEqual(data_out, ['192.168.24.24']) + self.assertEqual(data_out, ["192.168.24.24"]) - @patch('defender.config.BEHIND_REVERSE_PROXY', True) - @patch('defender.config.IP_FAILURE_LIMIT', 3) + @patch("defender.config.BEHIND_REVERSE_PROXY", True) + @patch("defender.config.IP_FAILURE_LIMIT", 3) def test_login_blocked_for_non_standard_login_views_with_msg(self): """ Check that a view wich returns the expected status code and the expected message is causing the IP to be locked out. """ - @watch_login(status_code=401, msg='Invalid credentials') + + @watch_login(status_code=401, msg="Invalid credentials") def fake_api_401_login_view_without_msg(request): """ Fake the api login with 401 """ - return HttpResponse('Sorry, Invalid credentials', - status=401) + return HttpResponse("Sorry, Invalid credentials", status=401) request_factory = RequestFactory() - request = request_factory.post('api/login') + request = request_factory.post("api/login") request.user = AnonymousUser() request.session = SessionStore() - request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24' + request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24" for _ in range(3): fake_api_401_login_view_without_msg(request) @@ -895,27 +893,27 @@ class AccessAttemptTest(DefenderTestCase): fake_api_401_login_view_without_msg(request) data_out = utils.get_blocked_ips() - self.assertEqual(data_out, ['192.168.24.24']) + self.assertEqual(data_out, ["192.168.24.24"]) - @patch('defender.config.BEHIND_REVERSE_PROXY', True) - @patch('defender.config.IP_FAILURE_LIMIT', 3) + @patch("defender.config.BEHIND_REVERSE_PROXY", True) + @patch("defender.config.IP_FAILURE_LIMIT", 3) def test_login_non_blocked_for_non_standard_login_views_different_msg(self): """ Check that a view wich returns the expected status code but not the expected message is not causing the IP to be locked out. """ - @watch_login(status_code=401, msg='Invalid credentials') + + @watch_login(status_code=401, msg="Invalid credentials") def fake_api_401_login_view_without_msg(request): """ Fake the api login with 401 """ - return HttpResponse('Ups, wrong credentials', - status=401) + return HttpResponse("Ups, wrong credentials", status=401) request_factory = RequestFactory() - request = request_factory.post('api/login') + request = request_factory.post("api/login") request.user = AnonymousUser() request.session = SessionStore() - request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24' + request.META["HTTP_X_FORWARDED_FOR"] = "192.168.24.24" for _ in range(4): fake_api_401_login_view_without_msg(request) @@ -935,17 +933,17 @@ class SignalTest(DefenderTestCase): self.blocked_ip = ip_address ip_block_signal.connect(handler) - utils.block_ip('8.8.8.8') - self.assertEqual(self.blocked_ip, '8.8.8.8') + utils.block_ip("8.8.8.8") + self.assertEqual(self.blocked_ip, "8.8.8.8") def test_should_send_signal_when_unblocking_ip(self): - self.blocked_ip = ('8.8.8.8') + self.blocked_ip = "8.8.8.8" def handler(sender, ip_address, **kwargs): self.blocked_ip = None ip_unblock_signal.connect(handler) - utils.unblock_ip('8.8.8.8') + utils.unblock_ip("8.8.8.8") self.assertIsNone(self.blocked_ip) def test_should_not_send_signal_when_ip_already_blocked(self): @@ -956,10 +954,10 @@ class SignalTest(DefenderTestCase): ip_block_signal.connect(handler) - key = utils.get_ip_blocked_cache_key('8.8.8.8') - utils.REDIS_SERVER.set(key, 'blocked') + key = utils.get_ip_blocked_cache_key("8.8.8.8") + utils.REDIS_SERVER.set(key, "blocked") - utils.block_ip('8.8.8.8') + utils.block_ip("8.8.8.8") self.assertIsNone(self.blocked_ip) def test_should_send_signal_when_blocking_username(self): @@ -969,17 +967,17 @@ class SignalTest(DefenderTestCase): self.blocked_username = username username_block_signal.connect(handler) - utils.block_username('richard_hendricks') - self.assertEqual(self.blocked_username, 'richard_hendricks') + utils.block_username("richard_hendricks") + self.assertEqual(self.blocked_username, "richard_hendricks") def test_should_send_signal_when_unblocking_username(self): - self.blocked_username = 'richard_hendricks' + self.blocked_username = "richard_hendricks" def handler(sender, username, **kwargs): self.blocked_username = None username_unblock_signal.connect(handler) - utils.unblock_username('richard_hendricks') + utils.unblock_username("richard_hendricks") self.assertIsNone(self.blocked_username) def test_should_not_send_signal_when_username_already_blocked(self): @@ -990,16 +988,17 @@ class SignalTest(DefenderTestCase): username_block_signal.connect(handler) - key = utils.get_username_blocked_cache_key('richard hendricks') - utils.REDIS_SERVER.set(key, 'blocked') + key = utils.get_username_blocked_cache_key("richard hendricks") + utils.REDIS_SERVER.set(key, "blocked") - utils.block_ip('richard hendricks') + utils.block_ip("richard hendricks") self.assertIsNone(self.blocked_username) class DefenderTestCaseTest(DefenderTestCase): """ Make sure that we're cleaning the cache between tests """ - key = 'test_key' + + key = "test_key" def test_first_incr(self): """ first increment """ @@ -1016,7 +1015,8 @@ class DefenderTestCaseTest(DefenderTestCase): class DefenderTransactionTestCaseTest(DefenderTransactionTestCase): """ Make sure that we're cleaning the cache between tests """ - key = 'test_key' + + key = "test_key" def test_first_incr(self): """ first increment """ @@ -1036,7 +1036,7 @@ class TestUtils(DefenderTestCase): def test_username_blocking(self): """ test username blocking """ - username = 'foo' + username = "foo" self.assertFalse(utils.is_user_already_locked(username)) utils.block_username(username) self.assertTrue(utils.is_user_already_locked(username)) @@ -1045,7 +1045,7 @@ class TestUtils(DefenderTestCase): def test_ip_address_blocking(self): """ ip address blocking """ - ip = '1.2.3.4' + ip = "1.2.3.4" self.assertFalse(utils.is_source_ip_already_locked(ip)) utils.block_ip(ip) self.assertTrue(utils.is_source_ip_already_locked(ip)) @@ -1059,18 +1059,14 @@ class TestUtils(DefenderTestCase): request = request_factory.get(ADMIN_LOGIN_URL) request.user = AnonymousUser() request.session = SessionStore() - username = 'johndoe' + username = "johndoe" utils.block_username(request.user.username) self.assertFalse(utils.is_already_locked(request, username=username)) utils.check_request(request, True, username=username) - self.assertEqual( - utils.get_user_attempts(request, username=username), 1 - ) + self.assertEqual(utils.get_user_attempts(request, username=username), 1) utils.add_login_attempt_to_db(request, True, username=username) - self.assertEqual( - AccessAttempt.objects.filter(username=username).count(), 1 - ) + self.assertEqual(AccessAttempt.objects.filter(username=username).count(), 1) diff --git a/defender/travis_settings.py b/defender/travis_settings.py index b5f40e1..868c963 100644 --- a/defender/travis_settings.py +++ b/defender/travis_settings.py @@ -2,63 +2,55 @@ import os from celery import Celery -DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': ':memory:', - } -} +DATABASES = {"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:",}} CACHES = { - 'default': { - 'BACKEND': 'redis_cache.RedisCache', - 'LOCATION': 'localhost:6379', - } + "default": {"BACKEND": "redis_cache.RedisCache", "LOCATION": "localhost:6379",} } SITE_ID = 1 MIDDLEWARE = ( - 'django.middleware.common.CommonMiddleware', - 'django.contrib.sessions.middleware.SessionMiddleware', - 'django.contrib.auth.middleware.AuthenticationMiddleware', - 'django.contrib.messages.middleware.MessageMiddleware', - 'defender.middleware.FailedLoginMiddleware', + "django.middleware.common.CommonMiddleware", + "django.contrib.sessions.middleware.SessionMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", + "django.contrib.messages.middleware.MessageMiddleware", + "defender.middleware.FailedLoginMiddleware", ) -ROOT_URLCONF = 'defender.test_urls' +ROOT_URLCONF = "defender.test_urls" INSTALLED_APPS = [ - 'django.contrib.auth', - 'django.contrib.contenttypes', - 'django.contrib.sessions', - 'django.contrib.sites', - 'django.contrib.messages', - 'django.contrib.admin', - 'defender', + "django.contrib.auth", + "django.contrib.contenttypes", + "django.contrib.sessions", + "django.contrib.sites", + "django.contrib.messages", + "django.contrib.admin", + "defender", ] TEMPLATES = [ { - 'BACKEND': 'django.template.backends.django.DjangoTemplates', - 'APP_DIRS': True, - 'OPTIONS': { - 'context_processors': [ - 'django.contrib.auth.context_processors.auth', - 'django.template.context_processors.debug', - 'django.template.context_processors.i18n', - 'django.template.context_processors.media', - 'django.template.context_processors.static', - 'django.template.context_processors.tz', - 'django.contrib.messages.context_processors.messages', + "BACKEND": "django.template.backends.django.DjangoTemplates", + "APP_DIRS": True, + "OPTIONS": { + "context_processors": [ + "django.contrib.auth.context_processors.auth", + "django.template.context_processors.debug", + "django.template.context_processors.i18n", + "django.template.context_processors.media", + "django.template.context_processors.static", + "django.template.context_processors.tz", + "django.contrib.messages.context_processors.messages", ], }, }, ] -SECRET_KEY = os.environ.get('SECRET_KEY', 'too-secret-for-test') +SECRET_KEY = os.environ.get("SECRET_KEY", "too-secret-for-test") -LOGIN_REDIRECT_URL = '/admin' +LOGIN_REDIRECT_URL = "/admin" DEFENDER_LOGIN_FAILURE_LIMIT = 10 DEFENDER_COOLOFF_TIME = 2 @@ -68,15 +60,15 @@ DEFENDER_MOCK_REDIS = False # Celery settings: CELERY_ALWAYS_EAGER = True -BROKER_BACKEND = 'memory' -BROKER_URL = 'memory://' +BROKER_BACKEND = "memory" +BROKER_URL = "memory://" # set the default Django settings module for the 'celery' program. -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.travis_settings') +os.environ.setdefault("DJANGO_SETTINGS_MODULE", "defender.travis_settings") -app = Celery('defender') +app = Celery("defender") # Using a string here means the worker will not have to # pickle the object when using Windows. -app.config_from_object('django.conf:settings') +app.config_from_object("django.conf:settings") app.autodiscover_tasks(lambda: INSTALLED_APPS) diff --git a/defender/urls.py b/defender/urls.py index bfce50a..174d5e8 100644 --- a/defender/urls.py +++ b/defender/urls.py @@ -2,11 +2,15 @@ from django.conf.urls import url from .views import block_view, unblock_ip_view, unblock_username_view urlpatterns = [ - url(r'^blocks/$', block_view, - name="defender_blocks_view"), - url(r'^blocks/ip/(?P[A-Za-z0-9-._]+)/unblock$', unblock_ip_view, - name="defender_unblock_ip_view"), - url(r'^blocks/username/(?P[\w]+[^\/]*)/unblock$', + url(r"^blocks/$", block_view, name="defender_blocks_view"), + url( + r"^blocks/ip/(?P[A-Za-z0-9-._]+)/unblock$", + unblock_ip_view, + name="defender_unblock_ip_view", + ), + url( + r"^blocks/username/(?P[\w]+[^\/]*)/unblock$", unblock_username_view, - name="defender_unblock_username_view"), + name="defender_unblock_username_view", + ), ] diff --git a/defender/utils.py b/defender/utils.py index 06949da..3c195c7 100644 --- a/defender/utils.py +++ b/defender/utils.py @@ -10,8 +10,12 @@ from django.utils.module_loading import import_string from .connection import get_redis_connection from . import config from .data import store_login_attempt -from .signals import (send_username_block_signal, send_ip_block_signal, - send_username_unblock_signal, send_ip_unblock_signal) +from .signals import ( + send_username_block_signal, + send_ip_block_signal, + send_username_unblock_signal, + send_ip_unblock_signal, +) REDIS_SERVER = get_redis_connection() @@ -33,18 +37,18 @@ def is_valid_ip(ip_address): def get_ip_address_from_request(request): """ Makes the best attempt to get the client's real IP or return the loopback """ - remote_addr = request.META.get('REMOTE_ADDR', '') + remote_addr = request.META.get("REMOTE_ADDR", "") if remote_addr and is_valid_ip(remote_addr): return remote_addr.strip() - return '127.0.0.1' + return "127.0.0.1" def get_ip(request): """ get the ip address from the request """ if config.BEHIND_REVERSE_PROXY: - ip_address = request.META.get(config.REVERSE_PROXY_HEADER, '') + ip_address = request.META.get(config.REVERSE_PROXY_HEADER, "") ip_address = ip_address.split(",", 1)[0].strip() - if ip_address == '': + if ip_address == "": ip_address = get_ip_address_from_request(request) else: ip_address = get_ip_address_from_request(request) @@ -68,8 +72,9 @@ def get_ip_attempt_cache_key(ip_address): def get_username_attempt_cache_key(username): """ get the cache key by username """ - return "{0}:failed:username:{1}".format(config.CACHE_PREFIX, - lower_username(username)) + return "{0}:failed:username:{1}".format( + config.CACHE_PREFIX, lower_username(username) + ) def get_ip_blocked_cache_key(ip_address): @@ -79,8 +84,9 @@ def get_ip_blocked_cache_key(ip_address): def get_username_blocked_cache_key(username): """ get the cache key by username """ - return "{0}:blocked:username:{1}".format(config.CACHE_PREFIX, - lower_username(username)) + return "{0}:blocked:username:{1}".format( + config.CACHE_PREFIX, lower_username(username) + ) def strip_keys(key_list): @@ -105,8 +111,7 @@ def get_blocked_ips(): # There are no blocked IP's since we disabled them. return [] key = get_ip_blocked_cache_key("*") - key_list = [redis_key.decode('utf-8') - for redis_key in REDIS_SERVER.keys(key)] + key_list = [redis_key.decode("utf-8") for redis_key in REDIS_SERVER.keys(key)] return strip_keys(key_list) @@ -116,8 +121,7 @@ def get_blocked_usernames(): # There are no blocked usernames since we disabled them. return [] key = get_username_blocked_cache_key("*") - key_list = [redis_key.decode('utf-8') - for redis_key in REDIS_SERVER.keys(key)] + key_list = [redis_key.decode("utf-8") for redis_key in REDIS_SERVER.keys(key)] return strip_keys(key_list) @@ -138,9 +142,7 @@ def username_from_request(request): return None -get_username_from_request = import_string( - config.GET_USERNAME_FROM_REQUEST_PATH -) +get_username_from_request = import_string(config.GET_USERNAME_FROM_REQUEST_PATH) def get_user_attempts(request, get_username=get_username_from_request, username=None): @@ -177,9 +179,9 @@ def block_ip(ip_address): already_blocked = is_source_ip_already_locked(ip_address) key = get_ip_blocked_cache_key(ip_address) if config.COOLOFF_TIME: - REDIS_SERVER.set(key, 'blocked', config.COOLOFF_TIME) + REDIS_SERVER.set(key, "blocked", config.COOLOFF_TIME) else: - REDIS_SERVER.set(key, 'blocked') + REDIS_SERVER.set(key, "blocked") if not already_blocked: send_ip_block_signal(ip_address) @@ -195,9 +197,9 @@ def block_username(username): already_blocked = is_user_already_locked(username) key = get_username_blocked_cache_key(username) if config.COOLOFF_TIME: - REDIS_SERVER.set(key, 'blocked', config.COOLOFF_TIME) + REDIS_SERVER.set(key, "blocked", config.COOLOFF_TIME) else: - REDIS_SERVER.set(key, 'blocked') + REDIS_SERVER.set(key, "blocked") if not already_blocked: send_username_block_signal(username) @@ -291,9 +293,9 @@ def lockout_response(request): """ if we are locked out, here is the response """ if config.LOCKOUT_TEMPLATE: context = { - 'cooloff_time_seconds': config.COOLOFF_TIME, - 'cooloff_time_minutes': config.COOLOFF_TIME / 60, - 'failure_limit': config.FAILURE_LIMIT, + "cooloff_time_seconds": config.COOLOFF_TIME, + "cooloff_time_minutes": config.COOLOFF_TIME / 60, + "failure_limit": config.FAILURE_LIMIT, } return render(request, config.LOCKOUT_TEMPLATE, context) @@ -301,11 +303,14 @@ def lockout_response(request): return HttpResponseRedirect(config.LOCKOUT_URL) if config.COOLOFF_TIME: - return HttpResponse("Account locked: too many login attempts. " - "Please try again later.") + return HttpResponse( + "Account locked: too many login attempts. " "Please try again later." + ) else: - return HttpResponse("Account locked: too many login attempts. " - "Contact an admin to unlock your account.") + return HttpResponse( + "Account locked: too many login attempts. " + "Contact an admin to unlock your account." + ) def is_user_already_locked(username): @@ -339,9 +344,9 @@ def is_already_locked(request, get_username=get_username_from_request, username= return ip_blocked or user_blocked -def check_request(request, login_unsuccessful, - get_username=get_username_from_request, - username=None): +def check_request( + request, login_unsuccessful, get_username=get_username_from_request, username=None +): """ check the request, and process results""" ip_address = get_ip(request) username = username or get_username(request) @@ -355,9 +360,9 @@ def check_request(request, login_unsuccessful, return record_failed_attempt(ip_address, username) -def add_login_attempt_to_db(request, login_valid, - get_username=get_username_from_request, - username=None): +def add_login_attempt_to_db( + request, login_valid, get_username=get_username_from_request, username=None +): """ Create a record for the login attempt If using celery call celery task, if not, call the method normally """ @@ -367,15 +372,18 @@ def add_login_attempt_to_db(request, login_valid, username = username or get_username(request) - user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] + user_agent = request.META.get("HTTP_USER_AGENT", "")[:255] ip_address = get_ip(request) - http_accept = request.META.get('HTTP_ACCEPT', '') - path_info = request.META.get('PATH_INFO', '') + http_accept = request.META.get("HTTP_ACCEPT", "") + path_info = request.META.get("PATH_INFO", "") if config.USE_CELERY: from .tasks import add_login_attempt_task - add_login_attempt_task.delay(user_agent, ip_address, username, - http_accept, path_info, login_valid) + + add_login_attempt_task.delay( + user_agent, ip_address, username, http_accept, path_info, login_valid + ) else: - store_login_attempt(user_agent, ip_address, username, - http_accept, path_info, login_valid) + store_login_attempt( + user_agent, ip_address, username, http_accept, path_info, login_valid + ) diff --git a/defender/views.py b/defender/views.py index 3c3e4af..52585d8 100644 --- a/defender/views.py +++ b/defender/views.py @@ -1,13 +1,13 @@ from django.shortcuts import render from django.http import HttpResponseRedirect from django.contrib.admin.views.decorators import staff_member_required + try: from django.urls import reverse except ImportError: from django.core.urlresolvers import reverse -from .utils import ( - get_blocked_ips, get_blocked_usernames, unblock_ip, unblock_username) +from .utils import get_blocked_ips, get_blocked_usernames, unblock_ip, unblock_username @staff_member_required @@ -16,15 +16,17 @@ def block_view(request): blocked_ip_list = get_blocked_ips() blocked_username_list = get_blocked_usernames() - context = {'blocked_ip_list': blocked_ip_list, - 'blocked_username_list': blocked_username_list} - return render(request, 'defender/admin/blocks.html', context) + context = { + "blocked_ip_list": blocked_ip_list, + "blocked_username_list": blocked_username_list, + } + return render(request, "defender/admin/blocks.html", context) @staff_member_required def unblock_ip_view(request, ip_address): """ upblock the given ip """ - if request.method == 'POST': + if request.method == "POST": unblock_ip(ip_address) return HttpResponseRedirect(reverse("defender_blocks_view")) @@ -32,6 +34,6 @@ def unblock_ip_view(request, ip_address): @staff_member_required def unblock_username_view(request, username): """ unblockt he given username """ - if request.method == 'POST': + if request.method == "POST": unblock_username(username) return HttpResponseRedirect(reverse("defender_blocks_view")) diff --git a/docs/conf.py b/docs/conf.py index a72ac95..519b1b5 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -33,8 +33,7 @@ master_doc = "index" # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named "sphinx.ext.*") or your custom # ones. -extensions = [ -] +extensions = [] # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] diff --git a/setup.py b/setup.py index ec6c60d..cc0b50e 100644 --- a/setup.py +++ b/setup.py @@ -9,16 +9,18 @@ except ImportError: from distutils.core import setup -version = '0.6.2' +version = "0.6.2" def get_packages(package): """ Return root package and all sub-packages. """ - return [dirpath - for dirpath, dirnames, filenames in os.walk(package) - if os.path.exists(os.path.join(dirpath, '__init__.py'))] + return [ + dirpath + for dirpath, dirnames, filenames in os.walk(package) + if os.path.exists(os.path.join(dirpath, "__init__.py")) + ] def get_package_data(package): @@ -26,50 +28,58 @@ def get_package_data(package): Return all files under the root package, that are not in a package themselves. """ - walk = [(dirpath.replace(package + os.sep, '', 1), filenames) - for dirpath, dirnames, filenames in os.walk(package) - if not os.path.exists(os.path.join(dirpath, '__init__.py'))] + walk = [ + (dirpath.replace(package + os.sep, "", 1), filenames) + for dirpath, dirnames, filenames in os.walk(package) + if not os.path.exists(os.path.join(dirpath, "__init__.py")) + ] filepaths = [] for base, filenames in walk: - filepaths.extend([os.path.join(base, filename) - for filename in filenames]) + filepaths.extend([os.path.join(base, filename) for filename in filenames]) return {package: filepaths} -setup(name='django-defender', - version=version, - description="redis based Django app that locks out users after too " - "many failed login attempts.", - long_description="redis based Django app based on speed, that locks out" - "users after too many failed login attempts.", - classifiers=[ - 'Development Status :: 5 - Production/Stable', - 'Framework :: Django', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: Apache Software License', - 'Operating System :: OS Independent', - 'Programming Language :: Python', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.5', - 'Programming Language :: Python :: 3.6', - 'Programming Language :: Python :: 3.7', - 'Programming Language :: Python :: 3.8', - 'Programming Language :: Python :: Implementation :: PyPy', - 'Programming Language :: Python :: Implementation :: CPython', - 'Topic :: Internet :: WWW/HTTP :: Dynamic Content', - 'Topic :: Security', - 'Topic :: Software Development :: Libraries', - 'Topic :: Software Development :: Libraries :: Python Modules', ], - keywords='django, cache, security, authentication, throttle, login', - author='Ken Cochrane', - url='https://github.com/kencochrane/django-defender', - author_email='kencochrane@gmail.com', - license='Apache 2', - include_package_data=True, - packages=get_packages('defender'), - package_data=get_package_data('defender'), - install_requires=['Django>=1.8,<2.3', 'redis>=2.10.3,<3.3'], - tests_require=['mock', 'mockredispy>=2.9.0.11,<3.0', 'coverage', - 'celery', 'django-redis-cache'], - ) +setup( + name="django-defender", + version=version, + description="redis based Django app that locks out users after too " + "many failed login attempts.", + long_description="redis based Django app based on speed, that locks out" + "users after too many failed login attempts.", + classifiers=[ + "Development Status :: 5 - Production/Stable", + "Framework :: Django", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Operating System :: OS Independent", + "Programming Language :: Python", + "Programming Language :: Python :: 2.7", + "Programming Language :: Python :: 3.5", + "Programming Language :: Python :: 3.6", + "Programming Language :: Python :: 3.7", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: Implementation :: PyPy", + "Programming Language :: Python :: Implementation :: CPython", + "Topic :: Internet :: WWW/HTTP :: Dynamic Content", + "Topic :: Security", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], + keywords="django, cache, security, authentication, throttle, login", + author="Ken Cochrane", + url="https://github.com/kencochrane/django-defender", + author_email="kencochrane@gmail.com", + license="Apache 2", + include_package_data=True, + packages=get_packages("defender"), + package_data=get_package_data("defender"), + install_requires=["Django>=1.8,<2.3", "redis>=2.10.3,<3.3"], + tests_require=[ + "mock", + "mockredispy>=2.9.0.11,<3.0", + "coverage", + "celery", + "django-redis-cache", + ], +)