mirror of
https://github.com/jazzband/django-axes.git
synced 2026-03-16 22:30:23 +00:00
parent
23c2ef6433
commit
d8c6632384
7 changed files with 94 additions and 21 deletions
|
|
@ -23,6 +23,24 @@ class AxesHandler: # pylint: disable=unused-argument
|
|||
.. note:: This is a virtual class and **can not be used without specialization**.
|
||||
"""
|
||||
|
||||
def reset_attempts(self, *, ip_address: str = None, username: str = None) -> int:
|
||||
"""
|
||||
Resets access attempts that match the given IP address or username.
|
||||
|
||||
:raises NotImplementedError: if the handler does not support resetting attempts.
|
||||
"""
|
||||
|
||||
raise NotImplementedError('Reset for access attempts is not supported on this backend')
|
||||
|
||||
def reset_logs(self, *, age_days: int = None) -> int:
|
||||
"""
|
||||
Resets access logs that are older than given number of days.
|
||||
|
||||
:raises NotImplementedError: if the handler does not support resetting logs.
|
||||
"""
|
||||
|
||||
raise NotImplementedError('Reset for access logs is not supported on this backend')
|
||||
|
||||
def is_allowed(self, request, credentials: dict = None) -> bool:
|
||||
"""
|
||||
Checks if the user is allowed to access or use given functionality such as a login view or authentication.
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ from logging import getLogger
|
|||
|
||||
from django.db.models import Max, Value
|
||||
from django.db.models.functions import Concat
|
||||
from django.utils import timezone
|
||||
|
||||
from axes.attempts import (
|
||||
clean_expired_user_attempts,
|
||||
|
|
@ -30,6 +31,30 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
|
|||
Signal handler implementation that records user login attempts to database and locks users out if necessary.
|
||||
"""
|
||||
|
||||
def reset_attempts(self, *, ip_address: str = None, username: str = None) -> int:
|
||||
attempts = AccessAttempt.objects.all()
|
||||
|
||||
if ip_address:
|
||||
attempts = attempts.filter(ip_address=ip_address)
|
||||
if username:
|
||||
attempts = attempts.filter(username=username)
|
||||
|
||||
count, _ = attempts.delete()
|
||||
log.info('AXES: Reset %d access attempts from database.', count)
|
||||
|
||||
return count
|
||||
|
||||
def reset_logs(self, *, age_days: int = None) -> int:
|
||||
if age_days is None:
|
||||
count, _ = AccessLog.objects.all().delete()
|
||||
log.info('AXES: Reset all %d access logs from database.', count)
|
||||
else:
|
||||
limit = timezone.now() - timezone.timedelta(days=age_days)
|
||||
count, _ = AccessLog.objects.filter(attempt_time__lte=limit).delete()
|
||||
log.info('AXES: Reset %d access logs older than %d days from database.', count, age_days)
|
||||
|
||||
return count
|
||||
|
||||
def get_failures(self, request, credentials: dict = None) -> int:
|
||||
attempts = get_user_attempts(request, credentials)
|
||||
return attempts.aggregate(Max('failures_since_start'))['failures_since_start__max'] or 0
|
||||
|
|
|
|||
|
|
@ -42,6 +42,14 @@ class AxesProxyHandler(AxesHandler):
|
|||
cls.implementation = import_string(settings.AXES_HANDLER)()
|
||||
return cls.implementation
|
||||
|
||||
@classmethod
|
||||
def reset_attempts(cls, *, ip_address: str = None, username: str = None) -> int:
|
||||
return cls.get_implementation().reset_attempts(ip_address=ip_address, username=username)
|
||||
|
||||
@classmethod
|
||||
def reset_logs(cls, *, age_days: int = None) -> int:
|
||||
return cls.get_implementation().reset_logs(age_days=age_days)
|
||||
|
||||
@staticmethod
|
||||
def update_request(request):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
from django.core.management.base import BaseCommand
|
||||
from django.utils import timezone
|
||||
|
||||
from axes.models import AccessLog
|
||||
from axes.handlers.proxy import AxesProxyHandler
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
|
|
@ -16,9 +15,7 @@ class Command(BaseCommand):
|
|||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
limit = timezone.now().date() - timezone.timedelta(days=options['age'])
|
||||
count, _ = AccessLog.objects.filter(attempt_time__lte=limit).delete()
|
||||
|
||||
count = AxesProxyHandler.reset_logs(age_days=options['age'])
|
||||
if count:
|
||||
self.stdout.write(f'{count} logs removed.')
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ from axes.helpers import (
|
|||
get_credentials,
|
||||
get_failure_limit,
|
||||
)
|
||||
from axes.models import AccessAttempt
|
||||
from axes.models import AccessAttempt, AccessLog
|
||||
|
||||
|
||||
def custom_failure_limit(request, credentials):
|
||||
|
|
@ -91,14 +91,18 @@ class AxesTestCase(TestCase):
|
|||
'user_agent': self.user_agent,
|
||||
'ip_address': self.ip_address,
|
||||
'username': self.username,
|
||||
'failures_since_start': 1,
|
||||
}
|
||||
|
||||
defaults.update(kwargs)
|
||||
return defaults
|
||||
|
||||
def create_attempt(self, **kwargs):
|
||||
return AccessAttempt.objects.create(**self.get_kwargs_with_defaults(**kwargs))
|
||||
kwargs = self.get_kwargs_with_defaults(**kwargs)
|
||||
kwargs.setdefault('failures_since_start', 1)
|
||||
return AccessAttempt.objects.create(**kwargs)
|
||||
|
||||
def create_log(self, **kwargs):
|
||||
return AccessLog.objects.create(**self.get_kwargs_with_defaults(**kwargs))
|
||||
|
||||
def reset(self, ip=None, username=None):
|
||||
return reset(ip, username)
|
||||
|
|
|
|||
|
|
@ -2,16 +2,26 @@ from unittest.mock import MagicMock, patch
|
|||
|
||||
from django.test import override_settings
|
||||
from django.urls import reverse
|
||||
from django.utils import timezone
|
||||
from django.utils.timezone import timedelta
|
||||
|
||||
from axes.conf import settings
|
||||
from axes.handlers.proxy import AxesProxyHandler
|
||||
from axes.tests.base import AxesTestCase
|
||||
from axes.helpers import get_client_str
|
||||
from axes.models import AccessAttempt, AccessLog
|
||||
from axes.tests.base import AxesTestCase
|
||||
|
||||
|
||||
@override_settings(AXES_HANDLER='axes.handlers.base.AxesHandler')
|
||||
class AxesHandlerTestCase(AxesTestCase):
|
||||
def test_base_handler_reset_attempts_raises(self):
|
||||
with self.assertRaises(NotImplementedError):
|
||||
AxesProxyHandler.reset_attempts()
|
||||
|
||||
def test_base_handler_reset_logs_raises(self):
|
||||
with self.assertRaises(NotImplementedError):
|
||||
AxesProxyHandler.reset_logs()
|
||||
|
||||
def test_base_handler_raises_on_undefined_is_allowed_to_authenticate(self):
|
||||
with self.assertRaises(NotImplementedError):
|
||||
AxesProxyHandler.is_allowed(self.request, {})
|
||||
|
|
@ -124,6 +134,27 @@ class AxesHandlerBaseTestCase(AxesTestCase):
|
|||
AXES_RESET_ON_SUCCESS=True,
|
||||
)
|
||||
class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase):
|
||||
def test_handler_reset_attempts(self):
|
||||
self.create_attempt()
|
||||
self.assertEqual(1, AxesProxyHandler.reset_attempts())
|
||||
self.assertFalse(AccessAttempt.objects.count())
|
||||
|
||||
def test_handler_reset_logs(self):
|
||||
self.create_log()
|
||||
self.assertEqual(1, AxesProxyHandler.reset_logs())
|
||||
self.assertFalse(AccessLog.objects.count())
|
||||
|
||||
def test_handler_reset_logs_older_than_42_days(self):
|
||||
self.create_log()
|
||||
|
||||
then = timezone.now() - timezone.timedelta(days=90)
|
||||
with patch('django.utils.timezone.now', return_value=then):
|
||||
self.create_log()
|
||||
|
||||
self.assertEqual(AccessLog.objects.count(), 2)
|
||||
self.assertEqual(1, AxesProxyHandler.reset_logs(age_days=42))
|
||||
self.assertEqual(AccessLog.objects.count(), 1)
|
||||
|
||||
@override_settings(AXES_RESET_ON_SUCCESS=True)
|
||||
def test_handler(self):
|
||||
self.check_handler()
|
||||
|
|
|
|||
|
|
@ -7,7 +7,7 @@ and offers a backwards compatible import path.
|
|||
|
||||
from logging import getLogger
|
||||
|
||||
from axes.models import AccessAttempt
|
||||
from axes.handlers.proxy import AxesProxyHandler
|
||||
|
||||
log = getLogger(__name__)
|
||||
|
||||
|
|
@ -19,14 +19,4 @@ def reset(ip: str = None, username: str = None) -> int:
|
|||
This utility method is meant to be used from the CLI or via Python API.
|
||||
"""
|
||||
|
||||
attempts = AccessAttempt.objects.all()
|
||||
|
||||
if ip:
|
||||
attempts = attempts.filter(ip_address=ip)
|
||||
if username:
|
||||
attempts = attempts.filter(username=username)
|
||||
|
||||
count, _ = attempts.delete()
|
||||
log.info('AXES: Reset %s access attempts from database.', count)
|
||||
|
||||
return count
|
||||
return AxesProxyHandler.reset_attempts(ip_address=ip, username=username)
|
||||
|
|
|
|||
Loading…
Reference in a new issue