diff --git a/axes/handlers/cache.py b/axes/handlers/cache.py index b5bf6fc..dd266fb 100644 --- a/axes/handlers/cache.py +++ b/axes/handlers/cache.py @@ -82,7 +82,7 @@ class AxesCacheHandler(AbstractAxesHandler, AxesBaseHandler): username = get_client_username(request, credentials) if settings.AXES_ONLY_USER_FAILURES and username is None: log.warning( - "AXES: Username is None and AXES_ONLY_USER_FAILURES is enable, New record won't be created." + "AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created." ) return diff --git a/axes/handlers/database.py b/axes/handlers/database.py index d3ea5b7..912449d 100644 --- a/axes/handlers/database.py +++ b/axes/handlers/database.py @@ -86,11 +86,11 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler): self, sender, credentials: dict, request=None, **kwargs ): # pylint: disable=too-many-locals """ - When user login fails, save AccessAttempt record in database and lock user out if necessary. - - :raises AxesSignalPermissionDenied: if user should be locked out. + When user login fails, save AccessAttempt record in database, mark request with lockout attribute and emit lockout signal. """ + log.info("AXES: User login failed, running database handler for failure.") + if request is None: log.error( "AXES: AxesDatabaseHandler.user_login_failed does not function without a request." @@ -108,7 +108,7 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler): request.axes_path_info, ) - # This replaces null byte chars that crash saving failures, meaning an attacker doesn't get locked out. + # This replaces null byte chars that crash saving failures. get_data = get_query_str(request.GET).replace("\0", "0x00") post_data = get_query_str(request.POST).replace("\0", "0x00") @@ -116,16 +116,12 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler): log.info("AXES: Login failed from whitelisted client %s.", client_str) return - # 2. database query: Calculate the current maximum failure number from the existing attempts - failures_since_start = 1 + self.get_failures(request, credentials) - + # 2. database query: Get or create access record with the new failure data if settings.AXES_ONLY_USER_FAILURES and username is None: log.warning( - "AXES: Username is None and AXES_ONLY_USER_FAILURES is enable, New record won't be created." + "AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created." ) else: - separator = "\n---------\n" - # 3. database query: Insert or update access records with the new failure data attempt, created = AccessAttempt.objects.get_or_create( username=username, ip_address=request.axes_ip_address, @@ -136,29 +132,26 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler): "http_accept": request.axes_http_accept, "path_info": request.axes_path_info, "failures_since_start": 1, - "attempt_time": request.axes_attempt_time - } + "attempt_time": request.axes_attempt_time, + }, ) - # Update failed attempt information but do not touch the username, IP address, or user agent fields, - # because attackers can request the site with multiple different configurations - # in order to bypass the defense mechanisms that are used by the site. - # Record failed attempt with all the relevant information. # Filtering based on username, IP address and user agent handled elsewhere, # and this handler just records the available information for further use. if created: log.warning( - "AXES: New login failure by %s. Creating new record in the database.", + "AXES: New login failure by %s. Created new record in the database.", client_str, ) + + # 3. database query if there were previous attempts in the database + # Update failed attempt information but do not touch the username, IP address, or user agent fields, + # because attackers can request the site with multiple different configurations + # in order to bypass the defense mechanisms that are used by the site. else: - log.warning( - "AXES: Repeated login failure by %s. Count = %d of %d. Updating existing record in the database.", - client_str, - attempt.failures_since_start + 1, - get_failure_limit(request, credentials), - ) + separator = "\n---------\n" + attempt.get_data = Concat("get_data", Value(separator + get_data)) attempt.post_data = Concat("post_data", Value(separator + post_data)) attempt.http_accept = request.axes_http_accept @@ -167,6 +160,16 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler): attempt.attempt_time = request.axes_attempt_time attempt.save() + log.warning( + "AXES: Repeated login failure by %s. Count = %d of %d. Updated existing record in the database.", + client_str, + attempt.failures_since_start, + get_failure_limit(request, credentials), + ) + + # 3. or 4. database query: Calculate the current maximum failure number from the existing attempts + failures_since_start = self.get_failures(request, credentials) + if ( settings.AXES_LOCK_OUT_AT_FAILURE and failures_since_start >= get_failure_limit(request, credentials) @@ -176,7 +179,6 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler): ) request.axes_locked_out = True - user_locked_out.send( "axes", request=request, diff --git a/tests/test_handlers.py b/tests/test_handlers.py index 46aff04..b2f557d 100644 --- a/tests/test_handlers.py +++ b/tests/test_handlers.py @@ -209,7 +209,11 @@ class ResetAttemptsTestCase(AxesHandlerBaseTestCase): AXES_COOLOFF_TIME=timedelta(seconds=2), AXES_RESET_ON_SUCCESS=True, ) -@mark.xfail(python_implementation() == "PyPy", reason="PyPy implementation is flaky for this test", strict=False) +@mark.xfail( + python_implementation() == "PyPy", + reason="PyPy implementation is flaky for this test", + strict=False, +) class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase): def test_handler_reset_attempts(self): self.create_attempt() @@ -274,7 +278,7 @@ class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase): attempt = AccessAttempt.objects.all() self.assertEqual(0, AccessAttempt.objects.count()) log.warning.assert_called_with( - "AXES: Username is None and AXES_ONLY_USER_FAILURES is enable, New record won't be created." + "AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created." ) def test_user_login_failed_with_none_username(self): @@ -462,7 +466,7 @@ class AxesCacheHandlerTestCase(AxesHandlerBaseTestCase): AxesProxyHandler.user_login_failed(sender, credentials, self.request) self.assertFalse(cache_set.called) log.warning.assert_called_with( - "AXES: Username is None and AXES_ONLY_USER_FAILURES is enable, New record won't be created." + "AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created." ) @patch.object(cache, "set")