diff --git a/.prospector.yaml b/.prospector.yaml index 303a4c7..9020238 100644 --- a/.prospector.yaml +++ b/.prospector.yaml @@ -4,4 +4,4 @@ ignore-paths: pep8: options: - max-line-length: 119 + max-line-length: 142 diff --git a/axes/__init__.py b/axes/__init__.py index 2d46b79..a3beb38 100644 --- a/axes/__init__.py +++ b/axes/__init__.py @@ -1,5 +1,5 @@ from pkg_resources import get_distribution -default_app_config = 'axes.apps.AppConfig' +default_app_config = "axes.apps.AppConfig" -__version__ = get_distribution('django-axes').version +__version__ = get_distribution("django-axes").version diff --git a/axes/admin.py b/axes/admin.py index e8f0209..b4ba2ca 100644 --- a/axes/admin.py +++ b/axes/admin.py @@ -7,50 +7,36 @@ from axes.models import AccessAttempt, AccessLog @admin.register(AccessAttempt) class AccessAttemptAdmin(admin.ModelAdmin): list_display = ( - 'attempt_time', - 'ip_address', - 'user_agent', - 'username', - 'path_info', - 'failures_since_start', + "attempt_time", + "ip_address", + "user_agent", + "username", + "path_info", + "failures_since_start", ) - list_filter = [ - 'attempt_time', - 'path_info', - ] + list_filter = ["attempt_time", "path_info"] - search_fields = [ - 'ip_address', - 'username', - 'user_agent', - 'path_info', - ] + search_fields = ["ip_address", "username", "user_agent", "path_info"] - date_hierarchy = 'attempt_time' + date_hierarchy = "attempt_time" fieldsets = ( - (None, { - 'fields': ('path_info', 'failures_since_start') - }), - (_('Form Data'), { - 'fields': ('get_data', 'post_data') - }), - (_('Meta Data'), { - 'fields': ('user_agent', 'ip_address', 'http_accept') - }) + (None, {"fields": ("path_info", "failures_since_start")}), + (_("Form Data"), {"fields": ("get_data", "post_data")}), + (_("Meta Data"), {"fields": ("user_agent", "ip_address", "http_accept")}), ) readonly_fields = [ - 'user_agent', - 'ip_address', - 'username', - 'http_accept', - 'path_info', - 'attempt_time', - 'get_data', - 'post_data', - 'failures_since_start' + "user_agent", + "ip_address", + "username", + "http_accept", + "path_info", + "attempt_time", + "get_data", + "post_data", + "failures_since_start", ] def has_add_permission(self, request): @@ -60,46 +46,33 @@ class AccessAttemptAdmin(admin.ModelAdmin): @admin.register(AccessLog) class AccessLogAdmin(admin.ModelAdmin): list_display = ( - 'attempt_time', - 'logout_time', - 'ip_address', - 'username', - 'user_agent', - 'path_info', + "attempt_time", + "logout_time", + "ip_address", + "username", + "user_agent", + "path_info", ) - list_filter = [ - 'attempt_time', - 'logout_time', - 'path_info', - ] + list_filter = ["attempt_time", "logout_time", "path_info"] - search_fields = [ - 'ip_address', - 'user_agent', - 'username', - 'path_info', - ] + search_fields = ["ip_address", "user_agent", "username", "path_info"] - date_hierarchy = 'attempt_time' + date_hierarchy = "attempt_time" fieldsets = ( - (None, { - 'fields': ('path_info',) - }), - (_('Meta Data'), { - 'fields': ('user_agent', 'ip_address', 'http_accept') - }) + (None, {"fields": ("path_info",)}), + (_("Meta Data"), {"fields": ("user_agent", "ip_address", "http_accept")}), ) readonly_fields = [ - 'user_agent', - 'ip_address', - 'username', - 'http_accept', - 'path_info', - 'attempt_time', - 'logout_time' + "user_agent", + "ip_address", + "username", + "http_accept", + "path_info", + "attempt_time", + "logout_time", ] def has_add_permission(self, request): diff --git a/axes/apps.py b/axes/apps.py index 413fca5..6aeaa53 100644 --- a/axes/apps.py +++ b/axes/apps.py @@ -9,7 +9,7 @@ log = getLogger(settings.AXES_LOGGER) class AppConfig(apps.AppConfig): - name = 'axes' + name = "axes" logging_initialized = False @classmethod @@ -31,15 +31,18 @@ class AppConfig(apps.AppConfig): return cls.logging_initialized = True - log.info('AXES: BEGIN LOG') - log.info('AXES: Using django-axes version %s', get_distribution('django-axes').version) + log.info("AXES: BEGIN LOG") + log.info( + "AXES: Using django-axes version %s", + get_distribution("django-axes").version, + ) if settings.AXES_ONLY_USER_FAILURES: - log.info('AXES: blocking by username only.') + log.info("AXES: blocking by username only.") elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP: - log.info('AXES: blocking by combination of username and IP.') + log.info("AXES: blocking by combination of username and IP.") else: - log.info('AXES: blocking by IP only.') + log.info("AXES: blocking by IP only.") def ready(self): self.initialize() diff --git a/axes/attempts.py b/axes/attempts.py index 45a1796..fe5b3a4 100644 --- a/axes/attempts.py +++ b/axes/attempts.py @@ -6,11 +6,7 @@ from django.utils.timezone import datetime, now from axes.conf import settings from axes.models import AccessAttempt -from axes.helpers import ( - get_client_username, - get_client_parameters, - get_cool_off, -) +from axes.helpers import get_client_username, get_client_parameters, get_cool_off log = getLogger(settings.AXES_LOGGER) @@ -22,7 +18,9 @@ def get_cool_off_threshold(attempt_time: datetime = None) -> datetime: cool_off = get_cool_off() if cool_off is None: - raise TypeError('Cool off threshold can not be calculated with settings.AXES_COOLOFF_TIME set to None') + raise TypeError( + "Cool off threshold can not be calculated with settings.AXES_COOLOFF_TIME set to None" + ) if attempt_time is None: return now() - cool_off @@ -36,7 +34,9 @@ def filter_user_attempts(request, credentials: dict = None) -> QuerySet: username = get_client_username(request, credentials) - filter_kwargs = get_client_parameters(username, request.axes_ip_address, request.axes_user_agent) + filter_kwargs = get_client_parameters( + username, request.axes_ip_address, request.axes_user_agent + ) return AccessAttempt.objects.filter(**filter_kwargs) @@ -49,11 +49,13 @@ def get_user_attempts(request, credentials: dict = None) -> QuerySet: attempts = filter_user_attempts(request, credentials) if settings.AXES_COOLOFF_TIME is None: - log.debug('AXES: Getting all access attempts from database because no AXES_COOLOFF_TIME is configured') + log.debug( + "AXES: Getting all access attempts from database because no AXES_COOLOFF_TIME is configured" + ) return attempts threshold = get_cool_off_threshold(request.axes_attempt_time) - log.debug('AXES: Getting access attempts that are newer than %s', threshold) + log.debug("AXES: Getting access attempts that are newer than %s", threshold) return attempts.filter(attempt_time__gte=threshold) @@ -63,12 +65,18 @@ def clean_expired_user_attempts(attempt_time: datetime = None) -> int: """ if settings.AXES_COOLOFF_TIME is None: - log.debug('AXES: Skipping clean for expired access attempts because no AXES_COOLOFF_TIME is configured') + log.debug( + "AXES: Skipping clean for expired access attempts because no AXES_COOLOFF_TIME is configured" + ) return 0 threshold = get_cool_off_threshold(attempt_time) count, _ = AccessAttempt.objects.filter(attempt_time__lt=threshold).delete() - log.info('AXES: Cleaned up %s expired access attempts from database that were older than %s', count, threshold) + log.info( + "AXES: Cleaned up %s expired access attempts from database that were older than %s", + count, + threshold, + ) return count @@ -80,7 +88,7 @@ def reset_user_attempts(request, credentials: dict = None) -> int: attempts = filter_user_attempts(request, credentials) count, _ = attempts.delete() - log.info('AXES: Reset %s access attempts from database.', count) + log.info("AXES: Reset %s access attempts from database.", count) return count @@ -95,11 +103,9 @@ def is_user_attempt_whitelisted(request, credentials: dict = None) -> bool: this implementation fails gracefully and returns True. """ - username_field = getattr(get_user_model(), 'USERNAME_FIELD', 'username') + username_field = getattr(get_user_model(), "USERNAME_FIELD", "username") username_value = get_client_username(request, credentials) - kwargs = { - username_field: username_value - } + kwargs = {username_field: username_value} user_model = get_user_model() diff --git a/axes/backends.py b/axes/backends.py index f6c0a4f..5616979 100644 --- a/axes/backends.py +++ b/axes/backends.py @@ -1,6 +1,9 @@ from django.contrib.auth.backends import ModelBackend -from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired +from axes.exceptions import ( + AxesBackendPermissionDenied, + AxesBackendRequestParameterRequired, +) from axes.handlers.proxy import AxesProxyHandler from axes.helpers import get_credentials, get_lockout_message, toggleable @@ -17,7 +20,9 @@ class AxesBackend(ModelBackend): """ @toggleable - def authenticate(self, request, username: str = None, password: str = None, **kwargs: dict): + def authenticate( + self, request, username: str = None, password: str = None, **kwargs: dict + ): """ Checks user lockout status and raises an exception if user is not allowed to log in. @@ -30,7 +35,9 @@ class AxesBackend(ModelBackend): """ if request is None: - raise AxesBackendRequestParameterRequired('AxesBackend requires a request as an argument to authenticate') + raise AxesBackendRequestParameterRequired( + "AxesBackend requires a request as an argument to authenticate" + ) credentials = get_credentials(username=username, password=password, **kwargs) @@ -41,8 +48,8 @@ class AxesBackend(ModelBackend): # Its a bit weird to pass a context and expect a response value but its nice to get a "why" back. error_msg = get_lockout_message() - response_context = kwargs.get('response_context', {}) - response_context['error'] = error_msg + response_context = kwargs.get("response_context", {}) + response_context["error"] = error_msg # Raise an error that stops the authentication flows at django.contrib.auth.authenticate. # This error stops bubbling up at the authenticate call which catches backend PermissionDenied errors. @@ -50,4 +57,6 @@ class AxesBackend(ModelBackend): # which is processed by axes.signals.log_user_login_failed which logs and flags the failed request. # The axes.middleware.AxesMiddleware further processes the flagged request into a readable response. - raise AxesBackendPermissionDenied('AxesBackend detected that the given user is locked out') + raise AxesBackendPermissionDenied( + "AxesBackend detected that the given user is locked out" + ) diff --git a/axes/checks.py b/axes/checks.py index 41631ff..fabc2c9 100644 --- a/axes/checks.py +++ b/axes/checks.py @@ -1,4 +1,8 @@ -from django.core.checks import Tags, Warning, register # pylint: disable=redefined-builtin +from django.core.checks import ( # pylint: disable=redefined-builtin + Tags, + Warning, + register, +) from django.utils.module_loading import import_string from axes.backends import AxesBackend @@ -15,51 +19,51 @@ class Messages: MIDDLEWARE_INVALID = ( "You do not have 'axes.middleware.AxesMiddleware' in your settings.MIDDLEWARE." ) - BACKEND_INVALID = ( - "You do not have 'axes.backends.AxesBackend' or a subclass in your settings.AUTHENTICATION_BACKENDS." - ) - SETTING_DEPRECATED = ( - 'You have a deprecated setting {deprecated_setting} configured in your project settings' - ) + BACKEND_INVALID = "You do not have 'axes.backends.AxesBackend' or a subclass in your settings.AUTHENTICATION_BACKENDS." + SETTING_DEPRECATED = "You have a deprecated setting {deprecated_setting} configured in your project settings" class Hints: CACHE_INVALID = None MIDDLEWARE_INVALID = None - BACKEND_INVALID = 'AxesModelBackend was renamed to AxesBackend in django-axes version 5.0.' + BACKEND_INVALID = ( + "AxesModelBackend was renamed to AxesBackend in django-axes version 5.0." + ) SETTING_DEPRECATED = None class Codes: - CACHE_INVALID = 'axes.W001' - MIDDLEWARE_INVALID = 'axes.W002' - BACKEND_INVALID = 'axes.W003' - SETTING_DEPRECATED = 'axes.W004' + CACHE_INVALID = "axes.W001" + MIDDLEWARE_INVALID = "axes.W002" + BACKEND_INVALID = "axes.W003" + SETTING_DEPRECATED = "axes.W004" @register(Tags.security, Tags.caches, Tags.compatibility) def axes_cache_check(app_configs, **kwargs): # pylint: disable=unused-argument - axes_handler = getattr(settings, 'AXES_HANDLER', '') + axes_handler = getattr(settings, "AXES_HANDLER", "") - axes_cache_key = getattr(settings, 'AXES_CACHE', 'default') + axes_cache_key = getattr(settings, "AXES_CACHE", "default") axes_cache_config = settings.CACHES.get(axes_cache_key, {}) - axes_cache_backend = axes_cache_config.get('BACKEND', '') + axes_cache_backend = axes_cache_config.get("BACKEND", "") axes_cache_backend_incompatible = [ - 'django.core.cache.backends.dummy.DummyCache', - 'django.core.cache.backends.locmem.LocMemCache', - 'django.core.cache.backends.filebased.FileBasedCache', + "django.core.cache.backends.dummy.DummyCache", + "django.core.cache.backends.locmem.LocMemCache", + "django.core.cache.backends.filebased.FileBasedCache", ] warnings = [] - if axes_handler == 'axes.handlers.cache.AxesCacheHandler': + if axes_handler == "axes.handlers.cache.AxesCacheHandler": if axes_cache_backend in axes_cache_backend_incompatible: - warnings.append(Warning( - msg=Messages.CACHE_INVALID, - hint=Hints.CACHE_INVALID, - id=Codes.CACHE_INVALID, - )) + warnings.append( + Warning( + msg=Messages.CACHE_INVALID, + hint=Hints.CACHE_INVALID, + id=Codes.CACHE_INVALID, + ) + ) return warnings @@ -68,12 +72,14 @@ def axes_cache_check(app_configs, **kwargs): # pylint: disable=unused-argument def axes_middleware_check(app_configs, **kwargs): # pylint: disable=unused-argument warnings = [] - if 'axes.middleware.AxesMiddleware' not in settings.MIDDLEWARE: - warnings.append(Warning( - msg=Messages.MIDDLEWARE_INVALID, - hint=Hints.MIDDLEWARE_INVALID, - id=Codes.MIDDLEWARE_INVALID, - )) + if "axes.middleware.AxesMiddleware" not in settings.MIDDLEWARE: + warnings.append( + Warning( + msg=Messages.MIDDLEWARE_INVALID, + hint=Hints.MIDDLEWARE_INVALID, + id=Codes.MIDDLEWARE_INVALID, + ) + ) return warnings @@ -87,20 +93,26 @@ def axes_backend_check(app_configs, **kwargs): # pylint: disable=unused-argumen try: backend = import_string(name) except ModuleNotFoundError as e: - raise ModuleNotFoundError('Can not find module path defined in settings.AUTHENTICATION_BACKENDS') from e + raise ModuleNotFoundError( + "Can not find module path defined in settings.AUTHENTICATION_BACKENDS" + ) from e except ImportError as e: - raise ImportError('Can not import backend class defined in settings.AUTHENTICATION_BACKENDS') from e + raise ImportError( + "Can not import backend class defined in settings.AUTHENTICATION_BACKENDS" + ) from e if issubclass(backend, AxesBackend): found = True break if not found: - warnings.append(Warning( - msg=Messages.BACKEND_INVALID, - hint=Hints.BACKEND_INVALID, - id=Codes.BACKEND_INVALID, - )) + warnings.append( + Warning( + msg=Messages.BACKEND_INVALID, + hint=Hints.BACKEND_INVALID, + id=Codes.BACKEND_INVALID, + ) + ) return warnings @@ -109,18 +121,20 @@ def axes_backend_check(app_configs, **kwargs): # pylint: disable=unused-argumen def axes_deprecation_check(app_configs, **kwargs): # pylint: disable=unused-argument warnings = [] - deprecated_settings = [ - 'AXES_DISABLE_SUCCESS_ACCESS_LOG', - ] + deprecated_settings = ["AXES_DISABLE_SUCCESS_ACCESS_LOG"] for deprecated_setting in deprecated_settings: try: getattr(settings, deprecated_setting) - warnings.append(Warning( - msg=Messages.SETTING_DEPRECATED.format(deprecated_setting=deprecated_setting), - hint=None, - id=Codes.SETTING_DEPRECATED, - )) + warnings.append( + Warning( + msg=Messages.SETTING_DEPRECATED.format( + deprecated_setting=deprecated_setting + ), + hint=None, + id=Codes.SETTING_DEPRECATED, + ) + ) except AttributeError: pass diff --git a/axes/conf.py b/axes/conf.py index 3a75f04..67c5e47 100644 --- a/axes/conf.py +++ b/axes/conf.py @@ -27,10 +27,10 @@ class AxesAppConf(AppConf): USE_USER_AGENT = False # use a specific username field to retrieve from login POST data - USERNAME_FORM_FIELD = 'username' + USERNAME_FORM_FIELD = "username" # use a specific password field to retrieve from login POST data - PASSWORD_FORM_FIELD = 'password' # noqa + PASSWORD_FORM_FIELD = "password" # noqa # use a provided callable to transform the POSTed username into the one used in credentials USERNAME_CALLABLE = None @@ -40,9 +40,9 @@ class AxesAppConf(AppConf): DISABLE_ACCESS_LOG = False - HANDLER = 'axes.handlers.database.AxesDatabaseHandler' + HANDLER = "axes.handlers.database.AxesDatabaseHandler" - LOGGER = 'axes.watch_login' + LOGGER = "axes.watch_login" LOCKOUT_TEMPLATE = None @@ -64,13 +64,17 @@ class AxesAppConf(AppConf): IP_BLACKLIST = None # message to show when locked out and have cooloff enabled - COOLOFF_MESSAGE = _('Account locked: too many login attempts. Please try again later') + COOLOFF_MESSAGE = _( + "Account locked: too many login attempts. Please try again later" + ) # message to show when locked out and have cooloff disabled - PERMALOCK_MESSAGE = _('Account locked: too many login attempts. Contact an admin to unlock your account.') + PERMALOCK_MESSAGE = _( + "Account locked: too many login attempts. Contact an admin to unlock your account." + ) # if your deployment is using reverse proxies, set this value to 'left-most' or 'right-most' per your configuration - PROXY_ORDER = 'left-most' + PROXY_ORDER = "left-most" # if your deployment is using reverse proxies, set this value to the number of proxies in front of Django PROXY_COUNT = None @@ -82,9 +86,7 @@ class AxesAppConf(AppConf): # if your deployment is using reverse proxies, ensure that the header attributes are securely set by the proxy # ensure that the client can not spoof the headers by setting them and sending them through the proxy META_PRECEDENCE_ORDER = getattr( - settings, 'AXES_META_PRECEDENCE_ORDER', getattr( - settings, 'IPWARE_META_PRECEDENCE_ORDER', ( - 'REMOTE_ADDR', - ) - ) + settings, + "AXES_META_PRECEDENCE_ORDER", + getattr(settings, "IPWARE_META_PRECEDENCE_ORDER", ("REMOTE_ADDR",)), ) diff --git a/axes/handlers/base.py b/axes/handlers/base.py index a8acbcd..98d9fbc 100644 --- a/axes/handlers/base.py +++ b/axes/handlers/base.py @@ -30,7 +30,9 @@ class AxesHandler: # pylint: disable=unused-argument :raises NotImplementedError: if the handler does not support resetting attempts. """ - raise NotImplementedError('Reset for access attempts is not supported on this backend') + raise NotImplementedError( + "Reset for access attempts is not supported on this backend" + ) def reset_logs(self, *, age_days: int = None) -> int: """ @@ -39,7 +41,9 @@ class AxesHandler: # pylint: disable=unused-argument :raises NotImplementedError: if the handler does not support resetting logs. """ - raise NotImplementedError('Reset for access logs is not supported on this backend') + raise NotImplementedError( + "Reset for access logs is not supported on this backend" + ) def is_allowed(self, request, credentials: dict = None) -> bool: """ @@ -70,7 +74,7 @@ class AxesHandler: # pylint: disable=unused-argument return True - def user_login_failed(self, sender, credentials: dict, request = None, **kwargs): + def user_login_failed(self, sender, credentials: dict, request=None, **kwargs): """ Handles the Django ``django.contrib.auth.signals.user_login_failed`` authentication signal. """ @@ -95,7 +99,9 @@ class AxesHandler: # pylint: disable=unused-argument Handles the ``axes.models.AccessAttempt`` object post delete signal. """ - def is_blacklisted(self, request, credentials: dict = None) -> bool: # pylint: disable=unused-argument + def is_blacklisted( + self, request, credentials: dict = None + ) -> bool: # pylint: disable=unused-argument """ Checks if the request or given credentials are blacklisted from access. """ @@ -105,7 +111,9 @@ class AxesHandler: # pylint: disable=unused-argument return False - def is_whitelisted(self, request, credentials: dict = None) -> bool: # pylint: disable=unused-argument + def is_whitelisted( + self, request, credentials: dict = None + ) -> bool: # pylint: disable=unused-argument """ Checks if the request or given credentials are whitelisted for access. """ @@ -124,7 +132,9 @@ class AxesHandler: # pylint: disable=unused-argument """ if settings.AXES_LOCK_OUT_AT_FAILURE: - return self.get_failures(request, credentials) >= get_failure_limit(request, credentials) + return self.get_failures(request, credentials) >= get_failure_limit( + request, credentials + ) return False @@ -136,15 +146,18 @@ class AxesHandler: # pylint: disable=unused-argument if the ``settings.AXES_LOCK_OUT_AT_FAILURE`` flag is set to ``True``. """ - raise NotImplementedError('The Axes handler class needs a method definition for get_failures') + raise NotImplementedError( + "The Axes handler class needs a method definition for get_failures" + ) def is_admin_site(self, request) -> bool: """ Checks if the request is for admin site. """ if ( - settings.AXES_ONLY_ADMIN_SITE and hasattr(request, 'path') and - not re.match('^%s' % reverse('admin:index'), request.path) + settings.AXES_ONLY_ADMIN_SITE + and hasattr(request, "path") + and not re.match("^%s" % reverse("admin:index"), request.path) ): return True diff --git a/axes/handlers/cache.py b/axes/handlers/cache.py index 677ec27..cb409dd 100644 --- a/axes/handlers/cache.py +++ b/axes/handlers/cache.py @@ -30,11 +30,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals return self.cache.get(cache_key, default=0) def user_login_failed( - self, - sender, - credentials: dict, - request = None, - **kwargs + self, sender, credentials: dict, request=None, **kwargs ): # pylint: disable=too-many-locals """ When user login fails, save attempt record in cache and lock user out if necessary. @@ -43,64 +39,92 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals """ if request is None: - log.error('AXES: AxesCacheHandler.user_login_failed does not function without a request.') + log.error( + "AXES: AxesCacheHandler.user_login_failed does not function without a request." + ) return username = get_client_username(request, credentials) - client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) + client_str = get_client_str( + username, + request.axes_ip_address, + request.axes_user_agent, + request.axes_path_info, + ) if self.is_whitelisted(request, credentials): - log.info('AXES: Login failed from whitelisted client %s.', client_str) + log.info("AXES: Login failed from whitelisted client %s.", client_str) return failures_since_start = 1 + self.get_failures(request, credentials) if failures_since_start > 1: log.warning( - 'AXES: Repeated login failure by %s. Count = %d of %d. Updating existing record in the cache.', + "AXES: Repeated login failure by %s. Count = %d of %d. Updating existing record in the cache.", client_str, failures_since_start, get_failure_limit(request, credentials), ) else: log.warning( - 'AXES: New login failure by %s. Creating new record in the cache.', + "AXES: New login failure by %s. Creating new record in the cache.", client_str, ) cache_key = get_client_cache_key(request, credentials) self.cache.set(cache_key, failures_since_start, self.cache_timeout) - if settings.AXES_LOCK_OUT_AT_FAILURE and failures_since_start >= get_failure_limit(request, credentials): - log.warning('AXES: Locking out %s after repeated login failures.', client_str) + if ( + settings.AXES_LOCK_OUT_AT_FAILURE + and failures_since_start >= get_failure_limit(request, credentials) + ): + log.warning( + "AXES: Locking out %s after repeated login failures.", client_str + ) request.axes_locked_out = True user_locked_out.send( - 'axes', + "axes", request=request, username=username, ip_address=request.axes_ip_address, ) - def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument + def user_logged_in( + self, sender, request, user, **kwargs + ): # pylint: disable=unused-argument """ When user logs in, update the AccessLog related to the user. """ username = user.get_username() credentials = get_credentials(username) - client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) + client_str = get_client_str( + username, + request.axes_ip_address, + request.axes_user_agent, + request.axes_path_info, + ) - log.info('AXES: Successful login by %s.', client_str) + log.info("AXES: Successful login by %s.", client_str) if settings.AXES_RESET_ON_SUCCESS: cache_key = get_client_cache_key(request, credentials) failures_since_start = self.cache.get(cache_key, default=0) self.cache.delete(cache_key) - log.info('AXES: Deleted %d failed login attempts by %s from cache.', failures_since_start, client_str) + log.info( + "AXES: Deleted %d failed login attempts by %s from cache.", + failures_since_start, + client_str, + ) def user_logged_out(self, sender, request, user, **kwargs): username = user.get_username() if user else None - client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) + client_str = get_client_str( + username, + request.axes_ip_address, + request.axes_user_agent, + request.axes_path_info, + ) - log.info('AXES: Successful logout by %s.', client_str) + log.info("AXES: Successful logout by %s.", client_str) diff --git a/axes/handlers/database.py b/axes/handlers/database.py index c81de4c..78a6dda 100644 --- a/axes/handlers/database.py +++ b/axes/handlers/database.py @@ -40,24 +40,31 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals attempts = attempts.filter(username=username) count, _ = attempts.delete() - log.info('AXES: Reset %d access attempts from database.', count) + log.info("AXES: Reset %d access attempts from database.", count) return count def reset_logs(self, *, age_days: int = None) -> int: if age_days is None: count, _ = AccessLog.objects.all().delete() - log.info('AXES: Reset all %d access logs from database.', count) + log.info("AXES: Reset all %d access logs from database.", count) else: limit = timezone.now() - timezone.timedelta(days=age_days) count, _ = AccessLog.objects.filter(attempt_time__lte=limit).delete() - log.info('AXES: Reset %d access logs older than %d days from database.', count, age_days) + log.info( + "AXES: Reset %d access logs older than %d days from database.", + count, + age_days, + ) return count def get_failures(self, request, credentials: dict = None) -> int: attempts = get_user_attempts(request, credentials) - return attempts.aggregate(Max('failures_since_start'))['failures_since_start__max'] or 0 + return ( + attempts.aggregate(Max("failures_since_start"))["failures_since_start__max"] + or 0 + ) def is_locked(self, request, credentials: dict = None): if is_user_attempt_whitelisted(request, credentials): @@ -66,11 +73,7 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals return super().is_locked(request, credentials) def user_login_failed( - self, - sender, - credentials: dict, - request = None, - **kwargs + self, sender, credentials: dict, request=None, **kwargs ): # pylint: disable=too-many-locals """ When user login fails, save AccessAttempt record in database and lock user out if necessary. @@ -79,20 +82,27 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals """ if request is None: - log.error('AXES: AxesDatabaseHandler.user_login_failed does not function without a request.') + log.error( + "AXES: AxesDatabaseHandler.user_login_failed does not function without a request." + ) return # 1. database query: Clean up expired user attempts from the database before logging new attempts clean_expired_user_attempts(request.axes_attempt_time) username = get_client_username(request, credentials) - client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) + client_str = get_client_str( + username, + request.axes_ip_address, + request.axes_user_agent, + request.axes_path_info, + ) get_data = get_query_str(request.GET) post_data = get_query_str(request.POST) if self.is_whitelisted(request, credentials): - log.info('AXES: Login failed from whitelisted client %s.', client_str) + log.info("AXES: Login failed from whitelisted client %s.", client_str) return # 2. database query: Calculate the current maximum failure number from the existing attempts @@ -105,18 +115,18 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals # in order to bypass the defense mechanisms that are used by the site. log.warning( - 'AXES: Repeated login failure by %s. Count = %d of %d. Updating existing record in the database.', + "AXES: Repeated login failure by %s. Count = %d of %d. Updating existing record in the database.", client_str, failures_since_start, get_failure_limit(request, credentials), ) - separator = '\n---------\n' + separator = "\n---------\n" attempts = get_user_attempts(request, credentials) attempts.update( - get_data=Concat('get_data', Value(separator + get_data)), - post_data=Concat('post_data', Value(separator + post_data)), + get_data=Concat("get_data", Value(separator + get_data)), + post_data=Concat("post_data", Value(separator + post_data)), http_accept=request.axes_http_accept, path_info=request.axes_path_info, failures_since_start=failures_since_start, @@ -128,7 +138,7 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals # and this handler just records the available information for further use. log.warning( - 'AXES: New login failure by %s. Creating new record in the database.', + "AXES: New login failure by %s. Creating new record in the database.", client_str, ) @@ -144,19 +154,26 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals attempt_time=request.axes_attempt_time, ) - if settings.AXES_LOCK_OUT_AT_FAILURE and failures_since_start >= get_failure_limit(request, credentials): - log.warning('AXES: Locking out %s after repeated login failures.', client_str) + if ( + settings.AXES_LOCK_OUT_AT_FAILURE + and failures_since_start >= get_failure_limit(request, credentials) + ): + log.warning( + "AXES: Locking out %s after repeated login failures.", client_str + ) request.axes_locked_out = True user_locked_out.send( - 'axes', + "axes", request=request, username=username, ip_address=request.axes_ip_address, ) - def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument + def user_logged_in( + self, sender, request, user, **kwargs + ): # pylint: disable=unused-argument """ When user logs in, update the AccessLog related to the user. """ @@ -166,9 +183,14 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals username = user.get_username() credentials = get_credentials(username) - client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) + client_str = get_client_str( + username, + request.axes_ip_address, + request.axes_user_agent, + request.axes_path_info, + ) - log.info('AXES: Successful login by %s.', client_str) + log.info("AXES: Successful login by %s.", client_str) if not settings.AXES_DISABLE_ACCESS_LOG: # 2. database query: Insert new access logs with login time @@ -184,9 +206,15 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals if settings.AXES_RESET_ON_SUCCESS: # 3. database query: Reset failed attempts for the logging in user count = reset_user_attempts(request, credentials) - log.info('AXES: Deleted %d failed login attempts by %s from database.', count, client_str) + log.info( + "AXES: Deleted %d failed login attempts by %s from database.", + count, + client_str, + ) - def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument + def user_logged_out( + self, sender, request, user, **kwargs + ): # pylint: disable=unused-argument """ When user logs out, update the AccessLog related to the user. """ @@ -195,15 +223,17 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals clean_expired_user_attempts(request.axes_attempt_time) username = user.get_username() if user else None - client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info) + client_str = get_client_str( + username, + request.axes_ip_address, + request.axes_user_agent, + request.axes_path_info, + ) - log.info('AXES: Successful logout by %s.', client_str) + log.info("AXES: Successful logout by %s.", client_str) if username and not settings.AXES_DISABLE_ACCESS_LOG: # 2. database query: Update existing attempt logs with logout time AccessLog.objects.filter( - username=username, - logout_time__isnull=True, - ).update( - logout_time=request.axes_attempt_time, - ) + username=username, logout_time__isnull=True + ).update(logout_time=request.axes_attempt_time) diff --git a/axes/handlers/proxy.py b/axes/handlers/proxy.py index 7e3926e..8a97456 100644 --- a/axes/handlers/proxy.py +++ b/axes/handlers/proxy.py @@ -44,7 +44,9 @@ class AxesProxyHandler(AxesHandler): @classmethod def reset_attempts(cls, *, ip_address: str = None, username: str = None) -> int: - return cls.get_implementation().reset_attempts(ip_address=ip_address, username=username) + return cls.get_implementation().reset_attempts( + ip_address=ip_address, username=username + ) @classmethod def reset_logs(cls, *, age_days: int = None) -> int: @@ -57,7 +59,9 @@ class AxesProxyHandler(AxesHandler): """ if request is None: - log.error('AXES: AxesProxyHandler.update_request can not set request attributes to a None request') + log.error( + "AXES: AxesProxyHandler.update_request can not set request attributes to a None request" + ) return request.axes_locked_out = False @@ -81,7 +85,9 @@ class AxesProxyHandler(AxesHandler): @toggleable def user_login_failed(cls, sender, credentials: dict, request=None, **kwargs): cls.update_request(request) - return cls.get_implementation().user_login_failed(sender, credentials, request, **kwargs) + return cls.get_implementation().user_login_failed( + sender, credentials, request, **kwargs + ) @classmethod @toggleable diff --git a/axes/helpers.py b/axes/helpers.py index 3e757e5..e633821 100644 --- a/axes/helpers.py +++ b/axes/helpers.py @@ -4,7 +4,13 @@ from logging import getLogger from typing import Callable, Optional, Type, Union from django.core.cache import caches, BaseCache -from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, JsonResponse, QueryDict +from django.http import ( + HttpRequest, + HttpResponse, + HttpResponseRedirect, + JsonResponse, + QueryDict, +) from django.shortcuts import render from django.utils.module_loading import import_string @@ -21,7 +27,7 @@ def get_cache() -> BaseCache: Get the cache instance Axes is configured to use with ``settings.AXES_CACHE`` and use ``'default'`` if not set. """ - return caches[getattr(settings, 'AXES_CACHE', 'default')] + return caches[getattr(settings, "AXES_CACHE", "default")] def get_cache_timeout() -> Optional[int]: @@ -76,22 +82,17 @@ def get_cool_off_iso8601(delta: timedelta) -> str: hours, minutes = divmod(minutes, 60) days, hours = divmod(hours, 24) - days_str = f'{days:.0f}D' if days else '' + days_str = f"{days:.0f}D" if days else "" - time_str = ''.join( - f'{value:.0f}{designator}' - for value, designator - in [ - [hours, 'H'], - [minutes, 'M'], - [seconds, 'S'], - ] + time_str = "".join( + f"{value:.0f}{designator}" + for value, designator in [[hours, "H"], [minutes, "M"], [seconds, "S"]] if value ) if time_str: - return f'P{days_str}T{time_str}' - return f'P{days_str}' + return f"P{days_str}T{time_str}" + return f"P{days_str}" def get_credentials(username: str = None, **kwargs) -> dict: @@ -122,19 +123,25 @@ def get_client_username(request, credentials: dict = None) -> str: """ if settings.AXES_USERNAME_CALLABLE: - log.debug('Using settings.AXES_USERNAME_CALLABLE to get username') + log.debug("Using settings.AXES_USERNAME_CALLABLE to get username") if callable(settings.AXES_USERNAME_CALLABLE): return settings.AXES_USERNAME_CALLABLE(request, credentials) if isinstance(settings.AXES_USERNAME_CALLABLE, str): return import_string(settings.AXES_USERNAME_CALLABLE)(request, credentials) - raise TypeError('settings.AXES_USERNAME_CALLABLE needs to be a string, callable, or None.') + raise TypeError( + "settings.AXES_USERNAME_CALLABLE needs to be a string, callable, or None." + ) if credentials: - log.debug('Using parameter credentials to get username with key settings.AXES_USERNAME_FORM_FIELD') + log.debug( + "Using parameter credentials to get username with key settings.AXES_USERNAME_FORM_FIELD" + ) return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None) - log.debug('Using parameter request.POST to get username with key settings.AXES_USERNAME_FORM_FIELD') + log.debug( + "Using parameter request.POST to get username with key settings.AXES_USERNAME_FORM_FIELD" + ) return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None) @@ -158,15 +165,15 @@ def get_client_ip_address(request) -> str: def get_client_user_agent(request) -> str: - return request.META.get('HTTP_USER_AGENT', '')[:255] + return request.META.get("HTTP_USER_AGENT", "")[:255] def get_client_path_info(request) -> str: - return request.META.get('PATH_INFO', '')[:255] + return request.META.get("PATH_INFO", "")[:255] def get_client_http_accept(request) -> str: - return request.META.get('HTTP_ACCEPT', '')[:1025] + return request.META.get("HTTP_ACCEPT", "")[:1025] def get_client_parameters(username: str, ip_address: str, user_agent: str) -> dict: @@ -181,24 +188,26 @@ def get_client_parameters(username: str, ip_address: str, user_agent: str) -> di if settings.AXES_ONLY_USER_FAILURES: # 1. Only individual usernames can be tracked with parametrization - filter_kwargs['username'] = username + filter_kwargs["username"] = username else: if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP: # 2. A combination of username and IP address can be used as well - filter_kwargs['username'] = username - filter_kwargs['ip_address'] = ip_address + filter_kwargs["username"] = username + filter_kwargs["ip_address"] = ip_address else: # 3. Default case is to track the IP address only, which is the most secure option - filter_kwargs['ip_address'] = ip_address + filter_kwargs["ip_address"] = ip_address if settings.AXES_USE_USER_AGENT: # 4. The HTTP User-Agent can be used to track e.g. one browser - filter_kwargs['user_agent'] = user_agent + filter_kwargs["user_agent"] = user_agent return filter_kwargs -def get_client_str(username: str, ip_address: str, user_agent: str, path_info: str) -> str: +def get_client_str( + username: str, ip_address: str, user_agent: str, path_info: str +) -> str: """ Get a readable string that can be used in e.g. logging to distinguish client requests. @@ -209,9 +218,9 @@ def get_client_str(username: str, ip_address: str, user_agent: str, path_info: s if settings.AXES_VERBOSE: # Verbose mode logs every attribute that is available - client_dict['username'] = username - client_dict['ip_address'] = ip_address - client_dict['user_agent'] = user_agent + client_dict["username"] = username + client_dict["ip_address"] = ip_address + client_dict["user_agent"] = user_agent else: # Other modes initialize the attributes that are used for the actual lockouts client_dict = get_client_parameters(username, ip_address, user_agent) @@ -219,19 +228,15 @@ def get_client_str(username: str, ip_address: str, user_agent: str, path_info: s # Path info is always included as last component in the client string for traceability purposes if path_info and isinstance(path_info, (tuple, list)): path_info = path_info[0] - client_dict['path_info'] = path_info + client_dict["path_info"] = path_info # Template the internal dictionary representation into a readable and concatenated key: "value" format - template = ', '.join( - f'{key}: "{value}"' - for key, value - in client_dict.items() - ) + template = ", ".join(f'{key}: "{value}"' for key, value in client_dict.items()) # Wrap the internal dict into a single {key: "value"} bracing in the output # which requires double braces when done with the Python string templating system # i.e. {{key: "value"}} becomes {key: "value"} when run through a .format() call - template = '{{' + template + '}}' + template = "{{" + template + "}}" return template.format(client_dict) @@ -246,14 +251,10 @@ def get_query_str(query: Type[QueryDict], max_length: int = 1024) -> str: """ query_dict = query.copy() - query_dict.pop('password', None) + query_dict.pop("password", None) query_dict.pop(settings.AXES_PASSWORD_FORM_FIELD, None) - query_str = '\n'.join( - f'{key}={value}' - for key, value - in query_dict.items() - ) + query_str = "\n".join(f"{key}={value}" for key, value in query_dict.items()) return query_str[:max_length] @@ -265,7 +266,7 @@ def get_failure_limit(request, credentials) -> int: return import_string(settings.AXES_FAILURE_LIMIT)(request, credentials) if isinstance(settings.AXES_FAILURE_LIMIT, int): return settings.AXES_FAILURE_LIMIT - raise TypeError('settings.AXES_FAILURE_LIMIT needs to be a callable or an integer') + raise TypeError("settings.AXES_FAILURE_LIMIT needs to be a callable or an integer") def get_lockout_message() -> str: @@ -277,39 +278,30 @@ def get_lockout_message() -> str: def get_lockout_response(request, credentials: dict = None) -> HttpResponse: status = 403 context = { - 'failure_limit': get_failure_limit(request, credentials), - 'username': get_client_username(request, credentials) or '' + "failure_limit": get_failure_limit(request, credentials), + "username": get_client_username(request, credentials) or "", } cool_off = get_cool_off() if cool_off: - context.update({ - 'cooloff_time': get_cool_off_iso8601(cool_off), # differing old name is kept for backwards compatibility - }) + context.update( + { + "cooloff_time": get_cool_off_iso8601( + cool_off + ) # differing old name is kept for backwards compatibility + } + ) if request.is_ajax(): - return JsonResponse( - context, - status=status, - ) + return JsonResponse(context, status=status) if settings.AXES_LOCKOUT_TEMPLATE: - return render( - request, - settings.AXES_LOCKOUT_TEMPLATE, - context, - status=status, - ) + return render(request, settings.AXES_LOCKOUT_TEMPLATE, context, status=status) if settings.AXES_LOCKOUT_URL: - return HttpResponseRedirect( - settings.AXES_LOCKOUT_URL, - ) + return HttpResponseRedirect(settings.AXES_LOCKOUT_URL) - return HttpResponse( - get_lockout_message(), - status=status, - ) + return HttpResponse(get_lockout_message(), status=status) def is_ip_address_in_whitelist(ip_address: str) -> bool: @@ -331,10 +323,14 @@ def is_client_ip_address_whitelisted(request): Check if the given request refers to a whitelisted IP. """ - if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(request.axes_ip_address): + if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist( + request.axes_ip_address + ): return True - if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(request.axes_ip_address): + if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist( + request.axes_ip_address + ): return True return False @@ -348,7 +344,9 @@ def is_client_ip_address_blacklisted(request) -> bool: if is_ip_address_in_blacklist(request.axes_ip_address): return True - if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(request.axes_ip_address): + if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist( + request.axes_ip_address + ): return True return False @@ -359,13 +357,15 @@ def is_client_method_whitelisted(request) -> bool: Check if the given request uses a whitelisted method. """ - if settings.AXES_NEVER_LOCKOUT_GET and request.method == 'GET': + if settings.AXES_NEVER_LOCKOUT_GET and request.method == "GET": return True return False -def get_client_cache_key(request_or_attempt: Union[HttpRequest, AccessBase], credentials: dict = None) -> str: +def get_client_cache_key( + request_or_attempt: Union[HttpRequest, AccessBase], credentials: dict = None +) -> str: """ Build cache key name from request or AccessAttempt object. @@ -385,9 +385,9 @@ def get_client_cache_key(request_or_attempt: Union[HttpRequest, AccessBase], cre filter_kwargs = get_client_parameters(username, ip_address, user_agent) - cache_key_components = ''.join(value for value in filter_kwargs.values() if value) + cache_key_components = "".join(value for value in filter_kwargs.values() if value) cache_key_digest = md5(cache_key_components.encode()).hexdigest() - cache_key = f'axes-{cache_key_digest}' + cache_key = f"axes-{cache_key_digest}" return cache_key @@ -406,4 +406,5 @@ def toggleable(func) -> Callable: def inner(*args, **kwargs): # pylint: disable=inconsistent-return-statements if settings.AXES_ENABLED: return func(*args, **kwargs) + return inner diff --git a/axes/management/commands/axes_list_attempts.py b/axes/management/commands/axes_list_attempts.py index 595eca9..ad4da8d 100644 --- a/axes/management/commands/axes_list_attempts.py +++ b/axes/management/commands/axes_list_attempts.py @@ -4,8 +4,10 @@ from axes.models import AccessAttempt class Command(BaseCommand): - help = 'List access attempts' + help = "List access attempts" def handle(self, *args, **options): # pylint: disable=unused-argument for obj in AccessAttempt.objects.all(): - self.stdout.write(f'{obj.ip_address}\t{obj.username}\t{obj.failures_since_start}') + self.stdout.write( + f"{obj.ip_address}\t{obj.username}\t{obj.failures_since_start}" + ) diff --git a/axes/management/commands/axes_reset.py b/axes/management/commands/axes_reset.py index 583a940..70187e5 100644 --- a/axes/management/commands/axes_reset.py +++ b/axes/management/commands/axes_reset.py @@ -4,12 +4,12 @@ from axes.utils import reset class Command(BaseCommand): - help = 'Reset all access attempts and lockouts' + help = "Reset all access attempts and lockouts" def handle(self, *args, **options): # pylint: disable=unused-argument count = reset() if count: - self.stdout.write(f'{count} attempts removed.') + self.stdout.write(f"{count} attempts removed.") else: - self.stdout.write('No attempts found.') + self.stdout.write("No attempts found.") diff --git a/axes/management/commands/axes_reset_ip.py b/axes/management/commands/axes_reset_ip.py index 5ac70b0..e043497 100644 --- a/axes/management/commands/axes_reset_ip.py +++ b/axes/management/commands/axes_reset_ip.py @@ -4,18 +4,18 @@ from axes.utils import reset class Command(BaseCommand): - help = 'Reset all access attempts and lockouts for given IP addresses' + help = "Reset all access attempts and lockouts for given IP addresses" def add_arguments(self, parser): - parser.add_argument('ip', nargs='+', type=str) + parser.add_argument("ip", nargs="+", type=str) def handle(self, *args, **options): count = 0 - for ip in options['ip']: + for ip in options["ip"]: count += reset(ip=ip) if count: - self.stdout.write(f'{count} attempts removed.') + self.stdout.write(f"{count} attempts removed.") else: - self.stdout.write('No attempts found.') + self.stdout.write("No attempts found.") diff --git a/axes/management/commands/axes_reset_logs.py b/axes/management/commands/axes_reset_logs.py index 8a197c8..4cdad84 100644 --- a/axes/management/commands/axes_reset_logs.py +++ b/axes/management/commands/axes_reset_logs.py @@ -4,19 +4,19 @@ from axes.handlers.proxy import AxesProxyHandler class Command(BaseCommand): - help = 'Reset access log records older than given days.' + help = "Reset access log records older than given days." def add_arguments(self, parser): parser.add_argument( - '--age', + "--age", type=int, default=30, - help='Maximum age for records to keep in days', + help="Maximum age for records to keep in days", ) def handle(self, *args, **options): - count = AxesProxyHandler.reset_logs(age_days=options['age']) + count = AxesProxyHandler.reset_logs(age_days=options["age"]) if count: - self.stdout.write(f'{count} logs removed.') + self.stdout.write(f"{count} logs removed.") else: - self.stdout.write('No logs found.') + self.stdout.write("No logs found.") diff --git a/axes/management/commands/axes_reset_username.py b/axes/management/commands/axes_reset_username.py index 86646d9..85bf9ef 100644 --- a/axes/management/commands/axes_reset_username.py +++ b/axes/management/commands/axes_reset_username.py @@ -4,18 +4,18 @@ from axes.utils import reset class Command(BaseCommand): - help = 'Reset all access attempts and lockouts for given usernames' + help = "Reset all access attempts and lockouts for given usernames" def add_arguments(self, parser): - parser.add_argument('username', nargs='+', type=str) + parser.add_argument("username", nargs="+", type=str) def handle(self, *args, **options): count = 0 - for username in options['username']: + for username in options["username"]: count += reset(username=username) if count: - self.stdout.write(f'{count} attempts removed.') + self.stdout.write(f"{count} attempts removed.") else: - self.stdout.write('No attempts found.') + self.stdout.write("No attempts found.") diff --git a/axes/middleware.py b/axes/middleware.py index 8868a6d..4ec1c0e 100644 --- a/axes/middleware.py +++ b/axes/middleware.py @@ -29,7 +29,7 @@ class AxesMiddleware: def __call__(self, request): response = self.get_response(request) - if getattr(request, 'axes_locked_out', None): + if getattr(request, "axes_locked_out", None): response = get_lockout_response(request) # type: ignore return response diff --git a/axes/migrations/0001_initial.py b/axes/migrations/0001_initial.py index 5c5d510..1535183 100644 --- a/axes/migrations/0001_initial.py +++ b/axes/migrations/0001_initial.py @@ -3,46 +3,70 @@ from django.db import migrations, models class Migration(migrations.Migration): - dependencies = [ - ] + dependencies = [] operations = [ migrations.CreateModel( - name='AccessAttempt', + name="AccessAttempt", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('user_agent', models.CharField(max_length=255)), - ('ip_address', models.GenericIPAddressField(null=True, verbose_name='IP Address')), - ('username', models.CharField(max_length=255, null=True)), - ('trusted', models.BooleanField(default=False)), - ('http_accept', models.CharField(max_length=1025, verbose_name='HTTP Accept')), - ('path_info', models.CharField(max_length=255, verbose_name='Path')), - ('attempt_time', models.DateTimeField(auto_now_add=True)), - ('get_data', models.TextField(verbose_name='GET Data')), - ('post_data', models.TextField(verbose_name='POST Data')), - ('failures_since_start', models.PositiveIntegerField(verbose_name='Failed Logins')), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ("user_agent", models.CharField(max_length=255)), + ( + "ip_address", + models.GenericIPAddressField(null=True, verbose_name="IP Address"), + ), + ("username", models.CharField(max_length=255, null=True)), + ("trusted", models.BooleanField(default=False)), + ( + "http_accept", + models.CharField(max_length=1025, verbose_name="HTTP Accept"), + ), + ("path_info", models.CharField(max_length=255, verbose_name="Path")), + ("attempt_time", models.DateTimeField(auto_now_add=True)), + ("get_data", models.TextField(verbose_name="GET Data")), + ("post_data", models.TextField(verbose_name="POST Data")), + ( + "failures_since_start", + models.PositiveIntegerField(verbose_name="Failed Logins"), + ), ], - options={ - 'ordering': ['-attempt_time'], - 'abstract': False, - }, + options={"ordering": ["-attempt_time"], "abstract": False}, ), migrations.CreateModel( - name='AccessLog', + name="AccessLog", fields=[ - ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), - ('user_agent', models.CharField(max_length=255)), - ('ip_address', models.GenericIPAddressField(null=True, verbose_name='IP Address')), - ('username', models.CharField(max_length=255, null=True)), - ('trusted', models.BooleanField(default=False)), - ('http_accept', models.CharField(max_length=1025, verbose_name='HTTP Accept')), - ('path_info', models.CharField(max_length=255, verbose_name='Path')), - ('attempt_time', models.DateTimeField(auto_now_add=True)), - ('logout_time', models.DateTimeField(null=True, blank=True)), + ( + "id", + models.AutoField( + verbose_name="ID", + serialize=False, + auto_created=True, + primary_key=True, + ), + ), + ("user_agent", models.CharField(max_length=255)), + ( + "ip_address", + models.GenericIPAddressField(null=True, verbose_name="IP Address"), + ), + ("username", models.CharField(max_length=255, null=True)), + ("trusted", models.BooleanField(default=False)), + ( + "http_accept", + models.CharField(max_length=1025, verbose_name="HTTP Accept"), + ), + ("path_info", models.CharField(max_length=255, verbose_name="Path")), + ("attempt_time", models.DateTimeField(auto_now_add=True)), + ("logout_time", models.DateTimeField(null=True, blank=True)), ], - options={ - 'ordering': ['-attempt_time'], - 'abstract': False, - }, + options={"ordering": ["-attempt_time"], "abstract": False}, ), ] diff --git a/axes/migrations/0002_auto_20151217_2044.py b/axes/migrations/0002_auto_20151217_2044.py index 0d880d0..62bc371 100644 --- a/axes/migrations/0002_auto_20151217_2044.py +++ b/axes/migrations/0002_auto_20151217_2044.py @@ -3,49 +3,51 @@ from django.db import migrations, models class Migration(migrations.Migration): - dependencies = [ - ('axes', '0001_initial'), - ] + dependencies = [("axes", "0001_initial")] operations = [ migrations.AlterField( - model_name='accessattempt', - name='ip_address', - field=models.GenericIPAddressField(db_index=True, null=True, verbose_name='IP Address'), + model_name="accessattempt", + name="ip_address", + field=models.GenericIPAddressField( + db_index=True, null=True, verbose_name="IP Address" + ), ), migrations.AlterField( - model_name='accessattempt', - name='trusted', + model_name="accessattempt", + name="trusted", field=models.BooleanField(db_index=True, default=False), ), migrations.AlterField( - model_name='accessattempt', - name='user_agent', + model_name="accessattempt", + name="user_agent", field=models.CharField(db_index=True, max_length=255), ), migrations.AlterField( - model_name='accessattempt', - name='username', + model_name="accessattempt", + name="username", field=models.CharField(db_index=True, max_length=255, null=True), ), migrations.AlterField( - model_name='accesslog', - name='ip_address', - field=models.GenericIPAddressField(db_index=True, null=True, verbose_name='IP Address'), + model_name="accesslog", + name="ip_address", + field=models.GenericIPAddressField( + db_index=True, null=True, verbose_name="IP Address" + ), ), migrations.AlterField( - model_name='accesslog', - name='trusted', + model_name="accesslog", + name="trusted", field=models.BooleanField(db_index=True, default=False), ), migrations.AlterField( - model_name='accesslog', - name='user_agent', + model_name="accesslog", + name="user_agent", field=models.CharField(db_index=True, max_length=255), ), migrations.AlterField( - model_name='accesslog', - name='username', + model_name="accesslog", + name="username", field=models.CharField(db_index=True, max_length=255, null=True), ), ] diff --git a/axes/migrations/0003_auto_20160322_0929.py b/axes/migrations/0003_auto_20160322_0929.py index e337e54..18e3941 100644 --- a/axes/migrations/0003_auto_20160322_0929.py +++ b/axes/migrations/0003_auto_20160322_0929.py @@ -3,54 +3,56 @@ from django.db import models, migrations class Migration(migrations.Migration): - dependencies = [ - ('axes', '0002_auto_20151217_2044'), - ] + dependencies = [("axes", "0002_auto_20151217_2044")] operations = [ migrations.AlterField( - model_name='accessattempt', - name='failures_since_start', - field=models.PositiveIntegerField(verbose_name='Failed Logins'), + model_name="accessattempt", + name="failures_since_start", + field=models.PositiveIntegerField(verbose_name="Failed Logins"), ), migrations.AlterField( - model_name='accessattempt', - name='get_data', - field=models.TextField(verbose_name='GET Data'), + model_name="accessattempt", + name="get_data", + field=models.TextField(verbose_name="GET Data"), ), migrations.AlterField( - model_name='accessattempt', - name='http_accept', - field=models.CharField(verbose_name='HTTP Accept', max_length=1025), + model_name="accessattempt", + name="http_accept", + field=models.CharField(verbose_name="HTTP Accept", max_length=1025), ), migrations.AlterField( - model_name='accessattempt', - name='ip_address', - field=models.GenericIPAddressField(null=True, verbose_name='IP Address', db_index=True), + model_name="accessattempt", + name="ip_address", + field=models.GenericIPAddressField( + null=True, verbose_name="IP Address", db_index=True + ), ), migrations.AlterField( - model_name='accessattempt', - name='path_info', - field=models.CharField(verbose_name='Path', max_length=255), + model_name="accessattempt", + name="path_info", + field=models.CharField(verbose_name="Path", max_length=255), ), migrations.AlterField( - model_name='accessattempt', - name='post_data', - field=models.TextField(verbose_name='POST Data'), + model_name="accessattempt", + name="post_data", + field=models.TextField(verbose_name="POST Data"), ), migrations.AlterField( - model_name='accesslog', - name='http_accept', - field=models.CharField(verbose_name='HTTP Accept', max_length=1025), + model_name="accesslog", + name="http_accept", + field=models.CharField(verbose_name="HTTP Accept", max_length=1025), ), migrations.AlterField( - model_name='accesslog', - name='ip_address', - field=models.GenericIPAddressField(null=True, verbose_name='IP Address', db_index=True), + model_name="accesslog", + name="ip_address", + field=models.GenericIPAddressField( + null=True, verbose_name="IP Address", db_index=True + ), ), migrations.AlterField( - model_name='accesslog', - name='path_info', - field=models.CharField(verbose_name='Path', max_length=255), + model_name="accesslog", + name="path_info", + field=models.CharField(verbose_name="Path", max_length=255), ), ] diff --git a/axes/migrations/0004_auto_20181024_1538.py b/axes/migrations/0004_auto_20181024_1538.py index 215b18d..fc88826 100644 --- a/axes/migrations/0004_auto_20181024_1538.py +++ b/axes/migrations/0004_auto_20181024_1538.py @@ -3,52 +3,66 @@ from django.db import migrations, models class Migration(migrations.Migration): - dependencies = [ - ('axes', '0003_auto_20160322_0929'), - ] + dependencies = [("axes", "0003_auto_20160322_0929")] operations = [ migrations.AlterModelOptions( - name='accessattempt', - options={'verbose_name': 'access attempt', 'verbose_name_plural': 'access attempts'}, + name="accessattempt", + options={ + "verbose_name": "access attempt", + "verbose_name_plural": "access attempts", + }, ), migrations.AlterModelOptions( - name='accesslog', - options={'verbose_name': 'access log', 'verbose_name_plural': 'access logs'}, + name="accesslog", + options={ + "verbose_name": "access log", + "verbose_name_plural": "access logs", + }, ), migrations.AlterField( - model_name='accessattempt', - name='attempt_time', - field=models.DateTimeField(auto_now_add=True, verbose_name='Attempt Time'), + model_name="accessattempt", + name="attempt_time", + field=models.DateTimeField(auto_now_add=True, verbose_name="Attempt Time"), ), migrations.AlterField( - model_name='accessattempt', - name='user_agent', - field=models.CharField(db_index=True, max_length=255, verbose_name='User Agent'), + model_name="accessattempt", + name="user_agent", + field=models.CharField( + db_index=True, max_length=255, verbose_name="User Agent" + ), ), migrations.AlterField( - model_name='accessattempt', - name='username', - field=models.CharField(db_index=True, max_length=255, null=True, verbose_name='Username'), + model_name="accessattempt", + name="username", + field=models.CharField( + db_index=True, max_length=255, null=True, verbose_name="Username" + ), ), migrations.AlterField( - model_name='accesslog', - name='attempt_time', - field=models.DateTimeField(auto_now_add=True, verbose_name='Attempt Time'), + model_name="accesslog", + name="attempt_time", + field=models.DateTimeField(auto_now_add=True, verbose_name="Attempt Time"), ), migrations.AlterField( - model_name='accesslog', - name='logout_time', - field=models.DateTimeField(blank=True, null=True, verbose_name='Logout Time'), + model_name="accesslog", + name="logout_time", + field=models.DateTimeField( + blank=True, null=True, verbose_name="Logout Time" + ), ), migrations.AlterField( - model_name='accesslog', - name='user_agent', - field=models.CharField(db_index=True, max_length=255, verbose_name='User Agent'), + model_name="accesslog", + name="user_agent", + field=models.CharField( + db_index=True, max_length=255, verbose_name="User Agent" + ), ), migrations.AlterField( - model_name='accesslog', - name='username', - field=models.CharField(db_index=True, max_length=255, null=True, verbose_name='Username'), + model_name="accesslog", + name="username", + field=models.CharField( + db_index=True, max_length=255, null=True, verbose_name="Username" + ), ), ] diff --git a/axes/migrations/0005_remove_accessattempt_trusted.py b/axes/migrations/0005_remove_accessattempt_trusted.py index 9e004a0..c7711fb 100644 --- a/axes/migrations/0005_remove_accessattempt_trusted.py +++ b/axes/migrations/0005_remove_accessattempt_trusted.py @@ -3,13 +3,6 @@ from django.db import migrations class Migration(migrations.Migration): - dependencies = [ - ('axes', '0004_auto_20181024_1538'), - ] + dependencies = [("axes", "0004_auto_20181024_1538")] - operations = [ - migrations.RemoveField( - model_name='accessattempt', - name='trusted', - ), - ] + operations = [migrations.RemoveField(model_name="accessattempt", name="trusted")] diff --git a/axes/migrations/0006_remove_accesslog_trusted.py b/axes/migrations/0006_remove_accesslog_trusted.py index 756522b..347e0f0 100644 --- a/axes/migrations/0006_remove_accesslog_trusted.py +++ b/axes/migrations/0006_remove_accesslog_trusted.py @@ -5,13 +5,6 @@ from django.db import migrations class Migration(migrations.Migration): - dependencies = [ - ('axes', '0005_remove_accessattempt_trusted'), - ] + dependencies = [("axes", "0005_remove_accessattempt_trusted")] - operations = [ - migrations.RemoveField( - model_name='accesslog', - name='trusted', - ), - ] + operations = [migrations.RemoveField(model_name="accesslog", name="trusted")] diff --git a/axes/models.py b/axes/models.py index 8bf1dc4..a5a807c 100644 --- a/axes/models.py +++ b/axes/models.py @@ -3,77 +3,45 @@ from django.utils.translation import gettext_lazy as _ class AccessBase(models.Model): - user_agent = models.CharField( - _('User Agent'), - max_length=255, - db_index=True, - ) + user_agent = models.CharField(_("User Agent"), max_length=255, db_index=True) - ip_address = models.GenericIPAddressField( - _('IP Address'), - null=True, - db_index=True, - ) + ip_address = models.GenericIPAddressField(_("IP Address"), null=True, db_index=True) - username = models.CharField( - _('Username'), - max_length=255, - null=True, - db_index=True, - ) + username = models.CharField(_("Username"), max_length=255, null=True, db_index=True) - http_accept = models.CharField( - _('HTTP Accept'), - max_length=1025, - ) + http_accept = models.CharField(_("HTTP Accept"), max_length=1025) - path_info = models.CharField( - _('Path'), - max_length=255, - ) + path_info = models.CharField(_("Path"), max_length=255) - attempt_time = models.DateTimeField( - _('Attempt Time'), - auto_now_add=True, - ) + attempt_time = models.DateTimeField(_("Attempt Time"), auto_now_add=True) class Meta: - app_label = 'axes' + app_label = "axes" abstract = True - ordering = ['-attempt_time'] + ordering = ["-attempt_time"] class AccessAttempt(AccessBase): - get_data = models.TextField( - _('GET Data'), - ) + get_data = models.TextField(_("GET Data")) - post_data = models.TextField( - _('POST Data'), - ) + post_data = models.TextField(_("POST Data")) - failures_since_start = models.PositiveIntegerField( - _('Failed Logins'), - ) + failures_since_start = models.PositiveIntegerField(_("Failed Logins")) def __str__(self): - return f'Attempted Access: {self.attempt_time}' + return f"Attempted Access: {self.attempt_time}" class Meta: - verbose_name = _('access attempt') - verbose_name_plural = _('access attempts') + verbose_name = _("access attempt") + verbose_name_plural = _("access attempts") class AccessLog(AccessBase): - logout_time = models.DateTimeField( - _('Logout Time'), - null=True, - blank=True, - ) + logout_time = models.DateTimeField(_("Logout Time"), null=True, blank=True) def __str__(self): - return f'Access Log for {self.username} @ {self.attempt_time}' + return f"Access Log for {self.username} @ {self.attempt_time}" class Meta: - verbose_name = _('access log') - verbose_name_plural = _('access logs') + verbose_name = _("access log") + verbose_name_plural = _("access logs") diff --git a/axes/signals.py b/axes/signals.py index 0e8002d..fd13562 100644 --- a/axes/signals.py +++ b/axes/signals.py @@ -1,6 +1,10 @@ from logging import getLogger -from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed +from django.contrib.auth.signals import ( + user_logged_in, + user_logged_out, + user_login_failed, +) from django.core.signals import setting_changed from django.db.models.signals import post_save, post_delete from django.dispatch import receiver @@ -13,7 +17,7 @@ from axes.handlers.proxy import AxesProxyHandler log = getLogger(settings.AXES_LOGGER) -user_locked_out = Signal(providing_args=['request', 'username', 'ip_address']) +user_locked_out = Signal(providing_args=["request", "username", "ip_address"]) @receiver(user_login_failed) @@ -42,11 +46,13 @@ def handle_post_delete_access_attempt(*args, **kwargs): @receiver(setting_changed) -def handle_setting_changed(sender, setting, value, enter, **kwargs): # pylint: disable=unused-argument +def handle_setting_changed( + sender, setting, value, enter, **kwargs +): # pylint: disable=unused-argument """ Reinitialize handler implementation if a relevant setting changes in e.g. application reconfiguration or during testing. """ - if setting == 'AXES_HANDLER': + if setting == "AXES_HANDLER": AxesProxyHandler.get_implementation(force=True) diff --git a/axes/tests/base.py b/axes/tests/base.py index 69ca054..54c9cd3 100644 --- a/axes/tests/base.py +++ b/axes/tests/base.py @@ -32,18 +32,18 @@ class AxesTestCase(TestCase): Test case using custom settings for testing. """ - VALID_USERNAME = 'axes-valid-username' - VALID_PASSWORD = 'axes-valid-password' - VALID_EMAIL = 'axes-valid-email@example.com' - VALID_USER_AGENT = 'axes-user-agent' - VALID_IP_ADDRESS = '127.0.0.1' + VALID_USERNAME = "axes-valid-username" + VALID_PASSWORD = "axes-valid-password" + VALID_EMAIL = "axes-valid-email@example.com" + VALID_USER_AGENT = "axes-user-agent" + VALID_IP_ADDRESS = "127.0.0.1" - INVALID_USERNAME = 'axes-invalid-username' - INVALID_PASSWORD = 'axes-invalid-password' - INVALID_EMAIL = 'axes-invalid-email@example.com' + INVALID_USERNAME = "axes-invalid-username" + INVALID_PASSWORD = "axes-invalid-password" + INVALID_EMAIL = "axes-invalid-email@example.com" - LOCKED_MESSAGE = 'Account locked: too many login attempts.' - LOGOUT_MESSAGE = 'Logged out' + LOCKED_MESSAGE = "Account locked: too many login attempts." + LOGOUT_MESSAGE = "Logged out" LOGIN_FORM_KEY = '' STATUS_SUCCESS = 200 @@ -61,19 +61,17 @@ class AxesTestCase(TestCase): self.ip_address = self.VALID_IP_ADDRESS self.user_agent = self.VALID_USER_AGENT - self.path_info = reverse('admin:login') + self.path_info = reverse("admin:login") self.user = get_user_model().objects.create_superuser( - username=self.username, - password=self.password, - email=self.email, + username=self.username, password=self.password, email=self.email ) self.request = HttpRequest() - self.request.method = 'POST' - self.request.META['REMOTE_ADDR'] = self.ip_address - self.request.META['HTTP_USER_AGENT'] = self.user_agent - self.request.META['PATH_INFO'] = self.path_info + self.request.method = "POST" + self.request.META["REMOTE_ADDR"] = self.ip_address + self.request.META["HTTP_USER_AGENT"] = self.user_agent + self.request.META["PATH_INFO"] = self.path_info self.request.axes_attempt_time = now() self.request.axes_ip_address = get_client_ip_address(self.request) @@ -88,9 +86,9 @@ class AxesTestCase(TestCase): def get_kwargs_with_defaults(self, **kwargs): defaults = { - 'user_agent': self.user_agent, - 'ip_address': self.ip_address, - 'username': self.username, + "user_agent": self.user_agent, + "ip_address": self.ip_address, + "username": self.username, } defaults.update(kwargs) @@ -98,7 +96,7 @@ class AxesTestCase(TestCase): def create_attempt(self, **kwargs): kwargs = self.get_kwargs_with_defaults(**kwargs) - kwargs.setdefault('failures_since_start', 1) + kwargs.setdefault("failures_since_start", 1) return AccessAttempt.objects.create(**kwargs) def create_log(self, **kwargs): @@ -118,24 +116,17 @@ class AxesTestCase(TestCase): if is_valid_username: username = self.VALID_USERNAME else: - username = ''.join( - choice(ascii_letters + digits) - for _ in range(10) - ) + username = "".join(choice(ascii_letters + digits) for _ in range(10)) if is_valid_password: password = self.VALID_PASSWORD else: password = self.INVALID_PASSWORD - post_data = { - 'username': username, - 'password': password, - **kwargs - } + post_data = {"username": username, "password": password, **kwargs} return self.client.post( - reverse('admin:login'), + reverse("admin:login"), post_data, REMOTE_ADDR=self.ip_address, HTTP_USER_AGENT=self.user_agent, @@ -143,14 +134,16 @@ class AxesTestCase(TestCase): def logout(self): return self.client.post( - reverse('admin:logout'), + reverse("admin:logout"), REMOTE_ADDR=self.ip_address, HTTP_USER_AGENT=self.user_agent, ) def check_login(self): response = self.login(is_valid_username=True, is_valid_password=True) - self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True) + self.assertNotContains( + response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True + ) def almost_lockout(self): for _ in range(1, get_failure_limit(None, None)): @@ -166,14 +159,18 @@ class AxesTestCase(TestCase): if settings.AXES_LOCK_OUT_AT_FAILURE == True: self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED) else: - self.assertNotContains(response, self.LOCKED_MESSAGE, status_code=self.STATUS_SUCCESS) + self.assertNotContains( + response, self.LOCKED_MESSAGE, status_code=self.STATUS_SUCCESS + ) def cool_off(self): sleep(get_cool_off().total_seconds()) def check_logout(self): response = self.logout() - self.assertContains(response, self.LOGOUT_MESSAGE, status_code=self.STATUS_SUCCESS) + self.assertContains( + response, self.LOGOUT_MESSAGE, status_code=self.STATUS_SUCCESS + ) def check_handler(self): """ @@ -186,4 +183,3 @@ class AxesTestCase(TestCase): self.cool_off() self.check_login() self.check_logout() - diff --git a/axes/tests/settings.py b/axes/tests/settings.py index 7c231fe..fe0cec4 100644 --- a/axes/tests/settings.py +++ b/axes/tests/settings.py @@ -1,87 +1,67 @@ -DATABASES = { - 'default': { - 'ENGINE': 'django.db.backends.sqlite3', - 'NAME': ':memory:', - } -} +DATABASES = {"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:"}} CACHES = { - 'default': { + "default": { # This cache backend is OK to use in development and testing # but has the potential to break production setups with more than on process # due to each process having their own local memory based cache - 'BACKEND': 'django.core.cache.backends.locmem.LocMemCache', + "BACKEND": "django.core.cache.backends.locmem.LocMemCache" } } SITE_ID = 1 MIDDLEWARE = [ - 'django.middleware.common.CommonMiddleware', - 'django.contrib.sessions.middleware.SessionMiddleware', - 'django.contrib.auth.middleware.AuthenticationMiddleware', - 'django.contrib.messages.middleware.MessageMiddleware', - - 'axes.middleware.AxesMiddleware', + "django.middleware.common.CommonMiddleware", + "django.contrib.sessions.middleware.SessionMiddleware", + "django.contrib.auth.middleware.AuthenticationMiddleware", + "django.contrib.messages.middleware.MessageMiddleware", + "axes.middleware.AxesMiddleware", ] AUTHENTICATION_BACKENDS = [ - 'axes.backends.AxesBackend', - - 'django.contrib.auth.backends.ModelBackend', + "axes.backends.AxesBackend", + "django.contrib.auth.backends.ModelBackend", ] -PASSWORD_HASHERS = [ - 'django.contrib.auth.hashers.MD5PasswordHasher', -] +PASSWORD_HASHERS = ["django.contrib.auth.hashers.MD5PasswordHasher"] -ROOT_URLCONF = 'axes.tests.urls' +ROOT_URLCONF = "axes.tests.urls" INSTALLED_APPS = [ - 'django.contrib.auth', - 'django.contrib.contenttypes', - 'django.contrib.sessions', - 'django.contrib.sites', - 'django.contrib.messages', - 'django.contrib.admin', - - 'axes', + "django.contrib.auth", + "django.contrib.contenttypes", + "django.contrib.sessions", + "django.contrib.sites", + "django.contrib.messages", + "django.contrib.admin", + "axes", ] TEMPLATES = [ { - 'BACKEND': 'django.template.backends.django.DjangoTemplates', - 'DIRS': [], - 'APP_DIRS': True, - 'OPTIONS': { - 'context_processors': [ - 'django.template.context_processors.debug', - 'django.template.context_processors.request', - 'django.contrib.auth.context_processors.auth', - 'django.contrib.messages.context_processors.messages', - ], + "BACKEND": "django.template.backends.django.DjangoTemplates", + "DIRS": [], + "APP_DIRS": True, + "OPTIONS": { + "context_processors": [ + "django.template.context_processors.debug", + "django.template.context_processors.request", + "django.contrib.auth.context_processors.auth", + "django.contrib.messages.context_processors.messages", + ] }, - }, + } ] LOGGING = { - 'version': 1, - 'disable_existing_loggers': False, - 'handlers': { - 'console': { - 'class': 'logging.StreamHandler', - }, - }, - 'loggers': { - 'axes': { - 'handlers': ['console'], - 'level': 'INFO', - 'propagate': False, - }, - }, + "version": 1, + "disable_existing_loggers": False, + "handlers": {"console": {"class": "logging.StreamHandler"}}, + "loggers": {"axes": {"handlers": ["console"], "level": "INFO", "propagate": False}}, } -SECRET_KEY = 'too-secret-for-test' +SECRET_KEY = "too-secret-for-test" USE_I18N = False @@ -89,6 +69,6 @@ USE_L10N = False USE_TZ = False -LOGIN_REDIRECT_URL = '/admin/' +LOGIN_REDIRECT_URL = "/admin/" AXES_FAILURE_LIMIT = 10 diff --git a/axes/tests/test_attempts.py b/axes/tests/test_attempts.py index 6cc1a76..b474528 100644 --- a/axes/tests/test_attempts.py +++ b/axes/tests/test_attempts.py @@ -16,7 +16,7 @@ class GetCoolOffThresholdTestCase(AxesTestCase): def test_get_cool_off_threshold(self): timestamp = now() - with patch('axes.attempts.now', return_value=timestamp): + with patch("axes.attempts.now", return_value=timestamp): attempt_time = timestamp threshold_now = get_cool_off_threshold(attempt_time) @@ -51,24 +51,28 @@ class ResetTestCase(AxesTestCase): class UserWhitelistTestCase(AxesTestCase): def setUp(self): self.user_model = get_user_model() - self.user = self.user_model.objects.create(username='jane.doe') + self.user = self.user_model.objects.create(username="jane.doe") self.request = HttpRequest() def test_is_client_username_whitelisted(self): - with patch.object(self.user_model, 'nolockout', True, create=True): - self.assertTrue(is_user_attempt_whitelisted( - self.request, - {self.user_model.USERNAME_FIELD: self.user.username}, - )) + with patch.object(self.user_model, "nolockout", True, create=True): + self.assertTrue( + is_user_attempt_whitelisted( + self.request, {self.user_model.USERNAME_FIELD: self.user.username} + ) + ) def test_is_client_username_whitelisted_not(self): - self.assertFalse(is_user_attempt_whitelisted( - self.request, - {self.user_model.USERNAME_FIELD: self.user.username}, - )) + self.assertFalse( + is_user_attempt_whitelisted( + self.request, {self.user_model.USERNAME_FIELD: self.user.username} + ) + ) def test_is_client_username_whitelisted_does_not_exist(self): - self.assertFalse(is_user_attempt_whitelisted( - self.request, - {self.user_model.USERNAME_FIELD: 'not.' + self.user.username}, - )) + self.assertFalse( + is_user_attempt_whitelisted( + self.request, + {self.user_model.USERNAME_FIELD: "not." + self.user.username}, + ) + ) diff --git a/axes/tests/test_backends.py b/axes/tests/test_backends.py index 6d338c3..e279427 100644 --- a/axes/tests/test_backends.py +++ b/axes/tests/test_backends.py @@ -1,7 +1,10 @@ from unittest.mock import patch, MagicMock from axes.backends import AxesBackend -from axes.exceptions import AxesBackendRequestParameterRequired, AxesBackendPermissionDenied +from axes.exceptions import ( + AxesBackendRequestParameterRequired, + AxesBackendPermissionDenied, +) from axes.tests.base import AxesTestCase @@ -12,7 +15,7 @@ class BackendTestCase(AxesTestCase): with self.assertRaises(AxesBackendRequestParameterRequired): AxesBackend().authenticate(request) - @patch('axes.handlers.proxy.AxesProxyHandler.is_allowed', return_value=False) + @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False) def test_authenticate_raises_on_locked_request(self, _): request = MagicMock() diff --git a/axes/tests/test_checks.py b/axes/tests/test_checks.py index f81a531..bfc56a7 100644 --- a/axes/tests/test_checks.py +++ b/axes/tests/test_checks.py @@ -8,32 +8,37 @@ from axes.tests.base import AxesTestCase class CacheCheckTestCase(AxesTestCase): @override_settings( - AXES_HANDLER='axes.handlers.cache.AxesCacheHandler', - CACHES={'default': {'BACKEND': 'django.core.cache.backends.db.DatabaseCache', 'LOCATION': 'axes_cache'}}, + AXES_HANDLER="axes.handlers.cache.AxesCacheHandler", + CACHES={ + "default": { + "BACKEND": "django.core.cache.backends.db.DatabaseCache", + "LOCATION": "axes_cache", + } + }, ) def test_cache_check(self): warnings = run_checks() self.assertEqual(warnings, []) @override_settings( - AXES_HANDLER='axes.handlers.cache.AxesCacheHandler', - CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}}, + AXES_HANDLER="axes.handlers.cache.AxesCacheHandler", + CACHES={ + "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"} + }, ) def test_cache_check_warnings(self): warnings = run_checks() warning = Warning( - msg=Messages.CACHE_INVALID, - hint=Hints.CACHE_INVALID, - id=Codes.CACHE_INVALID, + msg=Messages.CACHE_INVALID, hint=Hints.CACHE_INVALID, id=Codes.CACHE_INVALID ) - self.assertEqual(warnings, [ - warning, - ]) + self.assertEqual(warnings, [warning]) @override_settings( - AXES_HANDLER='axes.handlers.database.AxesDatabaseHandler', - CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}}, + AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler", + CACHES={ + "default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"} + }, ) def test_cache_check_does_not_produce_check_warnings_with_database_handler(self): warnings = run_checks() @@ -41,11 +46,7 @@ class CacheCheckTestCase(AxesTestCase): class MiddlewareCheckTestCase(AxesTestCase): - @modify_settings( - MIDDLEWARE={ - 'remove': ['axes.middleware.AxesMiddleware'] - }, - ) + @modify_settings(MIDDLEWARE={"remove": ["axes.middleware.AxesMiddleware"]}) def test_cache_check_warnings(self): warnings = run_checks() warning = Warning( @@ -54,9 +55,7 @@ class MiddlewareCheckTestCase(AxesTestCase): id=Codes.MIDDLEWARE_INVALID, ) - self.assertEqual(warnings, [ - warning, - ]) + self.assertEqual(warnings, [warning]) class AxesSpecializedBackend(AxesBackend): @@ -64,11 +63,7 @@ class AxesSpecializedBackend(AxesBackend): class BackendCheckTestCase(AxesTestCase): - @modify_settings( - AUTHENTICATION_BACKENDS={ - 'remove': ['axes.backends.AxesBackend'] - }, - ) + @modify_settings(AUTHENTICATION_BACKENDS={"remove": ["axes.backends.AxesBackend"]}) def test_backend_missing(self): warnings = run_checks() warning = Warning( @@ -77,27 +72,23 @@ class BackendCheckTestCase(AxesTestCase): id=Codes.BACKEND_INVALID, ) - self.assertEqual(warnings, [ - warning, - ]) + self.assertEqual(warnings, [warning]) @override_settings( - AUTHENTICATION_BACKENDS=['axes.tests.test_checks.AxesSpecializedBackend'] + AUTHENTICATION_BACKENDS=["axes.tests.test_checks.AxesSpecializedBackend"] ) def test_specialized_backend(self): warnings = run_checks() self.assertEqual(warnings, []) @override_settings( - AUTHENTICATION_BACKENDS=['axes.tests.test_checks.AxesNotDefinedBackend'] + AUTHENTICATION_BACKENDS=["axes.tests.test_checks.AxesNotDefinedBackend"] ) def test_import_error(self): with self.assertRaises(ImportError): run_checks() - @override_settings( - AUTHENTICATION_BACKENDS=['module.not_defined'] - ) + @override_settings(AUTHENTICATION_BACKENDS=["module.not_defined"]) def test_module_not_found_error(self): with self.assertRaises(ModuleNotFoundError): run_checks() @@ -106,16 +97,14 @@ class BackendCheckTestCase(AxesTestCase): class DeprecatedSettingsTestCase(AxesTestCase): def setUp(self): self.disable_success_access_log_warning = Warning( - msg=Messages.SETTING_DEPRECATED.format(deprecated_setting='AXES_DISABLE_SUCCESS_ACCESS_LOG'), + msg=Messages.SETTING_DEPRECATED.format( + deprecated_setting="AXES_DISABLE_SUCCESS_ACCESS_LOG" + ), hint=Hints.SETTING_DEPRECATED, id=Codes.SETTING_DEPRECATED, ) - @override_settings( - AXES_DISABLE_SUCCESS_ACCESS_LOG=True, - ) + @override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True) def test_deprecated_success_access_log_flag(self): warnings = run_checks() - self.assertEqual(warnings, [ - self.disable_success_access_log_warning, - ]) + self.assertEqual(warnings, [self.disable_success_access_log_warning]) diff --git a/axes/tests/test_decorators.py b/axes/tests/test_decorators.py index e4440f0..1447aa9 100644 --- a/axes/tests/test_decorators.py +++ b/axes/tests/test_decorators.py @@ -7,34 +7,34 @@ from axes.tests.base import AxesTestCase class DecoratorTestCase(AxesTestCase): - SUCCESS_RESPONSE = HttpResponse(status=200, content='Dispatched') - LOCKOUT_RESPONSE = HttpResponse(status=403, content='Locked out') + SUCCESS_RESPONSE = HttpResponse(status=200, content="Dispatched") + LOCKOUT_RESPONSE = HttpResponse(status=403, content="Locked out") def setUp(self): self.request = MagicMock() self.cls = MagicMock(return_value=self.request) self.func = MagicMock(return_value=self.SUCCESS_RESPONSE) - @patch('axes.handlers.proxy.AxesProxyHandler.is_allowed', return_value=False) - @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) + @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False) + @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE) def test_axes_dispatch_locks_out(self, _, __): response = axes_dispatch(self.func)(self.request) self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content) - @patch('axes.handlers.proxy.AxesProxyHandler.is_allowed', return_value=True) - @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) + @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=True) + @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE) def test_axes_dispatch_dispatches(self, _, __): response = axes_dispatch(self.func)(self.request) self.assertEqual(response.content, self.SUCCESS_RESPONSE.content) - @patch('axes.handlers.proxy.AxesProxyHandler.is_allowed', return_value=False) - @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) + @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False) + @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE) def test_axes_form_invalid_locks_out(self, _, __): response = axes_form_invalid(self.func)(self.cls) self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content) - @patch('axes.handlers.proxy.AxesProxyHandler.is_allowed', return_value=True) - @patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE) + @patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=True) + @patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE) def test_axes_form_invalid_dispatches(self, _, __): response = axes_form_invalid(self.func)(self.cls) self.assertEqual(response.content, self.SUCCESS_RESPONSE.content) diff --git a/axes/tests/test_handlers.py b/axes/tests/test_handlers.py index 56bcf9d..d869864 100644 --- a/axes/tests/test_handlers.py +++ b/axes/tests/test_handlers.py @@ -12,7 +12,7 @@ from axes.models import AccessAttempt, AccessLog from axes.tests.base import AxesTestCase -@override_settings(AXES_HANDLER='axes.handlers.base.AxesHandler') +@override_settings(AXES_HANDLER="axes.handlers.base.AxesHandler") class AxesHandlerTestCase(AxesTestCase): def test_base_handler_reset_attempts_raises(self): with self.assertRaises(NotImplementedError): @@ -26,20 +26,19 @@ class AxesHandlerTestCase(AxesTestCase): with self.assertRaises(NotImplementedError): AxesProxyHandler.is_allowed(self.request, {}) - @override_settings(AXES_IP_BLACKLIST=['127.0.0.1']) + @override_settings(AXES_IP_BLACKLIST=["127.0.0.1"]) def test_is_allowed_with_blacklisted_ip_address(self): self.assertFalse(AxesProxyHandler.is_allowed(self.request)) @override_settings( - AXES_NEVER_LOCKOUT_WHITELIST=True, - AXES_IP_WHITELIST=['127.0.0.1'], + AXES_NEVER_LOCKOUT_WHITELIST=True, AXES_IP_WHITELIST=["127.0.0.1"] ) def test_is_allowed_with_whitelisted_ip_address(self): self.assertTrue(AxesProxyHandler.is_allowed(self.request)) @override_settings(AXES_NEVER_LOCKOUT_GET=True) def test_is_allowed_with_whitelisted_method(self): - self.request.method = 'GET' + self.request.method = "GET" self.assertTrue(AxesProxyHandler.is_allowed(self.request)) @override_settings(AXES_LOCK_OUT_AT_FAILURE=False) @@ -49,7 +48,7 @@ class AxesHandlerTestCase(AxesTestCase): @override_settings(AXES_ONLY_ADMIN_SITE=True) def test_only_admin_site(self): request = MagicMock() - request.path = '/test/' + request.path = "/test/" self.assertTrue(AxesProxyHandler.is_allowed(self.request)) @@ -61,38 +60,38 @@ class AxesProxyHandlerTestCase(AxesTestCase): self.user = MagicMock() self.instance = MagicMock() - @patch('axes.handlers.proxy.AxesProxyHandler.implementation', None) + @patch("axes.handlers.proxy.AxesProxyHandler.implementation", None) def test_setting_changed_signal_triggers_handler_reimport(self): - self.assertIsNone(AxesProxyHandler.implementation) + self.assertIsNone(AxesProxyHandler.implementation) - with self.settings(AXES_HANDLER='axes.handlers.database.AxesDatabaseHandler'): - self.assertIsNotNone(AxesProxyHandler.implementation) + with self.settings(AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler"): + self.assertIsNotNone(AxesProxyHandler.implementation) - @patch('axes.handlers.proxy.AxesProxyHandler.implementation') + @patch("axes.handlers.proxy.AxesProxyHandler.implementation") def test_user_login_failed(self, handler): self.assertFalse(handler.user_login_failed.called) AxesProxyHandler.user_login_failed(self.sender, self.credentials, self.request) self.assertTrue(handler.user_login_failed.called) - @patch('axes.handlers.proxy.AxesProxyHandler.implementation') + @patch("axes.handlers.proxy.AxesProxyHandler.implementation") def test_user_logged_in(self, handler): self.assertFalse(handler.user_logged_in.called) AxesProxyHandler.user_logged_in(self.sender, self.request, self.user) self.assertTrue(handler.user_logged_in.called) - @patch('axes.handlers.proxy.AxesProxyHandler.implementation') + @patch("axes.handlers.proxy.AxesProxyHandler.implementation") def test_user_logged_out(self, handler): self.assertFalse(handler.user_logged_out.called) AxesProxyHandler.user_logged_out(self.sender, self.request, self.user) self.assertTrue(handler.user_logged_out.called) - @patch('axes.handlers.proxy.AxesProxyHandler.implementation') + @patch("axes.handlers.proxy.AxesProxyHandler.implementation") def test_post_save_access_attempt(self, handler): self.assertFalse(handler.post_save_access_attempt.called) AxesProxyHandler.post_save_access_attempt(self.instance) self.assertTrue(handler.post_save_access_attempt.called) - @patch('axes.handlers.proxy.AxesProxyHandler.implementation') + @patch("axes.handlers.proxy.AxesProxyHandler.implementation") def test_post_delete_access_attempt(self, handler): self.assertFalse(handler.post_delete_access_attempt.called) AxesProxyHandler.post_delete_access_attempt(self.instance) @@ -102,24 +101,31 @@ class AxesProxyHandlerTestCase(AxesTestCase): class AxesHandlerBaseTestCase(AxesTestCase): def check_whitelist(self, log): with override_settings( - AXES_NEVER_LOCKOUT_WHITELIST=True, - AXES_IP_WHITELIST=[self.ip_address], + AXES_NEVER_LOCKOUT_WHITELIST=True, AXES_IP_WHITELIST=[self.ip_address] ): - AxesProxyHandler.user_login_failed(sender=None, request=self.request, credentials=self.credentials) - client_str = get_client_str(self.username, self.ip_address, self.user_agent, self.path_info) - log.info.assert_called_with('AXES: Login failed from whitelisted client %s.', client_str) + AxesProxyHandler.user_login_failed( + sender=None, request=self.request, credentials=self.credentials + ) + client_str = get_client_str( + self.username, self.ip_address, self.user_agent, self.path_info + ) + log.info.assert_called_with( + "AXES: Login failed from whitelisted client %s.", client_str + ) def check_empty_request(self, log, handler): AxesProxyHandler.user_login_failed(sender=None, credentials={}, request=None) - log.error.assert_called_with(f'AXES: {handler}.user_login_failed does not function without a request.') + log.error.assert_called_with( + f"AXES: {handler}.user_login_failed does not function without a request." + ) def test_is_admin_site(self): request = MagicMock() tests = ( # (AXES_ONLY_ADMIN_SITE, URL, Expected) - (True, '/test/', True), - (True, reverse('admin:index'), False), - (False, '/test/', False), - (False, reverse('admin:index'), False), + (True, "/test/", True), + (True, reverse("admin:index"), False), + (False, "/test/", False), + (False, reverse("admin:index"), False), ) for setting_value, url, expected in tests: @@ -129,7 +135,7 @@ class AxesHandlerBaseTestCase(AxesTestCase): @override_settings( - AXES_HANDLER='axes.handlers.database.AxesDatabaseHandler', + AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler", AXES_COOLOFF_TIME=timedelta(seconds=1), AXES_RESET_ON_SUCCESS=True, ) @@ -148,7 +154,7 @@ class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase): self.create_log() then = timezone.now() - timezone.timedelta(days=90) - with patch('django.utils.timezone.now', return_value=then): + with patch("django.utils.timezone.now", return_value=then): self.create_log() self.assertEqual(AccessLog.objects.count(), 2) @@ -167,7 +173,7 @@ class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase): def test_handler_callable_failure_limit(self): self.check_handler() - @override_settings(AXES_FAILURE_LIMIT='axes.tests.base.custom_failure_limit') + @override_settings(AXES_FAILURE_LIMIT="axes.tests.base.custom_failure_limit") def test_handler_str_failure_limit(self): self.check_handler() @@ -180,22 +186,22 @@ class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase): def test_handler_without_lockout(self): self.check_handler() - @patch('axes.handlers.database.log') + @patch("axes.handlers.database.log") def test_empty_request(self, log): - self.check_empty_request(log, 'AxesDatabaseHandler') + self.check_empty_request(log, "AxesDatabaseHandler") - @patch('axes.handlers.database.log') + @patch("axes.handlers.database.log") def test_whitelist(self, log): self.check_whitelist(log) - @patch('axes.handlers.database.is_user_attempt_whitelisted', return_value=True) + @patch("axes.handlers.database.is_user_attempt_whitelisted", return_value=True) def test_user_whitelisted(self, is_whitelisted): self.assertFalse(AxesProxyHandler().is_locked(self.request, self.credentials)) self.assertEqual(1, is_whitelisted.call_count) @override_settings( - AXES_HANDLER='axes.handlers.cache.AxesCacheHandler', + AXES_HANDLER="axes.handlers.cache.AxesCacheHandler", AXES_COOLOFF_TIME=timedelta(seconds=1), ) class AxesCacheHandlerTestCase(AxesHandlerBaseTestCase): @@ -211,18 +217,16 @@ class AxesCacheHandlerTestCase(AxesHandlerBaseTestCase): def test_handler_without_lockout(self): self.check_handler() - @patch('axes.handlers.cache.log') + @patch("axes.handlers.cache.log") def test_empty_request(self, log): - self.check_empty_request(log, 'AxesCacheHandler') + self.check_empty_request(log, "AxesCacheHandler") - @patch('axes.handlers.cache.log') + @patch("axes.handlers.cache.log") def test_whitelist(self, log): self.check_whitelist(log) -@override_settings( - AXES_HANDLER='axes.handlers.dummy.AxesDummyHandler', -) +@override_settings(AXES_HANDLER="axes.handlers.dummy.AxesDummyHandler") class AxesDummyHandlerTestCase(AxesHandlerBaseTestCase): def test_handler(self): for _ in range(settings.AXES_FAILURE_LIMIT): diff --git a/axes/tests/test_helpers.py b/axes/tests/test_helpers.py index 728853b..4fad35f 100644 --- a/axes/tests/test_helpers.py +++ b/axes/tests/test_helpers.py @@ -24,6 +24,6 @@ class AxesHelpersTestCase(AxesTestCase): def test_get_cool_off_callable(self): self.assertEqual(get_cool_off(), timedelta(seconds=30)) - @override_settings(AXES_COOLOFF_TIME='axes.tests.test_helpers.get_cool_off_str') + @override_settings(AXES_COOLOFF_TIME="axes.tests.test_helpers.get_cool_off_str") def test_get_cool_off_str(self): self.assertEqual(get_cool_off(), timedelta(seconds=30)) diff --git a/axes/tests/test_logging.py b/axes/tests/test_logging.py index c97d48e..a49e041 100644 --- a/axes/tests/test_logging.py +++ b/axes/tests/test_logging.py @@ -8,8 +8,8 @@ from axes.models import AccessAttempt, AccessLog from axes.tests.base import AxesTestCase -@patch('axes.apps.AppConfig.logging_initialized', False) -@patch('axes.apps.log') +@patch("axes.apps.AppConfig.logging_initialized", False) +@patch("axes.apps.log") class AppsTestCase(AxesTestCase): def test_axes_config_log_re_entrant(self, log): """ @@ -22,7 +22,7 @@ class AppsTestCase(AxesTestCase): AppConfig.initialize() self.assertTrue( calls == log.info.call_count and calls > 0, - 'AxesConfig.initialize needs to be re-entrant', + "AxesConfig.initialize needs to be re-entrant", ) @override_settings(AXES_VERBOSE=False) @@ -33,17 +33,17 @@ class AppsTestCase(AxesTestCase): @override_settings(AXES_ONLY_USER_FAILURES=True) def test_axes_config_log_user_only(self, log): AppConfig.initialize() - log.info.assert_called_with('AXES: blocking by username only.') + log.info.assert_called_with("AXES: blocking by username only.") @override_settings(AXES_ONLY_USER_FAILURES=False) def test_axes_config_log_ip_only(self, log): AppConfig.initialize() - log.info.assert_called_with('AXES: blocking by IP only.') + log.info.assert_called_with("AXES: blocking by IP only.") @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) def test_axes_config_log_user_ip(self, log): AppConfig.initialize() - log.info.assert_called_with('AXES: blocking by combination of username and IP.') + log.info.assert_called_with("AXES: blocking by combination of username and IP.") class AccessLogTestCase(AxesTestCase): @@ -53,12 +53,12 @@ class AccessLogTestCase(AxesTestCase): """ self.login(is_valid_username=True, is_valid_password=True) - self.assertIsNone(AccessLog.objects.latest('id').logout_time) + self.assertIsNone(AccessLog.objects.latest("id").logout_time) - response = self.client.get(reverse('admin:logout')) - self.assertContains(response, 'Logged out') + response = self.client.get(reverse("admin:logout")) + self.assertContains(response, "Logged out") - self.assertIsNotNone(AccessLog.objects.latest('id').logout_time) + self.assertIsNotNone(AccessLog.objects.latest("id").logout_time) def test_log_data_truncated(self): """ @@ -66,21 +66,19 @@ class AccessLogTestCase(AxesTestCase): """ # An impossibly large post dict - extra_data = {'a' * x: x for x in range(1024)} + extra_data = {"a" * x: x for x in range(1024)} self.login(**extra_data) - self.assertEqual( - len(AccessAttempt.objects.latest('id').post_data), 1024 - ) + self.assertEqual(len(AccessAttempt.objects.latest("id").post_data), 1024) @override_settings(AXES_DISABLE_ACCESS_LOG=True) def test_valid_logout_without_success_log(self): AccessLog.objects.all().delete() response = self.login(is_valid_username=True, is_valid_password=True) - response = self.client.get(reverse('admin:logout')) + response = self.client.get(reverse("admin:logout")) self.assertEqual(AccessLog.objects.all().count(), 0) - self.assertContains(response, 'Logged out', html=True) + self.assertContains(response, "Logged out", html=True) @override_settings(AXES_DISABLE_ACCESS_LOG=True) def test_valid_login_without_success_log(self): @@ -100,10 +98,10 @@ class AccessLogTestCase(AxesTestCase): AccessLog.objects.all().delete() response = self.login(is_valid_username=True, is_valid_password=True) - response = self.client.get(reverse('admin:logout')) + response = self.client.get(reverse("admin:logout")) self.assertEqual(AccessLog.objects.count(), 0) - self.assertContains(response, 'Logged out', html=True) + self.assertContains(response, "Logged out", html=True) @override_settings(AXES_DISABLE_ACCESS_LOG=True) def test_non_valid_login_without_log(self): diff --git a/axes/tests/test_login.py b/axes/tests/test_login.py index 51adbd8..0c22bbb 100644 --- a/axes/tests/test_login.py +++ b/axes/tests/test_login.py @@ -23,13 +23,13 @@ class DjangoLoginTestCase(TestCase): self.request = HttpRequest() self.request.session = engine.SessionStore() - self.username = 'john.doe' - self.password = 'hunter2' + self.username = "john.doe" + self.password = "hunter2" self.user = get_user_model().objects.create(username=self.username) self.user.set_password(self.password) self.user.save() - self.user.backend = 'django.contrib.auth.backends.ModelBackend' + self.user.backend = "django.contrib.auth.backends.ModelBackend" class DjangoContribAuthLoginTestCase(DjangoLoginTestCase): @@ -62,60 +62,52 @@ class LoginTestCase(AxesTestCase): Always allow attempted logins for a different user from a different IP. """ - IP_1 = '10.1.1.1' - IP_2 = '10.2.2.2' - USER_1 = 'valid-user-1' - USER_2 = 'valid-user-2' - EMAIL_1 = 'valid-email-1@example.com' - EMAIL_2 = 'valid-email-2@example.com' + IP_1 = "10.1.1.1" + IP_2 = "10.2.2.2" + USER_1 = "valid-user-1" + USER_2 = "valid-user-2" + EMAIL_1 = "valid-email-1@example.com" + EMAIL_2 = "valid-email-2@example.com" VALID_USERNAME = USER_1 VALID_EMAIL = EMAIL_1 - VALID_PASSWORD = 'valid-password' + VALID_PASSWORD = "valid-password" VALID_IP_ADDRESS = IP_1 - WRONG_PASSWORD = 'wrong-password' - LOCKED_MESSAGE = 'Account locked: too many login attempts.' + WRONG_PASSWORD = "wrong-password" + LOCKED_MESSAGE = "Account locked: too many login attempts." LOGIN_FORM_KEY = '' ALLOWED = 302 BLOCKED = 403 - def _login(self, username, password, ip_addr='127.0.0.1', **kwargs): + def _login(self, username, password, ip_addr="127.0.0.1", **kwargs): """ Login a user and get the response. IP address can be configured to test IP blocking functionality. """ - post_data = { - 'username': username, - 'password': password, - } + post_data = {"username": username, "password": password} post_data.update(kwargs) return self.client.post( - reverse('admin:login'), + reverse("admin:login"), post_data, REMOTE_ADDR=ip_addr, - HTTP_USER_AGENT='test-browser' + HTTP_USER_AGENT="test-browser", ) def _lockout_user_from_ip(self, username, ip_addr): for _ in range(settings.AXES_FAILURE_LIMIT): response = self._login( - username=username, - password=self.WRONG_PASSWORD, - ip_addr=ip_addr, + username=username, password=self.WRONG_PASSWORD, ip_addr=ip_addr ) return response def _lockout_user1_from_ip1(self): - return self._lockout_user_from_ip( - username=self.USER_1, - ip_addr=self.IP_1, - ) + return self._lockout_user_from_ip(username=self.USER_1, ip_addr=self.IP_1) def setUp(self): """ @@ -138,7 +130,9 @@ class LoginTestCase(AxesTestCase): """ response = self._login(self.username, self.password) - self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True) + self.assertNotContains( + response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True + ) def test_lockout_limit_once(self): """ @@ -188,10 +182,7 @@ class LoginTestCase(AxesTestCase): # test until one try before the limit for _ in range(1, settings.AXES_FAILURE_LIMIT): - response = self.login( - is_valid_username=True, - is_valid_password=False, - ) + response = self.login(is_valid_username=True, is_valid_password=False) # Check if we are in the same login page self.assertContains(response, self.LOGIN_FORM_KEY, html=True) @@ -224,7 +215,9 @@ class LoginTestCase(AxesTestCase): response = self._login(self.username, self.password) # Check if we are still in the login page - self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True) + self.assertNotContains( + response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True + ) # now create failure_limit + 1 failed logins and then we should still # be able to login with valid_username @@ -233,7 +226,9 @@ class LoginTestCase(AxesTestCase): # Check if we can still log in with valid user response = self._login(self.username, self.password) - self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True) + self.assertNotContains( + response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True + ) # Test for true and false positives when blocking by IP *OR* user (default) # Cache disabled. Default settings. @@ -242,11 +237,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 is still blocked from IP 1. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.BLOCKED) def test_lockout_by_ip_allows_when_same_user_diff_ip_without_cache(self): @@ -254,11 +245,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 can still login from IP 2. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.ALLOWED) def test_lockout_by_ip_blocks_when_diff_user_same_ip_without_cache(self): @@ -266,11 +253,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 is also locked out from IP 1. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.BLOCKED) def test_lockout_by_ip_allows_when_diff_user_diff_ip_without_cache(self): @@ -278,11 +261,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 can still login from IP 2. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.ALLOWED) # Test for true and false positives when blocking by user only. @@ -293,11 +272,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 is still blocked from IP 1. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.BLOCKED) @override_settings(AXES_ONLY_USER_FAILURES=True) @@ -306,11 +281,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 is also locked out from IP 2. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.BLOCKED) @override_settings(AXES_ONLY_USER_FAILURES=True) @@ -319,11 +290,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 can still login from IP 1. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.ALLOWED) @override_settings(AXES_ONLY_USER_FAILURES=True) @@ -332,20 +299,16 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 can still login from IP 2. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.ALLOWED) @override_settings(AXES_ONLY_USER_FAILURES=True) def test_lockout_by_user_with_empty_username_allows_other_users_without_cache(self): # User with empty username is locked out from IP 1. - self._lockout_user_from_ip(username='', ip_addr=self.IP_1) + self._lockout_user_from_ip(username="", ip_addr=self.IP_1) # Still possible to access the login page - response = self.client.get(reverse('admin:login'), REMOTE_ADDR=self.IP_1) + response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1) self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True) # Test for true and false positives when blocking by user and IP together. @@ -356,11 +319,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 is still blocked from IP 1. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.BLOCKED) @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) @@ -369,11 +328,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 can still login from IP 2. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.ALLOWED) @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) @@ -382,11 +337,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 can still login from IP 1. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.ALLOWED) @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) @@ -395,20 +346,18 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 can still login from IP 2. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.ALLOWED) @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) - def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_without_cache(self): + def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_without_cache( + self + ): # User with empty username is locked out from IP 1. - self._lockout_user_from_ip(username='', ip_addr=self.IP_1) + self._lockout_user_from_ip(username="", ip_addr=self.IP_1) # Still possible to access the login page - response = self.client.get(reverse('admin:login'), REMOTE_ADDR=self.IP_1) + response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1) self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True) # Test for true and false positives when blocking by IP *OR* user (default) @@ -418,11 +367,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 is still blocked from IP 1. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.BLOCKED) def test_lockout_by_ip_allows_when_same_user_diff_ip_using_cache(self): @@ -430,11 +375,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 can still login from IP 2. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.ALLOWED) def test_lockout_by_ip_blocks_when_diff_user_same_ip_using_cache(self): @@ -442,11 +383,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 is also locked out from IP 1. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.BLOCKED) def test_lockout_by_ip_allows_when_diff_user_diff_ip_using_cache(self): @@ -454,20 +391,16 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 can still login from IP 2. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.ALLOWED) @override_settings(AXES_ONLY_USER_FAILURES=True) def test_lockout_by_user_with_empty_username_allows_other_users_using_cache(self): # User with empty username is locked out from IP 1. - self._lockout_user_from_ip(username='', ip_addr=self.IP_1) + self._lockout_user_from_ip(username="", ip_addr=self.IP_1) # Still possible to access the login page - response = self.client.get(reverse('admin:login'), REMOTE_ADDR=self.IP_1) + response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1) self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True) # Test for true and false positives when blocking by user only. @@ -478,11 +411,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 is still blocked from IP 1. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.BLOCKED) @override_settings(AXES_ONLY_USER_FAILURES=True) @@ -491,11 +420,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 is also locked out from IP 2. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.BLOCKED) @override_settings(AXES_ONLY_USER_FAILURES=True) @@ -504,11 +429,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 can still login from IP 1. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.ALLOWED) @override_settings(AXES_ONLY_USER_FAILURES=True) @@ -517,11 +438,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 can still login from IP 2. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.ALLOWED) # Test for true and false positives when blocking by user and IP together. @@ -532,11 +449,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 is still blocked from IP 1. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.BLOCKED) @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) @@ -545,11 +458,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 1 can still login from IP 2. - response = self._login( - self.USER_1, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.ALLOWED) @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) @@ -558,11 +467,7 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 can still login from IP 1. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_1 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1) self.assertEqual(response.status_code, self.ALLOWED) @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) @@ -571,18 +476,16 @@ class LoginTestCase(AxesTestCase): self._lockout_user1_from_ip1() # User 2 can still login from IP 2. - response = self._login( - self.USER_2, - self.VALID_PASSWORD, - ip_addr=self.IP_2 - ) + response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2) self.assertEqual(response.status_code, self.ALLOWED) @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) - def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_using_cache(self): + def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_using_cache( + self + ): # User with empty username is locked out from IP 1. - self._lockout_user_from_ip(username='', ip_addr=self.IP_1) + self._lockout_user_from_ip(username="", ip_addr=self.IP_1) # Still possible to access the login page - response = self.client.get(reverse('admin:login'), REMOTE_ADDR=self.IP_1) + response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1) self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True) diff --git a/axes/tests/test_management.py b/axes/tests/test_management.py index 4753489..e95b03b 100644 --- a/axes/tests/test_management.py +++ b/axes/tests/test_management.py @@ -10,104 +10,100 @@ from axes.tests.base import AxesTestCase class ResetAccessLogsManagementCommandTestCase(AxesTestCase): def setUp(self): - self.msg_not_found = 'No logs found.\n' - self.msg_num_found = '{} logs removed.\n' + self.msg_not_found = "No logs found.\n" + self.msg_num_found = "{} logs removed.\n" days_3 = timezone.now() - timezone.timedelta(days=3) - with patch('django.utils.timezone.now', Mock(return_value=days_3)): + with patch("django.utils.timezone.now", Mock(return_value=days_3)): AccessLog.objects.create() days_13 = timezone.now() - timezone.timedelta(days=9) - with patch('django.utils.timezone.now', Mock(return_value=days_13)): + with patch("django.utils.timezone.now", Mock(return_value=days_13)): AccessLog.objects.create() days_30 = timezone.now() - timezone.timedelta(days=27) - with patch('django.utils.timezone.now', Mock(return_value=days_30)): + with patch("django.utils.timezone.now", Mock(return_value=days_30)): AccessLog.objects.create() def test_axes_delete_access_logs_default(self): out = StringIO() - call_command('axes_reset_logs', stdout=out) + call_command("axes_reset_logs", stdout=out) self.assertEqual(self.msg_not_found, out.getvalue()) def test_axes_delete_access_logs_older_than_2_days(self): out = StringIO() - call_command('axes_reset_logs', age=2, stdout=out) + call_command("axes_reset_logs", age=2, stdout=out) self.assertEqual(self.msg_num_found.format(3), out.getvalue()) def test_axes_delete_access_logs_older_than_4_days(self): out = StringIO() - call_command('axes_reset_logs', age=4, stdout=out) + call_command("axes_reset_logs", age=4, stdout=out) self.assertEqual(self.msg_num_found.format(2), out.getvalue()) def test_axes_delete_access_logs_older_than_16_days(self): out = StringIO() - call_command('axes_reset_logs', age=16, stdout=out) + call_command("axes_reset_logs", age=16, stdout=out) self.assertEqual(self.msg_num_found.format(1), out.getvalue()) class ManagementCommandTestCase(AxesTestCase): def setUp(self): AccessAttempt.objects.create( - username='jane.doe', - ip_address='10.0.0.1', - failures_since_start='4', + username="jane.doe", ip_address="10.0.0.1", failures_since_start="4" ) AccessAttempt.objects.create( - username='john.doe', - ip_address='10.0.0.2', - failures_since_start='15', + username="john.doe", ip_address="10.0.0.2", failures_since_start="15" ) def test_axes_list_attempts(self): out = StringIO() - call_command('axes_list_attempts', stdout=out) + call_command("axes_list_attempts", stdout=out) - expected = '10.0.0.1\tjane.doe\t4\n10.0.0.2\tjohn.doe\t15\n' + expected = "10.0.0.1\tjane.doe\t4\n10.0.0.2\tjohn.doe\t15\n" self.assertEqual(expected, out.getvalue()) def test_axes_reset(self): out = StringIO() - call_command('axes_reset', stdout=out) + call_command("axes_reset", stdout=out) - expected = '2 attempts removed.\n' + expected = "2 attempts removed.\n" self.assertEqual(expected, out.getvalue()) def test_axes_reset_not_found(self): out = StringIO() - call_command('axes_reset', stdout=out) + call_command("axes_reset", stdout=out) out = StringIO() - call_command('axes_reset', stdout=out) + call_command("axes_reset", stdout=out) - expected = 'No attempts found.\n' + expected = "No attempts found.\n" self.assertEqual(expected, out.getvalue()) def test_axes_reset_ip(self): out = StringIO() - call_command('axes_reset_ip', '10.0.0.1', stdout=out) + call_command("axes_reset_ip", "10.0.0.1", stdout=out) - expected = '1 attempts removed.\n' + expected = "1 attempts removed.\n" self.assertEqual(expected, out.getvalue()) def test_axes_reset_ip_not_found(self): out = StringIO() - call_command('axes_reset_ip', '10.0.0.3', stdout=out) + call_command("axes_reset_ip", "10.0.0.3", stdout=out) - expected = 'No attempts found.\n' + expected = "No attempts found.\n" self.assertEqual(expected, out.getvalue()) def test_axes_reset_username(self): out = StringIO() - call_command('axes_reset_username', 'john.doe', stdout=out) + call_command("axes_reset_username", "john.doe", stdout=out) - expected = '1 attempts removed.\n' + expected = "1 attempts removed.\n" self.assertEqual(expected, out.getvalue()) def test_axes_reset_username_not_found(self): out = StringIO() - call_command('axes_reset_username', 'ivan.renko', stdout=out) + call_command("axes_reset_username", "ivan.renko", stdout=out) - expected = 'No attempts found.\n' + expected = "No attempts found.\n" self.assertEqual(expected, out.getvalue()) diff --git a/axes/tests/test_models.py b/axes/tests/test_models.py index d5ebadf..754cfc8 100644 --- a/axes/tests/test_models.py +++ b/axes/tests/test_models.py @@ -13,23 +13,22 @@ class ModelsTestCase(AxesTestCase): self.failures_since_start = 42 self.access_attempt = AccessAttempt( - failures_since_start=self.failures_since_start, + failures_since_start=self.failures_since_start ) self.access_log = AccessLog() def test_access_attempt_str(self): - self.assertIn('Access', str(self.access_attempt)) + self.assertIn("Access", str(self.access_attempt)) def test_access_log_str(self): - self.assertIn('Access', str(self.access_log)) + self.assertIn("Access", str(self.access_log)) class MigrationsTestCase(AxesTestCase): def test_missing_migrations(self): executor = MigrationExecutor(connection) autodetector = MigrationAutodetector( - executor.loader.project_state(), - ProjectState.from_apps(apps), + executor.loader.project_state(), ProjectState.from_apps(apps) ) changes = autodetector.changes(graph=executor.loader.graph) diff --git a/axes/tests/test_utils.py b/axes/tests/test_utils.py index 98a87eb..2d6f13a 100644 --- a/axes/tests/test_utils.py +++ b/axes/tests/test_utils.py @@ -62,22 +62,14 @@ class TimestampTestCase(AxesTestCase): """ expected = { - timedelta(days=1, hours=25, minutes=42, seconds=8): - 'P2DT1H42M8S', - timedelta(days=7, seconds=342): - 'P7DT5M42S', - timedelta(days=0, hours=2, minutes=42): - 'PT2H42M', - timedelta(hours=20, seconds=42): - 'PT20H42S', - timedelta(seconds=300): - 'PT5M', - timedelta(seconds=9005): - 'PT2H30M5S', - timedelta(minutes=9005): - 'P6DT6H5M', - timedelta(days=15): - 'P15D' + timedelta(days=1, hours=25, minutes=42, seconds=8): "P2DT1H42M8S", + timedelta(days=7, seconds=342): "P7DT5M42S", + timedelta(days=0, hours=2, minutes=42): "PT2H42M", + timedelta(hours=20, seconds=42): "PT20H42S", + timedelta(seconds=300): "PT5M", + timedelta(seconds=9005): "PT2H30M5S", + timedelta(minutes=9005): "P6DT6H5M", + timedelta(days=15): "P15D", } for delta, iso_duration in expected.items(): @@ -93,34 +85,38 @@ class ClientStringTestCase(AxesTestCase): @override_settings(AXES_VERBOSE=True) def test_verbose_ip_only_client_details(self): - username = 'test@example.com' - ip_address = '127.0.0.1' - user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' - path_info = '/admin/' + username = "test@example.com" + ip_address = "127.0.0.1" + user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)" + path_info = "/admin/" - expected = self.get_expected_client_str(username, ip_address, user_agent, path_info) + expected = self.get_expected_client_str( + username, ip_address, user_agent, path_info + ) actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @override_settings(AXES_VERBOSE=True) def test_verbose_ip_only_client_details_tuple(self): - username = 'test@example.com' - ip_address = '127.0.0.1' - user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' - path_info = ('admin', 'login') + username = "test@example.com" + ip_address = "127.0.0.1" + user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)" + path_info = ("admin", "login") - expected = self.get_expected_client_str(username, ip_address, user_agent, path_info[0]) + expected = self.get_expected_client_str( + username, ip_address, user_agent, path_info[0] + ) actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @override_settings(AXES_VERBOSE=False) def test_non_verbose_ip_only_client_details(self): - username = 'test@example.com' - ip_address = '127.0.0.1' - user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' - path_info = '/admin/' + username = "test@example.com" + ip_address = "127.0.0.1" + user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)" + path_info = "/admin/" expected = '{ip_address: "127.0.0.1", path_info: "/admin/"}' actual = get_client_str(username, ip_address, user_agent, path_info) @@ -130,12 +126,14 @@ class ClientStringTestCase(AxesTestCase): @override_settings(AXES_ONLY_USER_FAILURES=True) @override_settings(AXES_VERBOSE=True) def test_verbose_user_only_client_details(self): - username = 'test@example.com' - ip_address = '127.0.0.1' - user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' - path_info = '/admin/' + username = "test@example.com" + ip_address = "127.0.0.1" + user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)" + path_info = "/admin/" - expected = self.get_expected_client_str(username, ip_address, user_agent, path_info) + expected = self.get_expected_client_str( + username, ip_address, user_agent, path_info + ) actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -143,10 +141,10 @@ class ClientStringTestCase(AxesTestCase): @override_settings(AXES_ONLY_USER_FAILURES=True) @override_settings(AXES_VERBOSE=False) def test_non_verbose_user_only_client_details(self): - username = 'test@example.com' - ip_address = '127.0.0.1' - user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' - path_info = '/admin/' + username = "test@example.com" + ip_address = "127.0.0.1" + user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)" + path_info = "/admin/" expected = '{username: "test@example.com", path_info: "/admin/"}' actual = get_client_str(username, ip_address, user_agent, path_info) @@ -156,12 +154,14 @@ class ClientStringTestCase(AxesTestCase): @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) @override_settings(AXES_VERBOSE=True) def test_verbose_user_ip_combo_client_details(self): - username = 'test@example.com' - ip_address = '127.0.0.1' - user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' - path_info = '/admin/' + username = "test@example.com" + ip_address = "127.0.0.1" + user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)" + path_info = "/admin/" - expected = self.get_expected_client_str(username, ip_address, user_agent, path_info) + expected = self.get_expected_client_str( + username, ip_address, user_agent, path_info + ) actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -169,10 +169,10 @@ class ClientStringTestCase(AxesTestCase): @override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True) @override_settings(AXES_VERBOSE=False) def test_non_verbose_user_ip_combo_client_details(self): - username = 'test@example.com' - ip_address = '127.0.0.1' - user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' - path_info = '/admin/' + username = "test@example.com" + ip_address = "127.0.0.1" + user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)" + path_info = "/admin/" expected = '{username: "test@example.com", ip_address: "127.0.0.1", path_info: "/admin/"}' actual = get_client_str(username, ip_address, user_agent, path_info) @@ -182,12 +182,14 @@ class ClientStringTestCase(AxesTestCase): @override_settings(AXES_USE_USER_AGENT=True) @override_settings(AXES_VERBOSE=True) def test_verbose_user_agent_client_details(self): - username = 'test@example.com' - ip_address = '127.0.0.1' - user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' - path_info = '/admin/' + username = "test@example.com" + ip_address = "127.0.0.1" + user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)" + path_info = "/admin/" - expected = self.get_expected_client_str(username, ip_address, user_agent, path_info) + expected = self.get_expected_client_str( + username, ip_address, user_agent, path_info + ) actual = get_client_str(username, ip_address, user_agent, path_info) self.assertEqual(expected, actual) @@ -195,10 +197,10 @@ class ClientStringTestCase(AxesTestCase): @override_settings(AXES_USE_USER_AGENT=True) @override_settings(AXES_VERBOSE=False) def test_non_verbose_user_agent_client_details(self): - username = 'test@example.com' - ip_address = '127.0.0.1' - user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)' - path_info = '/admin/' + username = "test@example.com" + ip_address = "127.0.0.1" + user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)" + path_info = "/admin/" expected = '{ip_address: "127.0.0.1", user_agent: "Googlebot/2.1 (+http://www.googlebot.com/bot.html)", path_info: "/admin/"}' actual = get_client_str(username, ip_address, user_agent, path_info) @@ -207,13 +209,13 @@ class ClientStringTestCase(AxesTestCase): class ClientParametersTestCase(AxesTestCase): - @override_settings( - AXES_ONLY_USER_FAILURES=True, - ) + @override_settings(AXES_ONLY_USER_FAILURES=True) def test_get_filter_kwargs_user(self): self.assertEqual( - dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), - {'username': self.username}, + dict( + get_client_parameters(self.username, self.ip_address, self.user_agent) + ), + {"username": self.username}, ) @override_settings( @@ -223,8 +225,10 @@ class ClientParametersTestCase(AxesTestCase): ) def test_get_filter_kwargs_ip(self): self.assertEqual( - dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), - {'ip_address': self.ip_address}, + dict( + get_client_parameters(self.username, self.ip_address, self.user_agent) + ), + {"ip_address": self.ip_address}, ) @override_settings( @@ -234,8 +238,10 @@ class ClientParametersTestCase(AxesTestCase): ) def test_get_filter_kwargs_user_and_ip(self): self.assertEqual( - dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), - {'username': self.username, 'ip_address': self.ip_address}, + dict( + get_client_parameters(self.username, self.ip_address, self.user_agent) + ), + {"username": self.username, "ip_address": self.ip_address}, ) @override_settings( @@ -245,8 +251,10 @@ class ClientParametersTestCase(AxesTestCase): ) def test_get_filter_kwargs_ip_and_agent(self): self.assertEqual( - dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), - {'ip_address': self.ip_address, 'user_agent': self.user_agent}, + dict( + get_client_parameters(self.username, self.ip_address, self.user_agent) + ), + {"ip_address": self.ip_address, "user_agent": self.user_agent}, ) @override_settings( @@ -256,8 +264,14 @@ class ClientParametersTestCase(AxesTestCase): ) def test_get_filter_kwargs_user_ip_agent(self): self.assertEqual( - dict(get_client_parameters(self.username, self.ip_address, self.user_agent)), - {'username': self.username, 'ip_address': self.ip_address, 'user_agent': self.user_agent}, + dict( + get_client_parameters(self.username, self.ip_address, self.user_agent) + ), + { + "username": self.username, + "ip_address": self.ip_address, + "user_agent": self.user_agent, + }, ) @@ -268,29 +282,25 @@ class ClientCacheKeyTestCase(AxesTestCase): """ cache_hash_digest = md5(self.ip_address.encode()).hexdigest() - cache_hash_key = f'axes-{cache_hash_digest}' + cache_hash_key = f"axes-{cache_hash_digest}" # Getting cache key from request request_factory = RequestFactory() request = request_factory.post( - '/admin/login/', - data={ - 'username': self.username, - 'password': 'test', - }, + "/admin/login/", data={"username": self.username, "password": "test"} ) self.assertEqual(cache_hash_key, get_client_cache_key(request)) # Getting cache key from AccessAttempt Object attempt = AccessAttempt( - user_agent='', + user_agent="", ip_address=self.ip_address, username=self.username, - get_data='', - post_data='', - http_accept=request.META.get('HTTP_ACCEPT', ''), - path_info=request.META.get('PATH_INFO', ''), + get_data="", + post_data="", + http_accept=request.META.get("HTTP_ACCEPT", ""), + path_info=request.META.get("PATH_INFO", ""), failures_since_start=0, ) @@ -301,19 +311,16 @@ class ClientCacheKeyTestCase(AxesTestCase): Simulate an empty IP address in the request. """ - empty_ip_address = '' + empty_ip_address = "" cache_hash_digest = md5(empty_ip_address.encode()).hexdigest() - cache_hash_key = f'axes-{cache_hash_digest}' + cache_hash_key = f"axes-{cache_hash_digest}" # Getting cache key from request request_factory = RequestFactory() request = request_factory.post( - '/admin/login/', - data={ - 'username': self.username, - 'password': 'test', - }, + "/admin/login/", + data={"username": self.username, "password": "test"}, REMOTE_ADDR=empty_ip_address, ) @@ -321,13 +328,13 @@ class ClientCacheKeyTestCase(AxesTestCase): # Getting cache key from AccessAttempt Object attempt = AccessAttempt( - user_agent='', + user_agent="", ip_address=empty_ip_address, username=self.username, - get_data='', - post_data='', - http_accept=request.META.get('HTTP_ACCEPT', ''), - path_info=request.META.get('PATH_INFO', ''), + get_data="", + post_data="", + http_accept=request.META.get("HTTP_ACCEPT", ""), + path_info=request.META.get("PATH_INFO", ""), failures_since_start=0, ) @@ -340,110 +347,108 @@ class ClientCacheKeyTestCase(AxesTestCase): ip_address = self.ip_address cache_hash_digest = md5(ip_address.encode()).hexdigest() - cache_hash_key = f'axes-{cache_hash_digest}' + cache_hash_key = f"axes-{cache_hash_digest}" # Getting cache key from request request_factory = RequestFactory() request = request_factory.post( - '/admin/login/', - data={ - 'username': self.username, - 'password': 'test' - } + "/admin/login/", data={"username": self.username, "password": "test"} ) # Difference between the upper test: new call signature with credentials - credentials = {'username': self.username} + credentials = {"username": self.username} self.assertEqual(cache_hash_key, get_client_cache_key(request, credentials)) # Getting cache key from AccessAttempt Object attempt = AccessAttempt( - user_agent='', + user_agent="", ip_address=ip_address, username=self.username, - get_data='', - post_data='', - http_accept=request.META.get('HTTP_ACCEPT', ''), - path_info=request.META.get('PATH_INFO', ''), + get_data="", + post_data="", + http_accept=request.META.get("HTTP_ACCEPT", ""), + path_info=request.META.get("PATH_INFO", ""), failures_since_start=0, ) self.assertEqual(cache_hash_key, get_client_cache_key(attempt)) class UsernameTestCase(AxesTestCase): - @override_settings(AXES_USERNAME_FORM_FIELD='username') + @override_settings(AXES_USERNAME_FORM_FIELD="username") def test_default_get_client_username(self): - expected = 'test-username' + expected = "test-username" request = HttpRequest() - request.POST['username'] = expected + request.POST["username"] = expected actual = get_client_username(request) self.assertEqual(expected, actual) - @override_settings(AXES_USERNAME_FORM_FIELD='username') + @override_settings(AXES_USERNAME_FORM_FIELD="username") def test_default_get_client_username_credentials(self): - expected = 'test-username' - expected_in_credentials = 'test-credentials-username' + expected = "test-username" + expected_in_credentials = "test-credentials-username" request = HttpRequest() - request.POST['username'] = expected - credentials = { - 'username': expected_in_credentials - } + request.POST["username"] = expected + credentials = {"username": expected_in_credentials} actual = get_client_username(request, credentials) self.assertEqual(expected_in_credentials, actual) def sample_customize_username(request, credentials): - return 'prefixed-' + request.POST.get('username') + return "prefixed-" + request.POST.get("username") - @override_settings(AXES_USERNAME_FORM_FIELD='username') + @override_settings(AXES_USERNAME_FORM_FIELD="username") @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username) def test_custom_get_client_username_from_request(self): - provided = 'test-username' - expected = 'prefixed-' + provided - provided_in_credentials = 'test-credentials-username' + provided = "test-username" + expected = "prefixed-" + provided + provided_in_credentials = "test-credentials-username" request = HttpRequest() - request.POST['username'] = provided - credentials = {'username': provided_in_credentials} + request.POST["username"] = provided + credentials = {"username": provided_in_credentials} actual = get_client_username(request, credentials) self.assertEqual(expected, actual) def sample_customize_username_credentials(request, credentials): - return 'prefixed-' + credentials.get('username') + return "prefixed-" + credentials.get("username") - @override_settings(AXES_USERNAME_FORM_FIELD='username') + @override_settings(AXES_USERNAME_FORM_FIELD="username") @override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_credentials) def test_custom_get_client_username_from_credentials(self): - provided = 'test-username' - provided_in_credentials = 'test-credentials-username' - expected_in_credentials = 'prefixed-' + provided_in_credentials + provided = "test-username" + provided_in_credentials = "test-credentials-username" + expected_in_credentials = "prefixed-" + provided_in_credentials request = HttpRequest() - request.POST['username'] = provided - credentials = {'username': provided_in_credentials} + request.POST["username"] = provided + credentials = {"username": provided_in_credentials} actual = get_client_username(request, credentials) self.assertEqual(expected_in_credentials, actual) - @override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials: 'example') # pragma: no cover + @override_settings( + AXES_USERNAME_CALLABLE=lambda request, credentials: "example" + ) # pragma: no cover def test_get_client_username(self): - self.assertEqual(get_client_username(HttpRequest(), {}), 'example') + self.assertEqual(get_client_username(HttpRequest(), {}), "example") @override_settings(AXES_USERNAME_CALLABLE=lambda request: None) # pragma: no cover def test_get_client_username_invalid_callable_too_few_arguments(self): with self.assertRaises(TypeError): get_client_username(HttpRequest(), {}) - @override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials, extra: None) # pragma: no cover + @override_settings( + AXES_USERNAME_CALLABLE=lambda request, credentials, extra: None + ) # pragma: no cover def test_get_client_username_invalid_callable_too_many_arguments(self): with self.assertRaises(TypeError): get_client_username(HttpRequest(), {}) @@ -453,73 +458,70 @@ class UsernameTestCase(AxesTestCase): with self.assertRaises(TypeError): get_client_username(HttpRequest(), {}) - @override_settings(AXES_USERNAME_CALLABLE='axes.tests.test_utils.get_username') + @override_settings(AXES_USERNAME_CALLABLE="axes.tests.test_utils.get_username") def test_get_client_username_str(self): - self.assertEqual( - get_client_username(HttpRequest(), {}), - 'username', - ) + self.assertEqual(get_client_username(HttpRequest(), {}), "username") def get_username(request, credentials: dict) -> str: - return 'username' + return "username" class IPWhitelistTestCase(AxesTestCase): def setUp(self): self.request = HttpRequest() - self.request.method = 'POST' - self.request.META['REMOTE_ADDR'] = '127.0.0.1' - self.request.axes_ip_address = '127.0.0.1' + self.request.method = "POST" + self.request.META["REMOTE_ADDR"] = "127.0.0.1" + self.request.axes_ip_address = "127.0.0.1" @override_settings(AXES_IP_WHITELIST=None) def test_ip_in_whitelist_none(self): - self.assertFalse(is_ip_address_in_whitelist('127.0.0.2')) + self.assertFalse(is_ip_address_in_whitelist("127.0.0.2")) - @override_settings(AXES_IP_WHITELIST=['127.0.0.1']) + @override_settings(AXES_IP_WHITELIST=["127.0.0.1"]) def test_ip_in_whitelist(self): - self.assertTrue(is_ip_address_in_whitelist('127.0.0.1')) - self.assertFalse(is_ip_address_in_whitelist('127.0.0.2')) + self.assertTrue(is_ip_address_in_whitelist("127.0.0.1")) + self.assertFalse(is_ip_address_in_whitelist("127.0.0.2")) @override_settings(AXES_IP_BLACKLIST=None) def test_ip_in_blacklist_none(self): - self.assertFalse(is_ip_address_in_blacklist('127.0.0.2')) + self.assertFalse(is_ip_address_in_blacklist("127.0.0.2")) - @override_settings(AXES_IP_BLACKLIST=['127.0.0.1']) + @override_settings(AXES_IP_BLACKLIST=["127.0.0.1"]) def test_ip_in_blacklist(self): - self.assertTrue(is_ip_address_in_blacklist('127.0.0.1')) - self.assertFalse(is_ip_address_in_blacklist('127.0.0.2')) + self.assertTrue(is_ip_address_in_blacklist("127.0.0.1")) + self.assertFalse(is_ip_address_in_blacklist("127.0.0.2")) - @override_settings(AXES_IP_BLACKLIST=['127.0.0.1']) + @override_settings(AXES_IP_BLACKLIST=["127.0.0.1"]) def test_is_client_ip_address_blacklisted_ip_in_blacklist(self): self.assertTrue(is_client_ip_address_blacklisted(self.request)) - @override_settings(AXES_IP_BLACKLIST=['127.0.0.2']) + @override_settings(AXES_IP_BLACKLIST=["127.0.0.2"]) def test_is_is_client_ip_address_blacklisted_ip_not_in_blacklist(self): self.assertFalse(is_client_ip_address_blacklisted(self.request)) @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True) - @override_settings(AXES_IP_WHITELIST=['127.0.0.1']) + @override_settings(AXES_IP_WHITELIST=["127.0.0.1"]) def test_is_client_ip_address_blacklisted_ip_in_whitelist(self): self.assertFalse(is_client_ip_address_blacklisted(self.request)) @override_settings(AXES_ONLY_WHITELIST=True) - @override_settings(AXES_IP_WHITELIST=['127.0.0.2']) + @override_settings(AXES_IP_WHITELIST=["127.0.0.2"]) def test_is_already_locked_ip_not_in_whitelist(self): self.assertTrue(is_client_ip_address_blacklisted(self.request)) @override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True) - @override_settings(AXES_IP_WHITELIST=['127.0.0.1']) + @override_settings(AXES_IP_WHITELIST=["127.0.0.1"]) def test_is_client_ip_address_whitelisted_never_lockout(self): self.assertTrue(is_client_ip_address_whitelisted(self.request)) @override_settings(AXES_ONLY_WHITELIST=True) - @override_settings(AXES_IP_WHITELIST=['127.0.0.1']) + @override_settings(AXES_IP_WHITELIST=["127.0.0.1"]) def test_is_client_ip_address_whitelisted_only_allow(self): self.assertTrue(is_client_ip_address_whitelisted(self.request)) @override_settings(AXES_ONLY_WHITELIST=True) - @override_settings(AXES_IP_WHITELIST=['127.0.0.2']) + @override_settings(AXES_IP_WHITELIST=["127.0.0.2"]) def test_is_client_ip_address_whitelisted_not(self): self.assertFalse(is_client_ip_address_whitelisted(self.request)) @@ -527,7 +529,7 @@ class IPWhitelistTestCase(AxesTestCase): class MethodWhitelistTestCase(AxesTestCase): def setUp(self): self.request = HttpRequest() - self.request.method = 'GET' + self.request.method = "GET" @override_settings(AXES_NEVER_LOCKOUT_GET=True) def test_is_client_method_whitelisted(self): @@ -546,14 +548,14 @@ class LockoutResponseTestCase(AxesTestCase): def test_get_lockout_response_cool_off(self): get_lockout_response(request=self.request) - @override_settings(AXES_LOCKOUT_TEMPLATE='example.html') - @patch('axes.helpers.render') + @override_settings(AXES_LOCKOUT_TEMPLATE="example.html") + @patch("axes.helpers.render") def test_get_lockout_response_lockout_template(self, render): self.assertFalse(render.called) get_lockout_response(request=self.request) self.assertTrue(render.called) - @override_settings(AXES_LOCKOUT_URL='https://example.com') + @override_settings(AXES_LOCKOUT_URL="https://example.com") def test_get_lockout_response_lockout_url(self): response = get_lockout_response(request=self.request) self.assertEqual(type(response), HttpResponseRedirect) diff --git a/axes/tests/urls.py b/axes/tests/urls.py index d52422e..bd22e9e 100644 --- a/axes/tests/urls.py +++ b/axes/tests/urls.py @@ -2,6 +2,4 @@ from django.conf.urls import url from django.contrib import admin -urlpatterns = [ - url(r'^admin/', admin.site.urls), -] +urlpatterns = [url(r"^admin/", admin.site.urls)] diff --git a/requirements.txt b/requirements.txt index e4bc8ad..ac813a4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ -e . +black==19.3b0 coverage==4.5.4 mypy==0.730; python_version < '3.8' and python_implementation != 'PyPy' prospector==1.1.6.2 ; python_version < '3.8' and python_implementation != 'PyPy' diff --git a/tox.ini b/tox.ini index da4cdcc..5cdfe48 100644 --- a/tox.ini +++ b/tox.ini @@ -26,7 +26,8 @@ deps = usedevelop = True commands = pytest - {py36,py37}: prospector {py36,py37}: mypy axes + {py36,py37}: prospector + {py36,py37}: black --check --diff axes setenv = PYTHONDONTWRITEBYTECODE=1