utils: add username kwarg for providing username directly rather than via callback arg (#107)

This commit is contained in:
William Boman 2018-02-01 12:27:38 +01:00 committed by Ken Cochrane
parent cde53c5315
commit 2b6374f1da
2 changed files with 28 additions and 8 deletions

View file

@ -912,3 +912,21 @@ class TestUtils(DefenderTestCase):
self.assertTrue(utils.is_source_ip_already_locked(ip))
utils.unblock_ip(ip)
self.assertFalse(utils.is_source_ip_already_locked(ip))
def test_username_argument_precedence(self):
""" test that the optional username argument has highest precedence when provided """
request_factory = RequestFactory()
request = request_factory.get(ADMIN_LOGIN_URL)
request.user = AnonymousUser()
request.session = SessionStore()
username = 'johndoe'
utils.block_username(request.user.username)
self.assertFalse(utils.is_already_locked(request, username=username))
utils.check_request(request, True, username=username)
self.assertEqual(utils.get_user_attempts(request, username=username), 1)
utils.add_login_attempt_to_db(request, True, username=username)
self.assertEqual(AccessAttempt.objects.filter(username=username).count(), 1)

View file

@ -135,12 +135,12 @@ def get_username_from_request(request):
return None
def get_user_attempts(request, get_username=get_username_from_request):
def get_user_attempts(request, get_username=get_username_from_request, username=None):
""" Returns number of access attempts for this ip, username
"""
ip_address = get_ip(request)
username = lower_username(get_username(request))
username = lower_username(username or get_username(request))
# get by IP
ip_count = REDIS_SERVER.get(get_ip_attempt_cache_key(ip_address))
@ -310,10 +310,10 @@ def is_source_ip_already_locked(ip_address):
return REDIS_SERVER.get(get_ip_blocked_cache_key(ip_address))
def is_already_locked(request, get_username=get_username_from_request):
def is_already_locked(request, get_username=get_username_from_request, username=None):
"""Parse the username & IP from the request, and see if it's
already locked."""
user_blocked = is_user_already_locked(get_username(request))
user_blocked = is_user_already_locked(username or get_username(request))
ip_blocked = is_source_ip_already_locked(get_ip(request))
if config.LOCKOUT_BY_IP_USERNAME:
@ -324,10 +324,11 @@ def is_already_locked(request, get_username=get_username_from_request):
def check_request(request, login_unsuccessful,
get_username=get_username_from_request):
get_username=get_username_from_request,
username=None):
""" check the request, and process results"""
ip_address = get_ip(request)
username = get_username(request)
username = username or get_username(request)
if not login_unsuccessful:
# user logged in -- forget the failed attempts
@ -339,7 +340,8 @@ def check_request(request, login_unsuccessful,
def add_login_attempt_to_db(request, login_valid,
get_username=get_username_from_request):
get_username=get_username_from_request,
username=None):
""" Create a record for the login attempt If using celery call celery
task, if not, call the method normally """
@ -347,7 +349,7 @@ def add_login_attempt_to_db(request, login_valid,
# If we don't want to store in the database, then don't proceed.
return
username = get_username(request)
username = username or get_username(request)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
ip_address = get_ip(request)