mirror of
https://github.com/jazzband/django-axes.git
synced 2026-04-29 11:14:45 +00:00
add missing tests; refactor reset_request()
This commit is contained in:
parent
576e666c54
commit
71ee9239d9
3 changed files with 195 additions and 10 deletions
|
|
@ -1,12 +1,13 @@
|
|||
from unittest.mock import patch
|
||||
|
||||
from django.http import HttpRequest
|
||||
from django.test import override_settings
|
||||
from django.utils.timezone import now
|
||||
|
||||
from axes.attempts import get_cool_off_threshold
|
||||
from axes.models import AccessAttempt
|
||||
from axes.tests.base import AxesTestCase
|
||||
from axes.utils import reset
|
||||
from axes.utils import reset, reset_request
|
||||
|
||||
|
||||
class GetCoolOffThresholdTestCase(AxesTestCase):
|
||||
|
|
@ -44,3 +45,111 @@ class ResetTestCase(AxesTestCase):
|
|||
self.create_attempt(username=self.username)
|
||||
reset(username=self.username)
|
||||
self.assertFalse(AccessAttempt.objects.count())
|
||||
|
||||
|
||||
class ResetResponseTestCase(AxesTestCase):
|
||||
USERNAME_1 = "foo_username"
|
||||
USERNAME_2 = "bar_username"
|
||||
IP_1 = "127.1.0.1"
|
||||
IP_2 = "127.1.0.2"
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.create_attempt()
|
||||
self.create_attempt(username=self.USERNAME_1, ip_address=self.IP_1)
|
||||
self.create_attempt(username=self.USERNAME_1, ip_address=self.IP_2)
|
||||
self.create_attempt(username=self.USERNAME_2, ip_address=self.IP_1)
|
||||
self.create_attempt(username=self.USERNAME_2, ip_address=self.IP_2)
|
||||
self.request = HttpRequest()
|
||||
|
||||
def test_reset(self):
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 5)
|
||||
|
||||
def test_reset_ip(self):
|
||||
self.request.META["REMOTE_ADDR"] = self.IP_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 3)
|
||||
|
||||
def test_reset_username(self):
|
||||
self.request.GET["username"] = self.USERNAME_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 5)
|
||||
|
||||
def test_reset_ip_username(self):
|
||||
self.request.GET["username"] = self.USERNAME_1
|
||||
self.request.META["REMOTE_ADDR"] = self.IP_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 3)
|
||||
|
||||
@override_settings(AXES_ONLY_USER_FAILURES=True)
|
||||
def test_reset_user_failures(self):
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 5)
|
||||
|
||||
@override_settings(AXES_ONLY_USER_FAILURES=True)
|
||||
def test_reset_ip_user_failures(self):
|
||||
self.request.META["REMOTE_ADDR"] = self.IP_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 5)
|
||||
|
||||
@override_settings(AXES_ONLY_USER_FAILURES=True)
|
||||
def test_reset_username_user_failures(self):
|
||||
self.request.GET["username"] = self.USERNAME_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 3)
|
||||
|
||||
@override_settings(AXES_ONLY_USER_FAILURES=True)
|
||||
def test_reset_ip_username_user_failures(self):
|
||||
self.request.GET["username"] = self.USERNAME_1
|
||||
self.request.META["REMOTE_ADDR"] = self.IP_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 3)
|
||||
|
||||
@override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
|
||||
def test_reset_user_or_ip(self):
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 5)
|
||||
|
||||
@override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
|
||||
def test_reset_ip_user_or_ip(self):
|
||||
self.request.META["REMOTE_ADDR"] = self.IP_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 3)
|
||||
|
||||
@override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
|
||||
def test_reset_username_user_or_ip(self):
|
||||
self.request.GET["username"] = self.USERNAME_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 3)
|
||||
|
||||
@override_settings(AXES_LOCK_OUT_BY_USER_OR_IP=True)
|
||||
def test_reset_ip_username_user_or_ip(self):
|
||||
self.request.GET["username"] = self.USERNAME_1
|
||||
self.request.META["REMOTE_ADDR"] = self.IP_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 2)
|
||||
|
||||
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
|
||||
def test_reset_user_and_ip(self):
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 5)
|
||||
|
||||
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
|
||||
def test_reset_ip_user_and_ip(self):
|
||||
self.request.META["REMOTE_ADDR"] = self.IP_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 3)
|
||||
|
||||
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
|
||||
def test_reset_username_user_and_ip(self):
|
||||
self.request.GET["username"] = self.USERNAME_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 3)
|
||||
|
||||
@override_settings(AXES_LOCK_OUT_BY_USER_OR_AND=True)
|
||||
def test_reset_ip_username_user_and_ip(self):
|
||||
self.request.GET["username"] = self.USERNAME_1
|
||||
self.request.META["REMOTE_ADDR"] = self.IP_1
|
||||
reset_request(self.request)
|
||||
self.assertEquals(AccessAttempt.objects.count(), 3)
|
||||
|
|
|
|||
|
|
@ -129,6 +129,77 @@ class AxesHandlerBaseTestCase(AxesTestCase):
|
|||
)
|
||||
|
||||
|
||||
@override_settings(AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler")
|
||||
class ResetAttemptsTestCase(AxesHandlerBaseTestCase):
|
||||
""" Resetting attempts is currently implemented only for database handler """
|
||||
|
||||
USERNAME_1 = "foo_username"
|
||||
USERNAME_2 = "bar_username"
|
||||
IP_1 = "127.1.0.1"
|
||||
IP_2 = "127.1.0.2"
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.create_attempt()
|
||||
self.create_attempt(username=self.USERNAME_1, ip_address=self.IP_1)
|
||||
self.create_attempt(username=self.USERNAME_1, ip_address=self.IP_2)
|
||||
self.create_attempt(username=self.USERNAME_2, ip_address=self.IP_1)
|
||||
self.create_attempt(username=self.USERNAME_2, ip_address=self.IP_2)
|
||||
|
||||
def test_handler_reset_attempts(self):
|
||||
self.assertEqual(5, AxesProxyHandler.reset_attempts())
|
||||
self.assertFalse(AccessAttempt.objects.count())
|
||||
|
||||
def test_handler_reset_attempts_username(self):
|
||||
self.assertEqual(2, AxesProxyHandler.reset_attempts(username=self.USERNAME_1))
|
||||
self.assertEqual(AccessAttempt.objects.count(), 3)
|
||||
self.assertEqual(
|
||||
AccessAttempt.objects.filter(ip_address=self.USERNAME_1).count(), 0
|
||||
)
|
||||
|
||||
def test_handler_reset_attempts_ip(self):
|
||||
self.assertEqual(2, AxesProxyHandler.reset_attempts(ip_address=self.IP_1))
|
||||
self.assertEqual(AccessAttempt.objects.count(), 3)
|
||||
self.assertEqual(AccessAttempt.objects.filter(ip_address=self.IP_1).count(), 0)
|
||||
|
||||
def test_handler_reset_attempts_ip_and_username(self):
|
||||
self.assertEqual(
|
||||
1,
|
||||
AxesProxyHandler.reset_attempts(
|
||||
ip_address=self.IP_1, username=self.USERNAME_1
|
||||
),
|
||||
)
|
||||
self.assertEqual(AccessAttempt.objects.count(), 4)
|
||||
self.assertEqual(AccessAttempt.objects.filter(ip_address=self.IP_1).count(), 1)
|
||||
|
||||
self.create_attempt(username=self.USERNAME_1, ip_address=self.IP_1)
|
||||
self.assertEqual(
|
||||
1,
|
||||
AxesProxyHandler.reset_attempts(
|
||||
ip_address=self.IP_1, username=self.USERNAME_2
|
||||
),
|
||||
)
|
||||
self.assertEqual(
|
||||
1,
|
||||
AxesProxyHandler.reset_attempts(
|
||||
ip_address=self.IP_2, username=self.USERNAME_1
|
||||
),
|
||||
)
|
||||
|
||||
def test_handler_reset_attempts_ip_or_username(self):
|
||||
self.assertEqual(
|
||||
3,
|
||||
AxesProxyHandler.reset_attempts(
|
||||
ip_address=self.IP_1, username=self.USERNAME_1, ip_or_username=True
|
||||
),
|
||||
)
|
||||
self.assertEqual(AccessAttempt.objects.count(), 2)
|
||||
self.assertEqual(AccessAttempt.objects.filter(ip_address=self.IP_1).count(), 0)
|
||||
self.assertEqual(
|
||||
AccessAttempt.objects.filter(ip_address=self.USERNAME_1).count(), 0
|
||||
)
|
||||
|
||||
|
||||
@override_settings(
|
||||
AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler",
|
||||
AXES_COOLOFF_TIME=timedelta(seconds=2),
|
||||
|
|
|
|||
|
|
@ -7,6 +7,8 @@ and offers a backwards compatible import path.
|
|||
|
||||
from logging import getLogger
|
||||
|
||||
from django.http import HttpRequest
|
||||
|
||||
from axes.conf import settings
|
||||
from axes.handlers.proxy import AxesProxyHandler
|
||||
from axes.helpers import get_client_ip_address
|
||||
|
|
@ -26,7 +28,7 @@ def reset(ip: str = None, username: str = None, ip_or_username=False) -> int:
|
|||
)
|
||||
|
||||
|
||||
def reset_request(request) -> int:
|
||||
def reset_request(request: HttpRequest) -> int:
|
||||
"""
|
||||
Reset records that match IP or username, and return the count of removed attempts.
|
||||
|
||||
|
|
@ -37,16 +39,19 @@ def reset_request(request) -> int:
|
|||
ip = get_client_ip_address(request)
|
||||
username = request.GET.get("username", None)
|
||||
|
||||
ip_or_username = False
|
||||
ip_or_username = settings.AXES_LOCK_OUT_BY_USER_OR_IP
|
||||
if settings.AXES_ONLY_USER_FAILURES:
|
||||
ip = None
|
||||
else:
|
||||
if settings.AXES_LOCK_OUT_BY_USER_OR_IP:
|
||||
ip_or_username = True
|
||||
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
|
||||
ip_or_username = False
|
||||
else:
|
||||
username = None
|
||||
elif not (
|
||||
settings.AXES_LOCK_OUT_BY_USER_OR_IP
|
||||
or settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP
|
||||
):
|
||||
username = None
|
||||
|
||||
if not ip and not username:
|
||||
return (
|
||||
0
|
||||
) # We don't want to reset everything, if there is some wrong request parameter
|
||||
|
||||
# if settings.AXES_USE_USER_AGENT:
|
||||
# TODO: reset based on user_agent?
|
||||
|
|
|
|||
Loading…
Reference in a new issue