Fix watch_login with custom username (#228)

Previously using of custom `get_username` function with disabled IP
lockout caused unhandled exception
Exception("Invalid state requested")
This commit is contained in:
Roman Gorbil 2023-11-09 19:41:49 +07:00 committed by GitHub
parent ba548fa9c3
commit 1e0aa91952
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 44 additions and 8 deletions

View file

@ -18,8 +18,10 @@ def watch_login(status_code=302, msg="", get_username=utils.get_username_from_re
# if the request is currently under lockout, do not proceed to the
# login function, go directly to lockout url, do not pass go,
# do not collect messages about this login attempt
if utils.is_already_locked(request):
return utils.lockout_response(request)
username = get_username(request)
if utils.is_already_locked(request, username=username):
return utils.lockout_response(request, username=username)
# call the login function
response = func(request, *args, **kwargs)
@ -44,13 +46,13 @@ def watch_login(status_code=302, msg="", get_username=utils.get_username_from_re
# ideally make this background task, but to keep simple,
# keeping it inline for now.
utils.add_login_attempt_to_db(
request, not login_unsuccessful, get_username
request, not login_unsuccessful, username=username
)
if utils.check_request(request, login_unsuccessful, get_username):
if utils.check_request(request, login_unsuccessful, username=username):
return response
return utils.lockout_response(request)
return utils.lockout_response(request, username=username)
return response

View file

@ -865,6 +865,41 @@ class AccessAttemptTest(DefenderTestCase):
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, [])
@patch("defender.config.USERNAME_FAILURE_LIMIT", 3)
@patch("defender.config.DISABLE_IP_LOCKOUT", True)
def test_login_blocked_for_non_standard_login_views_different_username(self):
"""
Check that a view with custom username blocked correctly
"""
@watch_login(status_code=401, get_username=lambda request: request.POST.get("email"))
def fake_api_401_login_different_username(request):
""" Fake the api login with 401 """
return HttpResponse("Invalid", status=401)
wrong_email = "email@localhost"
request_factory = RequestFactory()
request = request_factory.post("api/login", data={"email": wrong_email})
request.user = AnonymousUser()
request.session = SessionStore()
for _ in range(3):
fake_api_401_login_different_username(request)
data_out = utils.get_blocked_usernames()
self.assertEqual(data_out, [])
fake_api_401_login_different_username(request)
data_out = utils.get_blocked_usernames()
self.assertEqual(data_out, [wrong_email])
# Ensure that `watch_login` correctly extract username from request
# during `is_already_locked` check and don't cause 500 errors
status_code = fake_api_401_login_different_username(request)
self.assertNotEqual(status_code, 500)
@patch("defender.config.ATTEMPT_COOLOFF_TIME", "a")
def test_bad_attempt_cooloff_configuration(self):
self.assertRaises(Exception)

View file

@ -63,7 +63,7 @@ def strip_port_number(ip_address_string):
"""
If it's not a valid IP address, we prefer to return
the string as-is instead of returning a potentially
the string as-is instead of returning a potentially
corrupted string:
"""
if is_valid_ip(ip_address):
@ -360,10 +360,9 @@ def reset_failed_attempts(ip_address=None, username=None):
pipe.execute()
def lockout_response(request):
def lockout_response(request, username):
""" if we are locked out, here is the response """
ip_address = get_ip(request)
username = get_username_from_request(request)
if config.LOCKOUT_TEMPLATE:
cooloff_time = get_lockout_cooloff_time(ip_address=ip_address, username=username)
context = {