diff --git a/defender/admin.py b/defender/admin.py index dcb9e71..625c6be 100644 --- a/defender/admin.py +++ b/defender/admin.py @@ -3,6 +3,7 @@ from .models import AccessAttempt class AccessAttemptAdmin(admin.ModelAdmin): + """ Access attempt admin config """ list_display = ( 'attempt_time', 'ip_address', diff --git a/defender/config.py b/defender/config.py index 9a43799..a612d03 100644 --- a/defender/config.py +++ b/defender/config.py @@ -7,6 +7,7 @@ def get_setting(variable, default=None): provided default """ return getattr(settings, variable, default) + # redis server host DEFENDER_REDIS_URL = get_setting('DEFENDER_REDIS_URL') @@ -29,7 +30,8 @@ DISABLE_IP_LOCKOUT = get_setting('DEFENDER_DISABLE_IP_LOCKOUT', False) # If this is True, usernames will not get locked when # there are too many login attempts. -DISABLE_USERNAME_LOCKOUT = get_setting('DEFENDER_DISABLE_USERNAME_LOCKOUT', False) +DISABLE_USERNAME_LOCKOUT = get_setting( + 'DEFENDER_DISABLE_USERNAME_LOCKOUT', False) # use a specific username field to retrieve from login POST data USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username') diff --git a/defender/connection.py b/defender/connection.py index 163eae1..94b24e0 100644 --- a/defender/connection.py +++ b/defender/connection.py @@ -14,7 +14,8 @@ urlparse.uses_netloc.append("redis") MOCKED_REDIS = mockredis.mock_strict_redis_client() -INVALID_CACHE_ERROR_MSG = 'The cache {} was not found on the django cache settings.' +INVALID_CACHE_ERROR_MSG = 'The cache {} was not found on the django cache' \ + ' settings.' def get_redis_connection(): @@ -25,7 +26,8 @@ def get_redis_connection(): try: cache = caches[config.DEFENDER_REDIS_NAME] except InvalidCacheBackendError: - raise KeyError(INVALID_CACHE_ERROR_MSG.format(config.DEFENDER_REDIS_NAME)) + raise KeyError(INVALID_CACHE_ERROR_MSG.format( + config.DEFENDER_REDIS_NAME)) # every redis backend implement it own way to get the low level client try: # redis_cache.RedisCache case (django-redis-cache package) diff --git a/defender/exampleapp/settings.py b/defender/exampleapp/settings.py index d331af7..c2d7a95 100644 --- a/defender/exampleapp/settings.py +++ b/defender/exampleapp/settings.py @@ -1,4 +1,6 @@ import os +from celery import Celery + PROJECT_DIR = lambda base: os.path.abspath( os.path.join(os.path.dirname(__file__), base).replace('\\', '/')) @@ -64,10 +66,6 @@ CELERY_ALWAYS_EAGER = True BROKER_BACKEND = 'memory' BROKER_URL = 'memory://' -import os - -from celery import Celery - # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.exampleapp.settings') diff --git a/defender/management/commands/cleanup_django_defender.py b/defender/management/commands/cleanup_django_defender.py index e5a10c8..d0c506c 100644 --- a/defender/management/commands/cleanup_django_defender.py +++ b/defender/management/commands/cleanup_django_defender.py @@ -9,6 +9,7 @@ from ... import config class Command(BaseCommand): + """ clean up management command """ help = "Cleans up django-defender AccessAttempt table" def handle(self, **options): diff --git a/defender/middleware.py b/defender/middleware.py index a5646ee..4eec1e0 100644 --- a/defender/middleware.py +++ b/defender/middleware.py @@ -5,6 +5,7 @@ from .decorators import watch_login class FailedLoginMiddleware(object): + """ Failed login middleware """ patched = False def __init__(self, *args, **kwargs): diff --git a/defender/migrations/0001_initial.py b/defender/migrations/0001_initial.py index f6b02c5..915bcc3 100644 --- a/defender/migrations/0001_initial.py +++ b/defender/migrations/0001_initial.py @@ -5,6 +5,7 @@ from django.db import models, migrations class Migration(migrations.Migration): + """ Initial migrations """ dependencies = [ ] diff --git a/defender/models.py b/defender/models.py index f721264..868f77e 100644 --- a/defender/models.py +++ b/defender/models.py @@ -6,6 +6,7 @@ from django.utils.encoding import python_2_unicode_compatible @python_2_unicode_compatible class AccessAttempt(models.Model): + """ Access Attempt log """ user_agent = models.CharField( max_length=255, ) diff --git a/defender/south_migrations/0001_initial.py b/defender/south_migrations/0001_initial.py index 15f2bb6..1d17ee1 100644 --- a/defender/south_migrations/0001_initial.py +++ b/defender/south_migrations/0001_initial.py @@ -6,9 +6,10 @@ from django.db import models class Migration(SchemaMigration): + """Initial Migration for Defender""" def forwards(self, orm): - # Adding model 'AccessAttempt' + """ Adding model 'AccessAttempt' """ db.create_table(u'defender_accessattempt', ( (u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)), ('user_agent', self.gf('django.db.models.fields.CharField')(max_length=255)), @@ -41,4 +42,4 @@ class Migration(SchemaMigration): } } - complete_apps = ['defender'] \ No newline at end of file + complete_apps = ['defender'] diff --git a/defender/test_settings.py b/defender/test_settings.py index 1746936..a17411a 100644 --- a/defender/test_settings.py +++ b/defender/test_settings.py @@ -1,4 +1,5 @@ import os +from celery import Celery DATABASES = { 'default': { @@ -62,10 +63,6 @@ CELERY_ALWAYS_EAGER = True BROKER_BACKEND = 'memory' BROKER_URL = 'memory://' -import os - -from celery import Celery - # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.test_settings') diff --git a/defender/tests.py b/defender/tests.py index 8d32722..43eb4fb 100644 --- a/defender/tests.py +++ b/defender/tests.py @@ -290,7 +290,7 @@ class AccessAttemptTest(DefenderTestCase): self.test_valid_login() @patch('defender.config.LOCKOUT_URL', 'http://localhost/othe/login/') - def test_failed_login_redirect_to_URL(self): + def test_failed_login_redirect_to_url(self): """ Test to make sure that after lockout we send to the correct redirect URL """ @@ -311,7 +311,7 @@ class AccessAttemptTest(DefenderTestCase): self.assertEqual(response['Location'], 'http://localhost/othe/login/') @patch('defender.config.LOCKOUT_URL', '/o/login/') - def test_failed_login_redirect_to_URL_local(self): + def test_failed_login_redirect_to_url_local(self): """ Test to make sure that after lockout we send to the correct redirect URL """ @@ -361,6 +361,7 @@ class AccessAttemptTest(DefenderTestCase): @patch('defender.config.COOLOFF_TIME', 0) def test_failed_login_no_cooloff(self): + """ failed login no cooloff """ for i in range(0, config.FAILURE_LIMIT): response = self._login() # Check if we are in the same login page @@ -376,7 +377,7 @@ class AccessAttemptTest(DefenderTestCase): self.assertContains(response, self.PERMANENT_LOCKED_MESSAGE) def test_login_attempt_model(self): - """ test the login model""" + """ test the login model """ response = self._login() self.assertContains(response, LOGIN_FORM_KEY) @@ -458,15 +459,18 @@ class AccessAttemptTest(DefenderTestCase): @patch('defender.config.DEFENDER_REDIS_NAME', 'default') def test_get_redis_connection_django_conf(self): + """ get the redis connection """ redis_client = get_redis_connection() self.assertIsInstance(redis_client, Redis) @patch('defender.config.DEFENDER_REDIS_NAME', 'bad-key') def test_get_redis_connection_django_conf_wrong_key(self): + """ see if we get the correct error """ error_msg = 'The cache bad-key was not found on the django cache settings.' self.assertRaisesMessage(KeyError, error_msg, get_redis_connection) def test_get_ip_address_from_request(self): + """ get ip from request, make sure it is correct """ req = HttpRequest() req.META['REMOTE_ADDR'] = '1.2.3.4' ip = utils.get_ip_address_from_request(req) @@ -494,6 +498,7 @@ class AccessAttemptTest(DefenderTestCase): @patch('defender.config.BEHIND_REVERSE_PROXY', True) @patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_PROXIED') def test_get_ip_reverse_proxy_custom_header(self): + """ make sure the ip is correct behind reverse proxy """ req = HttpRequest() req.META['HTTP_X_PROXIED'] = '1.2.3.4' self.assertEqual(utils.get_ip(req), '1.2.3.4') @@ -509,6 +514,7 @@ class AccessAttemptTest(DefenderTestCase): @patch('defender.config.BEHIND_REVERSE_PROXY', True) @patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_REAL_IP') def test_get_user_attempts(self): + """ Get the user attempts make sure they are correct """ ip_attempts = random.randint(3, 12) username_attempts = random.randint(3, 12) for i in range(0, ip_attempts): @@ -574,7 +580,7 @@ class AccessAttemptTest(DefenderTestCase): self.assertContains(response, self.LOCKED_MESSAGE) def test_get_view(self): - """ Check that the decorator doesn't tamper with GET requests""" + """ Check that the decorator doesn't tamper with GET requests """ for i in range(0, config.FAILURE_LIMIT): response = self.client.get(ADMIN_LOGIN_URL) # Check if we are in the same login page @@ -584,7 +590,7 @@ class AccessAttemptTest(DefenderTestCase): @patch('defender.config.USE_CELERY', True) def test_use_celery(self): - """ Check that use celery works""" + """ Check that use celery works """ self.assertEqual(AccessAttempt.objects.count(), 0) @@ -598,12 +604,14 @@ class AccessAttemptTest(DefenderTestCase): response = self._login() self.assertContains(response, self.LOCKED_MESSAGE) - self.assertEqual(AccessAttempt.objects.count(), config.FAILURE_LIMIT + 1) + self.assertEqual(AccessAttempt.objects.count(), + config.FAILURE_LIMIT + 1) self.assertIsNotNone(str(AccessAttempt.objects.all()[0])) @patch('defender.config.LOCKOUT_BY_IP_USERNAME', True) def test_lockout_by_ip_and_username(self): - """Check that lockout still works when locking out by IP and Username combined""" + """ Check that lockout still works when locking out by + IP and Username combined """ username = 'testy' @@ -621,11 +629,13 @@ class AccessAttemptTest(DefenderTestCase): response = self.client.get(ADMIN_LOGIN_URL) self.assertContains(response, LOGIN_FORM_KEY) - # We shouldn't get a lockout message when attempting to use a different username + # We shouldn't get a lockout message when attempting to use a + # different username response = self._login() self.assertContains(response, LOGIN_FORM_KEY) - # We shouldn't get a lockout message when attempting to use a different ip address + # We shouldn't get a lockout message when attempting to use a + # different ip address ip = '74.125.239.60' response = self._login(username=VALID_USERNAME, remote_addr=ip) # Check if we are in the same login page @@ -633,7 +643,7 @@ class AccessAttemptTest(DefenderTestCase): @patch('defender.config.DISABLE_IP_LOCKOUT', True) def test_disable_ip_lockout(self): - """Check that lockout still works when we disable IP Lock out""" + """ Check that lockout still works when we disable IP Lock out """ username = 'testy' @@ -663,11 +673,13 @@ class AccessAttemptTest(DefenderTestCase): response = self.client.get(ADMIN_LOGIN_URL) self.assertContains(response, LOGIN_FORM_KEY) - # We shouldn't get a lockout message when attempting to use a different username + # We shouldn't get a lockout message when attempting to use a + # different username response = self._login() self.assertContains(response, LOGIN_FORM_KEY) - # We shouldn't get a lockout message when attempting to use a different ip address + # We shouldn't get a lockout message when attempting to use a + # different ip address second_ip = '74.125.239.99' response = self._login(username=VALID_USERNAME, remote_addr=second_ip) # Check if we are in the same login page @@ -686,7 +698,7 @@ class AccessAttemptTest(DefenderTestCase): @patch('defender.config.DISABLE_USERNAME_LOCKOUT', True) def test_disable_username_lockout(self): - """Check lockouting still works when we disable username lockout""" + """ Check lockouting still works when we disable username lockout """ username = 'testy' @@ -714,8 +726,8 @@ class AccessAttemptTest(DefenderTestCase): response = self.client.get(ADMIN_LOGIN_URL) self.assertContains(response, LOGIN_FORM_KEY) - # We shouldn't get a lockout message when attempting to use a different ip address - # to be sure that username is not blocked. + # We shouldn't get a lockout message when attempting to use a + # different ip address to be sure that username is not blocked. second_ip = '74.125.127.2' response = self._login(username=username, remote_addr=second_ip) # Check if we are in the same login page @@ -736,13 +748,14 @@ class AccessAttemptTest(DefenderTestCase): @patch('defender.config.FAILURE_LIMIT', 3) def test_login_blocked_for_non_standard_login_views_without_msg(self): """ - Check that a view wich returns the expected status code is causing + Check that a view wich returns the expected status code is causing the user to be locked out when we do not expect a specific message to be returned. """ @watch_login(status_code=401) def fake_api_401_login_view_without_msg(request): + """ Fake the api login with 401 """ return HttpResponse(status=401) request_factory = RequestFactory() @@ -772,6 +785,7 @@ class AccessAttemptTest(DefenderTestCase): """ @watch_login(status_code=401, msg='Invalid credentials') def fake_api_401_login_view_without_msg(request): + """ Fake the api login with 401 """ return HttpResponse('Sorry, Invalid credentials', status=401) @@ -802,6 +816,7 @@ class AccessAttemptTest(DefenderTestCase): """ @watch_login(status_code=401, msg='Invalid credentials') def fake_api_401_login_view_without_msg(request): + """ Fake the api login with 401 """ return HttpResponse('Ups, wrong credentials', status=401) @@ -820,38 +835,44 @@ class AccessAttemptTest(DefenderTestCase): class DefenderTestCaseTest(DefenderTestCase): - """Make sure that we're cleaning the cache between tests""" + """ Make sure that we're cleaning the cache between tests """ key = 'test_key' def test_first_incr(self): + """ first increment """ utils.REDIS_SERVER.incr(self.key) result = int(utils.REDIS_SERVER.get(self.key)) self.assertEqual(result, 1) def test_second_incr(self): + """ second increment """ utils.REDIS_SERVER.incr(self.key) result = int(utils.REDIS_SERVER.get(self.key)) self.assertEqual(result, 1) class DefenderTransactionTestCaseTest(DefenderTransactionTestCase): - """Make sure that we're cleaning the cache between tests""" + """ Make sure that we're cleaning the cache between tests """ key = 'test_key' def test_first_incr(self): + """ first increment """ utils.REDIS_SERVER.incr(self.key) result = int(utils.REDIS_SERVER.get(self.key)) self.assertEqual(result, 1) def test_second_incr(self): + """ second increment """ utils.REDIS_SERVER.incr(self.key) result = int(utils.REDIS_SERVER.get(self.key)) self.assertEqual(result, 1) class TestUtils(DefenderTestCase): + """ Unit tests for util methods """ def test_username_blocking(self): + """ test username blocking """ username = 'foo' self.assertFalse(utils.is_user_already_locked(username)) utils.block_username(username) @@ -860,6 +881,7 @@ class TestUtils(DefenderTestCase): self.assertFalse(utils.is_user_already_locked(username)) def test_ip_address_blocking(self): + """ ip address blocking """ ip = '1.2.3.4' self.assertFalse(utils.is_source_ip_already_locked(ip)) utils.block_ip(ip) diff --git a/defender/travis_settings.py b/defender/travis_settings.py index fed0b95..f089cb4 100644 --- a/defender/travis_settings.py +++ b/defender/travis_settings.py @@ -1,4 +1,6 @@ import os +from celery import Celery + DATABASES = { 'default': { @@ -68,10 +70,6 @@ CELERY_ALWAYS_EAGER = True BROKER_BACKEND = 'memory' BROKER_URL = 'memory://' -import os - -from celery import Celery - # set the default Django settings module for the 'celery' program. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.travis_settings') diff --git a/defender/utils.py b/defender/utils.py index 8b510bd..9da01af 100644 --- a/defender/utils.py +++ b/defender/utils.py @@ -299,7 +299,8 @@ def is_source_ip_already_locked(ip_address): def is_already_locked(request, get_username=get_username_from_request): - """Parse the username & IP from the request, and see if it's already locked.""" + """Parse the username & IP from the request, and see if it's + already locked.""" user_blocked = is_user_already_locked(get_username(request)) ip_blocked = is_source_ip_already_locked(get_ip(request)) @@ -310,7 +311,8 @@ def is_already_locked(request, get_username=get_username_from_request): return ip_blocked or user_blocked -def check_request(request, login_unsuccessful, get_username=get_username_from_request): +def check_request(request, login_unsuccessful, + get_username=get_username_from_request): """ check the request, and process results""" ip_address = get_ip(request) username = get_username(request) @@ -324,7 +326,8 @@ def check_request(request, login_unsuccessful, get_username=get_username_from_re return record_failed_attempt(ip_address, username) -def add_login_attempt_to_db(request, login_valid, get_username=get_username_from_request): +def add_login_attempt_to_db(request, login_valid, + get_username=get_username_from_request): """ Create a record for the login attempt If using celery call celery task, if not, call the method normally """