Adjust commentation and log messages

This commit is contained in:
Aleksi Häkli 2021-06-29 17:05:43 +03:00
parent f079c48bb1
commit ac86d4b213
3 changed files with 34 additions and 28 deletions

View file

@ -82,7 +82,7 @@ class AxesCacheHandler(AbstractAxesHandler, AxesBaseHandler):
username = get_client_username(request, credentials)
if settings.AXES_ONLY_USER_FAILURES and username is None:
log.warning(
"AXES: Username is None and AXES_ONLY_USER_FAILURES is enable, New record won't be created."
"AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created."
)
return

View file

@ -86,11 +86,11 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
self, sender, credentials: dict, request=None, **kwargs
): # pylint: disable=too-many-locals
"""
When user login fails, save AccessAttempt record in database and lock user out if necessary.
:raises AxesSignalPermissionDenied: if user should be locked out.
When user login fails, save AccessAttempt record in database, mark request with lockout attribute and emit lockout signal.
"""
log.info("AXES: User login failed, running database handler for failure.")
if request is None:
log.error(
"AXES: AxesDatabaseHandler.user_login_failed does not function without a request."
@ -108,7 +108,7 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
request.axes_path_info,
)
# This replaces null byte chars that crash saving failures, meaning an attacker doesn't get locked out.
# This replaces null byte chars that crash saving failures.
get_data = get_query_str(request.GET).replace("\0", "0x00")
post_data = get_query_str(request.POST).replace("\0", "0x00")
@ -116,16 +116,12 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
log.info("AXES: Login failed from whitelisted client %s.", client_str)
return
# 2. database query: Calculate the current maximum failure number from the existing attempts
failures_since_start = 1 + self.get_failures(request, credentials)
# 2. database query: Get or create access record with the new failure data
if settings.AXES_ONLY_USER_FAILURES and username is None:
log.warning(
"AXES: Username is None and AXES_ONLY_USER_FAILURES is enable, New record won't be created."
"AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created."
)
else:
separator = "\n---------\n"
# 3. database query: Insert or update access records with the new failure data
attempt, created = AccessAttempt.objects.get_or_create(
username=username,
ip_address=request.axes_ip_address,
@ -136,29 +132,26 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
"http_accept": request.axes_http_accept,
"path_info": request.axes_path_info,
"failures_since_start": 1,
"attempt_time": request.axes_attempt_time
}
"attempt_time": request.axes_attempt_time,
},
)
# Update failed attempt information but do not touch the username, IP address, or user agent fields,
# because attackers can request the site with multiple different configurations
# in order to bypass the defense mechanisms that are used by the site.
# Record failed attempt with all the relevant information.
# Filtering based on username, IP address and user agent handled elsewhere,
# and this handler just records the available information for further use.
if created:
log.warning(
"AXES: New login failure by %s. Creating new record in the database.",
"AXES: New login failure by %s. Created new record in the database.",
client_str,
)
# 3. database query if there were previous attempts in the database
# Update failed attempt information but do not touch the username, IP address, or user agent fields,
# because attackers can request the site with multiple different configurations
# in order to bypass the defense mechanisms that are used by the site.
else:
log.warning(
"AXES: Repeated login failure by %s. Count = %d of %d. Updating existing record in the database.",
client_str,
attempt.failures_since_start + 1,
get_failure_limit(request, credentials),
)
separator = "\n---------\n"
attempt.get_data = Concat("get_data", Value(separator + get_data))
attempt.post_data = Concat("post_data", Value(separator + post_data))
attempt.http_accept = request.axes_http_accept
@ -167,6 +160,16 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
attempt.attempt_time = request.axes_attempt_time
attempt.save()
log.warning(
"AXES: Repeated login failure by %s. Count = %d of %d. Updated existing record in the database.",
client_str,
attempt.failures_since_start,
get_failure_limit(request, credentials),
)
# 3. or 4. database query: Calculate the current maximum failure number from the existing attempts
failures_since_start = self.get_failures(request, credentials)
if (
settings.AXES_LOCK_OUT_AT_FAILURE
and failures_since_start >= get_failure_limit(request, credentials)
@ -176,7 +179,6 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler):
)
request.axes_locked_out = True
user_locked_out.send(
"axes",
request=request,

View file

@ -209,7 +209,11 @@ class ResetAttemptsTestCase(AxesHandlerBaseTestCase):
AXES_COOLOFF_TIME=timedelta(seconds=2),
AXES_RESET_ON_SUCCESS=True,
)
@mark.xfail(python_implementation() == "PyPy", reason="PyPy implementation is flaky for this test", strict=False)
@mark.xfail(
python_implementation() == "PyPy",
reason="PyPy implementation is flaky for this test",
strict=False,
)
class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase):
def test_handler_reset_attempts(self):
self.create_attempt()
@ -274,7 +278,7 @@ class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase):
attempt = AccessAttempt.objects.all()
self.assertEqual(0, AccessAttempt.objects.count())
log.warning.assert_called_with(
"AXES: Username is None and AXES_ONLY_USER_FAILURES is enable, New record won't be created."
"AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created."
)
def test_user_login_failed_with_none_username(self):
@ -462,7 +466,7 @@ class AxesCacheHandlerTestCase(AxesHandlerBaseTestCase):
AxesProxyHandler.user_login_failed(sender, credentials, self.request)
self.assertFalse(cache_set.called)
log.warning.assert_called_with(
"AXES: Username is None and AXES_ONLY_USER_FAILURES is enable, New record won't be created."
"AXES: Username is None and AXES_ONLY_USER_FAILURES is enabled, new record will NOT be created."
)
@patch.object(cache, "set")