mirror of
https://github.com/jazzband/django-defender.git
synced 2026-03-16 22:10:32 +00:00
Allow decoration of functions beyond the admin login (#86)
* Allow decoration of functions beyond the admin login * Exclude tests file from coverage * Allow installing django 1.11 * Add python 3.6 for testing
This commit is contained in:
parent
d2b712eade
commit
b985d17beb
6 changed files with 149 additions and 33 deletions
|
|
@ -1,2 +1,3 @@
|
|||
[run]
|
||||
omit = *_settings.py, defender/*migrations/*, defender/exampleapp/*
|
||||
omit = *_settings.py, defender/*migrations/*, defender/exampleapp/*, *test.py,
|
||||
*__init__.py, *tests.py, *urls.py
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ python:
|
|||
- "3.3"
|
||||
- "3.4"
|
||||
- "3.5"
|
||||
- "3.6"
|
||||
|
||||
env:
|
||||
- DJANGO=1.8
|
||||
|
|
@ -35,6 +36,12 @@ matrix:
|
|||
env: DJANGO=1.10
|
||||
- python: "3.3"
|
||||
env: DJANGO=1.11
|
||||
- python: "3.6"
|
||||
env: DJANGO=1.8
|
||||
- python: "3.6"
|
||||
env: DJANGO=1.9
|
||||
- python: "3.6"
|
||||
env: DJANGO=1.10
|
||||
|
||||
after_success:
|
||||
- coveralls --verbose
|
||||
|
|
|
|||
|
|
@ -1,38 +1,54 @@
|
|||
from . import utils
|
||||
|
||||
import functools
|
||||
|
||||
def watch_login(func):
|
||||
|
||||
def watch_login(status_code=302, msg=''):
|
||||
"""
|
||||
Used to decorate the django.contrib.admin.site.login method.
|
||||
Used to decorate the django.contrib.admin.site.login method or
|
||||
any other function you want to protect by brute forcing.
|
||||
To make it work on normal functions just pass the status code that should
|
||||
indicate a failure and/or a string that will be checked within the
|
||||
response body.
|
||||
"""
|
||||
def decorated_login(func):
|
||||
@functools.wraps(func)
|
||||
def wrapper(request, *args, **kwargs):
|
||||
# if the request is currently under lockout, do not proceed to the
|
||||
# login function, go directly to lockout url, do not pass go, do not
|
||||
# collect messages about this login attempt
|
||||
if utils.is_already_locked(request):
|
||||
return utils.lockout_response(request)
|
||||
|
||||
def decorated_login(request, *args, **kwargs):
|
||||
# if the request is currently under lockout, do not proceed to the
|
||||
# login function, go directly to lockout url, do not pass go, do not
|
||||
# collect messages about this login attempt
|
||||
if utils.is_already_locked(request):
|
||||
return utils.lockout_response(request)
|
||||
# call the login function
|
||||
response = func(request, *args, **kwargs)
|
||||
|
||||
# call the login function
|
||||
response = func(request, *args, **kwargs)
|
||||
if request.method == 'POST':
|
||||
# see if the login was successful
|
||||
if status_code == 302: # standard Django login view
|
||||
login_unsuccessful = (
|
||||
response and
|
||||
not response.has_header('location') and
|
||||
response.status_code != status_code
|
||||
)
|
||||
else:
|
||||
# If msg is not passed the last condition will be evaluated
|
||||
# always to True so the first 2 will decide the result.
|
||||
login_unsuccessful = (
|
||||
response and response.status_code == status_code
|
||||
and msg in response.content.decode('utf-8')
|
||||
)
|
||||
|
||||
if request.method == 'POST':
|
||||
# see if the login was successful
|
||||
login_unsuccessful = (
|
||||
response and
|
||||
not response.has_header('location') and
|
||||
response.status_code != 302
|
||||
)
|
||||
# ideally make this background task, but to keep simple, keeping
|
||||
# it inline for now.
|
||||
utils.add_login_attempt_to_db(request, not login_unsuccessful)
|
||||
|
||||
# ideally make this background task, but to keep simple, keeping
|
||||
# it inline for now.
|
||||
utils.add_login_attempt_to_db(request, not login_unsuccessful)
|
||||
if utils.check_request(request, login_unsuccessful):
|
||||
return response
|
||||
|
||||
if utils.check_request(request, login_unsuccessful):
|
||||
return response
|
||||
return utils.lockout_response(request)
|
||||
|
||||
return utils.lockout_response(request)
|
||||
|
||||
return response
|
||||
return response
|
||||
|
||||
return wrapper
|
||||
return decorated_login
|
||||
|
|
|
|||
|
|
@ -9,7 +9,6 @@ class FailedLoginMiddleware(object):
|
|||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(FailedLoginMiddleware, self).__init__(*args, **kwargs)
|
||||
|
||||
# Watch the auth login.
|
||||
# Monkey-patch only once - otherwise we would be recording
|
||||
# failed attempts multiple times!
|
||||
|
|
@ -18,9 +17,10 @@ class FailedLoginMiddleware(object):
|
|||
# `LoginView` class-based view
|
||||
try:
|
||||
from django.contrib.auth.views import LoginView
|
||||
watch_login_method = method_decorator(watch_login)
|
||||
our_decorator = watch_login()
|
||||
watch_login_method = method_decorator(our_decorator)
|
||||
LoginView.dispatch = watch_login_method(LoginView.dispatch)
|
||||
except ImportError: # Django < 1.11
|
||||
auth_views.login = watch_login(auth_views.login)
|
||||
auth_views.login = watch_login()(auth_views.login)
|
||||
|
||||
FailedLoginMiddleware.patched = True
|
||||
|
|
|
|||
|
|
@ -3,20 +3,25 @@ import string
|
|||
import time
|
||||
from distutils.version import StrictVersion
|
||||
|
||||
from mock import patch
|
||||
# Python 3 has mock in the stdlib
|
||||
try:
|
||||
from mock import patch
|
||||
except ImportError:
|
||||
from unittest.mock import patch
|
||||
|
||||
from django import get_version
|
||||
from django.contrib.auth.models import User
|
||||
from django.contrib.auth.models import AnonymousUser
|
||||
from django.contrib.sessions.backends.db import SessionStore
|
||||
from django.core.urlresolvers import reverse
|
||||
from django.http import HttpRequest
|
||||
from django.http import HttpRequest, HttpResponse
|
||||
from django.test.client import RequestFactory
|
||||
from redis.client import Redis
|
||||
|
||||
from . import utils
|
||||
from . import config
|
||||
from .connection import parse_redis_url, get_redis_connection
|
||||
from .decorators import watch_login
|
||||
from .models import AccessAttempt
|
||||
from .test import DefenderTestCase, DefenderTransactionTestCase
|
||||
|
||||
|
|
@ -687,7 +692,7 @@ class AccessAttemptTest(DefenderTestCase):
|
|||
|
||||
# try logging in with the same username, but different IPs.
|
||||
# we shouldn't be locked.
|
||||
for i in range(0, config.FAILURE_LIMIT+10):
|
||||
for i in range(0, config.FAILURE_LIMIT + 10):
|
||||
ip = '74.125.126.{0}'.format(i)
|
||||
response = self._login(username=username, remote_addr=ip)
|
||||
# Check if we are in the same login page
|
||||
|
|
@ -727,6 +732,92 @@ class AccessAttemptTest(DefenderTestCase):
|
|||
data_out = utils.get_blocked_usernames()
|
||||
self.assertEqual(data_out, [])
|
||||
|
||||
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
|
||||
@patch('defender.config.FAILURE_LIMIT', 3)
|
||||
def test_login_blocked_for_non_standard_login_views_without_msg(self):
|
||||
"""
|
||||
Check that a view wich returns the expected status code is causing
|
||||
the user to be locked out when we do not expect a specific message
|
||||
to be returned.
|
||||
"""
|
||||
|
||||
@watch_login(status_code=401)
|
||||
def fake_api_401_login_view_without_msg(request):
|
||||
return HttpResponse(status=401)
|
||||
|
||||
request_factory = RequestFactory()
|
||||
request = request_factory.post('api/login')
|
||||
request.user = AnonymousUser()
|
||||
request.session = SessionStore()
|
||||
|
||||
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24'
|
||||
|
||||
for _ in range(3):
|
||||
fake_api_401_login_view_without_msg(request)
|
||||
|
||||
data_out = utils.get_blocked_ips()
|
||||
self.assertEqual(data_out, [])
|
||||
|
||||
fake_api_401_login_view_without_msg(request)
|
||||
|
||||
data_out = utils.get_blocked_ips()
|
||||
self.assertEqual(data_out, ['192.168.24.24'])
|
||||
|
||||
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
|
||||
@patch('defender.config.FAILURE_LIMIT', 3)
|
||||
def test_login_blocked_for_non_standard_login_views_with_msg(self):
|
||||
"""
|
||||
Check that a view wich returns the expected status code and the
|
||||
expected message is causing the IP to be locked out.
|
||||
"""
|
||||
@watch_login(status_code=401, msg='Invalid credentials')
|
||||
def fake_api_401_login_view_without_msg(request):
|
||||
return HttpResponse('Sorry, Invalid credentials',
|
||||
status=401)
|
||||
|
||||
request_factory = RequestFactory()
|
||||
request = request_factory.post('api/login')
|
||||
request.user = AnonymousUser()
|
||||
request.session = SessionStore()
|
||||
|
||||
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24'
|
||||
|
||||
for _ in range(3):
|
||||
fake_api_401_login_view_without_msg(request)
|
||||
|
||||
data_out = utils.get_blocked_ips()
|
||||
self.assertEqual(data_out, [])
|
||||
|
||||
fake_api_401_login_view_without_msg(request)
|
||||
|
||||
data_out = utils.get_blocked_ips()
|
||||
self.assertEqual(data_out, ['192.168.24.24'])
|
||||
|
||||
@patch('defender.config.BEHIND_REVERSE_PROXY', True)
|
||||
@patch('defender.config.FAILURE_LIMIT', 3)
|
||||
def test_login_non_blocked_for_non_standard_login_views_different_msg(self):
|
||||
"""
|
||||
Check that a view wich returns the expected status code but not the
|
||||
expected message is not causing the IP to be locked out.
|
||||
"""
|
||||
@watch_login(status_code=401, msg='Invalid credentials')
|
||||
def fake_api_401_login_view_without_msg(request):
|
||||
return HttpResponse('Ups, wrong credentials',
|
||||
status=401)
|
||||
|
||||
request_factory = RequestFactory()
|
||||
request = request_factory.post('api/login')
|
||||
request.user = AnonymousUser()
|
||||
request.session = SessionStore()
|
||||
|
||||
request.META['HTTP_X_FORWARDED_FOR'] = '192.168.24.24'
|
||||
|
||||
for _ in range(4):
|
||||
fake_api_401_login_view_without_msg(request)
|
||||
|
||||
data_out = utils.get_blocked_ips()
|
||||
self.assertEqual(data_out, [])
|
||||
|
||||
|
||||
class DefenderTestCaseTest(DefenderTestCase):
|
||||
"""Make sure that we're cleaning the cache between tests"""
|
||||
|
|
@ -759,6 +850,7 @@ class DefenderTransactionTestCaseTest(DefenderTransactionTestCase):
|
|||
|
||||
|
||||
class TestUtils(DefenderTestCase):
|
||||
|
||||
def test_username_blocking(self):
|
||||
username = 'foo'
|
||||
self.assertFalse(utils.is_user_already_locked(username))
|
||||
|
|
|
|||
2
setup.py
2
setup.py
|
|
@ -69,7 +69,7 @@ setup(name='django-defender',
|
|||
include_package_data=True,
|
||||
packages=get_packages('defender'),
|
||||
package_data=get_package_data('defender'),
|
||||
install_requires=['Django>=1.8,<=1.10', 'redis>=2.10.3,<3.0',
|
||||
install_requires=['Django>=1.8,<=1.11', 'redis>=2.10.3,<3.0',
|
||||
'hiredis>=0.2.0,<1.0', 'mockredispy>=2.9.0.11,<3.0'],
|
||||
tests_require=['mock', 'mockredispy', 'coverage', 'celery', 'django-redis-cache'],
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in a new issue