diff --git a/defender/tasks.py b/defender/tasks.py index db3498a..b7d9ff9 100644 --- a/defender/tasks.py +++ b/defender/tasks.py @@ -1,15 +1,14 @@ -from . import config from .data import store_login_attempt # not sure how to get this to look better. ideally we want to dynamically # apply the celery decorator based on the USE_CELERY setting. -if config.USE_CELERY: - from celery import shared_task +from celery import shared_task - @shared_task() - def add_login_attempt_task(user_agent, ip_address, username, - http_accept, path_info, login_valid): - """ Create a record for the login attempt """ - store_login_attempt(user_agent, ip_address, username, - http_accept, path_info, login_valid) + +@shared_task() +def add_login_attempt_task(user_agent, ip_address, username, + http_accept, path_info, login_valid): + """ Create a record for the login attempt """ + store_login_attempt(user_agent, ip_address, username, + http_accept, path_info, login_valid) diff --git a/defender/tests.py b/defender/tests.py index 9699dbb..c911e1e 100644 --- a/defender/tests.py +++ b/defender/tests.py @@ -686,3 +686,21 @@ class DefenderTransactionTestCaseTest(DefenderTransactionTestCase): utils.REDIS_SERVER.incr(self.key) result = int(utils.REDIS_SERVER.get(self.key)) self.assertEqual(result, 1) + + +class TestUtils(DefenderTestCase): + def test_username_blocking(self): + username = 'foo' + self.assertFalse(utils.is_user_already_locked(username)) + utils.block_username(username) + self.assertTrue(utils.is_user_already_locked(username)) + utils.unblock_username(username) + self.assertFalse(utils.is_user_already_locked(username)) + + def test_ip_address_blocking(self): + ip = '1.2.3.4' + self.assertFalse(utils.is_source_ip_already_locked(ip)) + utils.block_ip(ip) + self.assertTrue(utils.is_source_ip_already_locked(ip)) + utils.unblock_ip(ip) + self.assertFalse(utils.is_source_ip_already_locked(ip)) diff --git a/defender/utils.py b/defender/utils.py index 6c056b5..0b1b1d5 100644 --- a/defender/utils.py +++ b/defender/utils.py @@ -265,25 +265,30 @@ def lockout_response(request): "Contact an admin to unlock your account.") +def is_user_already_locked(username): + """Is this username already locked?""" + if username is None: + return False + return REDIS_SERVER.get(get_username_blocked_cache_key(username)) + + +def is_source_ip_already_locked(ip_address): + """Is this IP already locked?""" + if ip_address is None: + return False + if config.DISABLE_IP_LOCKOUT: + return False + return REDIS_SERVER.get(get_ip_blocked_cache_key(ip_address)) + + def is_already_locked(request): - """ Is this IP/username already locked? """ - - if not config.DISABLE_IP_LOCKOUT: - # ip blocked? - ip_address = get_ip(request) - ip_blocked = REDIS_SERVER.get(get_ip_blocked_cache_key(ip_address)) - else: - # we disabled ip lockout, so it will never be blocked. - ip_blocked = False - - # username blocked? - username = request.POST.get(config.USERNAME_FORM_FIELD, None) - user_blocked = REDIS_SERVER.get(get_username_blocked_cache_key(username)) + """Parse the username & IP from the request, and see if it's already locked.""" + user_blocked = is_user_already_locked( + request.POST.get(config.USERNAME_FORM_FIELD, None)) + ip_blocked = is_source_ip_already_locked(get_ip(request)) if config.LOCKOUT_BY_IP_USERNAME: - LOG.info("Block by ip & username") - # if both this IP and this username are present the request is - # blocked + # if both this IP and this username are present the request is blocked return ip_blocked and user_blocked return ip_blocked or user_blocked