mirror of
https://github.com/jazzband/django-axes.git
synced 2026-03-16 22:30:23 +00:00
refactor: handle Retry-After in middleware with opt-in setting
This commit is contained in:
parent
ef91ba9b61
commit
a9e9704318
6 changed files with 99 additions and 52 deletions
|
|
@ -137,6 +137,11 @@ settings.AXES_CLIENT_STR_CALLABLE = getattr(settings, "AXES_CLIENT_STR_CALLABLE"
|
|||
# set the HTTP response code given by too many requests
|
||||
settings.AXES_HTTP_RESPONSE_CODE = getattr(settings, "AXES_HTTP_RESPONSE_CODE", 429)
|
||||
|
||||
# if True, set Retry-After header for lockout responses with cool off configured
|
||||
settings.AXES_ENABLE_RETRY_AFTER_HEADER = getattr(
|
||||
settings, "AXES_ENABLE_RETRY_AFTER_HEADER", False
|
||||
)
|
||||
|
||||
# If True, a failed login attempt during lockout will reset the cool off period
|
||||
settings.AXES_RESET_COOL_OFF_ON_FAILURE_DURING_LOCKOUT = getattr(
|
||||
settings, "AXES_RESET_COOL_OFF_ON_FAILURE_DURING_LOCKOUT", True
|
||||
|
|
|
|||
|
|
@ -461,12 +461,6 @@ def get_lockout_message() -> str:
|
|||
return settings.AXES_PERMALOCK_MESSAGE
|
||||
|
||||
|
||||
def _set_retry_after_header(response: HttpResponse, request: HttpRequest) -> None:
|
||||
cool_off = get_cool_off(request)
|
||||
if cool_off is not None:
|
||||
response["Retry-After"] = str(int(cool_off.total_seconds()))
|
||||
|
||||
|
||||
def get_lockout_response(
|
||||
request: HttpRequest,
|
||||
original_response: Optional[HttpResponse] = None,
|
||||
|
|
@ -519,15 +513,10 @@ def get_lockout_response(
|
|||
json_response["Access-Control-Allow-Headers"] = (
|
||||
"Origin, Content-Type, Accept, Authorization, x-requested-with"
|
||||
)
|
||||
_set_retry_after_header(json_response, request)
|
||||
return json_response
|
||||
|
||||
if settings.AXES_LOCKOUT_TEMPLATE:
|
||||
response = render(
|
||||
request, settings.AXES_LOCKOUT_TEMPLATE, context, status=status
|
||||
)
|
||||
_set_retry_after_header(response, request)
|
||||
return response
|
||||
return render(request, settings.AXES_LOCKOUT_TEMPLATE, context, status=status)
|
||||
|
||||
if settings.AXES_LOCKOUT_URL:
|
||||
lockout_url = settings.AXES_LOCKOUT_URL
|
||||
|
|
@ -535,9 +524,7 @@ def get_lockout_response(
|
|||
url = f"{lockout_url}?{query_string}"
|
||||
return redirect(url)
|
||||
|
||||
response = HttpResponse(get_lockout_message(), status=status)
|
||||
_set_retry_after_header(response, request)
|
||||
return response
|
||||
return HttpResponse(get_lockout_message(), status=status)
|
||||
|
||||
|
||||
def is_ip_address_in_whitelist(ip_address: str) -> bool:
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ from asgiref.sync import iscoroutinefunction, markcoroutinefunction, sync_to_asy
|
|||
from django.conf import settings
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
|
||||
from axes.helpers import get_lockout_response
|
||||
from axes.helpers import get_cool_off, get_lockout_response
|
||||
|
||||
|
||||
class AxesMiddleware:
|
||||
|
|
@ -39,6 +39,22 @@ class AxesMiddleware:
|
|||
if iscoroutinefunction(self.get_response):
|
||||
markcoroutinefunction(self)
|
||||
|
||||
@staticmethod
|
||||
def _set_retry_after_header(
|
||||
response: HttpResponse, request: HttpRequest
|
||||
) -> HttpResponse:
|
||||
if not settings.AXES_ENABLE_RETRY_AFTER_HEADER:
|
||||
return response
|
||||
|
||||
if settings.AXES_LOCKOUT_CALLABLE or settings.AXES_LOCKOUT_URL:
|
||||
return response
|
||||
|
||||
cool_off = get_cool_off(request)
|
||||
if cool_off is not None:
|
||||
response["Retry-After"] = str(int(cool_off.total_seconds()))
|
||||
|
||||
return response
|
||||
|
||||
def __call__(self, request: HttpRequest) -> HttpResponse:
|
||||
# Exit out to async mode, if needed
|
||||
if iscoroutinefunction(self):
|
||||
|
|
@ -49,6 +65,7 @@ class AxesMiddleware:
|
|||
if getattr(request, "axes_locked_out", None):
|
||||
credentials = getattr(request, "axes_credentials", None)
|
||||
response = get_lockout_response(request, response, credentials) # type: ignore
|
||||
response = self._set_retry_after_header(response, request)
|
||||
|
||||
return response
|
||||
|
||||
|
|
@ -60,6 +77,9 @@ class AxesMiddleware:
|
|||
credentials = getattr(request, "axes_credentials", None)
|
||||
response = await sync_to_async(
|
||||
get_lockout_response, thread_sensitive=True
|
||||
)(request, credentials) # type: ignore
|
||||
)(
|
||||
request, response, credentials
|
||||
) # type: ignore
|
||||
response = self._set_retry_after_header(response, request)
|
||||
|
||||
return response
|
||||
|
|
|
|||
|
|
@ -81,17 +81,18 @@ The following ``settings.py`` options are available for customizing Axes behavio
|
|||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_HTTP_RESPONSE_CODE | 429 | Sets the http response code returned when ``AXES_FAILURE_LIMIT`` is reached. For example: ``AXES_HTTP_RESPONSE_CODE = 403`` |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_ENABLE_RETRY_AFTER_HEADER | False | If ``True``, ``AxesMiddleware`` sets the ``Retry-After`` HTTP header on lockout responses when ``AXES_COOLOFF_TIME`` is configured. Set to ``False`` to disable this header. |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_RESET_COOL_OFF_ON_FAILURE_DURING_LOCKOUT | True | If ``True``, a failed login attempt during lockout will reset the cool off period. |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
| AXES_LOCKOUT_PARAMETERS | ["ip_address"] | A list of parameters that Axes uses to lock out users. It can also be callable, which takes an http request or AccesAttempt object and credentials and returns a list of parameters. Each parameter can be a string (a single parameter) or a list of strings (a combined parameter). For example, if you configure ``AXES_LOCKOUT_PARAMETERS = ["ip_address", ["username", "user_agent"]]``, axes will block clients by ip and/or username and user agent combination. See :ref:`customizing-lockout-parameters` for more details. |
|
||||
+------------------------------------------------------+----------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|
||||
|
||||
.. note::
|
||||
When ``AXES_COOLOFF_TIME`` is configured, lockout responses automatically include a
|
||||
``Retry-After`` HTTP header (`RFC 7231 <https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.3>`_)
|
||||
with the cool-off duration in seconds. This applies to JSON, template-rendered, and
|
||||
plain-text lockout responses, but not to redirects (``AXES_LOCKOUT_URL``) or custom
|
||||
callables (``AXES_LOCKOUT_CALLABLE``).
|
||||
If ``AXES_ENABLE_RETRY_AFTER_HEADER`` is enabled and ``AXES_COOLOFF_TIME`` is configured,
|
||||
``AxesMiddleware`` adds a ``Retry-After`` HTTP header (`RFC 7231 <https://datatracker.ietf.org/doc/html/rfc7231#section-7.1.3>`_)
|
||||
with the cool-off duration in seconds. This header is not added for redirects
|
||||
(``AXES_LOCKOUT_URL``) or custom lockout responses (``AXES_LOCKOUT_CALLABLE``).
|
||||
|
||||
The configuration option precedences for the access attempt monitoring are:
|
||||
|
||||
|
|
|
|||
|
|
@ -946,35 +946,6 @@ class LockoutResponseTestCase(AxesTestCase):
|
|||
response = get_lockout_response(request=self.request)
|
||||
self.assertEqual(type(response), HttpResponse)
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=2)
|
||||
def test_get_lockout_response_retry_after_header(self):
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertEqual(response["Retry-After"], "7200")
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=None)
|
||||
def test_get_lockout_response_retry_after_no_cooloff(self):
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=2)
|
||||
def test_get_lockout_response_retry_after_json(self):
|
||||
self.request.META["HTTP_X_REQUESTED_WITH"] = "XMLHttpRequest"
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertEqual(response["Retry-After"], "7200")
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=2, AXES_LOCKOUT_TEMPLATE="example.html")
|
||||
@patch("axes.helpers.render")
|
||||
def test_get_lockout_response_retry_after_template(self, mock_render):
|
||||
mock_render.return_value = HttpResponse(status=429)
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertEqual(response["Retry-After"], "7200")
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=2, AXES_LOCKOUT_URL="https://example.com")
|
||||
def test_get_lockout_response_retry_after_redirect_absent(self):
|
||||
response = get_lockout_response(request=self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
|
||||
def mock_get_cool_off_str(req):
|
||||
return timedelta(seconds=30)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
from datetime import timedelta
|
||||
|
||||
from django.conf import settings
|
||||
from django.http import HttpResponse, HttpRequest
|
||||
from django.test import override_settings
|
||||
|
|
@ -10,6 +12,10 @@ def get_username(request, credentials: dict) -> str:
|
|||
return credentials.get(settings.AXES_USERNAME_FORM_FIELD)
|
||||
|
||||
|
||||
def get_custom_lockout_response(request, original_response, credentials):
|
||||
return HttpResponse(status=429)
|
||||
|
||||
|
||||
class MiddlewareTestCase(AxesTestCase):
|
||||
STATUS_SUCCESS = 200
|
||||
STATUS_LOCKOUT = 429
|
||||
|
|
@ -33,11 +39,68 @@ class MiddlewareTestCase(AxesTestCase):
|
|||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertEqual(response.status_code, self.STATUS_LOCKOUT)
|
||||
|
||||
@override_settings(
|
||||
AXES_COOLOFF_TIME=timedelta(seconds=120),
|
||||
AXES_ENABLE_RETRY_AFTER_HEADER=True,
|
||||
)
|
||||
def test_lockout_response_sets_retry_after_header(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
return HttpResponse()
|
||||
|
||||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertEqual(response["Retry-After"], "120")
|
||||
|
||||
@override_settings(AXES_COOLOFF_TIME=None)
|
||||
def test_lockout_response_without_cooloff_does_not_set_retry_after_header(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
return HttpResponse()
|
||||
|
||||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
@override_settings(
|
||||
AXES_COOLOFF_TIME=timedelta(seconds=120),
|
||||
AXES_ENABLE_RETRY_AFTER_HEADER=False,
|
||||
)
|
||||
def test_lockout_response_respects_retry_after_toggle(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
return HttpResponse()
|
||||
|
||||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
@override_settings(
|
||||
AXES_COOLOFF_TIME=timedelta(seconds=120),
|
||||
AXES_LOCKOUT_URL="https://example.com",
|
||||
)
|
||||
def test_lockout_redirect_response_does_not_set_retry_after_header(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
return HttpResponse()
|
||||
|
||||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
@override_settings(
|
||||
AXES_COOLOFF_TIME=timedelta(seconds=120),
|
||||
AXES_LOCKOUT_CALLABLE="tests.test_middleware.get_custom_lockout_response",
|
||||
)
|
||||
def test_lockout_callable_response_does_not_set_retry_after_header(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
return HttpResponse()
|
||||
|
||||
response = AxesMiddleware(get_response)(self.request)
|
||||
self.assertFalse(response.has_header("Retry-After"))
|
||||
|
||||
@override_settings(AXES_USERNAME_CALLABLE="tests.test_middleware.get_username")
|
||||
def test_lockout_response_with_axes_callable_username(self):
|
||||
def get_response(request):
|
||||
request.axes_locked_out = True
|
||||
request.axes_credentials = {settings.AXES_USERNAME_FORM_FIELD: 'username'}
|
||||
request.axes_credentials = {settings.AXES_USERNAME_FORM_FIELD: "username"}
|
||||
|
||||
return HttpResponse()
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue