diff --git a/defender/decorators.py b/defender/decorators.py index 3612f2a..b11b80e 100644 --- a/defender/decorators.py +++ b/defender/decorators.py @@ -18,8 +18,10 @@ def watch_login(status_code=302, msg="", get_username=utils.get_username_from_re # if the request is currently under lockout, do not proceed to the # login function, go directly to lockout url, do not pass go, # do not collect messages about this login attempt - if utils.is_already_locked(request): - return utils.lockout_response(request) + username = get_username(request) + + if utils.is_already_locked(request, username=username): + return utils.lockout_response(request, username=username) # call the login function response = func(request, *args, **kwargs) @@ -44,13 +46,13 @@ def watch_login(status_code=302, msg="", get_username=utils.get_username_from_re # ideally make this background task, but to keep simple, # keeping it inline for now. utils.add_login_attempt_to_db( - request, not login_unsuccessful, get_username + request, not login_unsuccessful, username=username ) - if utils.check_request(request, login_unsuccessful, get_username): + if utils.check_request(request, login_unsuccessful, username=username): return response - return utils.lockout_response(request) + return utils.lockout_response(request, username=username) return response diff --git a/defender/tests.py b/defender/tests.py index 0c14e00..5cee9a3 100644 --- a/defender/tests.py +++ b/defender/tests.py @@ -865,6 +865,41 @@ class AccessAttemptTest(DefenderTestCase): data_out = utils.get_blocked_ips() self.assertEqual(data_out, []) + @patch("defender.config.USERNAME_FAILURE_LIMIT", 3) + @patch("defender.config.DISABLE_IP_LOCKOUT", True) + def test_login_blocked_for_non_standard_login_views_different_username(self): + """ + Check that a view with custom username blocked correctly + """ + + @watch_login(status_code=401, get_username=lambda request: request.POST.get("email")) + def fake_api_401_login_different_username(request): + """ Fake the api login with 401 """ + return HttpResponse("Invalid", status=401) + + wrong_email = "email@localhost" + + request_factory = RequestFactory() + request = request_factory.post("api/login", data={"email": wrong_email}) + request.user = AnonymousUser() + request.session = SessionStore() + + for _ in range(3): + fake_api_401_login_different_username(request) + + data_out = utils.get_blocked_usernames() + self.assertEqual(data_out, []) + + fake_api_401_login_different_username(request) + + data_out = utils.get_blocked_usernames() + self.assertEqual(data_out, [wrong_email]) + + # Ensure that `watch_login` correctly extract username from request + # during `is_already_locked` check and don't cause 500 errors + status_code = fake_api_401_login_different_username(request) + self.assertNotEqual(status_code, 500) + @patch("defender.config.ATTEMPT_COOLOFF_TIME", "a") def test_bad_attempt_cooloff_configuration(self): self.assertRaises(Exception) diff --git a/defender/utils.py b/defender/utils.py index fb35fd0..6b63fa7 100644 --- a/defender/utils.py +++ b/defender/utils.py @@ -63,7 +63,7 @@ def strip_port_number(ip_address_string): """ If it's not a valid IP address, we prefer to return - the string as-is instead of returning a potentially + the string as-is instead of returning a potentially corrupted string: """ if is_valid_ip(ip_address): @@ -360,10 +360,9 @@ def reset_failed_attempts(ip_address=None, username=None): pipe.execute() -def lockout_response(request): +def lockout_response(request, username): """ if we are locked out, here is the response """ ip_address = get_ip(request) - username = get_username_from_request(request) if config.LOCKOUT_TEMPLATE: cooloff_time = get_lockout_cooloff_time(ip_address=ip_address, username=username) context = {