diff --git a/axes/attempts.py b/axes/attempts.py index a47eb60..9ab40be 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -1,5 +1,5 @@ from logging import getLogger -from typing import List +from typing import List, Optional from django.db.models import QuerySet from django.utils.timezone import datetime, now @@ -11,7 +11,7 @@ from axes.models import AccessAttempt log = getLogger(__name__) -def get_cool_off_threshold(attempt_time: datetime = None) -> datetime: +def get_cool_off_threshold(attempt_time: Optional[datetime] = None) -> datetime: """ Get threshold for fetching access attempts from the database. """ @@ -27,7 +27,7 @@ def get_cool_off_threshold(attempt_time: datetime = None) -> datetime: return attempt_time - cool_off -def filter_user_attempts(request, credentials: dict = None) -> List[QuerySet]: +def filter_user_attempts(request, credentials: Optional[dict] = None) -> List[QuerySet]: """ Return a list querysets of AccessAttempts that match the given request and credentials. """ @@ -44,7 +44,7 @@ def filter_user_attempts(request, credentials: dict = None) -> List[QuerySet]: return attempts_list -def get_user_attempts(request, credentials: dict = None) -> List[QuerySet]: +def get_user_attempts(request, credentials: Optional[dict] = None) -> List[QuerySet]: """ Get list of querysets with valid user attempts that match the given request and credentials. """ @@ -62,7 +62,7 @@ def get_user_attempts(request, credentials: dict = None) -> List[QuerySet]: return [attempts.filter(attempt_time__gte=threshold) for attempts in attempts_list] -def clean_expired_user_attempts(attempt_time: datetime = None) -> int: +def clean_expired_user_attempts(attempt_time: Optional[datetime] = None) -> int: """ Clean expired user attempts from the database. """ @@ -83,7 +83,7 @@ def clean_expired_user_attempts(attempt_time: datetime = None) -> int: return count -def reset_user_attempts(request, credentials: dict = None) -> int: +def reset_user_attempts(request, credentials: Optional[dict] = None) -> int: """ Reset all user attempts that match the given request and credentials. """ diff --git a/axes/backends.py b/axes/backends.py index c337f89..30535f3 100644 --- a/axes/backends.py +++ b/axes/backends.py @@ -1,3 +1,4 @@ +from typing import Optional from django.conf import settings from django.contrib.auth.backends import ModelBackend @@ -22,7 +23,7 @@ class AxesBackend(ModelBackend): @toggleable def authenticate( - self, request, username: str = None, password: str = None, **kwargs: dict + self, request, username: Optional[str] = None, password: Optional[str] = None, **kwargs: dict ): """ Checks user lockout status and raises an exception if user is not allowed to log in. diff --git a/axes/handlers/base.py b/axes/handlers/base.py index 59b715d..f2643c4 100644 --- a/axes/handlers/base.py +++ b/axes/handlers/base.py @@ -1,5 +1,6 @@ import re from abc import ABC, abstractmethod +from typing import Optional from django.urls import reverse from django.urls.exceptions import NoReverseMatch @@ -41,7 +42,7 @@ class AbstractAxesHandler(ABC): raise NotImplementedError("user_logged_out should be implemented") @abstractmethod - def get_failures(self, request, credentials: dict = None) -> int: + def get_failures(self, request, credentials: Optional[dict] = None) -> int: """ Checks the number of failures associated to the given request and credentials. @@ -65,7 +66,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument .. note:: This is a virtual class and **can not be used without specialization**. """ - def is_allowed(self, request, credentials: dict = None) -> bool: + def is_allowed(self, request, credentials: Optional[dict] = None) -> bool: """ Checks if the user is allowed to access or use given functionality such as a login view or authentication. @@ -94,7 +95,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument return True - def is_blacklisted(self, request, credentials: dict = None) -> bool: + def is_blacklisted(self, request, credentials: Optional[dict] = None) -> bool: """ Checks if the request or given credentials are blacklisted from access. """ @@ -104,7 +105,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument return False - def is_whitelisted(self, request, credentials: dict = None) -> bool: + def is_whitelisted(self, request, credentials: Optional[dict] = None) -> bool: """ Checks if the request or given credentials are whitelisted for access. """ @@ -120,7 +121,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument return False - def is_locked(self, request, credentials: dict = None) -> bool: + def is_locked(self, request, credentials: Optional[dict] = None) -> bool: """ Checks if the request or given credentials are locked. """ @@ -149,8 +150,8 @@ class AxesBaseHandler: # pylint: disable=unused-argument def reset_attempts( self, *, - ip_address: str = None, - username: str = None, + ip_address: Optional[str] = None, + username: Optional[str] = None, ip_or_username: bool = False, ) -> int: """ @@ -163,7 +164,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument """ return 0 - def reset_logs(self, *, age_days: int = None) -> int: + def reset_logs(self, *, age_days: Optional[int] = None) -> int: """ Resets access logs that are older than given number of days. @@ -174,7 +175,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument """ return 0 - def reset_failure_logs(self, *, age_days: int = None) -> int: + def reset_failure_logs(self, *, age_days: Optional[int] = None) -> int: """ Resets access failure logs that are older than given number of days. @@ -186,7 +187,7 @@ class AxesBaseHandler: # pylint: disable=unused-argument return 0 - def remove_out_of_limit_failure_logs(self, *, username: str, limit: int = None) -> int: + def remove_out_of_limit_failure_logs(self, *, username: str, limit: Optional[int] = None) -> int: """Remove access failure logs that are over AXES_ACCESS_FAILURE_LOG_PER_USER_LIMIT for user username. @@ -212,5 +213,5 @@ class AxesHandler(AbstractAxesHandler, AxesBaseHandler): def user_logged_out(self, sender, request, user, **kwargs): pass - def get_failures(self, request, credentials: dict = None) -> int: + def get_failures(self, request, credentials: Optional[dict] = None) -> int: return 0 diff --git a/axes/handlers/cache.py b/axes/handlers/cache.py index 8bddae1..423c592 100644 --- a/axes/handlers/cache.py +++ b/axes/handlers/cache.py @@ -1,4 +1,5 @@ from logging import getLogger +from typing import Optional from axes.conf import settings from axes.handlers.base import AxesBaseHandler, AbstractAxesHandler @@ -56,7 +57,7 @@ class AxesCacheHandler(AbstractAxesHandler, AxesBaseHandler): return count - def get_failures(self, request, credentials: dict = None) -> int: + def get_failures(self, request, credentials: Optional[dict] = None) -> int: cache_keys = get_client_cache_key(request, credentials) failure_count = max( self.cache.get(cache_key, default=0) for cache_key in cache_keys diff --git a/axes/handlers/database.py b/axes/handlers/database.py index 176c898..5ac0843 100644 --- a/axes/handlers/database.py +++ b/axes/handlers/database.py @@ -1,4 +1,5 @@ from logging import getLogger +from typing import Optional from django.db import transaction from django.db.models import F, Sum, Value, Q @@ -36,8 +37,8 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler): def reset_attempts( self, *, - ip_address: str = None, - username: str = None, + ip_address: Optional[str] = None, + username: Optional[str] = None, ip_or_username: bool = False, ) -> int: attempts = AccessAttempt.objects.all() @@ -55,7 +56,7 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler): return count - def reset_logs(self, *, age_days: int = None) -> int: + def reset_logs(self, *, age_days: Optional[int] = None) -> int: if age_days is None: count, _ = AccessLog.objects.all().delete() log.info("AXES: Reset all %d access logs from database.", count) @@ -70,7 +71,7 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler): return count - def reset_failure_logs(self, *, age_days: int = None) -> int: + def reset_failure_logs(self, *, age_days: Optional[int] = None) -> int: if age_days is None: count, _ = AccessFailureLog.objects.all().delete() log.info("AXES: Reset all %d access failure logs from database.", count) @@ -89,7 +90,7 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler): self, *, username: str, - limit: int = settings.AXES_ACCESS_FAILURE_LOG_PER_USER_LIMIT) -> int: + limit: Optional[int] = settings.AXES_ACCESS_FAILURE_LOG_PER_USER_LIMIT) -> int: count = 0 failures = AccessFailureLog.objects.filter(username=username) out_of_limit_failures_logs = failures.count() - limit @@ -99,7 +100,7 @@ class AxesDatabaseHandler(AbstractAxesHandler, AxesBaseHandler): count += 1 return count - def get_failures(self, request, credentials: dict = None) -> int: + def get_failures(self, request, credentials: Optional[dict] = None) -> int: attempts_list = get_user_attempts(request, credentials) attempt_count = max( ( diff --git a/axes/handlers/dummy.py b/axes/handlers/dummy.py index 0e40ace..b8c3bbb 100644 --- a/axes/handlers/dummy.py +++ b/axes/handlers/dummy.py @@ -1,4 +1,5 @@ from axes.handlers.base import AxesBaseHandler, AbstractAxesHandler +from typing import Optional class AxesDummyHandler(AbstractAxesHandler, AxesBaseHandler): @@ -6,7 +7,7 @@ class AxesDummyHandler(AbstractAxesHandler, AxesBaseHandler): Signal handler implementation that does nothing and can be used to disable signal processing. """ - def is_allowed(self, request, credentials: dict = None) -> bool: + def is_allowed(self, request, credentials: Optional[dict] = None) -> bool: return True def user_login_failed(self, sender, credentials: dict, request=None, **kwargs): @@ -18,5 +19,5 @@ class AxesDummyHandler(AbstractAxesHandler, AxesBaseHandler): def user_logged_out(self, sender, request, user, **kwargs): pass - def get_failures(self, request, credentials: dict = None) -> int: + def get_failures(self, request, credentials: Optional[dict] = None) -> int: return 0 diff --git a/axes/handlers/proxy.py b/axes/handlers/proxy.py index 8c0a3f3..1adebe1 100644 --- a/axes/handlers/proxy.py +++ b/axes/handlers/proxy.py @@ -1,4 +1,5 @@ from logging import getLogger +from typing import Optional from django.utils.module_loading import import_string from django.utils.timezone import now @@ -46,8 +47,8 @@ class AxesProxyHandler(AbstractAxesHandler, AxesBaseHandler): def reset_attempts( cls, *, - ip_address: str = None, - username: str = None, + ip_address: Optional[str] = None, + username: Optional[str] = None, ip_or_username: bool = False, ) -> int: return cls.get_implementation().reset_attempts( @@ -55,15 +56,15 @@ class AxesProxyHandler(AbstractAxesHandler, AxesBaseHandler): ) @classmethod - def reset_logs(cls, *, age_days: int = None) -> int: + def reset_logs(cls, *, age_days: Optional[int] = None) -> int: return cls.get_implementation().reset_logs(age_days=age_days) @classmethod - def reset_failure_logs(cls, *, age_days: int = None) -> int: + def reset_failure_logs(cls, *, age_days: Optional[int] = None) -> int: return cls.get_implementation().reset_failure_logs(age_days=age_days) @classmethod - def remove_out_of_limit_failure_logs(cls, *, username: str, limit: int = None) -> int: + def remove_out_of_limit_failure_logs(cls, *, username: str, limit: Optional[int] = None) -> int: return cls.get_implementation().remove_out_of_limit_failure_logs(username=username) @staticmethod @@ -90,17 +91,17 @@ class AxesProxyHandler(AbstractAxesHandler, AxesBaseHandler): request.axes_credentials = None @classmethod - def is_locked(cls, request, credentials: dict = None) -> bool: + def is_locked(cls, request, credentials: Optional[dict] = None) -> bool: cls.update_request(request) return cls.get_implementation().is_locked(request, credentials) @classmethod - def is_allowed(cls, request, credentials: dict = None) -> bool: + def is_allowed(cls, request, credentials: Optional[dict] = None) -> bool: cls.update_request(request) return cls.get_implementation().is_allowed(request, credentials) @classmethod - def get_failures(cls, request, credentials: dict = None) -> int: + def get_failures(cls, request, credentials: Optional[dict] = None) -> int: cls.update_request(request) return cls.get_implementation().get_failures(request, credentials) diff --git a/axes/handlers/test.py b/axes/handlers/test.py index c345104..bea66b3 100644 --- a/axes/handlers/test.py +++ b/axes/handlers/test.py @@ -1,4 +1,5 @@ from axes.handlers.base import AxesHandler +from typing import Optional class AxesTestHandler(AxesHandler): # pylint: disable=unused-argument @@ -9,17 +10,17 @@ class AxesTestHandler(AxesHandler): # pylint: disable=unused-argument def reset_attempts( self, *, - ip_address: str = None, - username: str = None, + ip_address: Optional[str] = None, + username: Optional[str] = None, ip_or_username: bool = False, ) -> int: return 0 - def reset_logs(self, *, age_days: int = None) -> int: + def reset_logs(self, *, age_days: Optional[int] = None) -> int: return 0 - def is_allowed(self, request, credentials: dict = None) -> bool: + def is_allowed(self, request, credentials: Optional[dict] = None) -> bool: return True - def get_failures(self, request, credentials: dict = None) -> int: + def get_failures(self, request, credentials: Optional[dict] = None) -> int: return 0 diff --git a/axes/helpers.py b/axes/helpers.py index 3061614..18a3aa8 100644 --- a/axes/helpers.py +++ b/axes/helpers.py @@ -90,7 +90,7 @@ def get_cool_off_iso8601(delta: timedelta) -> str: return f"P{days_str}" -def get_credentials(username: str = None, **kwargs) -> dict: +def get_credentials(username: Optional[str] = None, **kwargs) -> dict: """ Calculate credentials for Axes to use internally from given username and kwargs. @@ -103,7 +103,7 @@ def get_credentials(username: str = None, **kwargs) -> dict: return credentials -def get_client_username(request, credentials: dict = None) -> str: +def get_client_username(request, credentials: Optional[dict] = None) -> str: """ Resolve client username from the given request or credentials if supplied. @@ -218,7 +218,7 @@ def make_cache_key_list(filter_kwargs_list): def get_client_cache_key( - request_or_attempt: Union[HttpRequest, AccessBase], credentials: dict = None + request_or_attempt: Union[HttpRequest, AccessBase], credentials: Optional[dict] = None ) -> str: """ Build cache key name from request or AccessAttempt object. @@ -360,7 +360,7 @@ def get_lockout_message() -> str: return settings.AXES_PERMALOCK_MESSAGE -def get_lockout_response(request, credentials: dict = None) -> HttpResponse: +def get_lockout_response(request, credentials: Optional[dict] = None) -> HttpResponse: if settings.AXES_LOCKOUT_CALLABLE: if callable(settings.AXES_LOCKOUT_CALLABLE): return settings.AXES_LOCKOUT_CALLABLE( # pylint: disable=not-callable @@ -475,7 +475,7 @@ def is_client_method_whitelisted(request) -> bool: return False -def is_user_attempt_whitelisted(request, credentials: dict = None) -> bool: +def is_user_attempt_whitelisted(request, credentials: Optional[dict] = None) -> bool: """ Check if the given request or credentials refer to a whitelisted username. diff --git a/axes/utils.py b/axes/utils.py index 2cdd9d7..824fc9c 100644 --- a/axes/utils.py +++ b/axes/utils.py @@ -17,7 +17,7 @@ from axes.helpers import get_client_ip_address log = getLogger(__name__) -def reset(ip: str = None, username: str = None, ip_or_username=False) -> int: +def reset(ip: Optional[str] = None, username: Optional[str] = None, ip_or_username=False) -> int: """ Reset records that match IP or username, and return the count of removed attempts.