Added support for Django signals, big code refactor and improvements

This commit is contained in:
Camilo Nova 2017-07-20 09:06:41 -05:00
parent 85ece72541
commit ba37442252
33 changed files with 1688 additions and 2131 deletions

14
.gitignore vendored
View file

@ -1,9 +1,11 @@
django_axes.egg-info
*.egg-info
*.pyc
build
dist
.hg
.coverage
.DS_Store
.project
.pydevproject
.tox
build/
dist/
docs/_build
.tox/
tox.ini
test.db

View file

@ -1,18 +1,19 @@
sudo: false
language: python
cache: pip
python:
- 2.7
- 3.5
env:
- DJANGO="Django>=1.8,<1.9"
- DJANGO="Django>=1.9,<1.10"
- DJANGO="Django>=1.10,<1.11"
install:
- pip install -q $DJANGO coveralls
script:
- coverage run -a --source=axes runtests.py
- coverage run -a --source=axes runtests_proxy.py
- coverage run -a --source=axes runtests_num_proxies.py
- coverage run -a --source=axes runtests_proxy_custom_header.py
- coverage report
- 3.6
install: pip install tox-travis
script: tox
after_success:
- coveralls
deploy:
provider: pypi
user: jazzband
distributions: "sdist bdist_wheel"
password:
secure: VD+63Tnv0VYNfFQv9f1KZ0k79HSX8veNk4dTy42Hriteci50z5uSQdZMnqqD83xQJa4VF6N7DHkxHnBVOWLCqGQZeYqR/5BuDFNUewcr6O14dk31HvxMsWDaN1KW0Qwtus8ZrztwGhZtZ/92ODA6luHI4mCTzqX0gcG0/aKd75s=
on:
tags: true
repo: jazzband/django-axes
python: 3.6

View file

@ -1,6 +1,14 @@
Changes
=======
3.0.0 (2017-11-17)
------------------
- BREAKING CHANGES. Support for Django >= 1.11 and signals, see issue #215.
Drop support for Python < 3.6
[camilonova]
2.3.3 (2017-07-20)
------------------

View file

@ -1,4 +1,3 @@
include LICENSE README.rst CHANGES.txt
recursive-include axes *.py
include .travis.yml
include Makefile

216
Makefile
View file

@ -1,216 +0,0 @@
# https://github.com/aclark4life/project-makefile
#
# The MIT License (MIT)
#
# Copyright (c) 2016 Alex Clark
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
.DEFAULT_GOAL=git-commit-edit-push
APP=app
MESSAGE="Update"
PROJECT=project
TMP:=$(shell echo `tmp`)
commit: git-commit-auto-push
co: git-checkout-branches
db: django-migrate django-su
db-clean: django-db-clean-postgres
django-start: django-init
fe-init: npm-init npm-install grunt-init grunt-serve
fe: npm-install grunt-serve
freeze: python-pip-freeze
heroku: heroku-push
install: python3.5-virtualenv python-pip-install
lint: python-flake python-yapf python-wc
migrate: django-migrate
push: git-push
plone-start: plone-init
readme: python-package-readme-test
release: python-package-release
releasetest: python-package-release-test
serve: python-serve
sphinx-start: sphinx-init
static: django-static
test: django-test
vm: vagrant-up
vm-down: vagrant-suspend
# Django
django-db-clean-postgres:
-dropdb $(PROJECT)-$(APP)
-createdb $(PROJECT)-$(APP)
django-db-clean-sqlite:
-rm -f $(PROJECT)-$(APP).sqlite3
django-init:
-mkdir -p $(PROJECT)/$(APP)
-django-admin startproject $(PROJECT) .
-django-admin startapp $(APP) $(PROJECT)/$(APP)
django-install:
$(MAKE) python-virtualenv
bin/pip install Django
django-migrate:
python manage.py migrate
django-migrations:
python manage.py makemigrations $(APP)
django-migrations-clean:
rm -rf $(PROJECT)/$(APP)/migrations
$(MAKE) django-migrations
django-serve:
python manage.py runserver
django-test:
python manage.py test
django-shell:
python manage.py shell
django-static:
python manage.py collectstatic --noinput
django-su:
python manage.py createsuperuser
# Git
REMOTE_BRANCHES=`git branch -a |\
grep remote |\
grep -v HEAD |\
grep -v master`
git-checkout-branches:
-for i in $(REMOTE_BRANCHES) ; do \
git checkout -t $$i ; done
git-commit-auto-push:
git commit -a -m $(MESSAGE)
$(MAKE) git-push
git-commit-edit-push:
git commit -a
$(MAKE) git-push
git-push:
git push
# Heroku
heroku-debug-on:
heroku config:set DEBUG=1
heroku-debug-off:
heroku config:unset DEBUG
heroku-push:
git push heroku
heroku-shell:
heroku run bash
# Misc
help:
@echo "\nPlease run \`make\` with one of these targets:\n"
@$(MAKE) -pRrq -f $(lastword $(MAKEFILE_LIST)) : 2>/dev/null | awk -v RS= -F:\
'/^# File/,/^# Finished Make data base/ {if ($$1 !~ "^[#.]") {print $$1}}'\
| sort | egrep -v -e '^[^[:alnum:]]' -e '^$@$$' | xargs | tr ' ' '\n' | awk\
'{print " - "$$0}'
@echo "\n"
review:
open -a "Sublime Text 2" `find $(PROJECT) -name \*.py | grep -v __init__.py`\
`find $(PROJECT) -name \*.html`
# Node
npm-init:
npm init
npm-install:
npm install
grunt-init:
npm install grunt
grunt-init Gruntfile
grunt-serve:
grunt serve
# Plone
plone-heroku:
-@createuser -s plone > /dev/null 2>&1
-@createdb -U plone plone > /dev/null 2>&1
@export PORT=8080 && \
export USERNAME=admin && \
export PASSWORD=admin && \
bin/buildout -c heroku.cfg
plone-init:
plock --force --no-cache --no-virtualenv .
plone-install:
$(MAKE) install
bin/buildout
plone-db-sync:
bin/buildout -c database.cfg
plone-serve:
@echo "Zope about to handle requests here:\n\n\thttp://localhost:8080\n"
@bin/plone fg
# Python
python-clean-pyc:
find . -name \*.pyc | xargs rm -v
python-flake:
-flake8 *.py
-flake8 $(PROJECT)/*.py
-flake8 $(PROJECT)/$(APP)/*.py
python-package-check:
check-manifest
pyroma .
python-package-readme-test:
rst2html.py README.rst > readme.html; open readme.html
python-package-release:
python setup.py sdist --format=gztar,zip upload
python-package-release-test:
python setup.py sdist --format=gztar,zip upload -r test
python-package-test:
python setup.py test
python-pip-freeze:
bin/pip freeze | sort > $(TMP)/requirements.txt
mv -f $(TMP)/requirements.txt .
python-pip-install:
bin/pip install -r requirements.txt
python-serve:
@echo "\n\tServing HTTP on http://0.0.0.0:8000\n"
python -m SimpleHTTPServer
python-virtualenv:
virtualenv .
python-yapf:
-yapf -i *.py
-yapf -i -e $(PROJECT)/urls.py $(PROJECT)/*.py
-yapf -i $(PROJECT)/$(APP)/*.py
python-wc:
-wc -l *.py
-wc -l $(PROJECT)/*.py
-wc -l $(PROJECT)/$(APP)/*.py
# Sphinx
sphinx-init:
sphinx-quickstart -q -p "Python Project" -a "Alex Clark" -v 0.0.1 doc
sphinx-serve:
@echo "\nServing HTTP on http://0.0.0.0:8085 ...\n"
pushd _build/html; python -m SimpleHTTPServer 8085; popd
# Vagrant
vagrant-box-update:
vagrant box update
vagrant-clean:
vagrant destroy
vagrant-down:
vagrant suspend
vagrant-init:
vagrant init ubuntu/trusty64
vagrant up --provider virtualbox
vagrant-up:
vagrant up --provision
# Django-axes
python3.5-virtualenv:
virtualenv-3.5 .

View file

@ -1,4 +1,4 @@
__version__ = '2.3.3'
__version__ = '3.0.0'
default_app_config = 'axes.apps.AppConfig'

View file

@ -5,7 +5,9 @@ class AppConfig(apps.AppConfig):
name = 'axes'
def ready(self):
from django.contrib.auth import views as auth_views
from django.contrib.auth.views import LoginView
from django.utils.decorators import method_decorator
from axes.decorators import watch_login
auth_views.login = watch_login(auth_views.login)
LoginView.dispatch = method_decorator(watch_login)(LoginView.dispatch)

207
axes/attempts.py Normal file
View file

@ -0,0 +1,207 @@
from datetime import timedelta
from hashlib import md5
from django.contrib.auth import get_user_model
from django.core.cache import cache
from django.utils import timezone
from axes.conf import settings
from axes.models import AccessAttempt
from axes.utils import get_ip
def _query_user_attempts(request):
"""Returns access attempt record if it exists.
Otherwise return None.
"""
ip = get_ip(request)
username = request.POST.get(settings.AXES_USERNAME_FORM_FIELD, None)
if settings.AXES_ONLY_USER_FAILURES:
attempts = AccessAttempt.objects.filter(username=username)
elif settings.AXES_USE_USER_AGENT:
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
attempts = AccessAttempt.objects.filter(
user_agent=ua, ip_address=ip, username=username, trusted=True
)
else:
attempts = AccessAttempt.objects.filter(
ip_address=ip, username=username, trusted=True
)
if not attempts:
params = {'trusted': False}
if settings.AXES_ONLY_USER_FAILURES:
params['username'] = username
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
params['username'] = username
params['ip_address'] = ip
else:
params['ip_address'] = ip
if settings.AXES_USE_USER_AGENT:
params['user_agent'] = ua
attempts = AccessAttempt.objects.filter(**params)
return attempts
def get_cache_key(request_or_obj):
"""
Build cache key name from request or AccessAttempt object.
:param request_or_obj: Request or AccessAttempt object
:return cache-key: String, key to be used in cache system
"""
if isinstance(request_or_obj, AccessAttempt):
ip = request_or_obj.ip_address
un = request_or_obj.username
ua = request_or_obj.user_agent
else:
ip = get_ip(request_or_obj)
un = request_or_obj.POST.get(settings.AXES_USERNAME_FORM_FIELD, None)
ua = request_or_obj.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
ip = ip.encode('utf-8') if ip else ''.encode('utf-8')
un = un.encode('utf-8') if un else ''.encode('utf-8')
ua = ua.encode('utf-8') if ua else ''.encode('utf-8')
if settings.AXES_ONLY_USER_FAILURES:
attributes = un
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
attributes = ip + un
else:
attributes = ip
if settings.AXES_USE_USER_AGENT:
attributes += ua
cache_hash_key = 'axes-{}'.format(md5(attributes).hexdigest())
return cache_hash_key
def get_cache_timeout():
"""Returns timeout according to COOLOFF_TIME."""
cache_timeout = None
cool_off = settings.AXES_COOLOFF_TIME
if cool_off:
if (isinstance(cool_off, int) or isinstance(cool_off, float)):
cache_timeout = timedelta(hours=cool_off).total_seconds()
else:
cache_timeout = cool_off.total_seconds()
return cache_timeout
def get_user_attempts(request):
force_reload = False
attempts = _query_user_attempts(request)
cache_hash_key = get_cache_key(request)
cache_timeout = get_cache_timeout()
cool_off = settings.AXES_COOLOFF_TIME
if cool_off:
if (isinstance(cool_off, int) or isinstance(cool_off, float)):
cool_off = timedelta(hours=cool_off)
for attempt in attempts:
if attempt.attempt_time + cool_off < timezone.now():
if attempt.trusted:
attempt.failures_since_start = 0
attempt.save()
cache.set(cache_hash_key, 0, cache_timeout)
else:
attempt.delete()
force_reload = True
failures_cached = cache.get(cache_hash_key)
if failures_cached is not None:
cache.set(
cache_hash_key, failures_cached - 1, cache_timeout
)
# If objects were deleted, we need to update the queryset to reflect this,
# so force a reload.
if force_reload:
attempts = _query_user_attempts(request)
return attempts
def ip_in_whitelist(ip):
if not settings.AXES_IP_WHITELIST:
return False
return ip in settings.AXES_IP_WHITELIST
def ip_in_blacklist(ip):
if not settings.AXES_IP_BLACKLIST:
return False
return ip in settings.AXES_IP_BLACKLIST
def is_user_lockable(request):
"""Check if the user has a profile with nolockout
If so, then return the value to see if this user is special
and doesn't get their account locked out
"""
if hasattr(request.user, 'nolockout'):
return not request.user.nolockout
if request.method != 'POST':
return True
try:
field = getattr(get_user_model(), 'USERNAME_FIELD', 'username')
kwargs = {
field: request.POST.get(settings.AXES_USERNAME_FORM_FIELD)
}
user = get_user_model().objects.get(**kwargs)
if hasattr(user, 'nolockout'):
# need to invert since we need to return
# false for users that can't be blocked
return not user.nolockout
except get_user_model().DoesNotExist:
# not a valid user
return True
# Default behavior for a user to be lockable
return True
def is_already_locked(request):
ip = get_ip(request)
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip):
return False
if settings.AXES_ONLY_WHITELIST and not ip_in_whitelist(ip):
return True
if ip_in_blacklist(ip):
return True
if not is_user_lockable(request):
return False
cache_hash_key = get_cache_key(request)
failures_cached = cache.get(cache_hash_key)
if failures_cached is not None:
return (
failures_cached >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE
)
else:
for attempt in get_user_attempts(request):
if (
attempt.failures_since_start >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE
):
return True
return False

57
axes/conf.py Normal file
View file

@ -0,0 +1,57 @@
from django.conf import settings
from appconf import AppConf
class MyAppConf(AppConf):
# see if the django app is sitting behind a reverse proxy
BEHIND_REVERSE_PROXY = False
# if we are behind a proxy, we need to know how many proxies there are
NUM_PROXIES = 0
# behind a reverse proxy, look for the ip address using this value
REVERSE_PROXY_HEADER = 'HTTP_X_FORWARDED_FOR'
# see if the user has overridden the failure limit
FAILURE_LIMIT = 3
# see if the user has set axes to lock out logins after failure limit
LOCK_OUT_AT_FAILURE = True
USE_USER_AGENT = False
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = 'username'
# use a specific password field to retrieve from login POST data
PASSWORD_FORM_FIELD = 'password'
# only check user name and not location or user_agent
ONLY_USER_FAILURES = False
# lock out user from particular IP based on combination USER+IP
LOCK_OUT_BY_COMBINATION_USER_AND_IP = False
DISABLE_ACCESS_LOG = False
DISABLE_SUCCESS_ACCESS_LOG = False
LOGGER = 'axes.watch_login'
LOCKOUT_TEMPLATE = None
LOCKOUT_URL = None
COOLOFF_TIME = None
VERBOSE = True
# whitelist and blacklist
# TODO: convert the strings to IPv4 on startup to avoid type conversion during processing
NEVER_LOCKOUT_WHITELIST = False
ONLY_WHITELIST = False
IP_WHITELIST = None
IP_BLACKLIST = None

View file

@ -1,642 +1,92 @@
from datetime import timedelta
import json
import logging
from socket import inet_pton, AF_INET6, error
from hashlib import md5
from django.contrib.auth import get_user_model
from django.contrib.auth import logout
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.shortcuts import render
from django.utils import six
from django.utils import timezone as datetime
from django.core.cache import cache
from axes.models import AccessAttempt
from axes.models import AccessLog
from axes.settings import *
from axes.signals import user_locked_out
from axes import get_version
from axes.conf import settings
from axes.attempts import is_already_locked
from axes.utils import iso8601
import axes
from axes.signals import * # load all signals
log = logging.getLogger(LOGGER)
if VERBOSE:
log = logging.getLogger(settings.AXES_LOGGER)
if settings.AXES_VERBOSE:
log.info('AXES: BEGIN LOG')
log.info('AXES: Using django-axes ' + axes.get_version())
if AXES_ONLY_USER_FAILURES:
log.info('AXES: Using django-axes ' + get_version())
if settings.AXES_ONLY_USER_FAILURES:
log.info('AXES: blocking by username only.')
elif LOCK_OUT_BY_COMBINATION_USER_AND_IP:
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
log.info('AXES: blocking by combination of username and IP.')
else:
log.info('AXES: blocking by IP only.')
if BEHIND_REVERSE_PROXY:
if settings.AXES_BEHIND_REVERSE_PROXY:
log.debug('AXES: Axes is configured to be behind reverse proxy')
log.debug('AXES: Looking for header value %s', REVERSE_PROXY_HEADER)
log.debug(
'AXES: Looking for header value %s', settings.AXES_REVERSE_PROXY_HEADER
)
log.debug(
'AXES: Number of proxies configured: {} '
'(please check this if you are using a custom header)'.format(
NUM_PROXIES
settings.AXES_NUM_PROXIES
)
)
def get_client_str(username, ip_address, user_agent=None, path_info=None):
if VERBOSE:
if isinstance(path_info, tuple):
path_info = path_info[0]
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
return details.format(username, ip_address, user_agent, path_info)
if AXES_ONLY_USER_FAILURES:
client = username
elif LOCK_OUT_BY_COMBINATION_USER_AND_IP:
client = '{0} from {1}'.format(username, ip_address)
else:
client = ip_address
if USE_USER_AGENT:
return client + '(user-agent={0})'.format(user_agent)
return client
def log_successful_attempt(username, ip_address,
user_agent=None, path_info=None):
client = get_client_str(username, ip_address, user_agent, path_info)
msg = 'AXES: Successful login by {0}. Creating access record.'
log.info(msg.format(client))
def log_initial_attempt(username, ip_address, user_agent, path_info):
client = get_client_str(username, ip_address, user_agent, path_info)
msg = 'AXES: New login failure by {0}. Creating access record.'
log.info(msg.format(client))
def log_repeated_attempt(username, ip_address, user_agent, path_info,
fail_count):
client = get_client_str(username, ip_address, user_agent, path_info)
fail_msg = 'AXES: Repeated login failure by {0}. Updating access record.'
count_msg = 'Count = {0} of {1}'.format(fail_count, FAILURE_LIMIT)
log.info('{0} {1}'.format(fail_msg.format(client), count_msg))
def log_lockout(username, ip_address, user_agent, path_info):
client = get_client_str(username, ip_address, user_agent, path_info)
msg = 'AXES: locked out {0} after repeated login attempts.'
log.warn(msg.format(client))
def log_decorated_call(func, args=None, kwargs=None):
log.info('AXES: Calling decorated function: %s' % func.__name__)
if args:
log.info('args: %s' % str(args))
if kwargs:
log.info('kwargs: %s' % kwargs)
def is_ipv6(ip):
try:
inet_pton(AF_INET6, ip)
except (OSError, error):
return False
return True
def get_ip(request):
"""Parse IP address from REMOTE_ADDR or
AXES_REVERSE_PROXY_HEADER if AXES_BEHIND_REVERSE_PROXY is set."""
if BEHIND_REVERSE_PROXY:
# For requests originating from behind a reverse proxy,
# resolve the IP address from the given AXES_REVERSE_PROXY_HEADER.
# AXES_REVERSE_PROXY_HEADER defaults to HTTP_X_FORWARDED_FOR,
# which is the Django name for the HTTP X-Forwarder-For header.
# Please see RFC7239 for additional information:
# https://tools.ietf.org/html/rfc7239#section-5
# The REVERSE_PROXY_HEADER HTTP header is a list
# of potentionally unsecure IPs, for example:
# X-Forwarded-For: 1.1.1.1, 11.11.11.11:8080, 111.111.111.111
ip_str = request.META.get(REVERSE_PROXY_HEADER, '')
# We need to know the number of proxies present in the request chain
# in order to securely calculate the one IP that is the real client IP.
#
# This is because IP headers can have multiple IPs in different
# configurations, with e.g. the X-Forwarded-For header containing
# the originating client IP, proxies and possibly spoofed values.
#
# If you are using a special header for client calculation such as the
# X-Real-IP or the like with nginx, please check this configuration.
#
# Please see discussion for more information:
# https://github.com/jazzband/django-axes/issues/224
ip_list = [ip.strip() for ip in ip_str.split(',')]
# Pick the nth last IP in the given list of addresses after parsing
if len(ip_list) >= NUM_PROXIES:
ip = ip_list[-NUM_PROXIES]
# Fix IIS adding client port number to the
# 'X-Forwarded-For' header (strip port)
if not is_ipv6(ip):
ip = ip.split(':', 1)[0]
# If nth last is not found, default to no IP and raise a warning
else:
ip = ''
raise Warning(
'AXES: Axes is configured for operation behind a '
'reverse proxy but received too few IPs in the HTTP '
'AXES_REVERSE_PROXY_HEADER. Check your '
'AXES_NUM_PROXIES configuration. '
'Header name: {0}, value: {1}'.format(
REVERSE_PROXY_HEADER, ip_str
)
)
if not ip:
raise Warning(
'AXES: Axes is configured for operation behind a reverse '
'proxy but could not find a suitable IP in the specified '
'HTTP header. Check your proxy server settings to make '
'sure correct headers are being passed to Django in '
'AXES_REVERSE_PROXY_HEADER. '
'Header name: {0}, value: {1}'.format(
REVERSE_PROXY_HEADER, ip_str
)
)
return ip
return request.META.get('REMOTE_ADDR', '')
def query2str(items, max_length=1024):
"""Turns a dictionary into an easy-to-read list of key-value pairs.
If there's a field called "password" it will be excluded from the output.
The length of the output is limited to max_length to avoid a DoS attack
via excessively large payloads.
"""
return '\n'.join([
'%s=%s' % (k, v) for k, v in six.iteritems(items)
if k != PASSWORD_FORM_FIELD
][:int(max_length / 2)])[:max_length]
def ip_in_whitelist(ip):
if IP_WHITELIST is not None:
return ip in IP_WHITELIST
return False
def ip_in_blacklist(ip):
if IP_BLACKLIST is not None:
return ip in IP_BLACKLIST
return False
def is_user_lockable(request):
"""Check if the user has a profile with nolockout
If so, then return the value to see if this user is special
and doesn't get their account locked out
"""
if hasattr(request.user, 'nolockout'):
return not request.user.nolockout
if request.method != 'POST':
return True
try:
field = getattr(get_user_model(), 'USERNAME_FIELD', 'username')
kwargs = {
field: request.POST.get(USERNAME_FORM_FIELD)
}
user = get_user_model().objects.get(**kwargs)
if hasattr(user, 'nolockout'):
# need to invert since we need to return
# false for users that can't be blocked
return not user.nolockout
except get_user_model().DoesNotExist:
# not a valid user
return True
# Default behavior for a user to be lockable
return True
def _get_user_attempts(request):
"""Returns access attempt record if it exists.
Otherwise return None.
"""
ip = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
if AXES_ONLY_USER_FAILURES:
attempts = AccessAttempt.objects.filter(username=username)
elif USE_USER_AGENT:
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
attempts = AccessAttempt.objects.filter(
user_agent=ua, ip_address=ip, username=username, trusted=True
)
else:
attempts = AccessAttempt.objects.filter(
ip_address=ip, username=username, trusted=True
)
if not attempts:
params = {'trusted': False}
if AXES_ONLY_USER_FAILURES:
params['username'] = username
elif LOCK_OUT_BY_COMBINATION_USER_AND_IP:
params['username'] = username
params['ip_address'] = ip
else:
params['ip_address'] = ip
if USE_USER_AGENT:
params['user_agent'] = ua
attempts = AccessAttempt.objects.filter(**params)
return attempts
def get_user_attempts(request):
objects_deleted = False
attempts = _get_user_attempts(request)
cache_hash_key = get_cache_key(request)
cache_timeout = get_cache_timeout()
if COOLOFF_TIME:
for attempt in attempts:
if attempt.attempt_time + COOLOFF_TIME < datetime.now():
if attempt.trusted:
attempt.failures_since_start = 0
attempt.save()
cache.set(cache_hash_key, 0, cache_timeout)
else:
attempt.delete()
objects_deleted = True
failures_cached = cache.get(cache_hash_key)
if failures_cached is not None:
cache.set(cache_hash_key,
failures_cached - 1,
cache_timeout)
# If objects were deleted, we need to update the queryset to reflect this,
# so force a reload.
if objects_deleted:
attempts = _get_user_attempts(request)
return attempts
def is_login_failed(response):
return (
response and
not response.has_header('location') and
response.status_code != 302
)
def is_ajax_login_failed(response):
return (
response and
response.status_code != 302 and
response.status_code != 200
)
def watch_login(func):
"""
Used to decorate the django.contrib.admin.site.login method.
"""
# Don't decorate multiple times
if func.__name__ == 'decorated_login':
return func
def decorated_login(request, *args, **kwargs):
# share some useful information
if func.__name__ != 'decorated_login' and VERBOSE:
log_decorated_call(func, args, kwargs)
# TODO: create a class to hold the attempts records and perform checks
# with its methods? or just store attempts=get_user_attempts here and
# pass it to the functions
# also no need to keep accessing these:
# ip = request.META.get('REMOTE_ADDR', '')
# ua = request.META.get('HTTP_USER_AGENT', '<unknown>')
# username = request.POST.get(USERNAME_FORM_FIELD, None)
# if the request is currently under lockout, do not proceed to the
def inner(request, *args, **kwargs):
# If the request is currently under lockout, do not proceed to the
# login function, go directly to lockout url, do not pass go, do not
# collect messages about this login attempt
if is_already_locked(request):
return lockout_response(request)
# call the login function
response = func(request, *args, **kwargs)
return func(request, *args, **kwargs)
if func.__name__ == 'decorated_login':
# if we're dealing with this function itself, don't bother checking
# for invalid login attempts. I suppose there's a bunch of
# recursion going on here that used to cause one failed login
# attempt to generate 10+ failed access attempt records (with 3
# failed attempts each supposedly)
return response
if request.method == 'POST':
# see if the login was successful
if request.is_ajax():
login_unsuccessful = is_ajax_login_failed(response)
else:
login_unsuccessful = is_login_failed(response)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
if not DISABLE_ACCESS_LOG:
username = request.POST.get(USERNAME_FORM_FIELD, None)
ip_address = get_ip(request)
if login_unsuccessful or not DISABLE_SUCCESS_ACCESS_LOG:
AccessLog.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
http_accept=http_accept,
path_info=path_info,
trusted=not login_unsuccessful,
)
if not login_unsuccessful and not DISABLE_SUCCESS_ACCESS_LOG:
log_successful_attempt(username, ip_address,
user_agent, path_info)
if check_request(request, login_unsuccessful):
return response
return lockout_response(request)
return response
return decorated_login
return inner
def lockout_response(request):
context = {
'failure_limit': FAILURE_LIMIT,
'username': request.POST.get(USERNAME_FORM_FIELD, '')
'failure_limit': settings.AXES_FAILURE_LIMIT,
'username': request.POST.get(settings.AXES_USERNAME_FORM_FIELD, '')
}
if request.is_ajax():
if COOLOFF_TIME:
context.update({'cooloff_time': iso8601(COOLOFF_TIME)})
cool_off = settings.AXES_COOLOFF_TIME
if cool_off:
if (isinstance(cool_off, int) or isinstance(cool_off, float)):
cool_off = timedelta(hours=cool_off)
context.update({
'cooloff_time': iso8601(cool_off)
})
if request.is_ajax():
return HttpResponse(
json.dumps(context),
content_type='application/json',
status=403,
)
elif LOCKOUT_TEMPLATE:
if COOLOFF_TIME:
context.update({'cooloff_time': iso8601(COOLOFF_TIME)})
elif settings.AXES_LOCKOUT_TEMPLATE:
return render(
request, settings.AXES_LOCKOUT_TEMPLATE, context, status=403
)
return render(request, LOCKOUT_TEMPLATE, context, status=403)
elif LOCKOUT_URL:
return HttpResponseRedirect(LOCKOUT_URL)
elif settings.AXES_LOCKOUT_URL:
return HttpResponseRedirect(settings.AXES_LOCKOUT_URL)
else:
msg = 'Account locked: too many login attempts. {0}'
if COOLOFF_TIME:
if settings.AXES_COOLOFF_TIME:
msg = msg.format('Please try again later.')
else:
msg = msg.format('Contact an admin to unlock your account.')
return HttpResponse(msg, status=403)
def is_already_locked(request):
ip = get_ip(request)
if NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip):
return False
if ONLY_WHITELIST and not ip_in_whitelist(ip):
return True
if ip_in_blacklist(ip):
return True
if not is_user_lockable(request):
return False
cache_hash_key = get_cache_key(request)
failures_cached = cache.get(cache_hash_key)
if failures_cached is not None:
return failures_cached >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE
else:
for attempt in get_user_attempts(request):
if attempt.failures_since_start >= FAILURE_LIMIT and \
LOCK_OUT_AT_FAILURE:
return True
return False
def check_request(request, login_unsuccessful):
ip_address = get_ip(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
failures = 0
attempts = get_user_attempts(request)
cache_hash_key = get_cache_key(request)
cache_timeout = get_cache_timeout()
failures_cached = cache.get(cache_hash_key)
if failures_cached is not None:
failures = failures_cached
else:
for attempt in attempts:
failures = max(failures, attempt.failures_since_start)
if login_unsuccessful:
# add a failed attempt for this user
failures += 1
cache.set(cache_hash_key, failures, cache_timeout)
# Create an AccessAttempt record if the login wasn't successful
# has already attempted, update the info
if len(attempts):
for attempt in attempts:
attempt.get_data = '%s\n---------\n%s' % (
attempt.get_data,
query2str(request.GET),
)
attempt.post_data = '%s\n---------\n%s' % (
attempt.post_data,
query2str(request.POST)
)
attempt.http_accept = \
request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
attempt.path_info = path_info
attempt.failures_since_start = failures
attempt.attempt_time = datetime.now()
attempt.save()
log_repeated_attempt(username, ip_address,
user_agent, path_info, failures)
else:
create_new_failure_records(request, failures)
else:
# user logged in -- forget the failed attempts
failures = 0
trusted_record_exists = False
for attempt in attempts:
if not attempt.trusted:
attempt.delete()
failures_cached = cache.get(cache_hash_key)
if failures_cached is not None:
cache.set(cache_hash_key,
failures_cached - 1,
cache_timeout)
else:
trusted_record_exists = True
attempt.failures_since_start = 0
attempt.save()
cache.set(cache_hash_key, 0, cache_timeout)
if trusted_record_exists is False:
create_new_trusted_record(request)
if NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address):
return True
user_lockable = is_user_lockable(request)
# no matter what, we want to lock them out if they're past the number of
# attempts allowed, unless the user is set to notlockable
if failures >= FAILURE_LIMIT and LOCK_OUT_AT_FAILURE and user_lockable:
# We log them out in case they actually managed to enter the correct
# password
if hasattr(request, 'user') and request.user.is_authenticated():
logout(request)
username = request.POST.get(USERNAME_FORM_FIELD, None)
log_lockout(username, ip_address, user_agent, path_info)
# send signal when someone is locked out.
user_locked_out.send(
'axes', request=request, username=username, ip_address=ip_address
)
# if a trusted login has violated lockout, revoke trust
for attempt in [a for a in attempts if a.trusted]:
attempt.delete()
create_new_failure_records(request, failures)
return False
return True
def create_new_failure_records(request, failures):
ip = get_ip(request)
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
username = request.POST.get(USERNAME_FORM_FIELD, None)
path_info = request.META.get('PATH_INFO', '<unknown>'),
# Record failed attempt. Whether or not the IP address or user agent is
# used in counting failures is handled elsewhere, so we just record
# everything here.
AccessAttempt.objects.create(
user_agent=ua,
ip_address=ip,
username=username,
get_data=query2str(request.GET),
post_data=query2str(request.POST),
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=path_info,
failures_since_start=failures,
)
username = request.POST.get(USERNAME_FORM_FIELD, None)
log_initial_attempt(username, ip, ua, path_info)
def create_new_trusted_record(request):
ip = get_ip(request)
ua = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
username = request.POST.get(USERNAME_FORM_FIELD, None)
if not username:
return False
AccessAttempt.objects.create(
user_agent=ua,
ip_address=ip,
username=username,
get_data=query2str(request.GET),
post_data=query2str(request.POST),
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
failures_since_start=0,
trusted=True
)
def get_cache_key(request_or_object):
"""
Build cache key name from request or AccessAttempt object.
:param request_or_object: Request or AccessAttempt object
:return cache-key: String, key to be used in cache system
"""
if isinstance(request_or_object, AccessAttempt):
ip = request_or_object.ip_address
un = request_or_object.username
ua = request_or_object.user_agent
else:
ip = get_ip(request_or_object)
un = request_or_object.POST.get(USERNAME_FORM_FIELD, None)
ua = request_or_object.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
ip = ip.encode('utf-8') if ip else ''.encode('utf-8')
un = un.encode('utf-8') if un else ''.encode('utf-8')
ua = ua.encode('utf-8') if ua else ''.encode('utf-8')
if AXES_ONLY_USER_FAILURES:
attributes = un
elif LOCK_OUT_BY_COMBINATION_USER_AND_IP:
attributes = ip+un
else:
attributes = ip
if USE_USER_AGENT:
attributes += ua
cache_hash_key = 'axes-{}'.format(md5(attributes).hexdigest())
return cache_hash_key
def get_cache_timeout():
"Returns timeout according to COOLOFF_TIME."
cache_timeout = None
if COOLOFF_TIME:
cache_timeout = COOLOFF_TIME.total_seconds()
return cache_timeout

View file

@ -1,5 +1,4 @@
from django.db import models
from django.utils import six
class CommonAccess(models.Model):
@ -64,8 +63,8 @@ class AccessAttempt(CommonAccess):
def failures(self):
return self.failures_since_start
def __unicode__(self):
return six.u('Attempted Access: %s') % self.attempt_time
def __str__(self):
return 'Attempted Access: %s' % self.attempt_time
class AccessLog(CommonAccess):
@ -74,7 +73,5 @@ class AccessLog(CommonAccess):
blank=True,
)
def __unicode__(self):
return six.u('Access Log for %s @ %s') % (
self.username, self.attempt_time
)
def __str__(self):
return 'Access Log for %s @ %s' % (self.username, self.attempt_time)

View file

@ -1,61 +0,0 @@
from datetime import timedelta
from django.conf import settings
# see if the user has overridden the failure limit
FAILURE_LIMIT = getattr(settings, 'AXES_LOGIN_FAILURE_LIMIT', 3)
# see if the user has set axes to lock out logins after failure limit
LOCK_OUT_AT_FAILURE = getattr(settings, 'AXES_LOCK_OUT_AT_FAILURE', True)
USE_USER_AGENT = getattr(settings, 'AXES_USE_USER_AGENT', False)
# use a specific username field to retrieve from login POST data
USERNAME_FORM_FIELD = getattr(settings, 'AXES_USERNAME_FORM_FIELD', 'username')
# use a specific password field to retrieve from login POST data
PASSWORD_FORM_FIELD = getattr(settings, 'AXES_PASSWORD_FORM_FIELD', 'password')
# only check user name and not location or user_agent
AXES_ONLY_USER_FAILURES = getattr(settings, 'AXES_ONLY_USER_FAILURES', False)
# see if the django app is sitting behind a reverse proxy
BEHIND_REVERSE_PROXY = getattr(settings, 'AXES_BEHIND_REVERSE_PROXY', False)
# if we are behind a proxy, we need to know how many proxies there are
NUM_PROXIES = getattr(settings, 'AXES_NUM_PROXIES', 0)
# if the django app is behind a reverse proxy, look for the ip address using this HTTP header value
REVERSE_PROXY_HEADER = \
getattr(settings, 'AXES_REVERSE_PROXY_HEADER', 'HTTP_X_FORWARDED_FOR')
# lock out user from particular IP based on combination USER+IP
LOCK_OUT_BY_COMBINATION_USER_AND_IP = \
getattr(settings, 'AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP', False)
COOLOFF_TIME = getattr(settings, 'AXES_COOLOFF_TIME', None)
if (isinstance(COOLOFF_TIME, int) or isinstance(COOLOFF_TIME, float)):
COOLOFF_TIME = timedelta(hours=COOLOFF_TIME)
DISABLE_ACCESS_LOG = getattr(settings, 'AXES_DISABLE_ACCESS_LOG', False)
DISABLE_SUCCESS_ACCESS_LOG = getattr(settings, 'AXES_DISABLE_SUCCESS_ACCESS_LOG', False)
LOGGER = getattr(settings, 'AXES_LOGGER', 'axes.watch_login')
LOCKOUT_TEMPLATE = getattr(settings, 'AXES_LOCKOUT_TEMPLATE', None)
LOCKOUT_URL = getattr(settings, 'AXES_LOCKOUT_URL', None)
VERBOSE = getattr(settings, 'AXES_VERBOSE', True)
# whitelist and blacklist
# TODO: convert the strings to IPv4 on startup to avoid type conversion during processing
NEVER_LOCKOUT_WHITELIST = \
getattr(settings, 'AXES_NEVER_LOCKOUT_WHITELIST', False)
ONLY_WHITELIST = getattr(settings, 'AXES_ONLY_ALLOW_WHITELIST', False)
IP_WHITELIST = getattr(settings, 'AXES_IP_WHITELIST', None)
IP_BLACKLIST = getattr(settings, 'AXES_IP_BLACKLIST', None)

View file

@ -1,38 +1,162 @@
import logging
from django.contrib.auth.signals import user_logged_in
from django.contrib.auth.signals import user_logged_out
from django.contrib.auth.signals import user_login_failed
from django.core.cache import cache
from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver
from django.dispatch import Signal
from django.utils.timezone import now
from django.contrib.auth.signals import user_logged_out
from django.db.models.signals import post_save, post_delete
from django.core.cache import cache
from django.utils import timezone
from axes.conf import settings
from axes.attempts import get_cache_key
from axes.attempts import get_cache_timeout
from axes.attempts import get_user_attempts
from axes.attempts import is_user_lockable
from axes.attempts import ip_in_whitelist
from axes.models import AccessLog, AccessAttempt
from axes.settings import DISABLE_ACCESS_LOG
from axes.utils import get_client_str
from axes.utils import get_ip
from axes.utils import query2str
log = logging.getLogger(settings.AXES_LOGGER)
user_locked_out = Signal(providing_args=['request', 'username', 'ip_address'])
if not DISABLE_ACCESS_LOG:
@receiver(user_logged_out)
def log_user_lockout(sender, request, user, signal, *args, **kwargs):
""" When a user logs out, update the access log
"""
if not user:
return
access_logs = AccessLog.objects.filter(
@receiver(user_login_failed)
def log_user_login_failed(sender, credentials, request, **kwargs):
""" Create an AccessAttempt record if the login wasn't successful
"""
ip_address = get_ip(request)
username = credentials['username']
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
if settings.AXES_NEVER_LOCKOUT_WHITELIST and ip_in_whitelist(ip_address):
return
failures = 1
attempts = get_user_attempts(request)
cache_hash_key = get_cache_key(request)
cache_timeout = get_cache_timeout()
failures_cached = cache.get(cache_hash_key)
if failures_cached is not None:
failures = failures_cached
else:
for attempt in attempts:
failures = max(failures, attempt.failures_since_start)
# add a failed attempt for this user
failures += 1
cache.set(cache_hash_key, failures, cache_timeout)
# has already attempted, update the info
if len(attempts):
for attempt in attempts:
attempt.get_data = '%s\n---------\n%s' % (
attempt.get_data,
query2str(request.GET),
)
attempt.post_data = '%s\n---------\n%s' % (
attempt.post_data,
query2str(request.POST)
)
attempt.http_accept = http_accept
attempt.path_info = path_info
attempt.failures_since_start = failures
attempt.attempt_time = timezone.now()
attempt.save()
fail_msg = 'AXES: Repeated login failure by {0}.'.format(
get_client_str(username, ip_address, user_agent, path_info)
)
count_msg = 'Count = {0} of {1}'.format(
failures, settings.AXES_FAILURE_LIMIT
)
log.info('{0} {1}'.format(fail_msg, count_msg))
else:
# Record failed attempt. Whether or not the IP address or user agent is
# used in counting failures is handled elsewhere, so we just record
# everything here.
AccessAttempt.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
get_data=query2str(request.GET),
post_data=query2str(request.POST),
http_accept=http_accept,
path_info=path_info,
failures_since_start=failures,
)
log.info(
'AXES: New login failure by {0}. Creating access record.'.format(
get_client_str(username, ip_address, user_agent, path_info)
)
)
# no matter what, we want to lock them out if they're past the number of
# attempts allowed, unless the user is set to notlockable
if (
failures >= settings.AXES_FAILURE_LIMIT and
settings.AXES_LOCK_OUT_AT_FAILURE and
is_user_lockable(request)
):
log.warn('AXES: locked out {0} after repeated login attempts.'.format(
get_client_str(username, ip_address, user_agent, path_info)
))
# send signal when someone is locked out.
user_locked_out.send(
'axes', request=request, username=username, ip_address=ip_address
)
@receiver(user_logged_in)
def log_user_logged_in(sender, request, user, **kwargs):
""" When a user logs in, update the access log
"""
username = user.get_username()
ip_address = get_ip(request)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
log.info('AXES: Successful login by {0}.'.format(
get_client_str(username, ip_address, user_agent, path_info)
))
if not settings.AXES_DISABLE_SUCCESS_ACCESS_LOG:
AccessLog.objects.create(
user_agent=user_agent,
ip_address=ip_address,
username=username,
http_accept=http_accept,
path_info=path_info,
trusted=True,
)
@receiver(user_logged_out)
def log_user_logged_out(sender, request, user, **kwargs):
""" When a user logs out, update the access log
"""
log.info('AXES: Successful logout by {0}.'.format(user))
if user and not settings.AXES_DISABLE_ACCESS_LOG:
AccessLog.objects.filter(
username=user.get_username(),
logout_time__isnull=True,
).order_by('-attempt_time')[0:1]
if access_logs:
access_log = access_logs[0]
access_log.logout_time = now()
access_log.save()
).update(logout_time=timezone.now())
@receiver(post_save, sender=AccessAttempt)
def update_cache_after_save(instance, **kwargs):
from axes.decorators import get_cache_timeout, get_cache_key
cache_hash_key = get_cache_key(instance)
if not cache.get(cache_hash_key):
cache_timeout = get_cache_timeout()
@ -41,6 +165,5 @@ def update_cache_after_save(instance, **kwargs):
@receiver(post_delete, sender=AccessAttempt)
def delete_cache_after_delete(instance, **kwargs):
from axes.decorators import get_cache_key
cache_hash_key = get_cache_key(instance)
cache.delete(cache_hash_key)

View file

@ -5,6 +5,12 @@ DATABASES = {
}
}
CACHES = {
'default': {
'BACKEND': 'django.core.cache.backends.dummy.DummyCache'
}
}
SITE_ID = 1
MIDDLEWARE_CLASSES = (
@ -42,6 +48,23 @@ TEMPLATES = [
},
]
LOGGING = {
'version': 1,
'disable_existing_loggers': False,
'handlers': {
'console': {
'class': 'logging.StreamHandler',
},
},
'loggers': {
'axes': {
'handlers': ['console'],
'level': 'INFO',
'propagate': False,
},
},
}
SECRET_KEY = 'too-secret-for-test'
USE_I18N = False

View file

@ -1,5 +0,0 @@
from .test_settings import *
AXES_BEHIND_REVERSE_PROXY = True
AXES_REVERSE_PROXY_HEADER = 'HTTP_X_FORWARDED_FOR'
AXES_NUM_PROXIES = 2

View file

@ -1,3 +0,0 @@
from .test_settings import *
AXES_BEHIND_REVERSE_PROXY = True

View file

@ -1,3 +0,0 @@
from .test_settings_proxy import *
AXES_REVERSE_PROXY_HEADER = 'HTTP_X_AXES_CUSTOM_HEADER'

View file

@ -1,16 +1,6 @@
from django.conf.urls import url
from django.contrib import admin
try:
# django < 1.10
from django.conf.urls import patterns
from django.conf.urls import include
urlpatterns = patterns(
'',
url(r'^admin/', include(admin.site.urls)),
)
except ImportError:
urlpatterns = [
url(r'^admin/', admin.site.urls),
]
urlpatterns = [
url(r'^admin/', admin.site.urls),
]

File diff suppressed because it is too large Load diff

0
axes/tests/__init__.py Normal file
View file

View file

@ -0,0 +1,381 @@
from unittest.mock import patch
import datetime
import hashlib
import json
import random
import string
import time
from django.test import TestCase, override_settings
from django.urls import reverse
from django.contrib.auth.models import User
from django.test.client import RequestFactory
from axes.conf import settings
from axes.attempts import get_cache_key
from axes.models import AccessAttempt, AccessLog
from axes.signals import user_locked_out
from axes.utils import reset
@override_settings(AXES_COOLOFF_TIME=datetime.timedelta(seconds=2))
class AccessAttemptTest(TestCase):
"""Test case using custom settings for testing
"""
VALID_USERNAME = 'valid-username'
VALID_PASSWORD = 'valid-password'
LOCKED_MESSAGE = 'Account locked: too many login attempts.'
LOGIN_FORM_KEY = '<input type="submit" value="Log in" />'
def _login(self, is_valid_username=False, is_valid_password=False,
is_json=False, **kwargs):
"""Login a user. A valid credential is used when is_valid_username is True,
otherwise it will use a random string to make a failed login.
"""
if is_valid_username:
# Use a valid username
username = self.VALID_USERNAME
else:
# Generate a wrong random username
chars = string.ascii_uppercase + string.digits
username = ''.join(random.choice(chars) for x in range(10))
if is_valid_password:
password = self.VALID_PASSWORD
else:
password = 'invalid-password'
headers = {
'user_agent': 'test-browser'
}
post_data = {
'username': username,
'password': password,
'this_is_the_login_form': 1,
}
post_data.update(kwargs)
if is_json:
headers.update({
'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest',
'content_type': 'application/json',
})
post_data = json.dumps(post_data)
response = self.client.post(
reverse('admin:login'), post_data, **headers
)
return response
def setUp(self):
"""Create a valid user for login
"""
self.user = User.objects.create_superuser(
username=self.VALID_USERNAME,
email='test@example.com',
password=self.VALID_PASSWORD,
)
def test_failure_limit_once(self):
"""Tests the login lock trying to login one more time
than failure limit
"""
# test until one try before the limit
for i in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
def test_failure_limit_many(self):
"""Tests the login lock trying to login a lot of times more
than failure limit
"""
for i in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login()
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# We should get a locked message each time we try again
for i in range(0, random.randrange(1, settings.AXES_FAILURE_LIMIT)):
response = self._login()
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
def test_valid_login(self):
"""Tests a valid login for a real username
"""
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302)
def test_valid_logout(self):
"""Tests a valid logout and make sure the logout_time is updated
"""
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertEquals(AccessLog.objects.latest('id').logout_time, None)
response = self.client.get(reverse('admin:logout'))
self.assertNotEquals(AccessLog.objects.latest('id').logout_time, None)
self.assertContains(response, 'Logged out')
def test_cooling_off(self):
"""Tests if the cooling time allows a user to login
"""
self.test_failure_limit_once()
# Wait for the cooling off period
time.sleep(settings.AXES_COOLOFF_TIME.total_seconds())
# It should be possible to login again, make sure it is.
self.test_valid_login()
def test_cooling_off_for_trusted_user(self):
"""Test the cooling time for a trusted user
"""
# Test successful login-logout, this makes the user trusted.
self.test_valid_logout()
# Try the cooling off time
self.test_cooling_off()
def test_long_user_agent_valid(self):
"""Tests if can handle a long user agent
"""
long_user_agent = 'ie6' * 1024
response = self._login(
is_valid_username=True,
is_valid_password=True,
user_agent=long_user_agent,
)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302)
def test_long_user_agent_not_valid(self):
"""Tests if can handle a long user agent with failure
"""
long_user_agent = 'ie6' * 1024
for i in range(0, settings.AXES_FAILURE_LIMIT + 1):
response = self._login(user_agent=long_user_agent)
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
def test_reset_ip(self):
"""Tests if can reset an ip address
"""
# Make a lockout
self.test_failure_limit_once()
# Reset the ip so we can try again
reset(ip='127.0.0.1')
# Make a login attempt again
self.test_valid_login()
def test_reset_all(self):
"""Tests if can reset all attempts
"""
# Make a lockout
self.test_failure_limit_once()
# Reset all attempts so we can try again
reset()
# Make a login attempt again
self.test_valid_login()
@patch('axes.utils.get_ip', return_value='127.0.0.1')
def test_get_cache_key(self, get_ip_mock):
""" Test the cache key format"""
# Getting cache key from request
ip_address = '127.0.0.1'
cache_hash_key = 'axes-{}'.format(
hashlib.md5(ip_address.encode()).hexdigest()
)
request_factory = RequestFactory()
request = request_factory.post('/admin/login/',
data={
'username': self.VALID_USERNAME,
'password': 'test'
})
self.assertEqual(cache_hash_key, get_cache_key(request))
# Getting cache key from AccessAttempt Object
attempt = AccessAttempt(
user_agent='<unknown>',
ip_address=ip_address,
username=self.VALID_USERNAME,
get_data='',
post_data='',
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
failures_since_start=0,
)
self.assertEqual(cache_hash_key, get_cache_key(attempt))
def test_send_lockout_signal(self):
"""Test if the lockout signal is emitted
"""
# this "hack" is needed so we don't have to use global variables or python3 features
class Scope(object): pass
scope = Scope()
scope.signal_received = 0
def signal_handler(request, username, ip_address, *args, **kwargs):
scope.signal_received += 1
self.assertIsNotNone(request)
# Connect signal handler
user_locked_out.connect(signal_handler)
# Make a lockout
self.test_failure_limit_once()
self.assertEquals(scope.signal_received, 1)
reset()
# Make another lockout
self.test_failure_limit_once()
self.assertEquals(scope.signal_received, 2)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_combination_user_and_ip(self):
"""Tests the login lock with a valid username and invalid password
when AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP is True
"""
# test until one try before the limit
for i in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login(
is_valid_username=True,
is_valid_password=False,
)
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login(is_valid_username=True, is_valid_password=False)
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_only(self):
"""Tests the login lock with a valid username and invalid password
when AXES_ONLY_USER_FAILURES is True
"""
# test until one try before the limit
for i in range(1, settings.AXES_FAILURE_LIMIT):
response = self._login(
is_valid_username=True,
is_valid_password=False,
)
# Check if we are in the same login page
self.assertContains(response, self.LOGIN_FORM_KEY)
# So, we shouldn't have gotten a lock-out yet.
# But we should get one now
response = self._login(is_valid_username=True, is_valid_password=False)
self.assertContains(response, self.LOCKED_MESSAGE, status_code=403)
# reset the username only and make sure we can log in now even though
# our IP has failed each time
reset(username=AccessAttemptTest.VALID_USERNAME)
response = self._login(
is_valid_username=True,
is_valid_password=True,
)
# Check if we are still in the login page
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302)
# now create failure_limit + 1 failed logins and then we should still
# be able to login with valid_username
for i in range(1, settings.AXES_FAILURE_LIMIT + 1):
response = self._login(
is_valid_username=False,
is_valid_password=False,
)
# Check if we can still log in with valid user
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertNotContains(response, self.LOGIN_FORM_KEY, status_code=302)
def test_log_data_truncated(self):
"""Tests that query2str properly truncates data to the
max_length (default 1024)
"""
# An impossibly large post dict
extra_data = {string.ascii_letters * x: x for x in range(0, 1000)}
self._login(**extra_data)
self.assertEquals(
len(AccessAttempt.objects.latest('id').post_data), 1024
)
def test_json_response(self):
"""Tests response content type and status code for the ajax request
"""
self.test_failure_limit_once()
response = self._login(is_json=True)
self.assertEquals(response.status_code, 403)
self.assertEquals(response.get('Content-Type'), 'application/json')
@override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True)
def test_valid_logout_without_success_log(self):
AccessLog.objects.all().delete()
response = self._login(is_valid_username=True, is_valid_password=True)
response = self.client.get(reverse('admin:logout'))
self.assertEquals(AccessLog.objects.all().count(), 0)
self.assertContains(response, 'Logged out')
@override_settings(AXES_DISABLE_SUCCESS_ACCESS_LOG=True)
def test_valid_login_without_success_log(self):
"""
A valid login doesn't generate an AccessLog when
`DISABLE_SUCCESS_ACCESS_LOG=True`.
"""
AccessLog.objects.all().delete()
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertEqual(response.status_code, 302)
self.assertEqual(AccessLog.objects.all().count(), 0)
@override_settings(AXES_DISABLE_ACCESS_LOG=True)
def test_valid_logout_without_log(self):
AccessLog.objects.all().delete()
response = self._login(is_valid_username=True, is_valid_password=True)
response = self.client.get(reverse('admin:logout'))
self.assertEquals(AccessLog.objects.first().logout_time, None)
self.assertContains(response, 'Logged out')
@override_settings(AXES_DISABLE_ACCESS_LOG=True)
def test_non_valid_login_without_log(self):
"""
A non-valid login does generate an AccessLog when
`DISABLE_ACCESS_LOG=True`.
"""
AccessLog.objects.all().delete()
response = self._login(is_valid_username=True, is_valid_password=False)
self.assertEquals(response.status_code, 200)
self.assertEquals(AccessLog.objects.all().count(), 0)
@override_settings(AXES_DISABLE_ACCESS_LOG=True)
def test_check_is_not_made_on_GET(self):
AccessLog.objects.all().delete()
response = self.client.get(reverse('admin:login'))
self.assertEqual(response.status_code, 200)
response = self._login(is_valid_username=True, is_valid_password=True)
self.assertEqual(response.status_code, 302)
response = self.client.get(reverse('admin:index'))
self.assertEqual(response.status_code, 200)

View file

@ -0,0 +1,391 @@
import json
from django.test import TestCase, override_settings
from django.urls import reverse
from django.contrib.auth.models import User
from axes.conf import settings
class AccessAttemptConfigTest(TestCase):
""" This set of tests checks for lockouts under different configurations
and circumstances to prevent false positives and false negatives.
Always block attempted logins for the same user from the same IP.
Always allow attempted logins for a different user from a different IP.
"""
IP_1 = '10.1.1.1'
IP_2 = '10.2.2.2'
USER_1 = 'valid-user-1'
USER_2 = 'valid-user-2'
VALID_PASSWORD = 'valid-password'
WRONG_PASSWORD = 'wrong-password'
LOCKED_MESSAGE = 'Account locked: too many login attempts.'
LOGIN_FORM_KEY = '<input type="submit" value="Log in" />'
ALLOWED = 302
BLOCKED = 403
def _login(self, username, password, ip_addr='127.0.0.1',
is_json=False, **kwargs):
"""Login a user and get the response.
IP address can be configured to test IP blocking functionality.
"""
headers = {
'user_agent': 'test-browser'
}
post_data = {
'username': username,
'password': password,
'this_is_the_login_form': 1,
}
post_data.update(kwargs)
if is_json:
headers.update({
'HTTP_X_REQUESTED_WITH': 'XMLHttpRequest',
'content_type': 'application/json',
})
post_data = json.dumps(post_data)
response = self.client.post(
reverse('admin:login'), post_data, REMOTE_ADDR=ip_addr, **headers
)
return response
def _lockout_user1_from_ip1(self):
for i in range(1, settings.AXES_FAILURE_LIMIT + 1):
response = self._login(
username=self.USER_1,
password=self.WRONG_PASSWORD,
ip_addr=self.IP_1
)
return response
def setUp(self):
"""Create two valid users for authentication.
"""
self.user = User.objects.create_superuser(
username=self.USER_1,
email='test_1@example.com',
password=self.VALID_PASSWORD,
)
self.user = User.objects.create_superuser(
username=self.USER_2,
email='test_2@example.com',
password=self.VALID_PASSWORD,
)
# Test for true and false positives when blocking by IP *OR* user (default)
# Cache disabled. Default settings.
def test_lockout_by_ip_blocks_when_same_user_same_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.BLOCKED)
def test_lockout_by_ip_allows_when_same_user_diff_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 can still login from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.ALLOWED)
def test_lockout_by_ip_blocks_when_diff_user_same_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 is also locked out from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.BLOCKED)
def test_lockout_by_ip_allows_when_diff_user_diff_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.ALLOWED)
# Test for true and false positives when blocking by user only.
# Cache disabled. When AXES_ONLY_USER_FAILURES = True
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_blocks_when_same_user_same_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_blocks_when_same_user_diff_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 is also locked out from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_allows_when_diff_user_same_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 can still login from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_allows_when_diff_user_diff_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.ALLOWED)
# Test for true and false positives when blocking by user and IP together.
# Cache disabled. When LOCK_OUT_BY_COMBINATION_USER_AND_IP = True
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_user_and_ip_blocks_when_same_user_same_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_user_and_ip_allows_when_same_user_diff_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 can still login from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_user_and_ip_allows_when_diff_user_same_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 can still login from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_user_and_ip_allows_when_diff_user_diff_ip_without_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.ALLOWED)
# Test for true and false positives when blocking by IP *OR* user (default)
# With cache enabled. Default criteria.
def test_lockout_by_ip_blocks_when_same_user_same_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.BLOCKED)
def test_lockout_by_ip_allows_when_same_user_diff_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 can still login from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.ALLOWED)
def test_lockout_by_ip_blocks_when_diff_user_same_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 is also locked out from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.BLOCKED)
def test_lockout_by_ip_allows_when_diff_user_diff_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.ALLOWED)
# Test for true and false positives when blocking by user only.
# With cache enabled. When AXES_ONLY_USER_FAILURES = True
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_blocks_when_same_user_same_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_blocks_when_same_user_diff_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 is also locked out from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_allows_when_diff_user_same_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 can still login from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_ONLY_USER_FAILURES=True)
def test_lockout_by_user_allows_when_diff_user_diff_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.ALLOWED)
# Test for true and false positives when blocking by user and IP together.
# With cache enabled. When LOCK_OUT_BY_COMBINATION_USER_AND_IP = True
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_user_and_ip_blocks_when_same_user_same_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 is still blocked from IP 1.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.BLOCKED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_user_and_ip_allows_when_same_user_diff_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 1 can still login from IP 2.
response = self._login(
self.USER_1,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_user_and_ip_allows_when_diff_user_same_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 can still login from IP 1.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_1
)
self.assertEqual(response.status_code, self.ALLOWED)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
def test_lockout_by_user_and_ip_allows_when_diff_user_diff_ip_using_cache(self):
# User 1 is locked out from IP 1.
self._lockout_user1_from_ip1()
# User 2 can still login from IP 2.
response = self._login(
self.USER_2,
self.VALID_PASSWORD,
ip_addr=self.IP_2
)
self.assertEqual(response.status_code, self.ALLOWED)

109
axes/tests/test_proxy.py Normal file
View file

@ -0,0 +1,109 @@
from django.test import TestCase, override_settings
from axes.conf import settings
from axes.utils import get_ip
class MockRequest:
def __init__(self):
self.META = dict()
@override_settings(AXES_BEHIND_REVERSE_PROXY=True)
class GetIPProxyTest(TestCase):
"""Test get_ip returns correct addresses with proxy
"""
def setUp(self):
self.request = MockRequest()
def test_iis_ipv4_port_stripping(self):
self.ip = '192.168.1.1'
valid_headers = [
'192.168.1.1:6112',
'192.168.1.1:6033, 192.168.1.2:9001',
]
for header in valid_headers:
self.request.META['HTTP_X_FORWARDED_FOR'] = header
self.assertEqual(self.ip, get_ip(self.request))
def test_valid_ipv4_parsing(self):
self.ip = '192.168.1.1'
valid_headers = [
'192.168.1.1',
'192.168.1.1, 192.168.1.2',
' 192.168.1.1 , 192.168.1.2 ',
' 192.168.1.1 , 2001:db8:cafe::17 ',
]
for header in valid_headers:
self.request.META['HTTP_X_FORWARDED_FOR'] = header
self.assertEqual(self.ip, get_ip(self.request))
def test_valid_ipv6_parsing(self):
self.ip = '2001:db8:cafe::17'
valid_headers = [
'2001:db8:cafe::17',
'2001:db8:cafe::17 , 2001:db8:cafe::18',
'2001:db8:cafe::17, 2001:db8:cafe::18, 192.168.1.1',
]
for header in valid_headers:
self.request.META['HTTP_X_FORWARDED_FOR'] = header
self.assertEqual(self.ip, get_ip(self.request))
@override_settings(AXES_BEHIND_REVERSE_PROXY=True)
@override_settings(AXES_REVERSE_PROXY_HEADER='HTTP_X_FORWARDED_FOR')
@override_settings(AXES_NUM_PROXIES=2)
class GetIPNumProxiesTest(TestCase):
"""Test that get_ip returns the correct last IP when NUM_PROXIES is configured
"""
def setUp(self):
self.request = MockRequest()
def test_header_ordering(self):
self.ip = '2.2.2.2'
valid_headers = [
'4.4.4.4, 3.3.3.3, 2.2.2.2, 1.1.1.1',
' 3.3.3.3, 2.2.2.2, 1.1.1.1',
' 2.2.2.2, 1.1.1.1',
]
for header in valid_headers:
self.request.META[settings.AXES_REVERSE_PROXY_HEADER] = header
self.assertEqual(self.ip, get_ip(self.request))
def test_invalid_headers_too_few(self):
self.request.META[settings.AXES_REVERSE_PROXY_HEADER] = '1.1.1.1'
with self.assertRaises(Warning):
get_ip(self.request)
def test_invalid_headers_no_ip(self):
self.request.META[settings.AXES_REVERSE_PROXY_HEADER] = ''
with self.assertRaises(Warning):
get_ip(self.request)
@override_settings(AXES_BEHIND_REVERSE_PROXY=True)
@override_settings(AXES_REVERSE_PROXY_HEADER='HTTP_X_AXES_CUSTOM_HEADER')
class GetIPProxyCustomHeaderTest(TestCase):
"""Test that get_ip returns correct addresses with a custom proxy header
"""
def setUp(self):
self.request = MockRequest()
def test_custom_header_parsing(self):
self.ip = '2001:db8:cafe::17'
valid_headers = [
' 2001:db8:cafe::17 , 2001:db8:cafe::18',
]
for header in valid_headers:
self.request.META[settings.AXES_REVERSE_PROXY_HEADER] = header
self.assertEqual(self.ip, get_ip(self.request))

143
axes/tests/test_utils.py Normal file
View file

@ -0,0 +1,143 @@
import datetime
from django.test import TestCase, override_settings
from django.utils import six
from axes.utils import iso8601, is_ipv6, get_client_str
class UtilsTest(TestCase):
def test_iso8601(self):
"""Tests iso8601 correctly translates datetime.timdelta to ISO 8601
formatted duration."""
EXPECTED = {
datetime.timedelta(days=1, hours=25, minutes=42, seconds=8):
'P2DT1H42M8S',
datetime.timedelta(days=7, seconds=342):
'P7DT5M42S',
datetime.timedelta(days=0, hours=2, minutes=42):
'PT2H42M',
datetime.timedelta(hours=20, seconds=42):
'PT20H42S',
datetime.timedelta(seconds=300):
'PT5M',
datetime.timedelta(seconds=9005):
'PT2H30M5S',
datetime.timedelta(minutes=9005):
'P6DT6H5M',
datetime.timedelta(days=15):
'P15D'
}
for timedelta, iso_duration in six.iteritems(EXPECTED):
self.assertEqual(iso8601(timedelta), iso_duration)
def test_is_ipv6(self):
self.assertTrue(is_ipv6('ff80::220:16ff:fec9:1'))
self.assertFalse(is_ipv6('67.255.125.204'))
self.assertFalse(is_ipv6('foo'))
@override_settings(AXES_VERBOSE=True)
def test_verbose_ip_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_ip_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = ip
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@override_settings(AXES_ONLY_USER_FAILURES=True)
@override_settings(AXES_VERBOSE=True)
def test_verbose_user_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@override_settings(AXES_ONLY_USER_FAILURES=True)
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_user_only_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = username
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
@override_settings(AXES_VERBOSE=True)
def test_verbose_user_ip_combo_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@override_settings(AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP=True)
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_user_ip_combo_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = '{0} from {1}'.format(username, ip)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@override_settings(AXES_USE_USER_AGENT=True)
@override_settings(AXES_VERBOSE=True)
def test_verbose_user_agent_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
expected = details.format(username, ip, user_agent, path_info)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)
@override_settings(AXES_USE_USER_AGENT=True)
@override_settings(AXES_VERBOSE=False)
def test_non_verbose_user_agent_client_details(self):
username = 'test@example.com'
ip = '127.0.0.1'
user_agent = 'Googlebot/2.1 (+http://www.googlebot.com/bot.html)'
path_info = '/admin/'
expected = ip + '(user-agent={0})'.format(user_agent)
actual = get_client_str(username, ip, user_agent, path_info)
self.assertEqual(expected, actual)

View file

@ -1,8 +1,123 @@
from django.core.cache import cache
from socket import inet_pton, AF_INET6, error
from django.core.cache import cache
from django.utils import six
from axes.conf import settings
from axes.models import AccessAttempt
def query2str(items, max_length=1024):
"""Turns a dictionary into an easy-to-read list of key-value pairs.
If there's a field called "password" it will be excluded from the output.
The length of the output is limited to max_length to avoid a DoS attack
via excessively large payloads.
"""
return '\n'.join([
'%s=%s' % (k, v) for k, v in six.iteritems(items)
if k != settings.AXES_PASSWORD_FORM_FIELD
][:int(max_length / 2)])[:max_length]
def get_client_str(username, ip_address, user_agent=None, path_info=None):
if settings.AXES_VERBOSE:
if isinstance(path_info, tuple):
path_info = path_info[0]
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
return details.format(username, ip_address, user_agent, path_info)
if settings.AXES_ONLY_USER_FAILURES:
client = username
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
client = '{0} from {1}'.format(username, ip_address)
else:
client = ip_address
if settings.AXES_USE_USER_AGENT:
client += '(user-agent={0})'.format(user_agent)
return client
def is_ipv6(ip):
try:
inet_pton(AF_INET6, ip)
except (OSError, error):
return False
return True
def get_ip(request):
"""Parse IP address from REMOTE_ADDR or
AXES_REVERSE_PROXY_HEADER if AXES_BEHIND_REVERSE_PROXY is set."""
if settings.AXES_BEHIND_REVERSE_PROXY:
# For requests originating from behind a reverse proxy,
# resolve the IP address from the given AXES_REVERSE_PROXY_HEADER.
# AXES_REVERSE_PROXY_HEADER defaults to HTTP_X_FORWARDED_FOR,
# which is the Django name for the HTTP X-Forwarder-For header.
# Please see RFC7239 for additional information:
# https://tools.ietf.org/html/rfc7239#section-5
# The REVERSE_PROXY_HEADER HTTP header is a list
# of potentionally unsecure IPs, for example:
# X-Forwarded-For: 1.1.1.1, 11.11.11.11:8080, 111.111.111.111
ip_str = request.META.get(settings.AXES_REVERSE_PROXY_HEADER, '')
# We need to know the number of proxies present in the request chain
# in order to securely calculate the one IP that is the real client IP.
#
# This is because IP headers can have multiple IPs in different
# configurations, with e.g. the X-Forwarded-For header containing
# the originating client IP, proxies and possibly spoofed values.
#
# If you are using a special header for client calculation such as the
# X-Real-IP or the like with nginx, please check this configuration.
#
# Please see discussion for more information:
# https://github.com/jazzband/django-axes/issues/224
ip_list = [ip.strip() for ip in ip_str.split(',')]
# Pick the nth last IP in the given list of addresses after parsing
if len(ip_list) >= settings.AXES_NUM_PROXIES:
ip = ip_list[-settings.AXES_NUM_PROXIES]
# Fix IIS adding client port number to the
# 'X-Forwarded-For' header (strip port)
if not is_ipv6(ip):
ip = ip.split(':', 1)[0]
# If nth last is not found, default to no IP and raise a warning
else:
ip = ''
raise Warning(
'AXES: Axes is configured for operation behind a '
'reverse proxy but received too few IPs in the HTTP '
'AXES_REVERSE_PROXY_HEADER. Check your '
'AXES_NUM_PROXIES configuration. '
'Header name: {0}, value: {1}'.format(
settings.AXES_REVERSE_PROXY_HEADER, ip_str
)
)
if not ip:
raise Warning(
'AXES: Axes is configured for operation behind a reverse '
'proxy but could not find a suitable IP in the specified '
'HTTP header. Check your proxy server settings to make '
'sure correct headers are being passed to Django in '
'AXES_REVERSE_PROXY_HEADER. '
'Header name: {0}, value: {1}'.format(
settings.AXES_REVERSE_PROXY_HEADER, ip_str
)
)
return ip
return request.META.get('REMOTE_ADDR', '')
def reset(ip=None, username=None):
"""Reset records that match ip or username, and
return the count of removed attempts.
@ -45,4 +160,4 @@ def iso8601(timestamp):
for value, designator in zip(time_values, time_designators)
if value]
)
return u'P' + date + (u'T' + time if time else '')
return 'P' + date + ('T' + time if time else '')

View file

@ -47,18 +47,18 @@ source_suffix = '.rst'
master_doc = 'index'
# General information about the project.
project = u'Django Axes'
copyright = u'2016, jazzband'
author = u'jazzband'
project = 'Django Axes'
copyright = '2016, jazzband'
author = 'jazzband'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = u'2.2.0'
version = '2.2.0'
# The full version, including alpha/beta/rc tags.
release = u'2.2.0'
release = '2.2.0'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
@ -228,8 +228,8 @@ latex_elements = {
# (source start file, target name, title,
# author, documentclass [howto, manual, or own class]).
latex_documents = [
(master_doc, 'DjangoAxes.tex', u'Django Axes Documentation',
u'jazzband', 'manual'),
(master_doc, 'DjangoAxes.tex', 'Django Axes Documentation',
'jazzband', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
@ -258,7 +258,7 @@ latex_documents = [
# One entry per manual page. List of tuples
# (source start file, name, description, authors, manual section).
man_pages = [
(master_doc, 'djangoaxes', u'Django Axes Documentation',
(master_doc, 'djangoaxes', 'Django Axes Documentation',
[author], 1)
]
@ -272,7 +272,7 @@ man_pages = [
# (source start file, target name, title, author,
# dir menu entry, description, category)
texinfo_documents = [
(master_doc, 'DjangoAxes', u'Django Axes Documentation',
(master_doc, 'DjangoAxes', 'Django Axes Documentation',
author, 'DjangoAxes', 'One line description of project.',
'Miscellaneous'),
]

View file

@ -1,4 +1,3 @@
Django
sphinx-rtd-theme
mock
-e .

View file

@ -1,5 +1,4 @@
#!/usr/bin/env python
import os
import sys
@ -8,18 +7,10 @@ from django.conf import settings
from django.test.utils import get_runner
def run_tests(settings_module, *modules):
os.environ['DJANGO_SETTINGS_MODULE'] = settings_module
if __name__ == '__main__':
os.environ['DJANGO_SETTINGS_MODULE'] = 'axes.test_settings'
django.setup()
TestRunner = get_runner(settings)
test_runner = TestRunner()
failures = test_runner.run_tests(*modules)
failures = test_runner.run_tests(['axes.tests'])
sys.exit(bool(failures))
if __name__ == '__main__':
run_tests('axes.test_settings', [
'axes.tests.AccessAttemptTest',
'axes.tests.AccessAttemptConfigTest',
'axes.tests.UtilsTest',
])

View file

@ -1,8 +0,0 @@
#!/usr/bin/env python
from runtests import run_tests
if __name__ == '__main__':
run_tests('axes.test_settings_num_proxies', [
'axes.tests.GetIPNumProxiesTest',
])

View file

@ -1,8 +0,0 @@
#!/usr/bin/env python
from runtests import run_tests
if __name__ == '__main__':
run_tests('axes.test_settings_proxy', [
'axes.tests.GetIPProxyTest',
])

View file

@ -1,8 +0,0 @@
#!/usr/bin/env python
from runtests import run_tests
if __name__ == '__main__':
run_tests('axes.test_settings_proxy_custom_header', [
'axes.tests.GetIPProxyCustomHeaderTest',
])

View file

@ -20,7 +20,7 @@ setup(
url='https://github.com/django-pci/django-axes',
license='MIT',
package_dir={'axes': 'axes'},
install_requires=['pytz'],
install_requires=['pytz', 'django-appconf'],
include_package_data=True,
packages=find_packages(),
classifiers=[

22
tox.ini Normal file
View file

@ -0,0 +1,22 @@
[tox]
envlist =
py{36}-django-111
py{36}-django-20
py{36}-django-master
[testenv]
deps =
django-appconf
coveralls
django-111: Django>=1.11,<2.0
django-20: Django>=2.0a1,<2.1
django-master: https://github.com/django/django/archive/master.tar.gz
usedevelop = True
ignore_outcome =
django-20: True
django-master: True
commands =
coverage run -a --source=axes runtests.py -v2
coverage report
setenv =
PYTHONDONTWRITEBYTECODE=1