Upgrade CI tooling to use automatic code formatting

This commit is contained in:
Aleksi Häkli 2019-09-28 19:27:50 +03:00
parent d94c53f999
commit 93bb73552e
No known key found for this signature in database
GPG key ID: 3E7146964D726BBE
43 changed files with 1033 additions and 1074 deletions

View file

@ -4,4 +4,4 @@ ignore-paths:
pep8:
options:
max-line-length: 119
max-line-length: 142

View file

@ -1,5 +1,5 @@
from pkg_resources import get_distribution
default_app_config = 'axes.apps.AppConfig'
default_app_config = "axes.apps.AppConfig"
__version__ = get_distribution('django-axes').version
__version__ = get_distribution("django-axes").version

View file

@ -7,50 +7,36 @@ from axes.models import AccessAttempt, AccessLog
@admin.register(AccessAttempt)
class AccessAttemptAdmin(admin.ModelAdmin):
list_display = (
'attempt_time',
'ip_address',
'user_agent',
'username',
'path_info',
'failures_since_start',
"attempt_time",
"ip_address",
"user_agent",
"username",
"path_info",
"failures_since_start",
)
list_filter = [
'attempt_time',
'path_info',
]
list_filter = ["attempt_time", "path_info"]
search_fields = [
'ip_address',
'username',
'user_agent',
'path_info',
]
search_fields = ["ip_address", "username", "user_agent", "path_info"]
date_hierarchy = 'attempt_time'
date_hierarchy = "attempt_time"
fieldsets = (
(None, {
'fields': ('path_info', 'failures_since_start')
}),
(_('Form Data'), {
'fields': ('get_data', 'post_data')
}),
(_('Meta Data'), {
'fields': ('user_agent', 'ip_address', 'http_accept')
})
(None, {"fields": ("path_info", "failures_since_start")}),
(_("Form Data"), {"fields": ("get_data", "post_data")}),
(_("Meta Data"), {"fields": ("user_agent", "ip_address", "http_accept")}),
)
readonly_fields = [
'user_agent',
'ip_address',
'username',
'http_accept',
'path_info',
'attempt_time',
'get_data',
'post_data',
'failures_since_start'
"user_agent",
"ip_address",
"username",
"http_accept",
"path_info",
"attempt_time",
"get_data",
"post_data",
"failures_since_start",
]
def has_add_permission(self, request):
@ -60,46 +46,33 @@ class AccessAttemptAdmin(admin.ModelAdmin):
@admin.register(AccessLog)
class AccessLogAdmin(admin.ModelAdmin):
list_display = (
'attempt_time',
'logout_time',
'ip_address',
'username',
'user_agent',
'path_info',
"attempt_time",
"logout_time",
"ip_address",
"username",
"user_agent",
"path_info",
)
list_filter = [
'attempt_time',
'logout_time',
'path_info',
]
list_filter = ["attempt_time", "logout_time", "path_info"]
search_fields = [
'ip_address',
'user_agent',
'username',
'path_info',
]
search_fields = ["ip_address", "user_agent", "username", "path_info"]
date_hierarchy = 'attempt_time'
date_hierarchy = "attempt_time"
fieldsets = (
(None, {
'fields': ('path_info',)
}),
(_('Meta Data'), {
'fields': ('user_agent', 'ip_address', 'http_accept')
})
(None, {"fields": ("path_info",)}),
(_("Meta Data"), {"fields": ("user_agent", "ip_address", "http_accept")}),
)
readonly_fields = [
'user_agent',
'ip_address',
'username',
'http_accept',
'path_info',
'attempt_time',
'logout_time'
"user_agent",
"ip_address",
"username",
"http_accept",
"path_info",
"attempt_time",
"logout_time",
]
def has_add_permission(self, request):

View file

@ -9,7 +9,7 @@ log = getLogger(settings.AXES_LOGGER)
class AppConfig(apps.AppConfig):
name = 'axes'
name = "axes"
logging_initialized = False
@classmethod
@ -31,15 +31,18 @@ class AppConfig(apps.AppConfig):
return
cls.logging_initialized = True
log.info('AXES: BEGIN LOG')
log.info('AXES: Using django-axes version %s', get_distribution('django-axes').version)
log.info("AXES: BEGIN LOG")
log.info(
"AXES: Using django-axes version %s",
get_distribution("django-axes").version,
)
if settings.AXES_ONLY_USER_FAILURES:
log.info('AXES: blocking by username only.')
log.info("AXES: blocking by username only.")
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
log.info('AXES: blocking by combination of username and IP.')
log.info("AXES: blocking by combination of username and IP.")
else:
log.info('AXES: blocking by IP only.')
log.info("AXES: blocking by IP only.")
def ready(self):
self.initialize()

View file

@ -6,11 +6,7 @@ from django.utils.timezone import datetime, now
from axes.conf import settings
from axes.models import AccessAttempt
from axes.helpers import (
get_client_username,
get_client_parameters,
get_cool_off,
)
from axes.helpers import get_client_username, get_client_parameters, get_cool_off
log = getLogger(settings.AXES_LOGGER)
@ -22,7 +18,9 @@ def get_cool_off_threshold(attempt_time: datetime = None) -> datetime:
cool_off = get_cool_off()
if cool_off is None:
raise TypeError('Cool off threshold can not be calculated with settings.AXES_COOLOFF_TIME set to None')
raise TypeError(
"Cool off threshold can not be calculated with settings.AXES_COOLOFF_TIME set to None"
)
if attempt_time is None:
return now() - cool_off
@ -36,7 +34,9 @@ def filter_user_attempts(request, credentials: dict = None) -> QuerySet:
username = get_client_username(request, credentials)
filter_kwargs = get_client_parameters(username, request.axes_ip_address, request.axes_user_agent)
filter_kwargs = get_client_parameters(
username, request.axes_ip_address, request.axes_user_agent
)
return AccessAttempt.objects.filter(**filter_kwargs)
@ -49,11 +49,13 @@ def get_user_attempts(request, credentials: dict = None) -> QuerySet:
attempts = filter_user_attempts(request, credentials)
if settings.AXES_COOLOFF_TIME is None:
log.debug('AXES: Getting all access attempts from database because no AXES_COOLOFF_TIME is configured')
log.debug(
"AXES: Getting all access attempts from database because no AXES_COOLOFF_TIME is configured"
)
return attempts
threshold = get_cool_off_threshold(request.axes_attempt_time)
log.debug('AXES: Getting access attempts that are newer than %s', threshold)
log.debug("AXES: Getting access attempts that are newer than %s", threshold)
return attempts.filter(attempt_time__gte=threshold)
@ -63,12 +65,18 @@ def clean_expired_user_attempts(attempt_time: datetime = None) -> int:
"""
if settings.AXES_COOLOFF_TIME is None:
log.debug('AXES: Skipping clean for expired access attempts because no AXES_COOLOFF_TIME is configured')
log.debug(
"AXES: Skipping clean for expired access attempts because no AXES_COOLOFF_TIME is configured"
)
return 0
threshold = get_cool_off_threshold(attempt_time)
count, _ = AccessAttempt.objects.filter(attempt_time__lt=threshold).delete()
log.info('AXES: Cleaned up %s expired access attempts from database that were older than %s', count, threshold)
log.info(
"AXES: Cleaned up %s expired access attempts from database that were older than %s",
count,
threshold,
)
return count
@ -80,7 +88,7 @@ def reset_user_attempts(request, credentials: dict = None) -> int:
attempts = filter_user_attempts(request, credentials)
count, _ = attempts.delete()
log.info('AXES: Reset %s access attempts from database.', count)
log.info("AXES: Reset %s access attempts from database.", count)
return count
@ -95,11 +103,9 @@ def is_user_attempt_whitelisted(request, credentials: dict = None) -> bool:
this implementation fails gracefully and returns True.
"""
username_field = getattr(get_user_model(), 'USERNAME_FIELD', 'username')
username_field = getattr(get_user_model(), "USERNAME_FIELD", "username")
username_value = get_client_username(request, credentials)
kwargs = {
username_field: username_value
}
kwargs = {username_field: username_value}
user_model = get_user_model()

View file

@ -1,6 +1,9 @@
from django.contrib.auth.backends import ModelBackend
from axes.exceptions import AxesBackendPermissionDenied, AxesBackendRequestParameterRequired
from axes.exceptions import (
AxesBackendPermissionDenied,
AxesBackendRequestParameterRequired,
)
from axes.handlers.proxy import AxesProxyHandler
from axes.helpers import get_credentials, get_lockout_message, toggleable
@ -17,7 +20,9 @@ class AxesBackend(ModelBackend):
"""
@toggleable
def authenticate(self, request, username: str = None, password: str = None, **kwargs: dict):
def authenticate(
self, request, username: str = None, password: str = None, **kwargs: dict
):
"""
Checks user lockout status and raises an exception if user is not allowed to log in.
@ -30,7 +35,9 @@ class AxesBackend(ModelBackend):
"""
if request is None:
raise AxesBackendRequestParameterRequired('AxesBackend requires a request as an argument to authenticate')
raise AxesBackendRequestParameterRequired(
"AxesBackend requires a request as an argument to authenticate"
)
credentials = get_credentials(username=username, password=password, **kwargs)
@ -41,8 +48,8 @@ class AxesBackend(ModelBackend):
# Its a bit weird to pass a context and expect a response value but its nice to get a "why" back.
error_msg = get_lockout_message()
response_context = kwargs.get('response_context', {})
response_context['error'] = error_msg
response_context = kwargs.get("response_context", {})
response_context["error"] = error_msg
# Raise an error that stops the authentication flows at django.contrib.auth.authenticate.
# This error stops bubbling up at the authenticate call which catches backend PermissionDenied errors.
@ -50,4 +57,6 @@ class AxesBackend(ModelBackend):
# which is processed by axes.signals.log_user_login_failed which logs and flags the failed request.
# The axes.middleware.AxesMiddleware further processes the flagged request into a readable response.
raise AxesBackendPermissionDenied('AxesBackend detected that the given user is locked out')
raise AxesBackendPermissionDenied(
"AxesBackend detected that the given user is locked out"
)

View file

@ -1,4 +1,8 @@
from django.core.checks import Tags, Warning, register # pylint: disable=redefined-builtin
from django.core.checks import ( # pylint: disable=redefined-builtin
Tags,
Warning,
register,
)
from django.utils.module_loading import import_string
from axes.backends import AxesBackend
@ -15,51 +19,51 @@ class Messages:
MIDDLEWARE_INVALID = (
"You do not have 'axes.middleware.AxesMiddleware' in your settings.MIDDLEWARE."
)
BACKEND_INVALID = (
"You do not have 'axes.backends.AxesBackend' or a subclass in your settings.AUTHENTICATION_BACKENDS."
)
SETTING_DEPRECATED = (
'You have a deprecated setting {deprecated_setting} configured in your project settings'
)
BACKEND_INVALID = "You do not have 'axes.backends.AxesBackend' or a subclass in your settings.AUTHENTICATION_BACKENDS."
SETTING_DEPRECATED = "You have a deprecated setting {deprecated_setting} configured in your project settings"
class Hints:
CACHE_INVALID = None
MIDDLEWARE_INVALID = None
BACKEND_INVALID = 'AxesModelBackend was renamed to AxesBackend in django-axes version 5.0.'
BACKEND_INVALID = (
"AxesModelBackend was renamed to AxesBackend in django-axes version 5.0."
)
SETTING_DEPRECATED = None
class Codes:
CACHE_INVALID = 'axes.W001'
MIDDLEWARE_INVALID = 'axes.W002'
BACKEND_INVALID = 'axes.W003'
SETTING_DEPRECATED = 'axes.W004'
CACHE_INVALID = "axes.W001"
MIDDLEWARE_INVALID = "axes.W002"
BACKEND_INVALID = "axes.W003"
SETTING_DEPRECATED = "axes.W004"
@register(Tags.security, Tags.caches, Tags.compatibility)
def axes_cache_check(app_configs, **kwargs): # pylint: disable=unused-argument
axes_handler = getattr(settings, 'AXES_HANDLER', '')
axes_handler = getattr(settings, "AXES_HANDLER", "")
axes_cache_key = getattr(settings, 'AXES_CACHE', 'default')
axes_cache_key = getattr(settings, "AXES_CACHE", "default")
axes_cache_config = settings.CACHES.get(axes_cache_key, {})
axes_cache_backend = axes_cache_config.get('BACKEND', '')
axes_cache_backend = axes_cache_config.get("BACKEND", "")
axes_cache_backend_incompatible = [
'django.core.cache.backends.dummy.DummyCache',
'django.core.cache.backends.locmem.LocMemCache',
'django.core.cache.backends.filebased.FileBasedCache',
"django.core.cache.backends.dummy.DummyCache",
"django.core.cache.backends.locmem.LocMemCache",
"django.core.cache.backends.filebased.FileBasedCache",
]
warnings = []
if axes_handler == 'axes.handlers.cache.AxesCacheHandler':
if axes_handler == "axes.handlers.cache.AxesCacheHandler":
if axes_cache_backend in axes_cache_backend_incompatible:
warnings.append(Warning(
msg=Messages.CACHE_INVALID,
hint=Hints.CACHE_INVALID,
id=Codes.CACHE_INVALID,
))
warnings.append(
Warning(
msg=Messages.CACHE_INVALID,
hint=Hints.CACHE_INVALID,
id=Codes.CACHE_INVALID,
)
)
return warnings
@ -68,12 +72,14 @@ def axes_cache_check(app_configs, **kwargs): # pylint: disable=unused-argument
def axes_middleware_check(app_configs, **kwargs): # pylint: disable=unused-argument
warnings = []
if 'axes.middleware.AxesMiddleware' not in settings.MIDDLEWARE:
warnings.append(Warning(
msg=Messages.MIDDLEWARE_INVALID,
hint=Hints.MIDDLEWARE_INVALID,
id=Codes.MIDDLEWARE_INVALID,
))
if "axes.middleware.AxesMiddleware" not in settings.MIDDLEWARE:
warnings.append(
Warning(
msg=Messages.MIDDLEWARE_INVALID,
hint=Hints.MIDDLEWARE_INVALID,
id=Codes.MIDDLEWARE_INVALID,
)
)
return warnings
@ -87,20 +93,26 @@ def axes_backend_check(app_configs, **kwargs): # pylint: disable=unused-argumen
try:
backend = import_string(name)
except ModuleNotFoundError as e:
raise ModuleNotFoundError('Can not find module path defined in settings.AUTHENTICATION_BACKENDS') from e
raise ModuleNotFoundError(
"Can not find module path defined in settings.AUTHENTICATION_BACKENDS"
) from e
except ImportError as e:
raise ImportError('Can not import backend class defined in settings.AUTHENTICATION_BACKENDS') from e
raise ImportError(
"Can not import backend class defined in settings.AUTHENTICATION_BACKENDS"
) from e
if issubclass(backend, AxesBackend):
found = True
break
if not found:
warnings.append(Warning(
msg=Messages.BACKEND_INVALID,
hint=Hints.BACKEND_INVALID,
id=Codes.BACKEND_INVALID,
))
warnings.append(
Warning(
msg=Messages.BACKEND_INVALID,
hint=Hints.BACKEND_INVALID,
id=Codes.BACKEND_INVALID,
)
)
return warnings
@ -109,18 +121,20 @@ def axes_backend_check(app_configs, **kwargs): # pylint: disable=unused-argumen
def axes_deprecation_check(app_configs, **kwargs): # pylint: disable=unused-argument
warnings = []
deprecated_settings = [
'AXES_DISABLE_SUCCESS_ACCESS_LOG',
]
deprecated_settings = ["AXES_DISABLE_SUCCESS_ACCESS_LOG"]
for deprecated_setting in deprecated_settings:
try:
getattr(settings, deprecated_setting)
warnings.append(Warning(
msg=Messages.SETTING_DEPRECATED.format(deprecated_setting=deprecated_setting),
hint=None,
id=Codes.SETTING_DEPRECATED,
))
warnings.append(
Warning(
msg=Messages.SETTING_DEPRECATED.format(
deprecated_setting=deprecated_setting
),
hint=None,
id=Codes.SETTING_DEPRECATED,
)
)
except AttributeError:
pass

View file

@ -27,10 +27,10 @@ class AxesAppConf(AppConf):
USE_USER_AGENT = False
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = 'username'
USERNAME_FORM_FIELD = "username"
# use a specific password field to retrieve from login POST data
PASSWORD_FORM_FIELD = 'password' # noqa
PASSWORD_FORM_FIELD = "password" # noqa
# use a provided callable to transform the POSTed username into the one used in credentials
USERNAME_CALLABLE = None
@ -40,9 +40,9 @@ class AxesAppConf(AppConf):
DISABLE_ACCESS_LOG = False
HANDLER = 'axes.handlers.database.AxesDatabaseHandler'
HANDLER = "axes.handlers.database.AxesDatabaseHandler"
LOGGER = 'axes.watch_login'
LOGGER = "axes.watch_login"
LOCKOUT_TEMPLATE = None
@ -64,13 +64,17 @@ class AxesAppConf(AppConf):
IP_BLACKLIST = None
# message to show when locked out and have cooloff enabled
COOLOFF_MESSAGE = _('Account locked: too many login attempts. Please try again later')
COOLOFF_MESSAGE = _(
"Account locked: too many login attempts. Please try again later"
)
# message to show when locked out and have cooloff disabled
PERMALOCK_MESSAGE = _('Account locked: too many login attempts. Contact an admin to unlock your account.')
PERMALOCK_MESSAGE = _(
"Account locked: too many login attempts. Contact an admin to unlock your account."
)
# if your deployment is using reverse proxies, set this value to 'left-most' or 'right-most' per your configuration
PROXY_ORDER = 'left-most'
PROXY_ORDER = "left-most"
# if your deployment is using reverse proxies, set this value to the number of proxies in front of Django
PROXY_COUNT = None
@ -82,9 +86,7 @@ class AxesAppConf(AppConf):
# if your deployment is using reverse proxies, ensure that the header attributes are securely set by the proxy
# ensure that the client can not spoof the headers by setting them and sending them through the proxy
META_PRECEDENCE_ORDER = getattr(
settings, 'AXES_META_PRECEDENCE_ORDER', getattr(
settings, 'IPWARE_META_PRECEDENCE_ORDER', (
'REMOTE_ADDR',
)
)
settings,
"AXES_META_PRECEDENCE_ORDER",
getattr(settings, "IPWARE_META_PRECEDENCE_ORDER", ("REMOTE_ADDR",)),
)

View file

@ -30,7 +30,9 @@ class AxesHandler: # pylint: disable=unused-argument
:raises NotImplementedError: if the handler does not support resetting attempts.
"""
raise NotImplementedError('Reset for access attempts is not supported on this backend')
raise NotImplementedError(
"Reset for access attempts is not supported on this backend"
)
def reset_logs(self, *, age_days: int = None) -> int:
"""
@ -39,7 +41,9 @@ class AxesHandler: # pylint: disable=unused-argument
:raises NotImplementedError: if the handler does not support resetting logs.
"""
raise NotImplementedError('Reset for access logs is not supported on this backend')
raise NotImplementedError(
"Reset for access logs is not supported on this backend"
)
def is_allowed(self, request, credentials: dict = None) -> bool:
"""
@ -70,7 +74,7 @@ class AxesHandler: # pylint: disable=unused-argument
return True
def user_login_failed(self, sender, credentials: dict, request = None, **kwargs):
def user_login_failed(self, sender, credentials: dict, request=None, **kwargs):
"""
Handles the Django ``django.contrib.auth.signals.user_login_failed`` authentication signal.
"""
@ -95,7 +99,9 @@ class AxesHandler: # pylint: disable=unused-argument
Handles the ``axes.models.AccessAttempt`` object post delete signal.
"""
def is_blacklisted(self, request, credentials: dict = None) -> bool: # pylint: disable=unused-argument
def is_blacklisted(
self, request, credentials: dict = None
) -> bool: # pylint: disable=unused-argument
"""
Checks if the request or given credentials are blacklisted from access.
"""
@ -105,7 +111,9 @@ class AxesHandler: # pylint: disable=unused-argument
return False
def is_whitelisted(self, request, credentials: dict = None) -> bool: # pylint: disable=unused-argument
def is_whitelisted(
self, request, credentials: dict = None
) -> bool: # pylint: disable=unused-argument
"""
Checks if the request or given credentials are whitelisted for access.
"""
@ -124,7 +132,9 @@ class AxesHandler: # pylint: disable=unused-argument
"""
if settings.AXES_LOCK_OUT_AT_FAILURE:
return self.get_failures(request, credentials) >= get_failure_limit(request, credentials)
return self.get_failures(request, credentials) >= get_failure_limit(
request, credentials
)
return False
@ -136,15 +146,18 @@ class AxesHandler: # pylint: disable=unused-argument
if the ``settings.AXES_LOCK_OUT_AT_FAILURE`` flag is set to ``True``.
"""
raise NotImplementedError('The Axes handler class needs a method definition for get_failures')
raise NotImplementedError(
"The Axes handler class needs a method definition for get_failures"
)
def is_admin_site(self, request) -> bool:
"""
Checks if the request is for admin site.
"""
if (
settings.AXES_ONLY_ADMIN_SITE and hasattr(request, 'path') and
not re.match('^%s' % reverse('admin:index'), request.path)
settings.AXES_ONLY_ADMIN_SITE
and hasattr(request, "path")
and not re.match("^%s" % reverse("admin:index"), request.path)
):
return True

View file

@ -30,11 +30,7 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals
return self.cache.get(cache_key, default=0)
def user_login_failed(
self,
sender,
credentials: dict,
request = None,
**kwargs
self, sender, credentials: dict, request=None, **kwargs
): # pylint: disable=too-many-locals
"""
When user login fails, save attempt record in cache and lock user out if necessary.
@ -43,64 +39,92 @@ class AxesCacheHandler(AxesHandler): # pylint: disable=too-many-locals
"""
if request is None:
log.error('AXES: AxesCacheHandler.user_login_failed does not function without a request.')
log.error(
"AXES: AxesCacheHandler.user_login_failed does not function without a request."
)
return
username = get_client_username(request, credentials)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
client_str = get_client_str(
username,
request.axes_ip_address,
request.axes_user_agent,
request.axes_path_info,
)
if self.is_whitelisted(request, credentials):
log.info('AXES: Login failed from whitelisted client %s.', client_str)
log.info("AXES: Login failed from whitelisted client %s.", client_str)
return
failures_since_start = 1 + self.get_failures(request, credentials)
if failures_since_start > 1:
log.warning(
'AXES: Repeated login failure by %s. Count = %d of %d. Updating existing record in the cache.',
"AXES: Repeated login failure by %s. Count = %d of %d. Updating existing record in the cache.",
client_str,
failures_since_start,
get_failure_limit(request, credentials),
)
else:
log.warning(
'AXES: New login failure by %s. Creating new record in the cache.',
"AXES: New login failure by %s. Creating new record in the cache.",
client_str,
)
cache_key = get_client_cache_key(request, credentials)
self.cache.set(cache_key, failures_since_start, self.cache_timeout)
if settings.AXES_LOCK_OUT_AT_FAILURE and failures_since_start >= get_failure_limit(request, credentials):
log.warning('AXES: Locking out %s after repeated login failures.', client_str)
if (
settings.AXES_LOCK_OUT_AT_FAILURE
and failures_since_start >= get_failure_limit(request, credentials)
):
log.warning(
"AXES: Locking out %s after repeated login failures.", client_str
)
request.axes_locked_out = True
user_locked_out.send(
'axes',
"axes",
request=request,
username=username,
ip_address=request.axes_ip_address,
)
def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
def user_logged_in(
self, sender, request, user, **kwargs
): # pylint: disable=unused-argument
"""
When user logs in, update the AccessLog related to the user.
"""
username = user.get_username()
credentials = get_credentials(username)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
client_str = get_client_str(
username,
request.axes_ip_address,
request.axes_user_agent,
request.axes_path_info,
)
log.info('AXES: Successful login by %s.', client_str)
log.info("AXES: Successful login by %s.", client_str)
if settings.AXES_RESET_ON_SUCCESS:
cache_key = get_client_cache_key(request, credentials)
failures_since_start = self.cache.get(cache_key, default=0)
self.cache.delete(cache_key)
log.info('AXES: Deleted %d failed login attempts by %s from cache.', failures_since_start, client_str)
log.info(
"AXES: Deleted %d failed login attempts by %s from cache.",
failures_since_start,
client_str,
)
def user_logged_out(self, sender, request, user, **kwargs):
username = user.get_username() if user else None
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
client_str = get_client_str(
username,
request.axes_ip_address,
request.axes_user_agent,
request.axes_path_info,
)
log.info('AXES: Successful logout by %s.', client_str)
log.info("AXES: Successful logout by %s.", client_str)

View file

@ -40,24 +40,31 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
attempts = attempts.filter(username=username)
count, _ = attempts.delete()
log.info('AXES: Reset %d access attempts from database.', count)
log.info("AXES: Reset %d access attempts from database.", count)
return count
def reset_logs(self, *, age_days: int = None) -> int:
if age_days is None:
count, _ = AccessLog.objects.all().delete()
log.info('AXES: Reset all %d access logs from database.', count)
log.info("AXES: Reset all %d access logs from database.", count)
else:
limit = timezone.now() - timezone.timedelta(days=age_days)
count, _ = AccessLog.objects.filter(attempt_time__lte=limit).delete()
log.info('AXES: Reset %d access logs older than %d days from database.', count, age_days)
log.info(
"AXES: Reset %d access logs older than %d days from database.",
count,
age_days,
)
return count
def get_failures(self, request, credentials: dict = None) -> int:
attempts = get_user_attempts(request, credentials)
return attempts.aggregate(Max('failures_since_start'))['failures_since_start__max'] or 0
return (
attempts.aggregate(Max("failures_since_start"))["failures_since_start__max"]
or 0
)
def is_locked(self, request, credentials: dict = None):
if is_user_attempt_whitelisted(request, credentials):
@ -66,11 +73,7 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
return super().is_locked(request, credentials)
def user_login_failed(
self,
sender,
credentials: dict,
request = None,
**kwargs
self, sender, credentials: dict, request=None, **kwargs
): # pylint: disable=too-many-locals
"""
When user login fails, save AccessAttempt record in database and lock user out if necessary.
@ -79,20 +82,27 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
"""
if request is None:
log.error('AXES: AxesDatabaseHandler.user_login_failed does not function without a request.')
log.error(
"AXES: AxesDatabaseHandler.user_login_failed does not function without a request."
)
return
# 1. database query: Clean up expired user attempts from the database before logging new attempts
clean_expired_user_attempts(request.axes_attempt_time)
username = get_client_username(request, credentials)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
client_str = get_client_str(
username,
request.axes_ip_address,
request.axes_user_agent,
request.axes_path_info,
)
get_data = get_query_str(request.GET)
post_data = get_query_str(request.POST)
if self.is_whitelisted(request, credentials):
log.info('AXES: Login failed from whitelisted client %s.', client_str)
log.info("AXES: Login failed from whitelisted client %s.", client_str)
return
# 2. database query: Calculate the current maximum failure number from the existing attempts
@ -105,18 +115,18 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
# in order to bypass the defense mechanisms that are used by the site.
log.warning(
'AXES: Repeated login failure by %s. Count = %d of %d. Updating existing record in the database.',
"AXES: Repeated login failure by %s. Count = %d of %d. Updating existing record in the database.",
client_str,
failures_since_start,
get_failure_limit(request, credentials),
)
separator = '\n---------\n'
separator = "\n---------\n"
attempts = get_user_attempts(request, credentials)
attempts.update(
get_data=Concat('get_data', Value(separator + get_data)),
post_data=Concat('post_data', Value(separator + post_data)),
get_data=Concat("get_data", Value(separator + get_data)),
post_data=Concat("post_data", Value(separator + post_data)),
http_accept=request.axes_http_accept,
path_info=request.axes_path_info,
failures_since_start=failures_since_start,
@ -128,7 +138,7 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
# and this handler just records the available information for further use.
log.warning(
'AXES: New login failure by %s. Creating new record in the database.',
"AXES: New login failure by %s. Creating new record in the database.",
client_str,
)
@ -144,19 +154,26 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
attempt_time=request.axes_attempt_time,
)
if settings.AXES_LOCK_OUT_AT_FAILURE and failures_since_start >= get_failure_limit(request, credentials):
log.warning('AXES: Locking out %s after repeated login failures.', client_str)
if (
settings.AXES_LOCK_OUT_AT_FAILURE
and failures_since_start >= get_failure_limit(request, credentials)
):
log.warning(
"AXES: Locking out %s after repeated login failures.", client_str
)
request.axes_locked_out = True
user_locked_out.send(
'axes',
"axes",
request=request,
username=username,
ip_address=request.axes_ip_address,
)
def user_logged_in(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
def user_logged_in(
self, sender, request, user, **kwargs
): # pylint: disable=unused-argument
"""
When user logs in, update the AccessLog related to the user.
"""
@ -166,9 +183,14 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
username = user.get_username()
credentials = get_credentials(username)
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
client_str = get_client_str(
username,
request.axes_ip_address,
request.axes_user_agent,
request.axes_path_info,
)
log.info('AXES: Successful login by %s.', client_str)
log.info("AXES: Successful login by %s.", client_str)
if not settings.AXES_DISABLE_ACCESS_LOG:
# 2. database query: Insert new access logs with login time
@ -184,9 +206,15 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
if settings.AXES_RESET_ON_SUCCESS:
# 3. database query: Reset failed attempts for the logging in user
count = reset_user_attempts(request, credentials)
log.info('AXES: Deleted %d failed login attempts by %s from database.', count, client_str)
log.info(
"AXES: Deleted %d failed login attempts by %s from database.",
count,
client_str,
)
def user_logged_out(self, sender, request, user, **kwargs): # pylint: disable=unused-argument
def user_logged_out(
self, sender, request, user, **kwargs
): # pylint: disable=unused-argument
"""
When user logs out, update the AccessLog related to the user.
"""
@ -195,15 +223,17 @@ class AxesDatabaseHandler(AxesHandler): # pylint: disable=too-many-locals
clean_expired_user_attempts(request.axes_attempt_time)
username = user.get_username() if user else None
client_str = get_client_str(username, request.axes_ip_address, request.axes_user_agent, request.axes_path_info)
client_str = get_client_str(
username,
request.axes_ip_address,
request.axes_user_agent,
request.axes_path_info,
)
log.info('AXES: Successful logout by %s.', client_str)
log.info("AXES: Successful logout by %s.", client_str)
if username and not settings.AXES_DISABLE_ACCESS_LOG:
# 2. database query: Update existing attempt logs with logout time
AccessLog.objects.filter(
username=username,
logout_time__isnull=True,
).update(
logout_time=request.axes_attempt_time,
)
username=username, logout_time__isnull=True
).update(logout_time=request.axes_attempt_time)

View file

@ -44,7 +44,9 @@ class AxesProxyHandler(AxesHandler):
@classmethod
def reset_attempts(cls, *, ip_address: str = None, username: str = None) -> int:
return cls.get_implementation().reset_attempts(ip_address=ip_address, username=username)
return cls.get_implementation().reset_attempts(
ip_address=ip_address, username=username
)
@classmethod
def reset_logs(cls, *, age_days: int = None) -> int:
@ -57,7 +59,9 @@ class AxesProxyHandler(AxesHandler):
"""
if request is None:
log.error('AXES: AxesProxyHandler.update_request can not set request attributes to a None request')
log.error(
"AXES: AxesProxyHandler.update_request can not set request attributes to a None request"
)
return
request.axes_locked_out = False
@ -81,7 +85,9 @@ class AxesProxyHandler(AxesHandler):
@toggleable
def user_login_failed(cls, sender, credentials: dict, request=None, **kwargs):
cls.update_request(request)
return cls.get_implementation().user_login_failed(sender, credentials, request, **kwargs)
return cls.get_implementation().user_login_failed(
sender, credentials, request, **kwargs
)
@classmethod
@toggleable

View file

@ -4,7 +4,13 @@ from logging import getLogger
from typing import Callable, Optional, Type, Union
from django.core.cache import caches, BaseCache
from django.http import HttpRequest, HttpResponse, HttpResponseRedirect, JsonResponse, QueryDict
from django.http import (
HttpRequest,
HttpResponse,
HttpResponseRedirect,
JsonResponse,
QueryDict,
)
from django.shortcuts import render
from django.utils.module_loading import import_string
@ -21,7 +27,7 @@ def get_cache() -> BaseCache:
Get the cache instance Axes is configured to use with ``settings.AXES_CACHE`` and use ``'default'`` if not set.
"""
return caches[getattr(settings, 'AXES_CACHE', 'default')]
return caches[getattr(settings, "AXES_CACHE", "default")]
def get_cache_timeout() -> Optional[int]:
@ -76,22 +82,17 @@ def get_cool_off_iso8601(delta: timedelta) -> str:
hours, minutes = divmod(minutes, 60)
days, hours = divmod(hours, 24)
days_str = f'{days:.0f}D' if days else ''
days_str = f"{days:.0f}D" if days else ""
time_str = ''.join(
f'{value:.0f}{designator}'
for value, designator
in [
[hours, 'H'],
[minutes, 'M'],
[seconds, 'S'],
]
time_str = "".join(
f"{value:.0f}{designator}"
for value, designator in [[hours, "H"], [minutes, "M"], [seconds, "S"]]
if value
)
if time_str:
return f'P{days_str}T{time_str}'
return f'P{days_str}'
return f"P{days_str}T{time_str}"
return f"P{days_str}"
def get_credentials(username: str = None, **kwargs) -> dict:
@ -122,19 +123,25 @@ def get_client_username(request, credentials: dict = None) -> str:
"""
if settings.AXES_USERNAME_CALLABLE:
log.debug('Using settings.AXES_USERNAME_CALLABLE to get username')
log.debug("Using settings.AXES_USERNAME_CALLABLE to get username")
if callable(settings.AXES_USERNAME_CALLABLE):
return settings.AXES_USERNAME_CALLABLE(request, credentials)
if isinstance(settings.AXES_USERNAME_CALLABLE, str):
return import_string(settings.AXES_USERNAME_CALLABLE)(request, credentials)
raise TypeError('settings.AXES_USERNAME_CALLABLE needs to be a string, callable, or None.')
raise TypeError(
"settings.AXES_USERNAME_CALLABLE needs to be a string, callable, or None."
)
if credentials:
log.debug('Using parameter credentials to get username with key settings.AXES_USERNAME_FORM_FIELD')
log.debug(
"Using parameter credentials to get username with key settings.AXES_USERNAME_FORM_FIELD"
)
return credentials.get(settings.AXES_USERNAME_FORM_FIELD, None)
log.debug('Using parameter request.POST to get username with key settings.AXES_USERNAME_FORM_FIELD')
log.debug(
"Using parameter request.POST to get username with key settings.AXES_USERNAME_FORM_FIELD"
)
return request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None)
@ -158,15 +165,15 @@ def get_client_ip_address(request) -> str:
def get_client_user_agent(request) -> str:
return request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
return request.META.get("HTTP_USER_AGENT", "<unknown>")[:255]
def get_client_path_info(request) -> str:
return request.META.get('PATH_INFO', '<unknown>')[:255]
return request.META.get("PATH_INFO", "<unknown>")[:255]
def get_client_http_accept(request) -> str:
return request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
return request.META.get("HTTP_ACCEPT", "<unknown>")[:1025]
def get_client_parameters(username: str, ip_address: str, user_agent: str) -> dict:
@ -181,24 +188,26 @@ def get_client_parameters(username: str, ip_address: str, user_agent: str) -> di
if settings.AXES_ONLY_USER_FAILURES:
# 1. Only individual usernames can be tracked with parametrization
filter_kwargs['username'] = username
filter_kwargs["username"] = username
else:
if settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
# 2. A combination of username and IP address can be used as well
filter_kwargs['username'] = username
filter_kwargs['ip_address'] = ip_address
filter_kwargs["username"] = username
filter_kwargs["ip_address"] = ip_address
else:
# 3. Default case is to track the IP address only, which is the most secure option
filter_kwargs['ip_address'] = ip_address
filter_kwargs["ip_address"] = ip_address
if settings.AXES_USE_USER_AGENT:
# 4. The HTTP User-Agent can be used to track e.g. one browser
filter_kwargs['user_agent'] = user_agent
filter_kwargs["user_agent"] = user_agent
return filter_kwargs
def get_client_str(username: str, ip_address: str, user_agent: str, path_info: str) -> str:
def get_client_str(
username: str, ip_address: str, user_agent: str, path_info: str
) -> str:
"""
Get a readable string that can be used in e.g. logging to distinguish client requests.
@ -209,9 +218,9 @@ def get_client_str(username: str, ip_address: str, user_agent: str, path_info: s
if settings.AXES_VERBOSE:
# Verbose mode logs every attribute that is available
client_dict['username'] = username
client_dict['ip_address'] = ip_address
client_dict['user_agent'] = user_agent
client_dict["username"] = username
client_dict["ip_address"] = ip_address
client_dict["user_agent"] = user_agent
else:
# Other modes initialize the attributes that are used for the actual lockouts
client_dict = get_client_parameters(username, ip_address, user_agent)
@ -219,19 +228,15 @@ def get_client_str(username: str, ip_address: str, user_agent: str, path_info: s
# Path info is always included as last component in the client string for traceability purposes
if path_info and isinstance(path_info, (tuple, list)):
path_info = path_info[0]
client_dict['path_info'] = path_info
client_dict["path_info"] = path_info
# Template the internal dictionary representation into a readable and concatenated key: "value" format
template = ', '.join(
f'{key}: "{value}"'
for key, value
in client_dict.items()
)
template = ", ".join(f'{key}: "{value}"' for key, value in client_dict.items())
# Wrap the internal dict into a single {key: "value"} bracing in the output
# which requires double braces when done with the Python string templating system
# i.e. {{key: "value"}} becomes {key: "value"} when run through a .format() call
template = '{{' + template + '}}'
template = "{{" + template + "}}"
return template.format(client_dict)
@ -246,14 +251,10 @@ def get_query_str(query: Type[QueryDict], max_length: int = 1024) -> str:
"""
query_dict = query.copy()
query_dict.pop('password', None)
query_dict.pop("password", None)
query_dict.pop(settings.AXES_PASSWORD_FORM_FIELD, None)
query_str = '\n'.join(
f'{key}={value}'
for key, value
in query_dict.items()
)
query_str = "\n".join(f"{key}={value}" for key, value in query_dict.items())
return query_str[:max_length]
@ -265,7 +266,7 @@ def get_failure_limit(request, credentials) -> int:
return import_string(settings.AXES_FAILURE_LIMIT)(request, credentials)
if isinstance(settings.AXES_FAILURE_LIMIT, int):
return settings.AXES_FAILURE_LIMIT
raise TypeError('settings.AXES_FAILURE_LIMIT needs to be a callable or an integer')
raise TypeError("settings.AXES_FAILURE_LIMIT needs to be a callable or an integer")
def get_lockout_message() -> str:
@ -277,39 +278,30 @@ def get_lockout_message() -> str:
def get_lockout_response(request, credentials: dict = None) -> HttpResponse:
status = 403
context = {
'failure_limit': get_failure_limit(request, credentials),
'username': get_client_username(request, credentials) or ''
"failure_limit": get_failure_limit(request, credentials),
"username": get_client_username(request, credentials) or "",
}
cool_off = get_cool_off()
if cool_off:
context.update({
'cooloff_time': get_cool_off_iso8601(cool_off), # differing old name is kept for backwards compatibility
})
context.update(
{
"cooloff_time": get_cool_off_iso8601(
cool_off
) # differing old name is kept for backwards compatibility
}
)
if request.is_ajax():
return JsonResponse(
context,
status=status,
)
return JsonResponse(context, status=status)
if settings.AXES_LOCKOUT_TEMPLATE:
return render(
request,
settings.AXES_LOCKOUT_TEMPLATE,
context,
status=status,
)
return render(request, settings.AXES_LOCKOUT_TEMPLATE, context, status=status)
if settings.AXES_LOCKOUT_URL:
return HttpResponseRedirect(
settings.AXES_LOCKOUT_URL,
)
return HttpResponseRedirect(settings.AXES_LOCKOUT_URL)
return HttpResponse(
get_lockout_message(),
status=status,
)
return HttpResponse(get_lockout_message(), status=status)
def is_ip_address_in_whitelist(ip_address: str) -> bool:
@ -331,10 +323,14 @@ def is_client_ip_address_whitelisted(request):
Check if the given request refers to a whitelisted IP.
"""
if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(request.axes_ip_address):
if settings.AXES_NEVER_LOCKOUT_WHITELIST and is_ip_address_in_whitelist(
request.axes_ip_address
):
return True
if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(request.axes_ip_address):
if settings.AXES_ONLY_WHITELIST and is_ip_address_in_whitelist(
request.axes_ip_address
):
return True
return False
@ -348,7 +344,9 @@ def is_client_ip_address_blacklisted(request) -> bool:
if is_ip_address_in_blacklist(request.axes_ip_address):
return True
if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(request.axes_ip_address):
if settings.AXES_ONLY_WHITELIST and not is_ip_address_in_whitelist(
request.axes_ip_address
):
return True
return False
@ -359,13 +357,15 @@ def is_client_method_whitelisted(request) -> bool:
Check if the given request uses a whitelisted method.
"""
if settings.AXES_NEVER_LOCKOUT_GET and request.method == 'GET':
if settings.AXES_NEVER_LOCKOUT_GET and request.method == "GET":
return True
return False
def get_client_cache_key(request_or_attempt: Union[HttpRequest, AccessBase], credentials: dict = None) -> str:
def get_client_cache_key(
request_or_attempt: Union[HttpRequest, AccessBase], credentials: dict = None
) -> str:
"""
Build cache key name from request or AccessAttempt object.
@ -385,9 +385,9 @@ def get_client_cache_key(request_or_attempt: Union[HttpRequest, AccessBase], cre
filter_kwargs = get_client_parameters(username, ip_address, user_agent)
cache_key_components = ''.join(value for value in filter_kwargs.values() if value)
cache_key_components = "".join(value for value in filter_kwargs.values() if value)
cache_key_digest = md5(cache_key_components.encode()).hexdigest()
cache_key = f'axes-{cache_key_digest}'
cache_key = f"axes-{cache_key_digest}"
return cache_key
@ -406,4 +406,5 @@ def toggleable(func) -> Callable:
def inner(*args, **kwargs): # pylint: disable=inconsistent-return-statements
if settings.AXES_ENABLED:
return func(*args, **kwargs)
return inner

View file

@ -4,8 +4,10 @@ from axes.models import AccessAttempt
class Command(BaseCommand):
help = 'List access attempts'
help = "List access attempts"
def handle(self, *args, **options): # pylint: disable=unused-argument
for obj in AccessAttempt.objects.all():
self.stdout.write(f'{obj.ip_address}\t{obj.username}\t{obj.failures_since_start}')
self.stdout.write(
f"{obj.ip_address}\t{obj.username}\t{obj.failures_since_start}"
)

View file

@ -4,12 +4,12 @@ from axes.utils import reset
class Command(BaseCommand):
help = 'Reset all access attempts and lockouts'
help = "Reset all access attempts and lockouts"
def handle(self, *args, **options): # pylint: disable=unused-argument
count = reset()
if count:
self.stdout.write(f'{count} attempts removed.')
self.stdout.write(f"{count} attempts removed.")
else:
self.stdout.write('No attempts found.')
self.stdout.write("No attempts found.")

View file

@ -4,18 +4,18 @@ from axes.utils import reset
class Command(BaseCommand):
help = 'Reset all access attempts and lockouts for given IP addresses'
help = "Reset all access attempts and lockouts for given IP addresses"
def add_arguments(self, parser):
parser.add_argument('ip', nargs='+', type=str)
parser.add_argument("ip", nargs="+", type=str)
def handle(self, *args, **options):
count = 0
for ip in options['ip']:
for ip in options["ip"]:
count += reset(ip=ip)
if count:
self.stdout.write(f'{count} attempts removed.')
self.stdout.write(f"{count} attempts removed.")
else:
self.stdout.write('No attempts found.')
self.stdout.write("No attempts found.")

View file

@ -4,19 +4,19 @@ from axes.handlers.proxy import AxesProxyHandler
class Command(BaseCommand):
help = 'Reset access log records older than given days.'
help = "Reset access log records older than given days."
def add_arguments(self, parser):
parser.add_argument(
'--age',
"--age",
type=int,
default=30,
help='Maximum age for records to keep in days',
help="Maximum age for records to keep in days",
)
def handle(self, *args, **options):
count = AxesProxyHandler.reset_logs(age_days=options['age'])
count = AxesProxyHandler.reset_logs(age_days=options["age"])
if count:
self.stdout.write(f'{count} logs removed.')
self.stdout.write(f"{count} logs removed.")
else:
self.stdout.write('No logs found.')
self.stdout.write("No logs found.")

View file

@ -4,18 +4,18 @@ from axes.utils import reset
class Command(BaseCommand):
help = 'Reset all access attempts and lockouts for given usernames'
help = "Reset all access attempts and lockouts for given usernames"
def add_arguments(self, parser):
parser.add_argument('username', nargs='+', type=str)
parser.add_argument("username", nargs="+", type=str)
def handle(self, *args, **options):
count = 0
for username in options['username']:
for username in options["username"]:
count += reset(username=username)
if count:
self.stdout.write(f'{count} attempts removed.')
self.stdout.write(f"{count} attempts removed.")
else:
self.stdout.write('No attempts found.')
self.stdout.write("No attempts found.")

View file

@ -29,7 +29,7 @@ class AxesMiddleware:
def __call__(self, request):
response = self.get_response(request)
if getattr(request, 'axes_locked_out', None):
if getattr(request, "axes_locked_out", None):
response = get_lockout_response(request) # type: ignore
return response

View file

@ -3,46 +3,70 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
]
dependencies = []
operations = [
migrations.CreateModel(
name='AccessAttempt',
name="AccessAttempt",
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('user_agent', models.CharField(max_length=255)),
('ip_address', models.GenericIPAddressField(null=True, verbose_name='IP Address')),
('username', models.CharField(max_length=255, null=True)),
('trusted', models.BooleanField(default=False)),
('http_accept', models.CharField(max_length=1025, verbose_name='HTTP Accept')),
('path_info', models.CharField(max_length=255, verbose_name='Path')),
('attempt_time', models.DateTimeField(auto_now_add=True)),
('get_data', models.TextField(verbose_name='GET Data')),
('post_data', models.TextField(verbose_name='POST Data')),
('failures_since_start', models.PositiveIntegerField(verbose_name='Failed Logins')),
(
"id",
models.AutoField(
verbose_name="ID",
serialize=False,
auto_created=True,
primary_key=True,
),
),
("user_agent", models.CharField(max_length=255)),
(
"ip_address",
models.GenericIPAddressField(null=True, verbose_name="IP Address"),
),
("username", models.CharField(max_length=255, null=True)),
("trusted", models.BooleanField(default=False)),
(
"http_accept",
models.CharField(max_length=1025, verbose_name="HTTP Accept"),
),
("path_info", models.CharField(max_length=255, verbose_name="Path")),
("attempt_time", models.DateTimeField(auto_now_add=True)),
("get_data", models.TextField(verbose_name="GET Data")),
("post_data", models.TextField(verbose_name="POST Data")),
(
"failures_since_start",
models.PositiveIntegerField(verbose_name="Failed Logins"),
),
],
options={
'ordering': ['-attempt_time'],
'abstract': False,
},
options={"ordering": ["-attempt_time"], "abstract": False},
),
migrations.CreateModel(
name='AccessLog',
name="AccessLog",
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('user_agent', models.CharField(max_length=255)),
('ip_address', models.GenericIPAddressField(null=True, verbose_name='IP Address')),
('username', models.CharField(max_length=255, null=True)),
('trusted', models.BooleanField(default=False)),
('http_accept', models.CharField(max_length=1025, verbose_name='HTTP Accept')),
('path_info', models.CharField(max_length=255, verbose_name='Path')),
('attempt_time', models.DateTimeField(auto_now_add=True)),
('logout_time', models.DateTimeField(null=True, blank=True)),
(
"id",
models.AutoField(
verbose_name="ID",
serialize=False,
auto_created=True,
primary_key=True,
),
),
("user_agent", models.CharField(max_length=255)),
(
"ip_address",
models.GenericIPAddressField(null=True, verbose_name="IP Address"),
),
("username", models.CharField(max_length=255, null=True)),
("trusted", models.BooleanField(default=False)),
(
"http_accept",
models.CharField(max_length=1025, verbose_name="HTTP Accept"),
),
("path_info", models.CharField(max_length=255, verbose_name="Path")),
("attempt_time", models.DateTimeField(auto_now_add=True)),
("logout_time", models.DateTimeField(null=True, blank=True)),
],
options={
'ordering': ['-attempt_time'],
'abstract': False,
},
options={"ordering": ["-attempt_time"], "abstract": False},
),
]

View file

@ -3,49 +3,51 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('axes', '0001_initial'),
]
dependencies = [("axes", "0001_initial")]
operations = [
migrations.AlterField(
model_name='accessattempt',
name='ip_address',
field=models.GenericIPAddressField(db_index=True, null=True, verbose_name='IP Address'),
model_name="accessattempt",
name="ip_address",
field=models.GenericIPAddressField(
db_index=True, null=True, verbose_name="IP Address"
),
),
migrations.AlterField(
model_name='accessattempt',
name='trusted',
model_name="accessattempt",
name="trusted",
field=models.BooleanField(db_index=True, default=False),
),
migrations.AlterField(
model_name='accessattempt',
name='user_agent',
model_name="accessattempt",
name="user_agent",
field=models.CharField(db_index=True, max_length=255),
),
migrations.AlterField(
model_name='accessattempt',
name='username',
model_name="accessattempt",
name="username",
field=models.CharField(db_index=True, max_length=255, null=True),
),
migrations.AlterField(
model_name='accesslog',
name='ip_address',
field=models.GenericIPAddressField(db_index=True, null=True, verbose_name='IP Address'),
model_name="accesslog",
name="ip_address",
field=models.GenericIPAddressField(
db_index=True, null=True, verbose_name="IP Address"
),
),
migrations.AlterField(
model_name='accesslog',
name='trusted',
model_name="accesslog",
name="trusted",
field=models.BooleanField(db_index=True, default=False),
),
migrations.AlterField(
model_name='accesslog',
name='user_agent',
model_name="accesslog",
name="user_agent",
field=models.CharField(db_index=True, max_length=255),
),
migrations.AlterField(
model_name='accesslog',
name='username',
model_name="accesslog",
name="username",
field=models.CharField(db_index=True, max_length=255, null=True),
),
]

View file

@ -3,54 +3,56 @@ from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('axes', '0002_auto_20151217_2044'),
]
dependencies = [("axes", "0002_auto_20151217_2044")]
operations = [
migrations.AlterField(
model_name='accessattempt',
name='failures_since_start',
field=models.PositiveIntegerField(verbose_name='Failed Logins'),
model_name="accessattempt",
name="failures_since_start",
field=models.PositiveIntegerField(verbose_name="Failed Logins"),
),
migrations.AlterField(
model_name='accessattempt',
name='get_data',
field=models.TextField(verbose_name='GET Data'),
model_name="accessattempt",
name="get_data",
field=models.TextField(verbose_name="GET Data"),
),
migrations.AlterField(
model_name='accessattempt',
name='http_accept',
field=models.CharField(verbose_name='HTTP Accept', max_length=1025),
model_name="accessattempt",
name="http_accept",
field=models.CharField(verbose_name="HTTP Accept", max_length=1025),
),
migrations.AlterField(
model_name='accessattempt',
name='ip_address',
field=models.GenericIPAddressField(null=True, verbose_name='IP Address', db_index=True),
model_name="accessattempt",
name="ip_address",
field=models.GenericIPAddressField(
null=True, verbose_name="IP Address", db_index=True
),
),
migrations.AlterField(
model_name='accessattempt',
name='path_info',
field=models.CharField(verbose_name='Path', max_length=255),
model_name="accessattempt",
name="path_info",
field=models.CharField(verbose_name="Path", max_length=255),
),
migrations.AlterField(
model_name='accessattempt',
name='post_data',
field=models.TextField(verbose_name='POST Data'),
model_name="accessattempt",
name="post_data",
field=models.TextField(verbose_name="POST Data"),
),
migrations.AlterField(
model_name='accesslog',
name='http_accept',
field=models.CharField(verbose_name='HTTP Accept', max_length=1025),
model_name="accesslog",
name="http_accept",
field=models.CharField(verbose_name="HTTP Accept", max_length=1025),
),
migrations.AlterField(
model_name='accesslog',
name='ip_address',
field=models.GenericIPAddressField(null=True, verbose_name='IP Address', db_index=True),
model_name="accesslog",
name="ip_address",
field=models.GenericIPAddressField(
null=True, verbose_name="IP Address", db_index=True
),
),
migrations.AlterField(
model_name='accesslog',
name='path_info',
field=models.CharField(verbose_name='Path', max_length=255),
model_name="accesslog",
name="path_info",
field=models.CharField(verbose_name="Path", max_length=255),
),
]

View file

@ -3,52 +3,66 @@ from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('axes', '0003_auto_20160322_0929'),
]
dependencies = [("axes", "0003_auto_20160322_0929")]
operations = [
migrations.AlterModelOptions(
name='accessattempt',
options={'verbose_name': 'access attempt', 'verbose_name_plural': 'access attempts'},
name="accessattempt",
options={
"verbose_name": "access attempt",
"verbose_name_plural": "access attempts",
},
),
migrations.AlterModelOptions(
name='accesslog',
options={'verbose_name': 'access log', 'verbose_name_plural': 'access logs'},
name="accesslog",
options={
"verbose_name": "access log",
"verbose_name_plural": "access logs",
},
),
migrations.AlterField(
model_name='accessattempt',
name='attempt_time',
field=models.DateTimeField(auto_now_add=True, verbose_name='Attempt Time'),
model_name="accessattempt",
name="attempt_time",
field=models.DateTimeField(auto_now_add=True, verbose_name="Attempt Time"),
),
migrations.AlterField(
model_name='accessattempt',
name='user_agent',
field=models.CharField(db_index=True, max_length=255, verbose_name='User Agent'),
model_name="accessattempt",
name="user_agent",
field=models.CharField(
db_index=True, max_length=255, verbose_name="User Agent"
),
),
migrations.AlterField(
model_name='accessattempt',
name='username',
field=models.CharField(db_index=True, max_length=255, null=True, verbose_name='Username'),
model_name="accessattempt",
name="username",
field=models.CharField(
db_index=True, max_length=255, null=True, verbose_name="Username"
),
),
migrations.AlterField(
model_name='accesslog',
name='attempt_time',
field=models.DateTimeField(auto_now_add=True, verbose_name='Attempt Time'),
model_name="accesslog",
name="attempt_time",
field=models.DateTimeField(auto_now_add=True, verbose_name="Attempt Time"),
),
migrations.AlterField(
model_name='accesslog',
name='logout_time',
field=models.DateTimeField(blank=True, null=True, verbose_name='Logout Time'),
model_name="accesslog",
name="logout_time",
field=models.DateTimeField(
blank=True, null=True, verbose_name="Logout Time"
),
),
migrations.AlterField(
model_name='accesslog',
name='user_agent',
field=models.CharField(db_index=True, max_length=255, verbose_name='User Agent'),
model_name="accesslog",
name="user_agent",
field=models.CharField(
db_index=True, max_length=255, verbose_name="User Agent"
),
),
migrations.AlterField(
model_name='accesslog',
name='username',
field=models.CharField(db_index=True, max_length=255, null=True, verbose_name='Username'),
model_name="accesslog",
name="username",
field=models.CharField(
db_index=True, max_length=255, null=True, verbose_name="Username"
),
),
]

View file

@ -3,13 +3,6 @@ from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('axes', '0004_auto_20181024_1538'),
]
dependencies = [("axes", "0004_auto_20181024_1538")]
operations = [
migrations.RemoveField(
model_name='accessattempt',
name='trusted',
),
]
operations = [migrations.RemoveField(model_name="accessattempt", name="trusted")]

View file

@ -5,13 +5,6 @@ from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('axes', '0005_remove_accessattempt_trusted'),
]
dependencies = [("axes", "0005_remove_accessattempt_trusted")]
operations = [
migrations.RemoveField(
model_name='accesslog',
name='trusted',
),
]
operations = [migrations.RemoveField(model_name="accesslog", name="trusted")]

View file

@ -3,77 +3,45 @@ from django.utils.translation import gettext_lazy as _
class AccessBase(models.Model):
user_agent = models.CharField(
_('User Agent'),
max_length=255,
db_index=True,
)
user_agent = models.CharField(_("User Agent"), max_length=255, db_index=True)
ip_address = models.GenericIPAddressField(
_('IP Address'),
null=True,
db_index=True,
)
ip_address = models.GenericIPAddressField(_("IP Address"), null=True, db_index=True)
username = models.CharField(
_('Username'),
max_length=255,
null=True,
db_index=True,
)
username = models.CharField(_("Username"), max_length=255, null=True, db_index=True)
http_accept = models.CharField(
_('HTTP Accept'),
max_length=1025,
)
http_accept = models.CharField(_("HTTP Accept"), max_length=1025)
path_info = models.CharField(
_('Path'),
max_length=255,
)
path_info = models.CharField(_("Path"), max_length=255)
attempt_time = models.DateTimeField(
_('Attempt Time'),
auto_now_add=True,
)
attempt_time = models.DateTimeField(_("Attempt Time"), auto_now_add=True)
class Meta:
app_label = 'axes'
app_label = "axes"
abstract = True
ordering = ['-attempt_time']
ordering = ["-attempt_time"]
class AccessAttempt(AccessBase):
get_data = models.TextField(
_('GET Data'),
)
get_data = models.TextField(_("GET Data"))
post_data = models.TextField(
_('POST Data'),
)
post_data = models.TextField(_("POST Data"))
failures_since_start = models.PositiveIntegerField(
_('Failed Logins'),
)
failures_since_start = models.PositiveIntegerField(_("Failed Logins"))
def __str__(self):
return f'Attempted Access: {self.attempt_time}'
return f"Attempted Access: {self.attempt_time}"
class Meta:
verbose_name = _('access attempt')
verbose_name_plural = _('access attempts')
verbose_name = _("access attempt")
verbose_name_plural = _("access attempts")
class AccessLog(AccessBase):
logout_time = models.DateTimeField(
_('Logout Time'),
null=True,
blank=True,
)
logout_time = models.DateTimeField(_("Logout Time"), null=True, blank=True)
def __str__(self):
return f'Access Log for {self.username} @ {self.attempt_time}'
return f"Access Log for {self.username} @ {self.attempt_time}"
class Meta:
verbose_name = _('access log')
verbose_name_plural = _('access logs')
verbose_name = _("access log")
verbose_name_plural = _("access logs")

View file

@ -1,6 +1,10 @@
from logging import getLogger
from django.contrib.auth.signals import user_logged_in, user_logged_out, user_login_failed
from django.contrib.auth.signals import (
user_logged_in,
user_logged_out,
user_login_failed,
)
from django.core.signals import setting_changed
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
@ -13,7 +17,7 @@ from axes.handlers.proxy import AxesProxyHandler
log = getLogger(settings.AXES_LOGGER)
user_locked_out = Signal(providing_args=['request', 'username', 'ip_address'])
user_locked_out = Signal(providing_args=["request", "username", "ip_address"])
@receiver(user_login_failed)
@ -42,11 +46,13 @@ def handle_post_delete_access_attempt(*args, **kwargs):
@receiver(setting_changed)
def handle_setting_changed(sender, setting, value, enter, **kwargs): # pylint: disable=unused-argument
def handle_setting_changed(
sender, setting, value, enter, **kwargs
): # pylint: disable=unused-argument
"""
Reinitialize handler implementation if a relevant setting changes
in e.g. application reconfiguration or during testing.
"""
if setting == 'AXES_HANDLER':
if setting == "AXES_HANDLER":
AxesProxyHandler.get_implementation(force=True)

View file

@ -32,18 +32,18 @@ class AxesTestCase(TestCase):
Test case using custom settings for testing.
"""
VALID_USERNAME = 'axes-valid-username'
VALID_PASSWORD = 'axes-valid-password'
VALID_EMAIL = 'axes-valid-email@example.com'
VALID_USER_AGENT = 'axes-user-agent'
VALID_IP_ADDRESS = '127.0.0.1'
VALID_USERNAME = "axes-valid-username"
VALID_PASSWORD = "axes-valid-password"
VALID_EMAIL = "axes-valid-email@example.com"
VALID_USER_AGENT = "axes-user-agent"
VALID_IP_ADDRESS = "127.0.0.1"
INVALID_USERNAME = 'axes-invalid-username'
INVALID_PASSWORD = 'axes-invalid-password'
INVALID_EMAIL = 'axes-invalid-email@example.com'
INVALID_USERNAME = "axes-invalid-username"
INVALID_PASSWORD = "axes-invalid-password"
INVALID_EMAIL = "axes-invalid-email@example.com"
LOCKED_MESSAGE = 'Account locked: too many login attempts.'
LOGOUT_MESSAGE = 'Logged out'
LOCKED_MESSAGE = "Account locked: too many login attempts."
LOGOUT_MESSAGE = "Logged out"
LOGIN_FORM_KEY = '<input type="submit" value="Log in" />'
STATUS_SUCCESS = 200
@ -61,19 +61,17 @@ class AxesTestCase(TestCase):
self.ip_address = self.VALID_IP_ADDRESS
self.user_agent = self.VALID_USER_AGENT
self.path_info = reverse('admin:login')
self.path_info = reverse("admin:login")
self.user = get_user_model().objects.create_superuser(
username=self.username,
password=self.password,
email=self.email,
username=self.username, password=self.password, email=self.email
)
self.request = HttpRequest()
self.request.method = 'POST'
self.request.META['REMOTE_ADDR'] = self.ip_address
self.request.META['HTTP_USER_AGENT'] = self.user_agent
self.request.META['PATH_INFO'] = self.path_info
self.request.method = "POST"
self.request.META["REMOTE_ADDR"] = self.ip_address
self.request.META["HTTP_USER_AGENT"] = self.user_agent
self.request.META["PATH_INFO"] = self.path_info
self.request.axes_attempt_time = now()
self.request.axes_ip_address = get_client_ip_address(self.request)
@ -88,9 +86,9 @@ class AxesTestCase(TestCase):
def get_kwargs_with_defaults(self, **kwargs):
defaults = {
'user_agent': self.user_agent,
'ip_address': self.ip_address,
'username': self.username,
"user_agent": self.user_agent,
"ip_address": self.ip_address,
"username": self.username,
}
defaults.update(kwargs)
@ -98,7 +96,7 @@ class AxesTestCase(TestCase):
def create_attempt(self, **kwargs):
kwargs = self.get_kwargs_with_defaults(**kwargs)
kwargs.setdefault('failures_since_start', 1)
kwargs.setdefault("failures_since_start", 1)
return AccessAttempt.objects.create(**kwargs)
def create_log(self, **kwargs):
@ -118,24 +116,17 @@ class AxesTestCase(TestCase):
if is_valid_username:
username = self.VALID_USERNAME
else:
username = ''.join(
choice(ascii_letters + digits)
for _ in range(10)
)
username = "".join(choice(ascii_letters + digits) for _ in range(10))
if is_valid_password:
password = self.VALID_PASSWORD
else:
password = self.INVALID_PASSWORD
post_data = {
'username': username,
'password': password,
**kwargs
}
post_data = {"username": username, "password": password, **kwargs}
return self.client.post(
reverse('admin:login'),
reverse("admin:login"),
post_data,
REMOTE_ADDR=self.ip_address,
HTTP_USER_AGENT=self.user_agent,
@ -143,14 +134,16 @@ class AxesTestCase(TestCase):
def logout(self):
return self.client.post(
reverse('admin:logout'),
reverse("admin:logout"),
REMOTE_ADDR=self.ip_address,
HTTP_USER_AGENT=self.user_agent,
)
def check_login(self):
response = self.login(is_valid_username=True, is_valid_password=True)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True)
self.assertNotContains(
response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
)
def almost_lockout(self):
for _ in range(1, get_failure_limit(None, None)):
@ -166,14 +159,18 @@ class AxesTestCase(TestCase):
if settings.AXES_LOCK_OUT_AT_FAILURE == True:
self.assertContains(response, self.LOCKED_MESSAGE, status_code=self.BLOCKED)
else:
self.assertNotContains(response, self.LOCKED_MESSAGE, status_code=self.STATUS_SUCCESS)
self.assertNotContains(
response, self.LOCKED_MESSAGE, status_code=self.STATUS_SUCCESS
)
def cool_off(self):
sleep(get_cool_off().total_seconds())
def check_logout(self):
response = self.logout()
self.assertContains(response, self.LOGOUT_MESSAGE, status_code=self.STATUS_SUCCESS)
self.assertContains(
response, self.LOGOUT_MESSAGE, status_code=self.STATUS_SUCCESS
)
def check_handler(self):
"""
@ -186,4 +183,3 @@ class AxesTestCase(TestCase):
self.cool_off()
self.check_login()
self.check_logout()

View file

@ -1,87 +1,67 @@
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': ':memory:',
}
}
DATABASES = {"default": {"ENGINE": "django.db.backends.sqlite3", "NAME": ":memory:"}}
CACHES = {
'default': {
"default": {
# This cache backend is OK to use in development and testing
# but has the potential to break production setups with more than on process
# due to each process having their own local memory based cache
'BACKEND': 'django.core.cache.backends.locmem.LocMemCache',
"BACKEND": "django.core.cache.backends.locmem.LocMemCache"
}
}
SITE_ID = 1
MIDDLEWARE = [
'django.middleware.common.CommonMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'axes.middleware.AxesMiddleware',
"django.middleware.common.CommonMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"axes.middleware.AxesMiddleware",
]
AUTHENTICATION_BACKENDS = [
'axes.backends.AxesBackend',
'django.contrib.auth.backends.ModelBackend',
"axes.backends.AxesBackend",
"django.contrib.auth.backends.ModelBackend",
]
PASSWORD_HASHERS = [
'django.contrib.auth.hashers.MD5PasswordHasher',
]
PASSWORD_HASHERS = ["django.contrib.auth.hashers.MD5PasswordHasher"]
ROOT_URLCONF = 'axes.tests.urls'
ROOT_URLCONF = "axes.tests.urls"
INSTALLED_APPS = [
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.sites',
'django.contrib.messages',
'django.contrib.admin',
'axes',
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.sites",
"django.contrib.messages",
"django.contrib.admin",
"axes",
]
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
]
},
},
}
]
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
},
},
'loggers': {
'axes': {
'handlers': ['console'],
'level': 'INFO',
'propagate': False,
},
},
"version": 1,
"disable_existing_loggers": False,
"handlers": {"console": {"class": "logging.StreamHandler"}},
"loggers": {"axes": {"handlers": ["console"], "level": "INFO", "propagate": False}},
}
SECRET_KEY = 'too-secret-for-test'
SECRET_KEY = "too-secret-for-test"
USE_I18N = False
@ -89,6 +69,6 @@ USE_L10N = False
USE_TZ = False
LOGIN_REDIRECT_URL = '/admin/'
LOGIN_REDIRECT_URL = "/admin/"
AXES_FAILURE_LIMIT = 10

View file

@ -16,7 +16,7 @@ class GetCoolOffThresholdTestCase(AxesTestCase):
def test_get_cool_off_threshold(self):
timestamp = now()
with patch('axes.attempts.now', return_value=timestamp):
with patch("axes.attempts.now", return_value=timestamp):
attempt_time = timestamp
threshold_now = get_cool_off_threshold(attempt_time)
@ -51,24 +51,28 @@ class ResetTestCase(AxesTestCase):
class UserWhitelistTestCase(AxesTestCase):
def setUp(self):
self.user_model = get_user_model()
self.user = self.user_model.objects.create(username='jane.doe')
self.user = self.user_model.objects.create(username="jane.doe")
self.request = HttpRequest()
def test_is_client_username_whitelisted(self):
with patch.object(self.user_model, 'nolockout', True, create=True):
self.assertTrue(is_user_attempt_whitelisted(
self.request,
{self.user_model.USERNAME_FIELD: self.user.username},
))
with patch.object(self.user_model, "nolockout", True, create=True):
self.assertTrue(
is_user_attempt_whitelisted(
self.request, {self.user_model.USERNAME_FIELD: self.user.username}
)
)
def test_is_client_username_whitelisted_not(self):
self.assertFalse(is_user_attempt_whitelisted(
self.request,
{self.user_model.USERNAME_FIELD: self.user.username},
))
self.assertFalse(
is_user_attempt_whitelisted(
self.request, {self.user_model.USERNAME_FIELD: self.user.username}
)
)
def test_is_client_username_whitelisted_does_not_exist(self):
self.assertFalse(is_user_attempt_whitelisted(
self.request,
{self.user_model.USERNAME_FIELD: 'not.' + self.user.username},
))
self.assertFalse(
is_user_attempt_whitelisted(
self.request,
{self.user_model.USERNAME_FIELD: "not." + self.user.username},
)
)

View file

@ -1,7 +1,10 @@
from unittest.mock import patch, MagicMock
from axes.backends import AxesBackend
from axes.exceptions import AxesBackendRequestParameterRequired, AxesBackendPermissionDenied
from axes.exceptions import (
AxesBackendRequestParameterRequired,
AxesBackendPermissionDenied,
)
from axes.tests.base import AxesTestCase
@ -12,7 +15,7 @@ class BackendTestCase(AxesTestCase):
with self.assertRaises(AxesBackendRequestParameterRequired):
AxesBackend().authenticate(request)
@patch('axes.handlers.proxy.AxesProxyHandler.is_allowed', return_value=False)
@patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False)
def test_authenticate_raises_on_locked_request(self, _):
request = MagicMock()

View file

@ -8,32 +8,37 @@ from axes.tests.base import AxesTestCase
class CacheCheckTestCase(AxesTestCase):
@override_settings(
AXES_HANDLER='axes.handlers.cache.AxesCacheHandler',
CACHES={'default': {'BACKEND': 'django.core.cache.backends.db.DatabaseCache', 'LOCATION': 'axes_cache'}},
AXES_HANDLER="axes.handlers.cache.AxesCacheHandler",
CACHES={
"default": {
"BACKEND": "django.core.cache.backends.db.DatabaseCache",
"LOCATION": "axes_cache",
}
},
)
def test_cache_check(self):
warnings = run_checks()
self.assertEqual(warnings, [])
@override_settings(
AXES_HANDLER='axes.handlers.cache.AxesCacheHandler',
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
AXES_HANDLER="axes.handlers.cache.AxesCacheHandler",
CACHES={
"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}
},
)
def test_cache_check_warnings(self):
warnings = run_checks()
warning = Warning(
msg=Messages.CACHE_INVALID,
hint=Hints.CACHE_INVALID,
id=Codes.CACHE_INVALID,
msg=Messages.CACHE_INVALID, hint=Hints.CACHE_INVALID, id=Codes.CACHE_INVALID
)
self.assertEqual(warnings, [
warning,
])
self.assertEqual(warnings, [warning])
@override_settings(
AXES_HANDLER='axes.handlers.database.AxesDatabaseHandler',
CACHES={'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'}},
AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler",
CACHES={
"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"}
},
)
def test_cache_check_does_not_produce_check_warnings_with_database_handler(self):
warnings = run_checks()
@ -41,11 +46,7 @@ class CacheCheckTestCase(AxesTestCase):
class MiddlewareCheckTestCase(AxesTestCase):
@modify_settings(
MIDDLEWARE={
'remove': ['axes.middleware.AxesMiddleware']
},
)
@modify_settings(MIDDLEWARE={"remove": ["axes.middleware.AxesMiddleware"]})
def test_cache_check_warnings(self):
warnings = run_checks()
warning = Warning(
@ -54,9 +55,7 @@ class MiddlewareCheckTestCase(AxesTestCase):
id=Codes.MIDDLEWARE_INVALID,
)
self.assertEqual(warnings, [
warning,
])
self.assertEqual(warnings, [warning])
class AxesSpecializedBackend(AxesBackend):
@ -64,11 +63,7 @@ class AxesSpecializedBackend(AxesBackend):
class BackendCheckTestCase(AxesTestCase):
@modify_settings(
AUTHENTICATION_BACKENDS={
'remove': ['axes.backends.AxesBackend']
},
)
@modify_settings(AUTHENTICATION_BACKENDS={"remove": ["axes.backends.AxesBackend"]})
def test_backend_missing(self):
warnings = run_checks()
warning = Warning(
@ -77,27 +72,23 @@ class BackendCheckTestCase(AxesTestCase):
id=Codes.BACKEND_INVALID,
)
self.assertEqual(warnings, [
warning,
])
self.assertEqual(warnings, [warning])
@override_settings(
AUTHENTICATION_BACKENDS=['axes.tests.test_checks.AxesSpecializedBackend']
AUTHENTICATION_BACKENDS=["axes.tests.test_checks.AxesSpecializedBackend"]
)
def test_specialized_backend(self):
warnings = run_checks()
self.assertEqual(warnings, [])
@override_settings(
AUTHENTICATION_BACKENDS=['axes.tests.test_checks.AxesNotDefinedBackend']
AUTHENTICATION_BACKENDS=["axes.tests.test_checks.AxesNotDefinedBackend"]
)
def test_import_error(self):
with self.assertRaises(ImportError):
run_checks()
@override_settings(
AUTHENTICATION_BACKENDS=['module.not_defined']
)
@override_settings(AUTHENTICATION_BACKENDS=["module.not_defined"])
def test_module_not_found_error(self):
with self.assertRaises(ModuleNotFoundError):
run_checks()
@ -106,16 +97,14 @@ class BackendCheckTestCase(AxesTestCase):
class DeprecatedSettingsTestCase(AxesTestCase):
def setUp(self):
self.disable_success_access_log_warning = Warning(
msg=Messages.SETTING_DEPRECATED.format(deprecated_setting='AXES_DISABLE_SUCCESS_ACCESS_LOG'),
msg=Messages.SETTING_DEPRECATED.format(
deprecated_setting="AXES_DISABLE_SUCCESS_ACCESS_LOG"
),
hint=Hints.SETTING_DEPRECATED,
id=Codes.SETTING_DEPRECATED,
)
@override_settings(
AXES_DISABLE_SUCCESS_ACCESS_LOG=True,
)
@override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True)
def test_deprecated_success_access_log_flag(self):
warnings = run_checks()
self.assertEqual(warnings, [
self.disable_success_access_log_warning,
])
self.assertEqual(warnings, [self.disable_success_access_log_warning])

View file

@ -7,34 +7,34 @@ from axes.tests.base import AxesTestCase
class DecoratorTestCase(AxesTestCase):
SUCCESS_RESPONSE = HttpResponse(status=200, content='Dispatched')
LOCKOUT_RESPONSE = HttpResponse(status=403, content='Locked out')
SUCCESS_RESPONSE = HttpResponse(status=200, content="Dispatched")
LOCKOUT_RESPONSE = HttpResponse(status=403, content="Locked out")
def setUp(self):
self.request = MagicMock()
self.cls = MagicMock(return_value=self.request)
self.func = MagicMock(return_value=self.SUCCESS_RESPONSE)
@patch('axes.handlers.proxy.AxesProxyHandler.is_allowed', return_value=False)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
@patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False)
@patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
def test_axes_dispatch_locks_out(self, _, __):
response = axes_dispatch(self.func)(self.request)
self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content)
@patch('axes.handlers.proxy.AxesProxyHandler.is_allowed', return_value=True)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
@patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=True)
@patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
def test_axes_dispatch_dispatches(self, _, __):
response = axes_dispatch(self.func)(self.request)
self.assertEqual(response.content, self.SUCCESS_RESPONSE.content)
@patch('axes.handlers.proxy.AxesProxyHandler.is_allowed', return_value=False)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
@patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=False)
@patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
def test_axes_form_invalid_locks_out(self, _, __):
response = axes_form_invalid(self.func)(self.cls)
self.assertEqual(response.content, self.LOCKOUT_RESPONSE.content)
@patch('axes.handlers.proxy.AxesProxyHandler.is_allowed', return_value=True)
@patch('axes.decorators.get_lockout_response', return_value=LOCKOUT_RESPONSE)
@patch("axes.handlers.proxy.AxesProxyHandler.is_allowed", return_value=True)
@patch("axes.decorators.get_lockout_response", return_value=LOCKOUT_RESPONSE)
def test_axes_form_invalid_dispatches(self, _, __):
response = axes_form_invalid(self.func)(self.cls)
self.assertEqual(response.content, self.SUCCESS_RESPONSE.content)

View file

@ -12,7 +12,7 @@ from axes.models import AccessAttempt, AccessLog
from axes.tests.base import AxesTestCase
@override_settings(AXES_HANDLER='axes.handlers.base.AxesHandler')
@override_settings(AXES_HANDLER="axes.handlers.base.AxesHandler")
class AxesHandlerTestCase(AxesTestCase):
def test_base_handler_reset_attempts_raises(self):
with self.assertRaises(NotImplementedError):
@ -26,20 +26,19 @@ class AxesHandlerTestCase(AxesTestCase):
with self.assertRaises(NotImplementedError):
AxesProxyHandler.is_allowed(self.request, {})
@override_settings(AXES_IP_BLACKLIST=['127.0.0.1'])
@override_settings(AXES_IP_BLACKLIST=["127.0.0.1"])
def test_is_allowed_with_blacklisted_ip_address(self):
self.assertFalse(AxesProxyHandler.is_allowed(self.request))
@override_settings(
AXES_NEVER_LOCKOUT_WHITELIST=True,
AXES_IP_WHITELIST=['127.0.0.1'],
AXES_NEVER_LOCKOUT_WHITELIST=True, AXES_IP_WHITELIST=["127.0.0.1"]
)
def test_is_allowed_with_whitelisted_ip_address(self):
self.assertTrue(AxesProxyHandler.is_allowed(self.request))
@override_settings(AXES_NEVER_LOCKOUT_GET=True)
def test_is_allowed_with_whitelisted_method(self):
self.request.method = 'GET'
self.request.method = "GET"
self.assertTrue(AxesProxyHandler.is_allowed(self.request))
@override_settings(AXES_LOCK_OUT_AT_FAILURE=False)
@ -49,7 +48,7 @@ class AxesHandlerTestCase(AxesTestCase):
@override_settings(AXES_ONLY_ADMIN_SITE=True)
def test_only_admin_site(self):
request = MagicMock()
request.path = '/test/'
request.path = "/test/"
self.assertTrue(AxesProxyHandler.is_allowed(self.request))
@ -61,38 +60,38 @@ class AxesProxyHandlerTestCase(AxesTestCase):
self.user = MagicMock()
self.instance = MagicMock()
@patch('axes.handlers.proxy.AxesProxyHandler.implementation', None)
@patch("axes.handlers.proxy.AxesProxyHandler.implementation", None)
def test_setting_changed_signal_triggers_handler_reimport(self):
self.assertIsNone(AxesProxyHandler.implementation)
self.assertIsNone(AxesProxyHandler.implementation)
with self.settings(AXES_HANDLER='axes.handlers.database.AxesDatabaseHandler'):
self.assertIsNotNone(AxesProxyHandler.implementation)
with self.settings(AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler"):
self.assertIsNotNone(AxesProxyHandler.implementation)
@patch('axes.handlers.proxy.AxesProxyHandler.implementation')
@patch("axes.handlers.proxy.AxesProxyHandler.implementation")
def test_user_login_failed(self, handler):
self.assertFalse(handler.user_login_failed.called)
AxesProxyHandler.user_login_failed(self.sender, self.credentials, self.request)
self.assertTrue(handler.user_login_failed.called)
@patch('axes.handlers.proxy.AxesProxyHandler.implementation')
@patch("axes.handlers.proxy.AxesProxyHandler.implementation")
def test_user_logged_in(self, handler):
self.assertFalse(handler.user_logged_in.called)
AxesProxyHandler.user_logged_in(self.sender, self.request, self.user)
self.assertTrue(handler.user_logged_in.called)
@patch('axes.handlers.proxy.AxesProxyHandler.implementation')
@patch("axes.handlers.proxy.AxesProxyHandler.implementation")
def test_user_logged_out(self, handler):
self.assertFalse(handler.user_logged_out.called)
AxesProxyHandler.user_logged_out(self.sender, self.request, self.user)
self.assertTrue(handler.user_logged_out.called)
@patch('axes.handlers.proxy.AxesProxyHandler.implementation')
@patch("axes.handlers.proxy.AxesProxyHandler.implementation")
def test_post_save_access_attempt(self, handler):
self.assertFalse(handler.post_save_access_attempt.called)
AxesProxyHandler.post_save_access_attempt(self.instance)
self.assertTrue(handler.post_save_access_attempt.called)
@patch('axes.handlers.proxy.AxesProxyHandler.implementation')
@patch("axes.handlers.proxy.AxesProxyHandler.implementation")
def test_post_delete_access_attempt(self, handler):
self.assertFalse(handler.post_delete_access_attempt.called)
AxesProxyHandler.post_delete_access_attempt(self.instance)
@ -102,24 +101,31 @@ class AxesProxyHandlerTestCase(AxesTestCase):
class AxesHandlerBaseTestCase(AxesTestCase):
def check_whitelist(self, log):
with override_settings(
AXES_NEVER_LOCKOUT_WHITELIST=True,
AXES_IP_WHITELIST=[self.ip_address],
AXES_NEVER_LOCKOUT_WHITELIST=True, AXES_IP_WHITELIST=[self.ip_address]
):
AxesProxyHandler.user_login_failed(sender=None, request=self.request, credentials=self.credentials)
client_str = get_client_str(self.username, self.ip_address, self.user_agent, self.path_info)
log.info.assert_called_with('AXES: Login failed from whitelisted client %s.', client_str)
AxesProxyHandler.user_login_failed(
sender=None, request=self.request, credentials=self.credentials
)
client_str = get_client_str(
self.username, self.ip_address, self.user_agent, self.path_info
)
log.info.assert_called_with(
"AXES: Login failed from whitelisted client %s.", client_str
)
def check_empty_request(self, log, handler):
AxesProxyHandler.user_login_failed(sender=None, credentials={}, request=None)
log.error.assert_called_with(f'AXES: {handler}.user_login_failed does not function without a request.')
log.error.assert_called_with(
f"AXES: {handler}.user_login_failed does not function without a request."
)
def test_is_admin_site(self):
request = MagicMock()
tests = ( # (AXES_ONLY_ADMIN_SITE, URL, Expected)
(True, '/test/', True),
(True, reverse('admin:index'), False),
(False, '/test/', False),
(False, reverse('admin:index'), False),
(True, "/test/", True),
(True, reverse("admin:index"), False),
(False, "/test/", False),
(False, reverse("admin:index"), False),
)
for setting_value, url, expected in tests:
@ -129,7 +135,7 @@ class AxesHandlerBaseTestCase(AxesTestCase):
@override_settings(
AXES_HANDLER='axes.handlers.database.AxesDatabaseHandler',
AXES_HANDLER="axes.handlers.database.AxesDatabaseHandler",
AXES_COOLOFF_TIME=timedelta(seconds=1),
AXES_RESET_ON_SUCCESS=True,
)
@ -148,7 +154,7 @@ class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase):
self.create_log()
then = timezone.now() - timezone.timedelta(days=90)
with patch('django.utils.timezone.now', return_value=then):
with patch("django.utils.timezone.now", return_value=then):
self.create_log()
self.assertEqual(AccessLog.objects.count(), 2)
@ -167,7 +173,7 @@ class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase):
def test_handler_callable_failure_limit(self):
self.check_handler()
@override_settings(AXES_FAILURE_LIMIT='axes.tests.base.custom_failure_limit')
@override_settings(AXES_FAILURE_LIMIT="axes.tests.base.custom_failure_limit")
def test_handler_str_failure_limit(self):
self.check_handler()
@ -180,22 +186,22 @@ class AxesDatabaseHandlerTestCase(AxesHandlerBaseTestCase):
def test_handler_without_lockout(self):
self.check_handler()
@patch('axes.handlers.database.log')
@patch("axes.handlers.database.log")
def test_empty_request(self, log):
self.check_empty_request(log, 'AxesDatabaseHandler')
self.check_empty_request(log, "AxesDatabaseHandler")
@patch('axes.handlers.database.log')
@patch("axes.handlers.database.log")
def test_whitelist(self, log):
self.check_whitelist(log)
@patch('axes.handlers.database.is_user_attempt_whitelisted', return_value=True)
@patch("axes.handlers.database.is_user_attempt_whitelisted", return_value=True)
def test_user_whitelisted(self, is_whitelisted):
self.assertFalse(AxesProxyHandler().is_locked(self.request, self.credentials))
self.assertEqual(1, is_whitelisted.call_count)
@override_settings(
AXES_HANDLER='axes.handlers.cache.AxesCacheHandler',
AXES_HANDLER="axes.handlers.cache.AxesCacheHandler",
AXES_COOLOFF_TIME=timedelta(seconds=1),
)
class AxesCacheHandlerTestCase(AxesHandlerBaseTestCase):
@ -211,18 +217,16 @@ class AxesCacheHandlerTestCase(AxesHandlerBaseTestCase):
def test_handler_without_lockout(self):
self.check_handler()
@patch('axes.handlers.cache.log')
@patch("axes.handlers.cache.log")
def test_empty_request(self, log):
self.check_empty_request(log, 'AxesCacheHandler')
self.check_empty_request(log, "AxesCacheHandler")
@patch('axes.handlers.cache.log')
@patch("axes.handlers.cache.log")
def test_whitelist(self, log):
self.check_whitelist(log)
@override_settings(
AXES_HANDLER='axes.handlers.dummy.AxesDummyHandler',
)
@override_settings(AXES_HANDLER="axes.handlers.dummy.AxesDummyHandler")
class AxesDummyHandlerTestCase(AxesHandlerBaseTestCase):
def test_handler(self):
for _ in range(settings.AXES_FAILURE_LIMIT):

View file

@ -24,6 +24,6 @@ class AxesHelpersTestCase(AxesTestCase):
def test_get_cool_off_callable(self):
self.assertEqual(get_cool_off(), timedelta(seconds=30))
@override_settings(AXES_COOLOFF_TIME='axes.tests.test_helpers.get_cool_off_str')
@override_settings(AXES_COOLOFF_TIME="axes.tests.test_helpers.get_cool_off_str")
def test_get_cool_off_str(self):
self.assertEqual(get_cool_off(), timedelta(seconds=30))

View file

@ -8,8 +8,8 @@ from axes.models import AccessAttempt, AccessLog
from axes.tests.base import AxesTestCase
@patch('axes.apps.AppConfig.logging_initialized', False)
@patch('axes.apps.log')
@patch("axes.apps.AppConfig.logging_initialized", False)
@patch("axes.apps.log")
class AppsTestCase(AxesTestCase):
def test_axes_config_log_re_entrant(self, log):
"""
@ -22,7 +22,7 @@ class AppsTestCase(AxesTestCase):
AppConfig.initialize()
self.assertTrue(
calls == log.info.call_count and calls > 0,
'AxesConfig.initialize needs to be re-entrant',
"AxesConfig.initialize needs to be re-entrant",
)
@override_settings(AXES_VERBOSE=False)
@ -33,17 +33,17 @@ class AppsTestCase(AxesTestCase):
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_axes_config_log_user_only(self, log):
AppConfig.initialize()
log.info.assert_called_with('AXES: blocking by username only.')
log.info.assert_called_with("AXES: blocking by username only.")
@override_settings(AXES_ONLY_USER_FAILURES=False)
def test_axes_config_log_ip_only(self, log):
AppConfig.initialize()
log.info.assert_called_with('AXES: blocking by IP only.')
log.info.assert_called_with("AXES: blocking by IP only.")
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_axes_config_log_user_ip(self, log):
AppConfig.initialize()
log.info.assert_called_with('AXES: blocking by combination of username and IP.')
log.info.assert_called_with("AXES: blocking by combination of username and IP.")
class AccessLogTestCase(AxesTestCase):
@ -53,12 +53,12 @@ class AccessLogTestCase(AxesTestCase):
"""
self.login(is_valid_username=True, is_valid_password=True)
self.assertIsNone(AccessLog.objects.latest('id').logout_time)
self.assertIsNone(AccessLog.objects.latest("id").logout_time)
response = self.client.get(reverse('admin:logout'))
self.assertContains(response, 'Logged out')
response = self.client.get(reverse("admin:logout"))
self.assertContains(response, "Logged out")
self.assertIsNotNone(AccessLog.objects.latest('id').logout_time)
self.assertIsNotNone(AccessLog.objects.latest("id").logout_time)
def test_log_data_truncated(self):
"""
@ -66,21 +66,19 @@ class AccessLogTestCase(AxesTestCase):
"""
# An impossibly large post dict
extra_data = {'a' * x: x for x in range(1024)}
extra_data = {"a" * x: x for x in range(1024)}
self.login(**extra_data)
self.assertEqual(
len(AccessAttempt.objects.latest('id').post_data), 1024
)
self.assertEqual(len(AccessAttempt.objects.latest("id").post_data), 1024)
@override_settings(AXES_DISABLE_ACCESS_LOG=True)
def test_valid_logout_without_success_log(self):
AccessLog.objects.all().delete()
response = self.login(is_valid_username=True, is_valid_password=True)
response = self.client.get(reverse('admin:logout'))
response = self.client.get(reverse("admin:logout"))
self.assertEqual(AccessLog.objects.all().count(), 0)
self.assertContains(response, 'Logged out', html=True)
self.assertContains(response, "Logged out", html=True)
@override_settings(AXES_DISABLE_ACCESS_LOG=True)
def test_valid_login_without_success_log(self):
@ -100,10 +98,10 @@ class AccessLogTestCase(AxesTestCase):
AccessLog.objects.all().delete()
response = self.login(is_valid_username=True, is_valid_password=True)
response = self.client.get(reverse('admin:logout'))
response = self.client.get(reverse("admin:logout"))
self.assertEqual(AccessLog.objects.count(), 0)
self.assertContains(response, 'Logged out', html=True)
self.assertContains(response, "Logged out", html=True)
@override_settings(AXES_DISABLE_ACCESS_LOG=True)
def test_non_valid_login_without_log(self):

View file

@ -23,13 +23,13 @@ class DjangoLoginTestCase(TestCase):
self.request = HttpRequest()
self.request.session = engine.SessionStore()
self.username = 'john.doe'
self.password = 'hunter2'
self.username = "john.doe"
self.password = "hunter2"
self.user = get_user_model().objects.create(username=self.username)
self.user.set_password(self.password)
self.user.save()
self.user.backend = 'django.contrib.auth.backends.ModelBackend'
self.user.backend = "django.contrib.auth.backends.ModelBackend"
class DjangoContribAuthLoginTestCase(DjangoLoginTestCase):
@ -62,60 +62,52 @@ class LoginTestCase(AxesTestCase):
Always allow attempted logins for a different user from a different IP.
"""
IP_1 = '10.1.1.1'
IP_2 = '10.2.2.2'
USER_1 = 'valid-user-1'
USER_2 = 'valid-user-2'
EMAIL_1 = 'valid-email-1@example.com'
EMAIL_2 = 'valid-email-2@example.com'
IP_1 = "10.1.1.1"
IP_2 = "10.2.2.2"
USER_1 = "valid-user-1"
USER_2 = "valid-user-2"
EMAIL_1 = "valid-email-1@example.com"
EMAIL_2 = "valid-email-2@example.com"
VALID_USERNAME = USER_1
VALID_EMAIL = EMAIL_1
VALID_PASSWORD = 'valid-password'
VALID_PASSWORD = "valid-password"
VALID_IP_ADDRESS = IP_1
WRONG_PASSWORD = 'wrong-password'
LOCKED_MESSAGE = 'Account locked: too many login attempts.'
WRONG_PASSWORD = "wrong-password"
LOCKED_MESSAGE = "Account locked: too many login attempts."
LOGIN_FORM_KEY = '<input type="submit" value="Log in" />'
ALLOWED = 302
BLOCKED = 403
def _login(self, username, password, ip_addr='127.0.0.1', **kwargs):
def _login(self, username, password, ip_addr="127.0.0.1", **kwargs):
"""
Login a user and get the response.
IP address can be configured to test IP blocking functionality.
"""
post_data = {
'username': username,
'password': password,
}
post_data = {"username": username, "password": password}
post_data.update(kwargs)
return self.client.post(
reverse('admin:login'),
reverse("admin:login"),
post_data,
REMOTE_ADDR=ip_addr,
HTTP_USER_AGENT='test-browser'
HTTP_USER_AGENT="test-browser",
)
def _lockout_user_from_ip(self, username, ip_addr):
for _ in range(settings.AXES_FAILURE_LIMIT):
response = self._login(
username=username,
password=self.WRONG_PASSWORD,
ip_addr=ip_addr,
username=username, password=self.WRONG_PASSWORD, ip_addr=ip_addr
)
return response
def _lockout_user1_from_ip1(self):
return self._lockout_user_from_ip(
username=self.USER_1,
ip_addr=self.IP_1,
)
return self._lockout_user_from_ip(username=self.USER_1, ip_addr=self.IP_1)
def setUp(self):
"""
@ -138,7 +130,9 @@ class LoginTestCase(AxesTestCase):
"""
response = self._login(self.username, self.password)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True)
self.assertNotContains(
response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
)
def test_lockout_limit_once(self):
"""
@ -188,10 +182,7 @@ class LoginTestCase(AxesTestCase):
# test until one try before the limit
for _ in range(1, settings.AXES_FAILURE_LIMIT):
response = self.login(
is_valid_username=True,
is_valid_password=False,
)
response = self.login(is_valid_username=True, is_valid_password=False)
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY, html=True)
@ -224,7 +215,9 @@ class LoginTestCase(AxesTestCase):
response = self._login(self.username, self.password)
# Check if we are still in the login page
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True)
self.assertNotContains(
response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
)
# now create failure_limit + 1 failed logins and then we should still
# be able to login with valid_username
@ -233,7 +226,9 @@ class LoginTestCase(AxesTestCase):
# Check if we can still log in with valid user
response = self._login(self.username, self.password)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True)
self.assertNotContains(
response, self.LOGIN_FORM_KEY, status_code=self.ALLOWED, html=True
)
# Test for true and false positives when blocking by IP *OR* user (default)
# Cache disabled. Default settings.
@ -242,11 +237,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.BLOCKED)
def test_lockout_by_ip_allows_when_same_user_diff_ip_without_cache(self):
@ -254,11 +245,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 can still login from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.ALLOWED)
def test_lockout_by_ip_blocks_when_diff_user_same_ip_without_cache(self):
@ -266,11 +253,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 is also locked out from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.BLOCKED)
def test_lockout_by_ip_allows_when_diff_user_diff_ip_without_cache(self):
@ -278,11 +261,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.ALLOWED)
# Test for true and false positives when blocking by user only.
@ -293,11 +272,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
@ -306,11 +281,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 is also locked out from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
@ -319,11 +290,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 can still login from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
@ -332,20 +299,16 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_with_empty_username_allows_other_users_without_cache(self):
# User with empty username is locked out from IP 1.
self._lockout_user_from_ip(username='', ip_addr=self.IP_1)
self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
# Still possible to access the login page
response = self.client.get(reverse('admin:login'), REMOTE_ADDR=self.IP_1)
response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
# Test for true and false positives when blocking by user and IP together.
@ -356,11 +319,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
@ -369,11 +328,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 can still login from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
@ -382,11 +337,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 can still login from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
@ -395,20 +346,18 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_without_cache(self):
def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_without_cache(
self
):
# User with empty username is locked out from IP 1.
self._lockout_user_from_ip(username='', ip_addr=self.IP_1)
self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
# Still possible to access the login page
response = self.client.get(reverse('admin:login'), REMOTE_ADDR=self.IP_1)
response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
# Test for true and false positives when blocking by IP *OR* user (default)
@ -418,11 +367,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.BLOCKED)
def test_lockout_by_ip_allows_when_same_user_diff_ip_using_cache(self):
@ -430,11 +375,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 can still login from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.ALLOWED)
def test_lockout_by_ip_blocks_when_diff_user_same_ip_using_cache(self):
@ -442,11 +383,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 is also locked out from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.BLOCKED)
def test_lockout_by_ip_allows_when_diff_user_diff_ip_using_cache(self):
@ -454,20 +391,16 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_with_empty_username_allows_other_users_using_cache(self):
# User with empty username is locked out from IP 1.
self._lockout_user_from_ip(username='', ip_addr=self.IP_1)
self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
# Still possible to access the login page
response = self.client.get(reverse('admin:login'), REMOTE_ADDR=self.IP_1)
response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)
# Test for true and false positives when blocking by user only.
@ -478,11 +411,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
@ -491,11 +420,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 is also locked out from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
@ -504,11 +429,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 can still login from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
@ -517,11 +438,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.ALLOWED)
# Test for true and false positives when blocking by user and IP together.
@ -532,11 +449,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
@ -545,11 +458,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 1 can still login from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_1, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
@ -558,11 +467,7 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 can still login from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_1)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
@ -571,18 +476,16 @@ class LoginTestCase(AxesTestCase):
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
response = self._login(self.USER_2, self.VALID_PASSWORD, ip_addr=self.IP_2)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_using_cache(self):
def test_lockout_by_user_and_ip_with_empty_username_allows_other_users_using_cache(
self
):
# User with empty username is locked out from IP 1.
self._lockout_user_from_ip(username='', ip_addr=self.IP_1)
self._lockout_user_from_ip(username="", ip_addr=self.IP_1)
# Still possible to access the login page
response = self.client.get(reverse('admin:login'), REMOTE_ADDR=self.IP_1)
response = self.client.get(reverse("admin:login"), REMOTE_ADDR=self.IP_1)
self.assertContains(response, self.LOGIN_FORM_KEY, status_code=200, html=True)

View file

@ -10,104 +10,100 @@ from axes.tests.base import AxesTestCase
class ResetAccessLogsManagementCommandTestCase(AxesTestCase):
def setUp(self):
self.msg_not_found = 'No logs found.\n'
self.msg_num_found = '{} logs removed.\n'
self.msg_not_found = "No logs found.\n"
self.msg_num_found = "{} logs removed.\n"
days_3 = timezone.now() - timezone.timedelta(days=3)
with patch('django.utils.timezone.now', Mock(return_value=days_3)):
with patch("django.utils.timezone.now", Mock(return_value=days_3)):
AccessLog.objects.create()
days_13 = timezone.now() - timezone.timedelta(days=9)
with patch('django.utils.timezone.now', Mock(return_value=days_13)):
with patch("django.utils.timezone.now", Mock(return_value=days_13)):
AccessLog.objects.create()
days_30 = timezone.now() - timezone.timedelta(days=27)
with patch('django.utils.timezone.now', Mock(return_value=days_30)):
with patch("django.utils.timezone.now", Mock(return_value=days_30)):
AccessLog.objects.create()
def test_axes_delete_access_logs_default(self):
out = StringIO()
call_command('axes_reset_logs', stdout=out)
call_command("axes_reset_logs", stdout=out)
self.assertEqual(self.msg_not_found, out.getvalue())
def test_axes_delete_access_logs_older_than_2_days(self):
out = StringIO()
call_command('axes_reset_logs', age=2, stdout=out)
call_command("axes_reset_logs", age=2, stdout=out)
self.assertEqual(self.msg_num_found.format(3), out.getvalue())
def test_axes_delete_access_logs_older_than_4_days(self):
out = StringIO()
call_command('axes_reset_logs', age=4, stdout=out)
call_command("axes_reset_logs", age=4, stdout=out)
self.assertEqual(self.msg_num_found.format(2), out.getvalue())
def test_axes_delete_access_logs_older_than_16_days(self):
out = StringIO()
call_command('axes_reset_logs', age=16, stdout=out)
call_command("axes_reset_logs", age=16, stdout=out)
self.assertEqual(self.msg_num_found.format(1), out.getvalue())
class ManagementCommandTestCase(AxesTestCase):
def setUp(self):
AccessAttempt.objects.create(
username='jane.doe',
ip_address='10.0.0.1',
failures_since_start='4',
username="jane.doe", ip_address="10.0.0.1", failures_since_start="4"
)
AccessAttempt.objects.create(
username='john.doe',
ip_address='10.0.0.2',
failures_since_start='15',
username="john.doe", ip_address="10.0.0.2", failures_since_start="15"
)
def test_axes_list_attempts(self):
out = StringIO()
call_command('axes_list_attempts', stdout=out)
call_command("axes_list_attempts", stdout=out)
expected = '10.0.0.1\tjane.doe\t4\n10.0.0.2\tjohn.doe\t15\n'
expected = "10.0.0.1\tjane.doe\t4\n10.0.0.2\tjohn.doe\t15\n"
self.assertEqual(expected, out.getvalue())
def test_axes_reset(self):
out = StringIO()
call_command('axes_reset', stdout=out)
call_command("axes_reset", stdout=out)
expected = '2 attempts removed.\n'
expected = "2 attempts removed.\n"
self.assertEqual(expected, out.getvalue())
def test_axes_reset_not_found(self):
out = StringIO()
call_command('axes_reset', stdout=out)
call_command("axes_reset", stdout=out)
out = StringIO()
call_command('axes_reset', stdout=out)
call_command("axes_reset", stdout=out)
expected = 'No attempts found.\n'
expected = "No attempts found.\n"
self.assertEqual(expected, out.getvalue())
def test_axes_reset_ip(self):
out = StringIO()
call_command('axes_reset_ip', '10.0.0.1', stdout=out)
call_command("axes_reset_ip", "10.0.0.1", stdout=out)
expected = '1 attempts removed.\n'
expected = "1 attempts removed.\n"
self.assertEqual(expected, out.getvalue())
def test_axes_reset_ip_not_found(self):
out = StringIO()
call_command('axes_reset_ip', '10.0.0.3', stdout=out)
call_command("axes_reset_ip", "10.0.0.3", stdout=out)
expected = 'No attempts found.\n'
expected = "No attempts found.\n"
self.assertEqual(expected, out.getvalue())
def test_axes_reset_username(self):
out = StringIO()
call_command('axes_reset_username', 'john.doe', stdout=out)
call_command("axes_reset_username", "john.doe", stdout=out)
expected = '1 attempts removed.\n'
expected = "1 attempts removed.\n"
self.assertEqual(expected, out.getvalue())
def test_axes_reset_username_not_found(self):
out = StringIO()
call_command('axes_reset_username', 'ivan.renko', stdout=out)
call_command("axes_reset_username", "ivan.renko", stdout=out)
expected = 'No attempts found.\n'
expected = "No attempts found.\n"
self.assertEqual(expected, out.getvalue())

View file

@ -13,23 +13,22 @@ class ModelsTestCase(AxesTestCase):
self.failures_since_start = 42
self.access_attempt = AccessAttempt(
failures_since_start=self.failures_since_start,
failures_since_start=self.failures_since_start
)
self.access_log = AccessLog()
def test_access_attempt_str(self):
self.assertIn('Access', str(self.access_attempt))
self.assertIn("Access", str(self.access_attempt))
def test_access_log_str(self):
self.assertIn('Access', str(self.access_log))
self.assertIn("Access", str(self.access_log))
class MigrationsTestCase(AxesTestCase):
def test_missing_migrations(self):
executor = MigrationExecutor(connection)
autodetector = MigrationAutodetector(
executor.loader.project_state(),
ProjectState.from_apps(apps),
executor.loader.project_state(), ProjectState.from_apps(apps)
)
changes = autodetector.changes(graph=executor.loader.graph)

View file

@ -62,22 +62,14 @@ class TimestampTestCase(AxesTestCase):
"""
expected = {
timedelta(days=1, hours=25, minutes=42, seconds=8):
'P2DT1H42M8S',
timedelta(days=7, seconds=342):
'P7DT5M42S',
timedelta(days=0, hours=2, minutes=42):
'PT2H42M',
timedelta(hours=20, seconds=42):
'PT20H42S',
timedelta(seconds=300):
'PT5M',
timedelta(seconds=9005):
'PT2H30M5S',
timedelta(minutes=9005):
'P6DT6H5M',
timedelta(days=15):
'P15D'
timedelta(days=1, hours=25, minutes=42, seconds=8): "P2DT1H42M8S",
timedelta(days=7, seconds=342): "P7DT5M42S",
timedelta(days=0, hours=2, minutes=42): "PT2H42M",
timedelta(hours=20, seconds=42): "PT20H42S",
timedelta(seconds=300): "PT5M",
timedelta(seconds=9005): "PT2H30M5S",
timedelta(minutes=9005): "P6DT6H5M",
timedelta(days=15): "P15D",
}
for delta, iso_duration in expected.items():
@ -93,34 +85,38 @@ class ClientStringTestCase(AxesTestCase):
@override_settings(AXES_VERBOSE=True)
def test_verbose_ip_only_client_details(self):
username = 'test@example.com'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
username = "test@example.com"
ip_address = "127.0.0.1"
user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
path_info = "/admin/"
expected = self.get_expected_client_str(username, ip_address, user_agent, path_info)
expected = self.get_expected_client_str(
username, ip_address, user_agent, path_info
)
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@override_settings(AXES_VERBOSE=True)
def test_verbose_ip_only_client_details_tuple(self):
username = 'test@example.com'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = ('admin', 'login')
username = "test@example.com"
ip_address = "127.0.0.1"
user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
path_info = ("admin", "login")
expected = self.get_expected_client_str(username, ip_address, user_agent, path_info[0])
expected = self.get_expected_client_str(
username, ip_address, user_agent, path_info[0]
)
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_ip_only_client_details(self):
username = 'test@example.com'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
username = "test@example.com"
ip_address = "127.0.0.1"
user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
path_info = "/admin/"
expected = '{ip_address: "127.0.0.1", path_info: "/admin/"}'
actual = get_client_str(username, ip_address, user_agent, path_info)
@ -130,12 +126,14 @@ class ClientStringTestCase(AxesTestCase):
@override_settings(AXES_ONLY_USER_FAILURES=True)
@override_settings(AXES_VERBOSE=True)
def test_verbose_user_only_client_details(self):
username = 'test@example.com'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
username = "test@example.com"
ip_address = "127.0.0.1"
user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
path_info = "/admin/"
expected = self.get_expected_client_str(username, ip_address, user_agent, path_info)
expected = self.get_expected_client_str(
username, ip_address, user_agent, path_info
)
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@ -143,10 +141,10 @@ class ClientStringTestCase(AxesTestCase):
@override_settings(AXES_ONLY_USER_FAILURES=True)
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_user_only_client_details(self):
username = 'test@example.com'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
username = "test@example.com"
ip_address = "127.0.0.1"
user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
path_info = "/admin/"
expected = '{username: "test@example.com", path_info: "/admin/"}'
actual = get_client_str(username, ip_address, user_agent, path_info)
@ -156,12 +154,14 @@ class ClientStringTestCase(AxesTestCase):
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
@override_settings(AXES_VERBOSE=True)
def test_verbose_user_ip_combo_client_details(self):
username = 'test@example.com'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
username = "test@example.com"
ip_address = "127.0.0.1"
user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
path_info = "/admin/"
expected = self.get_expected_client_str(username, ip_address, user_agent, path_info)
expected = self.get_expected_client_str(
username, ip_address, user_agent, path_info
)
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@ -169,10 +169,10 @@ class ClientStringTestCase(AxesTestCase):
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_user_ip_combo_client_details(self):
username = 'test@example.com'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
username = "test@example.com"
ip_address = "127.0.0.1"
user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
path_info = "/admin/"
expected = '{username: "test@example.com", ip_address: "127.0.0.1", path_info: "/admin/"}'
actual = get_client_str(username, ip_address, user_agent, path_info)
@ -182,12 +182,14 @@ class ClientStringTestCase(AxesTestCase):
@override_settings(AXES_USE_USER_AGENT=True)
@override_settings(AXES_VERBOSE=True)
def test_verbose_user_agent_client_details(self):
username = 'test@example.com'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
username = "test@example.com"
ip_address = "127.0.0.1"
user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
path_info = "/admin/"
expected = self.get_expected_client_str(username, ip_address, user_agent, path_info)
expected = self.get_expected_client_str(
username, ip_address, user_agent, path_info
)
actual = get_client_str(username, ip_address, user_agent, path_info)
self.assertEqual(expected, actual)
@ -195,10 +197,10 @@ class ClientStringTestCase(AxesTestCase):
@override_settings(AXES_USE_USER_AGENT=True)
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_user_agent_client_details(self):
username = 'test@example.com'
ip_address = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
username = "test@example.com"
ip_address = "127.0.0.1"
user_agent = "Googlebot/2.1 (+http://www.googlebot.com/bot.html)"
path_info = "/admin/"
expected = '{ip_address: "127.0.0.1", user_agent: "Googlebot/2.1 (+http://www.googlebot.com/bot.html)", path_info: "/admin/"}'
actual = get_client_str(username, ip_address, user_agent, path_info)
@ -207,13 +209,13 @@ class ClientStringTestCase(AxesTestCase):
class ClientParametersTestCase(AxesTestCase):
@override_settings(
AXES_ONLY_USER_FAILURES=True,
)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_get_filter_kwargs_user(self):
self.assertEqual(
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'username': self.username},
dict(
get_client_parameters(self.username, self.ip_address, self.user_agent)
),
{"username": self.username},
)
@override_settings(
@ -223,8 +225,10 @@ class ClientParametersTestCase(AxesTestCase):
)
def test_get_filter_kwargs_ip(self):
self.assertEqual(
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'ip_address': self.ip_address},
dict(
get_client_parameters(self.username, self.ip_address, self.user_agent)
),
{"ip_address": self.ip_address},
)
@override_settings(
@ -234,8 +238,10 @@ class ClientParametersTestCase(AxesTestCase):
)
def test_get_filter_kwargs_user_and_ip(self):
self.assertEqual(
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'username': self.username, 'ip_address': self.ip_address},
dict(
get_client_parameters(self.username, self.ip_address, self.user_agent)
),
{"username": self.username, "ip_address": self.ip_address},
)
@override_settings(
@ -245,8 +251,10 @@ class ClientParametersTestCase(AxesTestCase):
)
def test_get_filter_kwargs_ip_and_agent(self):
self.assertEqual(
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'ip_address': self.ip_address, 'user_agent': self.user_agent},
dict(
get_client_parameters(self.username, self.ip_address, self.user_agent)
),
{"ip_address": self.ip_address, "user_agent": self.user_agent},
)
@override_settings(
@ -256,8 +264,14 @@ class ClientParametersTestCase(AxesTestCase):
)
def test_get_filter_kwargs_user_ip_agent(self):
self.assertEqual(
dict(get_client_parameters(self.username, self.ip_address, self.user_agent)),
{'username': self.username, 'ip_address': self.ip_address, 'user_agent': self.user_agent},
dict(
get_client_parameters(self.username, self.ip_address, self.user_agent)
),
{
"username": self.username,
"ip_address": self.ip_address,
"user_agent": self.user_agent,
},
)
@ -268,29 +282,25 @@ class ClientCacheKeyTestCase(AxesTestCase):
"""
cache_hash_digest = md5(self.ip_address.encode()).hexdigest()
cache_hash_key = f'axes-{cache_hash_digest}'
cache_hash_key = f"axes-{cache_hash_digest}"
# Getting cache key from request
request_factory = RequestFactory()
request = request_factory.post(
'/admin/login/',
data={
'username': self.username,
'password': 'test',
},
"/admin/login/", data={"username": self.username, "password": "test"}
)
self.assertEqual(cache_hash_key, get_client_cache_key(request))
# Getting cache key from AccessAttempt Object
attempt = AccessAttempt(
user_agent='<unknown>',
user_agent="<unknown>",
ip_address=self.ip_address,
username=self.username,
get_data='',
post_data='',
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
get_data="",
post_data="",
http_accept=request.META.get("HTTP_ACCEPT", "<unknown>"),
path_info=request.META.get("PATH_INFO", "<unknown>"),
failures_since_start=0,
)
@ -301,19 +311,16 @@ class ClientCacheKeyTestCase(AxesTestCase):
Simulate an empty IP address in the request.
"""
empty_ip_address = ''
empty_ip_address = ""
cache_hash_digest = md5(empty_ip_address.encode()).hexdigest()
cache_hash_key = f'axes-{cache_hash_digest}'
cache_hash_key = f"axes-{cache_hash_digest}"
# Getting cache key from request
request_factory = RequestFactory()
request = request_factory.post(
'/admin/login/',
data={
'username': self.username,
'password': 'test',
},
"/admin/login/",
data={"username": self.username, "password": "test"},
REMOTE_ADDR=empty_ip_address,
)
@ -321,13 +328,13 @@ class ClientCacheKeyTestCase(AxesTestCase):
# Getting cache key from AccessAttempt Object
attempt = AccessAttempt(
user_agent='<unknown>',
user_agent="<unknown>",
ip_address=empty_ip_address,
username=self.username,
get_data='',
post_data='',
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
get_data="",
post_data="",
http_accept=request.META.get("HTTP_ACCEPT", "<unknown>"),
path_info=request.META.get("PATH_INFO", "<unknown>"),
failures_since_start=0,
)
@ -340,110 +347,108 @@ class ClientCacheKeyTestCase(AxesTestCase):
ip_address = self.ip_address
cache_hash_digest = md5(ip_address.encode()).hexdigest()
cache_hash_key = f'axes-{cache_hash_digest}'
cache_hash_key = f"axes-{cache_hash_digest}"
# Getting cache key from request
request_factory = RequestFactory()
request = request_factory.post(
'/admin/login/',
data={
'username': self.username,
'password': 'test'
}
"/admin/login/", data={"username": self.username, "password": "test"}
)
# Difference between the upper test: new call signature with credentials
credentials = {'username': self.username}
credentials = {"username": self.username}
self.assertEqual(cache_hash_key, get_client_cache_key(request, credentials))
# Getting cache key from AccessAttempt Object
attempt = AccessAttempt(
user_agent='<unknown>',
user_agent="<unknown>",
ip_address=ip_address,
username=self.username,
get_data='',
post_data='',
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
get_data="",
post_data="",
http_accept=request.META.get("HTTP_ACCEPT", "<unknown>"),
path_info=request.META.get("PATH_INFO", "<unknown>"),
failures_since_start=0,
)
self.assertEqual(cache_hash_key, get_client_cache_key(attempt))
class UsernameTestCase(AxesTestCase):
@override_settings(AXES_USERNAME_FORM_FIELD='username')
@override_settings(AXES_USERNAME_FORM_FIELD="username")
def test_default_get_client_username(self):
expected = 'test-username'
expected = "test-username"
request = HttpRequest()
request.POST['username'] = expected
request.POST["username"] = expected
actual = get_client_username(request)
self.assertEqual(expected, actual)
@override_settings(AXES_USERNAME_FORM_FIELD='username')
@override_settings(AXES_USERNAME_FORM_FIELD="username")
def test_default_get_client_username_credentials(self):
expected = 'test-username'
expected_in_credentials = 'test-credentials-username'
expected = "test-username"
expected_in_credentials = "test-credentials-username"
request = HttpRequest()
request.POST['username'] = expected
credentials = {
'username': expected_in_credentials
}
request.POST["username"] = expected
credentials = {"username": expected_in_credentials}
actual = get_client_username(request, credentials)
self.assertEqual(expected_in_credentials, actual)
def sample_customize_username(request, credentials):
return 'prefixed-' + request.POST.get('username')
return "prefixed-" + request.POST.get("username")
@override_settings(AXES_USERNAME_FORM_FIELD='username')
@override_settings(AXES_USERNAME_FORM_FIELD="username")
@override_settings(AXES_USERNAME_CALLABLE=sample_customize_username)
def test_custom_get_client_username_from_request(self):
provided = 'test-username'
expected = 'prefixed-' + provided
provided_in_credentials = 'test-credentials-username'
provided = "test-username"
expected = "prefixed-" + provided
provided_in_credentials = "test-credentials-username"
request = HttpRequest()
request.POST['username'] = provided
credentials = {'username': provided_in_credentials}
request.POST["username"] = provided
credentials = {"username": provided_in_credentials}
actual = get_client_username(request, credentials)
self.assertEqual(expected, actual)
def sample_customize_username_credentials(request, credentials):
return 'prefixed-' + credentials.get('username')
return "prefixed-" + credentials.get("username")
@override_settings(AXES_USERNAME_FORM_FIELD='username')
@override_settings(AXES_USERNAME_FORM_FIELD="username")
@override_settings(AXES_USERNAME_CALLABLE=sample_customize_username_credentials)
def test_custom_get_client_username_from_credentials(self):
provided = 'test-username'
provided_in_credentials = 'test-credentials-username'
expected_in_credentials = 'prefixed-' + provided_in_credentials
provided = "test-username"
provided_in_credentials = "test-credentials-username"
expected_in_credentials = "prefixed-" + provided_in_credentials
request = HttpRequest()
request.POST['username'] = provided
credentials = {'username': provided_in_credentials}
request.POST["username"] = provided
credentials = {"username": provided_in_credentials}
actual = get_client_username(request, credentials)
self.assertEqual(expected_in_credentials, actual)
@override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials: 'example') # pragma: no cover
@override_settings(
AXES_USERNAME_CALLABLE=lambda request, credentials: "example"
) # pragma: no cover
def test_get_client_username(self):
self.assertEqual(get_client_username(HttpRequest(), {}), 'example')
self.assertEqual(get_client_username(HttpRequest(), {}), "example")
@override_settings(AXES_USERNAME_CALLABLE=lambda request: None) # pragma: no cover
def test_get_client_username_invalid_callable_too_few_arguments(self):
with self.assertRaises(TypeError):
get_client_username(HttpRequest(), {})
@override_settings(AXES_USERNAME_CALLABLE=lambda request, credentials, extra: None) # pragma: no cover
@override_settings(
AXES_USERNAME_CALLABLE=lambda request, credentials, extra: None
) # pragma: no cover
def test_get_client_username_invalid_callable_too_many_arguments(self):
with self.assertRaises(TypeError):
get_client_username(HttpRequest(), {})
@ -453,73 +458,70 @@ class UsernameTestCase(AxesTestCase):
with self.assertRaises(TypeError):
get_client_username(HttpRequest(), {})
@override_settings(AXES_USERNAME_CALLABLE='axes.tests.test_utils.get_username')
@override_settings(AXES_USERNAME_CALLABLE="axes.tests.test_utils.get_username")
def test_get_client_username_str(self):
self.assertEqual(
get_client_username(HttpRequest(), {}),
'username',
)
self.assertEqual(get_client_username(HttpRequest(), {}), "username")
def get_username(request, credentials: dict) -> str:
return 'username'
return "username"
class IPWhitelistTestCase(AxesTestCase):
def setUp(self):
self.request = HttpRequest()
self.request.method = 'POST'
self.request.META['REMOTE_ADDR'] = '127.0.0.1'
self.request.axes_ip_address = '127.0.0.1'
self.request.method = "POST"
self.request.META["REMOTE_ADDR"] = "127.0.0.1"
self.request.axes_ip_address = "127.0.0.1"
@override_settings(AXES_IP_WHITELIST=None)
def test_ip_in_whitelist_none(self):
self.assertFalse(is_ip_address_in_whitelist('127.0.0.2'))
self.assertFalse(is_ip_address_in_whitelist("127.0.0.2"))
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
@override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
def test_ip_in_whitelist(self):
self.assertTrue(is_ip_address_in_whitelist('127.0.0.1'))
self.assertFalse(is_ip_address_in_whitelist('127.0.0.2'))
self.assertTrue(is_ip_address_in_whitelist("127.0.0.1"))
self.assertFalse(is_ip_address_in_whitelist("127.0.0.2"))
@override_settings(AXES_IP_BLACKLIST=None)
def test_ip_in_blacklist_none(self):
self.assertFalse(is_ip_address_in_blacklist('127.0.0.2'))
self.assertFalse(is_ip_address_in_blacklist("127.0.0.2"))
@override_settings(AXES_IP_BLACKLIST=['127.0.0.1'])
@override_settings(AXES_IP_BLACKLIST=["127.0.0.1"])
def test_ip_in_blacklist(self):
self.assertTrue(is_ip_address_in_blacklist('127.0.0.1'))
self.assertFalse(is_ip_address_in_blacklist('127.0.0.2'))
self.assertTrue(is_ip_address_in_blacklist("127.0.0.1"))
self.assertFalse(is_ip_address_in_blacklist("127.0.0.2"))
@override_settings(AXES_IP_BLACKLIST=['127.0.0.1'])
@override_settings(AXES_IP_BLACKLIST=["127.0.0.1"])
def test_is_client_ip_address_blacklisted_ip_in_blacklist(self):
self.assertTrue(is_client_ip_address_blacklisted(self.request))
@override_settings(AXES_IP_BLACKLIST=['127.0.0.2'])
@override_settings(AXES_IP_BLACKLIST=["127.0.0.2"])
def test_is_is_client_ip_address_blacklisted_ip_not_in_blacklist(self):
self.assertFalse(is_client_ip_address_blacklisted(self.request))
@override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
@override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
def test_is_client_ip_address_blacklisted_ip_in_whitelist(self):
self.assertFalse(is_client_ip_address_blacklisted(self.request))
@override_settings(AXES_ONLY_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.2'])
@override_settings(AXES_IP_WHITELIST=["127.0.0.2"])
def test_is_already_locked_ip_not_in_whitelist(self):
self.assertTrue(is_client_ip_address_blacklisted(self.request))
@override_settings(AXES_NEVER_LOCKOUT_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
@override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
def test_is_client_ip_address_whitelisted_never_lockout(self):
self.assertTrue(is_client_ip_address_whitelisted(self.request))
@override_settings(AXES_ONLY_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.1'])
@override_settings(AXES_IP_WHITELIST=["127.0.0.1"])
def test_is_client_ip_address_whitelisted_only_allow(self):
self.assertTrue(is_client_ip_address_whitelisted(self.request))
@override_settings(AXES_ONLY_WHITELIST=True)
@override_settings(AXES_IP_WHITELIST=['127.0.0.2'])
@override_settings(AXES_IP_WHITELIST=["127.0.0.2"])
def test_is_client_ip_address_whitelisted_not(self):
self.assertFalse(is_client_ip_address_whitelisted(self.request))
@ -527,7 +529,7 @@ class IPWhitelistTestCase(AxesTestCase):
class MethodWhitelistTestCase(AxesTestCase):
def setUp(self):
self.request = HttpRequest()
self.request.method = 'GET'
self.request.method = "GET"
@override_settings(AXES_NEVER_LOCKOUT_GET=True)
def test_is_client_method_whitelisted(self):
@ -546,14 +548,14 @@ class LockoutResponseTestCase(AxesTestCase):
def test_get_lockout_response_cool_off(self):
get_lockout_response(request=self.request)
@override_settings(AXES_LOCKOUT_TEMPLATE='example.html')
@patch('axes.helpers.render')
@override_settings(AXES_LOCKOUT_TEMPLATE="example.html")
@patch("axes.helpers.render")
def test_get_lockout_response_lockout_template(self, render):
self.assertFalse(render.called)
get_lockout_response(request=self.request)
self.assertTrue(render.called)
@override_settings(AXES_LOCKOUT_URL='https://example.com')
@override_settings(AXES_LOCKOUT_URL="https://example.com")
def test_get_lockout_response_lockout_url(self):
response = get_lockout_response(request=self.request)
self.assertEqual(type(response), HttpResponseRedirect)

View file

@ -2,6 +2,4 @@ from django.conf.urls import url
from django.contrib import admin
urlpatterns = [
url(r'^admin/', admin.site.urls),
]
urlpatterns = [url(r"^admin/", admin.site.urls)]

View file

@ -1,4 +1,5 @@
-e .
black==19.3b0
coverage==4.5.4
mypy==0.730; python_version < '3.8' and python_implementation != 'PyPy'
prospector==1.1.6.2 ; python_version < '3.8' and python_implementation != 'PyPy'

View file

@ -26,7 +26,8 @@ deps =
usedevelop = True
commands =
pytest
{py36,py37}: prospector
{py36,py37}: mypy axes
{py36,py37}: prospector
{py36,py37}: black --check --diff axes
setenv =
PYTHONDONTWRITEBYTECODE=1