diff --git a/defender/tests.py b/defender/tests.py index a3f5a8b..6b9b9cd 100644 --- a/defender/tests.py +++ b/defender/tests.py @@ -912,3 +912,21 @@ class TestUtils(DefenderTestCase): self.assertTrue(utils.is_source_ip_already_locked(ip)) utils.unblock_ip(ip) self.assertFalse(utils.is_source_ip_already_locked(ip)) + + def test_username_argument_precedence(self): + """ test that the optional username argument has highest precedence when provided """ + request_factory = RequestFactory() + request = request_factory.get(ADMIN_LOGIN_URL) + request.user = AnonymousUser() + request.session = SessionStore() + username = 'johndoe' + + utils.block_username(request.user.username) + + self.assertFalse(utils.is_already_locked(request, username=username)) + + utils.check_request(request, True, username=username) + self.assertEqual(utils.get_user_attempts(request, username=username), 1) + + utils.add_login_attempt_to_db(request, True, username=username) + self.assertEqual(AccessAttempt.objects.filter(username=username).count(), 1) diff --git a/defender/utils.py b/defender/utils.py index 36e6676..d0aeddd 100644 --- a/defender/utils.py +++ b/defender/utils.py @@ -135,12 +135,12 @@ def get_username_from_request(request): return None -def get_user_attempts(request, get_username=get_username_from_request): +def get_user_attempts(request, get_username=get_username_from_request, username=None): """ Returns number of access attempts for this ip, username """ ip_address = get_ip(request) - username = lower_username(get_username(request)) + username = lower_username(username or get_username(request)) # get by IP ip_count = REDIS_SERVER.get(get_ip_attempt_cache_key(ip_address)) @@ -310,10 +310,10 @@ def is_source_ip_already_locked(ip_address): return REDIS_SERVER.get(get_ip_blocked_cache_key(ip_address)) -def is_already_locked(request, get_username=get_username_from_request): +def is_already_locked(request, get_username=get_username_from_request, username=None): """Parse the username & IP from the request, and see if it's already locked.""" - user_blocked = is_user_already_locked(get_username(request)) + user_blocked = is_user_already_locked(username or get_username(request)) ip_blocked = is_source_ip_already_locked(get_ip(request)) if config.LOCKOUT_BY_IP_USERNAME: @@ -324,10 +324,11 @@ def is_already_locked(request, get_username=get_username_from_request): def check_request(request, login_unsuccessful, - get_username=get_username_from_request): + get_username=get_username_from_request, + username=None): """ check the request, and process results""" ip_address = get_ip(request) - username = get_username(request) + username = username or get_username(request) if not login_unsuccessful: # user logged in -- forget the failed attempts @@ -339,7 +340,8 @@ def check_request(request, login_unsuccessful, def add_login_attempt_to_db(request, login_valid, - get_username=get_username_from_request): + get_username=get_username_from_request, + username=None): """ Create a record for the login attempt If using celery call celery task, if not, call the method normally """ @@ -347,7 +349,7 @@ def add_login_attempt_to_db(request, login_valid, # If we don't want to store in the database, then don't proceed. return - username = get_username(request) + username = username or get_username(request) user_agent = request.META.get('HTTP_USER_AGENT', '')[:255] ip_address = get_ip(request)