mirror of
https://github.com/jazzband/django-axes.git
synced 2026-03-16 22:30:23 +00:00
Improve management commands, docs, and tests
Fixes #362 Signed-off-by: Aleksi Häkli <aleksi.hakli@iki.fi>
This commit is contained in:
parent
99175dc57f
commit
ecadddbf5d
8 changed files with 187 additions and 91 deletions
|
|
@ -4,6 +4,11 @@ Changes
|
|||
5.0.0 (WIP)
|
||||
-----------
|
||||
|
||||
- Improve managment commands and separate commands for resetting
|
||||
all access attempts, attempts by IP and attempts by username.
|
||||
Add tests for the management commands for better coverage.
|
||||
[aleksihakli]
|
||||
|
||||
- Add a Django native authentication stack that utilizes
|
||||
``AUTHENTICATION_BACKENDS``, ``MIDDLEWARE``, and signal handlers
|
||||
for tracking login attempts and implementing user lockouts.
|
||||
|
|
|
|||
|
|
@ -6,10 +6,9 @@ from axes.models import AccessAttempt
|
|||
|
||||
|
||||
class Command(BaseCommand):
|
||||
args = ''
|
||||
help = ('List registered login attempts')
|
||||
help = 'List access attempts'
|
||||
|
||||
def handle(self, *args, **kwargs): # pylint: disable=unused-argument
|
||||
def handle(self, *args, **options): # pylint: disable=unused-argument
|
||||
for obj in AccessAttempt.objects.all():
|
||||
self.stdout.write('{ip}\t{username}\t{failures}'.format(
|
||||
ip=obj.ip_address,
|
||||
|
|
|
|||
|
|
@ -6,22 +6,12 @@ from axes.utils import reset
|
|||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = ("resets any lockouts or failed login records. If called with an "
|
||||
"IP, resets only for that IP")
|
||||
help = 'Reset all access attempts and lockouts'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('ip', nargs='*')
|
||||
def handle(self, *args, **options): # pylint: disable=unused-argument
|
||||
count = reset()
|
||||
|
||||
def handle(self, *args, **kwargs):
|
||||
count = 0
|
||||
if kwargs and kwargs.get('ip'):
|
||||
for ip in kwargs['ip'][1:]:
|
||||
count += reset(ip=ip)
|
||||
if count:
|
||||
self.stdout.write('{0} attempts removed.'.format(count))
|
||||
else:
|
||||
count = reset()
|
||||
|
||||
if kwargs['verbosity']:
|
||||
if count:
|
||||
self.stdout.write('{0} attempts removed.'.format(count))
|
||||
else:
|
||||
self.stdout.write('No attempts found.')
|
||||
self.stdout.write('No attempts found.')
|
||||
|
|
|
|||
23
axes/management/commands/axes_reset_ip.py
Normal file
23
axes/management/commands/axes_reset_ip.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from axes.utils import reset
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Reset all access attempts and lockouts for given IP addresses'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('ip', nargs='+', type=str)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
count = 0
|
||||
|
||||
for ip in options['ip']:
|
||||
count += reset(ip=ip)
|
||||
|
||||
if count:
|
||||
self.stdout.write('{0} attempts removed.'.format(count))
|
||||
else:
|
||||
self.stdout.write('No attempts found.')
|
||||
|
|
@ -1,22 +0,0 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from axes.utils import reset
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = ("Resets any lockouts or failed login records. If called with an "
|
||||
"User name, resets only for that User name")
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('username')
|
||||
|
||||
def handle(self, *args, **kwargs):
|
||||
count = 0
|
||||
count += reset(username=kwargs['username'])
|
||||
if kwargs['verbosity']:
|
||||
if count:
|
||||
self.stdout.write('{0} attempts removed.'.format(count))
|
||||
else:
|
||||
self.stdout.write('No attempts found.')
|
||||
23
axes/management/commands/axes_reset_username.py
Normal file
23
axes/management/commands/axes_reset_username.py
Normal file
|
|
@ -0,0 +1,23 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from axes.utils import reset
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Reset all access attempts and lockouts for given usernames'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument('username', nargs='+', type=str)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
count = 0
|
||||
|
||||
for username in options['username']:
|
||||
count += reset(username=username)
|
||||
|
||||
if count:
|
||||
self.stdout.write('{0} attempts removed.'.format(count))
|
||||
else:
|
||||
self.stdout.write('No attempts found.')
|
||||
68
axes/tests/test_management_commands.py
Normal file
68
axes/tests/test_management_commands.py
Normal file
|
|
@ -0,0 +1,68 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
from io import StringIO
|
||||
from sys import version_info
|
||||
from unittest import skipIf
|
||||
|
||||
from django.core.management import call_command
|
||||
from django.test import TestCase
|
||||
|
||||
from axes.models import AccessAttempt
|
||||
|
||||
|
||||
@skipIf(version_info < (3, ), 'Not supported on Python 2.x')
|
||||
class ManagementCommandTestCase(TestCase):
|
||||
def setUp(self):
|
||||
AccessAttempt.objects.create(
|
||||
username='jane.doe',
|
||||
ip_address='10.0.0.1',
|
||||
failures_since_start='4',
|
||||
)
|
||||
|
||||
AccessAttempt.objects.create(
|
||||
username='john.doe',
|
||||
ip_address='10.0.0.2',
|
||||
failures_since_start='15',
|
||||
)
|
||||
|
||||
def test_axes_list_attempts(self):
|
||||
out = StringIO()
|
||||
call_command('axes_list_attempts', stdout=out)
|
||||
|
||||
expected = '10.0.0.1\tjane.doe\t4\n10.0.0.2\tjohn.doe\t15\n'
|
||||
self.assertEqual(expected, out.getvalue())
|
||||
|
||||
def test_axes_reset(self):
|
||||
out = StringIO()
|
||||
call_command('axes_reset', stdout=out)
|
||||
|
||||
expected = '2 attempts removed.\n'
|
||||
self.assertEqual(expected, out.getvalue())
|
||||
|
||||
def test_axes_reset_ip(self):
|
||||
out = StringIO()
|
||||
call_command('axes_reset_ip', '10.0.0.1', stdout=out)
|
||||
|
||||
expected = '1 attempts removed.\n'
|
||||
self.assertEqual(expected, out.getvalue())
|
||||
|
||||
def test_axes_reset_ip_not_found(self):
|
||||
out = StringIO()
|
||||
call_command('axes_reset_ip', '10.0.0.3', stdout=out)
|
||||
|
||||
expected = 'No attempts found.\n'
|
||||
self.assertEqual(expected, out.getvalue())
|
||||
|
||||
def test_axes_reset_username(self):
|
||||
out = StringIO()
|
||||
call_command('axes_reset_username', 'john.doe', stdout=out)
|
||||
|
||||
expected = '1 attempts removed.\n'
|
||||
self.assertEqual(expected, out.getvalue())
|
||||
|
||||
def test_axes_reset_username_not_found(self):
|
||||
out = StringIO()
|
||||
call_command('axes_reset_username', 'ivan.renko', stdout=out)
|
||||
|
||||
expected = 'No attempts found.\n'
|
||||
self.assertEqual(expected, out.getvalue())
|
||||
110
docs/usage.rst
110
docs/usage.rst
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
Usage
|
||||
=====
|
||||
|
||||
``django-axes`` listens to signals from ``django.contrib.auth.signals`` to
|
||||
log access attempts:
|
||||
|
||||
|
|
@ -16,27 +17,29 @@ log the access attempts.
|
|||
Quickstart
|
||||
----------
|
||||
|
||||
Once ``axes`` is in your ``INSTALLED_APPS`` in your project settings file,
|
||||
you can login and logout of your application via the ``django.contrib.auth``
|
||||
views. The access attempts will be logged and visible in the "Access Attempts"
|
||||
secion of the admin app.
|
||||
Once ``axes`` is in your ``INSTALLED_APPS`` in your project settings file, you can
|
||||
login and logout of your application via the ``django.contrib.auth`` views.
|
||||
The attempts will be logged and visible in the "Access Attempts" section in admin.
|
||||
|
||||
By default, django-axes will lock out repeated attempts from the same IP
|
||||
address. You can allow this IP to attempt again by deleting the relevant
|
||||
``AccessAttempt`` records in the admin.
|
||||
By default, django-axes will lock out repeated access attempts from the same IP address.
|
||||
You can allow this IP to attempt again by deleting relevant ``AccessAttempt`` records.
|
||||
|
||||
You can also use the ``axes_reset`` and ``axes_reset_user`` management commands
|
||||
using Django's ``manage.py``.
|
||||
Records can be deleted, for example, by using the Django admin application.
|
||||
|
||||
You can also use the ``axes_reset``, ``axes_reset_ip``, and ``axes_reset_user``
|
||||
management commands with the Django ``manage.py`` command helpers:
|
||||
|
||||
* ``manage.py axes_reset`` will reset all lockouts and access records.
|
||||
* ``manage.py axes_reset ip`` will clear lockout/records for ip
|
||||
* ``manage.py axes_reset_user username`` will clear lockout/records for an username
|
||||
* ``manage.py axes_reset_ip ip [ip ...]``
|
||||
will clear lockouts and records for the given IP addresses.
|
||||
* ``manage.py axes_reset_user username [username ...]``
|
||||
will clear lockouts and records for the given usernames.
|
||||
|
||||
In your code, you can use ``from axes.utils import reset``.
|
||||
In your code, you can use the ``axes.utils.reset`` function.
|
||||
|
||||
* ``reset()`` will reset all lockouts and access records.
|
||||
* ``reset(ip=ip)`` will clear lockout/records for ip
|
||||
* ``reset(username=username)`` will clear lockout/records for a username
|
||||
* ``reset(ip=ip)`` will clear lockouts and records for the given IP address.
|
||||
* ``reset(username=username)`` will clear lockouts and records for the given username.
|
||||
|
||||
Example usage
|
||||
-------------
|
||||
|
|
@ -58,63 +61,70 @@ them as per the example.
|
|||
|
||||
*views.py:* ::
|
||||
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.http import JsonResponse, HttpResponse
|
||||
from django.contrib.auth.signals import user_logged_in,\
|
||||
user_logged_out,\
|
||||
user_login_failed
|
||||
import json
|
||||
from django.utils.decorators import method_decorator
|
||||
from django.contrib.auth import signals
|
||||
from django.views import View
|
||||
from django.views.decorators.csrf import csrf_exempt
|
||||
|
||||
from axes.decorators import axes_dispatch
|
||||
|
||||
from myapp.forms import LoginForm
|
||||
from myapp.auth import custom_authenticate, custom_login
|
||||
|
||||
from axes.decorators import axes_dispatch
|
||||
|
||||
@method_decorator(axes_dispatch, name='dispatch')
|
||||
@method_decorator(csrf_exempt, name='dispatch')
|
||||
class Login(View):
|
||||
''' Custom login view that takes JSON credentials '''
|
||||
"""
|
||||
Custom login view that takes JSON credentials
|
||||
"""
|
||||
|
||||
http_method_names = ['post',]
|
||||
http_method_names = ['post']
|
||||
|
||||
def post(self, request):
|
||||
# decode post json to dict & validate
|
||||
post_data = json.loads(request.body.decode('utf-8'))
|
||||
form = LoginForm(post_data)
|
||||
form = LoginForm(request.POST)
|
||||
|
||||
if not form.is_valid():
|
||||
# inform axes of failed login
|
||||
user_login_failed.send(
|
||||
sender = User,
|
||||
request = request,
|
||||
credentials = {
|
||||
'username': form.cleaned_data.get('username')
|
||||
}
|
||||
# inform django-axes of failed login
|
||||
signals.user_login_failed.send(
|
||||
sender=User,
|
||||
request=request,
|
||||
credentials={
|
||||
'username': form.cleaned_data.get('username'),
|
||||
},
|
||||
)
|
||||
return HttpResponse(status=400)
|
||||
|
||||
user = custom_authenticate(
|
||||
request = request,
|
||||
username = form.cleaned_data.get('username'),
|
||||
password = form.cleaned_data.get('password'),
|
||||
request=request,
|
||||
username=form.cleaned_data.get('username'),
|
||||
password=form.cleaned_data.get('password'),
|
||||
)
|
||||
|
||||
if user is not None:
|
||||
custom_login(request, user)
|
||||
user_logged_in.send(
|
||||
sender = User,
|
||||
request = request,
|
||||
user = user,
|
||||
|
||||
signals.user_logged_in.send(
|
||||
sender=User,
|
||||
request=request,
|
||||
user=user,
|
||||
)
|
||||
return JsonResponse({'message':'success!'}, status=200)
|
||||
else:
|
||||
user_login_failed.send(
|
||||
sender = User,
|
||||
request = request,
|
||||
credentials = {
|
||||
'username':form.cleaned_data.get('username')
|
||||
},
|
||||
)
|
||||
return HttpResponse(status=403)
|
||||
|
||||
return JsonResponse({
|
||||
'message':'success'
|
||||
}, status=200)
|
||||
|
||||
# inform django-axes of failed login
|
||||
signals.user_login_failed.send(
|
||||
sender=User,
|
||||
request=request,
|
||||
credentials={
|
||||
'username': form.cleaned_data.get('username'),
|
||||
},
|
||||
)
|
||||
|
||||
return HttpResponse(status=403)
|
||||
|
||||
*urls.py:* ::
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue