Cleanup the code to remove lint warnings (#87)

* Cleanup the code to remove lint warnings

Signed-off-by: Ken Cochrane <kencochrane@gmail.com>

* Fixed typo

Signed-off-by: Ken Cochrane <kencochrane@gmail.com>
This commit is contained in:
Ken Cochrane 2017-06-28 17:09:44 -04:00 committed by GitHub
parent b985d17beb
commit 4d9adc35c2
13 changed files with 66 additions and 38 deletions

View file

@ -3,6 +3,7 @@ from .models import AccessAttempt
class AccessAttemptAdmin(admin.ModelAdmin):
""" Access attempt admin config """
list_display = (
'attempt_time',
'ip_address',

View file

@ -7,6 +7,7 @@ def get_setting(variable, default=None):
provided default """
return getattr(settings, variable, default)
# redis server host
DEFENDER_REDIS_URL = get_setting('DEFENDER_REDIS_URL')
@ -29,7 +30,8 @@ DISABLE_IP_LOCKOUT = get_setting('DEFENDER_DISABLE_IP_LOCKOUT', False)
# If this is True, usernames will not get locked when
# there are too many login attempts.
DISABLE_USERNAME_LOCKOUT = get_setting('DEFENDER_DISABLE_USERNAME_LOCKOUT', False)
DISABLE_USERNAME_LOCKOUT = get_setting(
'DEFENDER_DISABLE_USERNAME_LOCKOUT', False)
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = get_setting('DEFENDER_USERNAME_FORM_FIELD', 'username')

View file

@ -14,7 +14,8 @@ urlparse.uses_netloc.append("redis")
MOCKED_REDIS = mockredis.mock_strict_redis_client()
INVALID_CACHE_ERROR_MSG = 'The cache {} was not found on the django cache settings.'
INVALID_CACHE_ERROR_MSG = 'The cache {} was not found on the django cache' \
' settings.'
def get_redis_connection():
@ -25,7 +26,8 @@ def get_redis_connection():
try:
cache = caches[config.DEFENDER_REDIS_NAME]
except InvalidCacheBackendError:
raise KeyError(INVALID_CACHE_ERROR_MSG.format(config.DEFENDER_REDIS_NAME))
raise KeyError(INVALID_CACHE_ERROR_MSG.format(
config.DEFENDER_REDIS_NAME))
# every redis backend implement it own way to get the low level client
try:
# redis_cache.RedisCache case (django-redis-cache package)

View file

@ -1,4 +1,6 @@
import os
from celery import Celery
PROJECT_DIR = lambda base: os.path.abspath(
os.path.join(os.path.dirname(__file__), base).replace('\\', '/'))
@ -64,10 +66,6 @@ CELERY_ALWAYS_EAGER = True
BROKER_BACKEND = 'memory'
BROKER_URL = 'memory://'
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.exampleapp.settings')

View file

@ -9,6 +9,7 @@ from ... import config
class Command(BaseCommand):
""" clean up management command """
help = "Cleans up django-defender AccessAttempt table"
def handle(self, **options):

View file

@ -5,6 +5,7 @@ from .decorators import watch_login
class FailedLoginMiddleware(object):
""" Failed login middleware """
patched = False
def __init__(self, *args, **kwargs):

View file

@ -5,6 +5,7 @@ from django.db import models, migrations
class Migration(migrations.Migration):
""" Initial migrations """
dependencies = [
]

View file

@ -6,6 +6,7 @@ from django.utils.encoding import python_2_unicode_compatible
@python_2_unicode_compatible
class AccessAttempt(models.Model):
""" Access Attempt log """
user_agent = models.CharField(
max_length=255,
)

View file

@ -6,9 +6,10 @@ from django.db import models
class Migration(SchemaMigration):
"""Initial Migration for Defender"""
def forwards(self, orm):
# Adding model 'AccessAttempt'
""" Adding model 'AccessAttempt' """
db.create_table(u'defender_accessattempt', (
(u'id', self.gf('django.db.models.fields.AutoField')(primary_key=True)),
('user_agent', self.gf('django.db.models.fields.CharField')(max_length=255)),
@ -41,4 +42,4 @@ class Migration(SchemaMigration):
}
}
complete_apps = ['defender']
complete_apps = ['defender']

View file

@ -1,4 +1,5 @@
import os
from celery import Celery
DATABASES = {
'default': {
@ -62,10 +63,6 @@ CELERY_ALWAYS_EAGER = True
BROKER_BACKEND = 'memory'
BROKER_URL = 'memory://'
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.test_settings')

View file

@ -290,7 +290,7 @@ class AccessAttemptTest(DefenderTestCase):
self.test_valid_login()
@patch('defender.config.LOCKOUT_URL', 'http://localhost/othe/login/')
def test_failed_login_redirect_to_URL(self):
def test_failed_login_redirect_to_url(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """
@ -311,7 +311,7 @@ class AccessAttemptTest(DefenderTestCase):
self.assertEqual(response['Location'], 'http://localhost/othe/login/')
@patch('defender.config.LOCKOUT_URL', '/o/login/')
def test_failed_login_redirect_to_URL_local(self):
def test_failed_login_redirect_to_url_local(self):
""" Test to make sure that after lockout we send to the correct
redirect URL """
@ -361,6 +361,7 @@ class AccessAttemptTest(DefenderTestCase):
@patch('defender.config.COOLOFF_TIME', 0)
def test_failed_login_no_cooloff(self):
""" failed login no cooloff """
for i in range(0, config.FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
@ -376,7 +377,7 @@ class AccessAttemptTest(DefenderTestCase):
self.assertContains(response, self.PERMANENT_LOCKED_MESSAGE)
def test_login_attempt_model(self):
""" test the login model"""
""" test the login model """
response = self._login()
self.assertContains(response, LOGIN_FORM_KEY)
@ -458,15 +459,18 @@ class AccessAttemptTest(DefenderTestCase):
@patch('defender.config.DEFENDER_REDIS_NAME', 'default')
def test_get_redis_connection_django_conf(self):
""" get the redis connection """
redis_client = get_redis_connection()
self.assertIsInstance(redis_client, Redis)
@patch('defender.config.DEFENDER_REDIS_NAME', 'bad-key')
def test_get_redis_connection_django_conf_wrong_key(self):
""" see if we get the correct error """
error_msg = 'The cache bad-key was not found on the django cache settings.'
self.assertRaisesMessage(KeyError, error_msg, get_redis_connection)
def test_get_ip_address_from_request(self):
""" get ip from request, make sure it is correct """
req = HttpRequest()
req.META['REMOTE_ADDR'] = '1.2.3.4'
ip = utils.get_ip_address_from_request(req)
@ -494,6 +498,7 @@ class AccessAttemptTest(DefenderTestCase):
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
@patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_PROXIED')
def test_get_ip_reverse_proxy_custom_header(self):
""" make sure the ip is correct behind reverse proxy """
req = HttpRequest()
req.META['HTTP_X_PROXIED'] = '1.2.3.4'
self.assertEqual(utils.get_ip(req), '1.2.3.4')
@ -509,6 +514,7 @@ class AccessAttemptTest(DefenderTestCase):
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
@patch('defender.config.REVERSE_PROXY_HEADER', 'HTTP_X_REAL_IP')
def test_get_user_attempts(self):
""" Get the user attempts make sure they are correct """
ip_attempts = random.randint(3, 12)
username_attempts = random.randint(3, 12)
for i in range(0, ip_attempts):
@ -574,7 +580,7 @@ class AccessAttemptTest(DefenderTestCase):
self.assertContains(response, self.LOCKED_MESSAGE)
def test_get_view(self):
""" Check that the decorator doesn't tamper with GET requests"""
""" Check that the decorator doesn't tamper with GET requests """
for i in range(0, config.FAILURE_LIMIT):
response = self.client.get(ADMIN_LOGIN_URL)
# Check if we are in the same login page
@ -584,7 +590,7 @@ class AccessAttemptTest(DefenderTestCase):
@patch('defender.config.USE_CELERY', True)
def test_use_celery(self):
""" Check that use celery works"""
""" Check that use celery works """
self.assertEqual(AccessAttempt.objects.count(), 0)
@ -598,12 +604,14 @@ class AccessAttemptTest(DefenderTestCase):
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE)
self.assertEqual(AccessAttempt.objects.count(), config.FAILURE_LIMIT + 1)
self.assertEqual(AccessAttempt.objects.count(),
config.FAILURE_LIMIT + 1)
self.assertIsNotNone(str(AccessAttempt.objects.all()[0]))
@patch('defender.config.LOCKOUT_BY_IP_USERNAME', True)
def test_lockout_by_ip_and_username(self):
"""Check that lockout still works when locking out by IP and Username combined"""
""" Check that lockout still works when locking out by
IP and Username combined """
username = 'testy'
@ -621,11 +629,13 @@ class AccessAttemptTest(DefenderTestCase):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, LOGIN_FORM_KEY)
# We shouldn't get a lockout message when attempting to use a different username
# We shouldn't get a lockout message when attempting to use a
# different username
response = self._login()
self.assertContains(response, LOGIN_FORM_KEY)
# We shouldn't get a lockout message when attempting to use a different ip address
# We shouldn't get a lockout message when attempting to use a
# different ip address
ip = '74.125.239.60'
response = self._login(username=VALID_USERNAME, remote_addr=ip)
# Check if we are in the same login page
@ -633,7 +643,7 @@ class AccessAttemptTest(DefenderTestCase):
@patch('defender.config.DISABLE_IP_LOCKOUT', True)
def test_disable_ip_lockout(self):
"""Check that lockout still works when we disable IP Lock out"""
""" Check that lockout still works when we disable IP Lock out """
username = 'testy'
@ -663,11 +673,13 @@ class AccessAttemptTest(DefenderTestCase):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, LOGIN_FORM_KEY)
# We shouldn't get a lockout message when attempting to use a different username
# We shouldn't get a lockout message when attempting to use a
# different username
response = self._login()
self.assertContains(response, LOGIN_FORM_KEY)
# We shouldn't get a lockout message when attempting to use a different ip address
# We shouldn't get a lockout message when attempting to use a
# different ip address
second_ip = '74.125.239.99'
response = self._login(username=VALID_USERNAME, remote_addr=second_ip)
# Check if we are in the same login page
@ -686,7 +698,7 @@ class AccessAttemptTest(DefenderTestCase):
@patch('defender.config.DISABLE_USERNAME_LOCKOUT', True)
def test_disable_username_lockout(self):
"""Check lockouting still works when we disable username lockout"""
""" Check lockouting still works when we disable username lockout """
username = 'testy'
@ -714,8 +726,8 @@ class AccessAttemptTest(DefenderTestCase):
response = self.client.get(ADMIN_LOGIN_URL)
self.assertContains(response, LOGIN_FORM_KEY)
# We shouldn't get a lockout message when attempting to use a different ip address
# to be sure that username is not blocked.
# We shouldn't get a lockout message when attempting to use a
# different ip address to be sure that username is not blocked.
second_ip = '74.125.127.2'
response = self._login(username=username, remote_addr=second_ip)
# Check if we are in the same login page
@ -736,13 +748,14 @@ class AccessAttemptTest(DefenderTestCase):
@patch('defender.config.FAILURE_LIMIT', 3)
def test_login_blocked_for_non_standard_login_views_without_msg(self):
"""
Check that a view wich returns the expected status code is causing
Check that a view wich returns the expected status code is causing
the user to be locked out when we do not expect a specific message
to be returned.
"""
@watch_login(status_code=401)
def fake_api_401_login_view_without_msg(request):
""" Fake the api login with 401 """
return HttpResponse(status=401)
request_factory = RequestFactory()
@ -772,6 +785,7 @@ class AccessAttemptTest(DefenderTestCase):
"""
@watch_login(status_code=401, msg='Invalid credentials')
def fake_api_401_login_view_without_msg(request):
""" Fake the api login with 401 """
return HttpResponse('Sorry, Invalid credentials',
status=401)
@ -802,6 +816,7 @@ class AccessAttemptTest(DefenderTestCase):
"""
@watch_login(status_code=401, msg='Invalid credentials')
def fake_api_401_login_view_without_msg(request):
""" Fake the api login with 401 """
return HttpResponse('Ups, wrong credentials',
status=401)
@ -820,38 +835,44 @@ class AccessAttemptTest(DefenderTestCase):
class DefenderTestCaseTest(DefenderTestCase):
"""Make sure that we're cleaning the cache between tests"""
""" Make sure that we're cleaning the cache between tests """
key = 'test_key'
def test_first_incr(self):
""" first increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
def test_second_incr(self):
""" second increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
class DefenderTransactionTestCaseTest(DefenderTransactionTestCase):
"""Make sure that we're cleaning the cache between tests"""
""" Make sure that we're cleaning the cache between tests """
key = 'test_key'
def test_first_incr(self):
""" first increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
def test_second_incr(self):
""" second increment """
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
class TestUtils(DefenderTestCase):
""" Unit tests for util methods """
def test_username_blocking(self):
""" test username blocking """
username = 'foo'
self.assertFalse(utils.is_user_already_locked(username))
utils.block_username(username)
@ -860,6 +881,7 @@ class TestUtils(DefenderTestCase):
self.assertFalse(utils.is_user_already_locked(username))
def test_ip_address_blocking(self):
""" ip address blocking """
ip = '1.2.3.4'
self.assertFalse(utils.is_source_ip_already_locked(ip))
utils.block_ip(ip)

View file

@ -1,4 +1,6 @@
import os
from celery import Celery
DATABASES = {
'default': {
@ -68,10 +70,6 @@ CELERY_ALWAYS_EAGER = True
BROKER_BACKEND = 'memory'
BROKER_URL = 'memory://'
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'defender.travis_settings')

View file

@ -299,7 +299,8 @@ def is_source_ip_already_locked(ip_address):
def is_already_locked(request, get_username=get_username_from_request):
"""Parse the username & IP from the request, and see if it's already locked."""
"""Parse the username & IP from the request, and see if it's
already locked."""
user_blocked = is_user_already_locked(get_username(request))
ip_blocked = is_source_ip_already_locked(get_ip(request))
@ -310,7 +311,8 @@ def is_already_locked(request, get_username=get_username_from_request):
return ip_blocked or user_blocked
def check_request(request, login_unsuccessful, get_username=get_username_from_request):
def check_request(request, login_unsuccessful,
get_username=get_username_from_request):
""" check the request, and process results"""
ip_address = get_ip(request)
username = get_username(request)
@ -324,7 +326,8 @@ def check_request(request, login_unsuccessful, get_username=get_username_from_re
return record_failed_attempt(ip_address, username)
def add_login_attempt_to_db(request, login_valid, get_username=get_username_from_request):
def add_login_attempt_to_db(request, login_valid,
get_username=get_username_from_request):
""" Create a record for the login attempt If using celery call celery
task, if not, call the method normally """