diff --git a/axes/handlers/base.py b/axes/handlers/base.py index cdc8932..a8acbcd 100644 --- a/axes/handlers/base.py +++ b/axes/handlers/base.py @@ -23,6 +23,24 @@ class AxesHandler: # pylint: disable=unused-argument .. note:: This is a virtual class and **can not be used without specialization**. """ + def reset_attempts(self, *, ip_address: str = None, username: str = None) -> int: + """ + Resets access attempts that match the given IP address or username. + + :raises NotImplementedError: if the handler does not support resetting attempts. + """ + + raise NotImplementedError('Reset for access attempts is not supported on this backend') + + def reset_logs(self, *, age_days: int = None) -> int: + """ + Resets access logs that are older than given number of days. + + :raises NotImplementedError: if the handler does not support resetting logs. + """ + + raise NotImplementedError('Reset for access logs is not supported on this backend') + def is_allowed(self, request, credentials: dict = None) -> bool: """ Checks if the user is allowed to access or use given functionality such as a login view or authentication. diff --git a/axes/handlers/database.py b/axes/handlers/database.py index f114f1f..c81de4c 100644 --- a/axes/handlers/database.py +++ b/axes/handlers/database.py @@ -2,6 +2,7 @@ from logging import getLogger from django.db.models import Max, Value from django.db.models.functions import Concat +from django.utils import timezone from axes.attempts import ( clean_expired_user_attempts, @@ -30,6 +31,30 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals Signal handler implementation that records user login attempts to database and locks users out if necessary. """ + def reset_attempts(self, *, ip_address: str = None, username: str = None) -> int: + attempts = AccessAttempt.objects.all() + + if ip_address: + attempts = attempts.filter(ip_address=ip_address) + if username: + attempts = attempts.filter(username=username) + + count, _ = attempts.delete() + log.info('AXES: Reset %d access attempts from database.', count) + + return count + + def reset_logs(self, *, age_days: int = None) -> int: + if age_days is None: + count, _ = AccessLog.objects.all().delete() + log.info('AXES: Reset all %d access logs from database.', count) + else: + limit = timezone.now() - timezone.timedelta(days=age_days) + count, _ = AccessLog.objects.filter(attempt_time__lte=limit).delete() + log.info('AXES: Reset %d access logs older than %d days from database.', count, age_days) + + return count + def get_failures(self, request, credentials: dict = None) -> int: attempts = get_user_attempts(request, credentials) return attempts.aggregate(Max('failures_since_start'))['failures_since_start__max'] or 0 diff --git a/axes/handlers/proxy.py b/axes/handlers/proxy.py index 2077508..7e3926e 100644 --- a/axes/handlers/proxy.py +++ b/axes/handlers/proxy.py @@ -42,6 +42,14 @@ class AxesProxyHandler(AxesHandler): cls.implementation = import_string(settings.AXES_HANDLER)() return cls.implementation + @classmethod + def reset_attempts(cls, *, ip_address: str = None, username: str = None) -> int: + return cls.get_implementation().reset_attempts(ip_address=ip_address, username=username) + + @classmethod + def reset_logs(cls, *, age_days: int = None) -> int: + return cls.get_implementation().reset_logs(age_days=age_days) + @staticmethod def update_request(request): """ diff --git a/axes/management/commands/axes_reset_logs.py b/axes/management/commands/axes_reset_logs.py index 66c9af0..8a197c8 100644 --- a/axes/management/commands/axes_reset_logs.py +++ b/axes/management/commands/axes_reset_logs.py @@ -1,7 +1,6 @@ from django.core.management.base import BaseCommand -from django.utils import timezone -from axes.models import AccessLog +from axes.handlers.proxy import AxesProxyHandler class Command(BaseCommand): @@ -16,9 +15,7 @@ class Command(BaseCommand): ) def handle(self, *args, **options): - limit = timezone.now().date() - timezone.timedelta(days=options['age']) - count, _ = AccessLog.objects.filter(attempt_time__lte=limit).delete() - + count = AxesProxyHandler.reset_logs(age_days=options['age']) if count: self.stdout.write(f'{count} logs removed.') else: diff --git a/axes/tests/base.py b/axes/tests/base.py index de9752a..69ca054 100644 --- a/axes/tests/base.py +++ b/axes/tests/base.py @@ -20,7 +20,7 @@ from axes.helpers import ( get_credentials, get_failure_limit, ) -from axes.models import AccessAttempt +from axes.models import AccessAttempt, AccessLog def custom_failure_limit(request, credentials): @@ -91,14 +91,18 @@ class AxesTestCase(TestCase): 'user_agent': self.user_agent, 'ip_address': self.ip_address, 'username': self.username, - 'failures_since_start': 1, } defaults.update(kwargs) return defaults def create_attempt(self, **kwargs): - return AccessAttempt.objects.create(**self.get_kwargs_with_defaults(**kwargs)) + kwargs = self.get_kwargs_with_defaults(**kwargs) + kwargs.setdefault('failures_since_start', 1) + return AccessAttempt.objects.create(**kwargs) + + def create_log(self, **kwargs): + return AccessLog.objects.create(**self.get_kwargs_with_defaults(**kwargs)) def reset(self, ip=None, username=None): return reset(ip, username) diff --git a/axes/tests/test_handlers.py b/axes/tests/test_handlers.py index 1ef2318..56bcf9d 100644 --- a/axes/tests/test_handlers.py +++ b/axes/tests/test_handlers.py @@ -2,16 +2,26 @@ from unittest.mock import MagicMock, patch from django.test import override_settings from django.urls import reverse +from django.utils import timezone from django.utils.timezone import timedelta from axes.conf import settings from axes.handlers.proxy import AxesProxyHandler -from axes.tests.base import AxesTestCase from axes.helpers import get_client_str +from axes.models import AccessAttempt, AccessLog +from axes.tests.base import AxesTestCase @override_settings(AXES_HANDLER='axes.handlers.base.AxesHandler') class AxesHandlerTestCase(AxesTestCase): + def test_base_handler_reset_attempts_raises(self): + with self.assertRaises(NotImplementedError): + AxesProxyHandler.reset_attempts() + + def test_base_handler_reset_logs_raises(self): + with self.assertRaises(NotImplementedError): + AxesProxyHandler.reset_logs() + def test_base_handler_raises_on_undefined_is_allowed_to_authenticate(self): with self.assertRaises(NotImplementedError): AxesProxyHandler.is_allowed(self.request, {}) @@ -124,6 +134,27 @@ class AxesHandlerBaseTestCase(AxesTestCase): AXES_RESET_ON_SUCCESS=True, ) class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase): + def test_handler_reset_attempts(self): + self.create_attempt() + self.assertEqual(1, AxesProxyHandler.reset_attempts()) + self.assertFalse(AccessAttempt.objects.count()) + + def test_handler_reset_logs(self): + self.create_log() + self.assertEqual(1, AxesProxyHandler.reset_logs()) + self.assertFalse(AccessLog.objects.count()) + + def test_handler_reset_logs_older_than_42_days(self): + self.create_log() + + then = timezone.now() - timezone.timedelta(days=90) + with patch('django.utils.timezone.now', return_value=then): + self.create_log() + + self.assertEqual(AccessLog.objects.count(), 2) + self.assertEqual(1, AxesProxyHandler.reset_logs(age_days=42)) + self.assertEqual(AccessLog.objects.count(), 1) + @override_settings(AXES_RESET_ON_SUCCESS=True) def test_handler(self): self.check_handler() diff --git a/axes/utils.py b/axes/utils.py index 67bf815..65590fd 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -7,7 +7,7 @@ and offers a backwards compatible import path. from logging import getLogger -from axes.models import AccessAttempt +from axes.handlers.proxy import AxesProxyHandler log = getLogger(__name__) @@ -19,14 +19,4 @@ def reset(ip: str = None, username: str = None) -> int: This utility method is meant to be used from the CLI or via Python API. """ - attempts = AccessAttempt.objects.all() - - if ip: - attempts = attempts.filter(ip_address=ip) - if username: - attempts = attempts.filter(username=username) - - count, _ = attempts.delete() - log.info('AXES: Reset %s access attempts from database.', count) - - return count + return AxesProxyHandler.reset_attempts(ip_address=ip, username=username)