Add new config options and update logic/tests to account for them

This commit is contained in:
djmore4 2022-10-11 12:33:05 -04:00 committed by Ken Cochrane
parent dffa7c3ba3
commit 177f2ecce8
6 changed files with 166 additions and 16 deletions

View file

@ -350,8 +350,20 @@ These should be defined in your ``settings.py`` file.
* ``DEFENDER_DISABLE_IP_LOCKOUT``\ : Boolean: If this is True, it will not lockout the users IP address, it will only lockout the username. [Default: False]
* ``DEFENDER_DISABLE_USERNAME_LOCKOUT``\ : Boolean: If this is True, it will not lockout usernames, it will only lockout IP addresess. [Default: False]
* ``DEFENDER_COOLOFF_TIME``\ : Int: If set, defines a period of inactivity after which
old failed login attempts will be forgotten. An integer, will be interpreted as a
number of seconds. If ``0``\ , the locks will not expire. [Default: ``300``\ ]
old failed login attempts and username/ip lockouts will be forgotten. An integer,
will be interpreted as a number of seconds. If 0, neither the failed login attempts
nor the username/ip locks will expire. [Default: ``300``\ ]
* ``DEFENDER_ATTEMPT_COOLOFF_TIME``\ : Int: If set, overrides the period of inactivity
after which old failed login attempts will be forgotten set by DEFENDER_COOLOFF_TIME.
An integer, will be interpreted as a number of seconds. If 0, the failed login
attempts will not expire. [Default: ``DEFENDER_COOLOFF_TIME``\ ]
* ``DEFENDER_LOCKOUT_COOLOFF_TIME``\ : Int or List: If set, overrides the period of
inactivity after which username/ip lockouts will be forgotten set by
DEFENDER_COOLOFF_TIME. An integer, will be interpreted as a number of seconds.
A list of integers, will be interpreted as a number of seconds for users with
the integer's index being how many previous lockouts (up to some maximum) occurred
in the last ``DEFENDER_ACCESS_ATTEMPT_EXPIRATION`` hours. If the property is set to
0 or [], the username/ip lockout will not expire. [Default: ``DEFENDER_COOLOFF_TIME``\ ]
* ``DEFENDER_LOCKOUT_TEMPLATE``\ : String: [Default: ``None``\ ] If set, specifies a template to render when a user is locked out. Template receives the following context variables:
* ``cooloff_time_seconds``\ : The cool off time in seconds
@ -434,7 +446,7 @@ There's sample ``BasicAuthenticationDefender`` class based on ``djangorestframew
"Your account is locked for {cooloff_time_seconds} seconds" \
"".format(
failure_limit=config.FAILURE_LIMIT,
cooloff_time_seconds=config.COOLOFF_TIME
cooloff_time_seconds=config.LOCKOUT_COOLOFF_TIME
)
raise exceptions.AuthenticationFailed(_(detail))
@ -520,7 +532,7 @@ Below is a sample ``BasicAuthenticationDefender`` class based on ``rest_framewor
detail = "You have attempted to login {failure_limit} times with no success. "
.format(
failure_limit=config.FAILURE_LIMIT,
cooloff_time_seconds=config.COOLOFF_TIME
cooloff_time_seconds=config.LOCKOUT_COOLOFF_TIME
)
raise exceptions.AuthenticationFailed(_(detail))

View file

@ -54,8 +54,31 @@ REVERSE_PROXY_HEADER = get_setting(
)
try:
# how long to wait before the bad login attempt gets forgotten. in seconds.
# how long to wait before the bad login attempt/lockout gets forgotten, in seconds.
COOLOFF_TIME = int(get_setting("DEFENDER_COOLOFF_TIME", 300)) # seconds
try:
# how long to wait before the bad login attempt gets forgotten, in seconds.
ATTEMPT_COOLOFF_TIME = int(get_setting("DEFENDER_ATTEMPT_COOLOFF_TIME", COOLOFF_TIME)) # measured in seconds
except ValueError:
raise Exception("DEFENDER_ATTEMPT_COOLOFF_TIME needs to be an integer")
try:
# how long to wait before a lockout gets forgotten, in seconds.
LOCKOUT_COOLOFF_TIMES = [int(get_setting("DEFENDER_LOCKOUT_COOLOFF_TIME", COOLOFF_TIME))] # measured in seconds
except TypeError:
try:
cooloff_times = get_setting("DEFENDER_LOCKOUT_COOLOFF_TIME", [COOLOFF_TIME]) # measured in seconds
for index, cooloff_time in enumerate(cooloff_times):
cooloff_times[index] = int(cooloff_time)
if not len(cooloff_times):
raise TypeError()
LOCKOUT_COOLOFF_TIMES = cooloff_times
except (TypeError, ValueError):
raise Exception("DEFENDER_LOCKOUT_COOLOFF_TIME needs to be an integer or list of integers having at least one element")
except ValueError:
raise Exception("DEFENDER_LOCKOUT_COOLOFF_TIME needs to be an integer or list of integers having at least one element")
except ValueError: # pragma: no cover
raise Exception("DEFENDER_COOLOFF_TIME needs to be an integer") # pragma: no cover

View file

@ -1,4 +1,8 @@
from datetime import datetime, timedelta
from defender import config
from .models import AccessAttempt
from django.db.models import Q
def store_login_attempt(
@ -13,3 +17,45 @@ def store_login_attempt(
path_info=path_info,
login_valid=login_valid,
)
def get_approx_account_lockouts_from_login_attempts(ip_address=None, username=None):
"""Get the approximate number of account lockouts in a period of ACCESS_ATTEMPT_EXPIRATION hours.
This is approximate because we do not consider the time between these failed
login attempts to be relevant.
Args:
ip_address (str, optional): IP address to search for. Can be used in conjunction with username for filtering when DISABLE_IP_LOCKOUT is False. Defaults to None.
username (str, optional): Username to search for. Can be used in conjunction with ip_address for filtering when DISABLE_USERNAME_LOCKOUT is False. Defaults to None.
Returns:
int: The minimum of the count of logged failure attempts and the length of the LOCKOUT_COOLOFF_TIMES - 1, or 0 dependant on either configuration or argument parameters (ie. both ip_address and username being None).
"""
# TODO: Possibly add logic to temporarily store this info in the cache
# to help mitigate any potential performance impact this could have.
if not config.STORE_ACCESS_ATTEMPTS or not (ip_address or username):
# If we're not storing login attempts OR both ip_address and username are
# None we should return 0.
return 0
q = Q(attempt_time__gte=datetime.now() - timedelta(hours=config.ACCESS_ATTEMPT_EXPIRATION))
failure_limit = config.FAILURE_LIMIT
if (ip_address and username \
and config.LOCKOUT_BY_IP_USERNAME \
and not (config.DISABLE_IP_LOCKOUT or config.DISABLE_USERNAME_LOCKOUT)
):
q = q & Q(ip_address=ip_address) & Q(username=username)
elif ip_address and not config.DISABLE_IP_LOCKOUT:
failure_limit = config.IP_FAILURE_LIMIT
q = q & Q(ip_address=ip_address)
elif username and not config.DISABLE_USERNAME_LOCKOUT:
failure_limit = config.USERNAME_FAILURE_LIMIT
q = q & Q(username=username)
else:
# If we've made it this far and didn't hit one of the other if or elif
# conditions, we're in an inappropriate context.
# TODO: Maybe we should throw an exception here instead of returning 0?
return 0
return min(AccessAttempt.objects.filter(q).count() // failure_limit, len(config.LOCKOUT_COOLOFF_TIMES) - 1)

View file

@ -14,6 +14,7 @@ MIDDLEWARE = (
"django.contrib.sessions.middleware.SessionMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"defender.middleware.FailedLoginMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
)
ROOT_URLCONF = "defender.test_urls"

View file

@ -286,7 +286,7 @@ class AccessAttemptTest(DefenderTestCase):
"""
self.test_failure_limit_by_ip_once()
# Wait for the cooling off period
time.sleep(config.COOLOFF_TIME)
time.sleep(config.ATTEMPT_COOLOFF_TIME)
if config.MOCK_REDIS:
# mock redis require that we expire on our own
@ -926,6 +926,64 @@ class AccessAttemptTest(DefenderTestCase):
data_out = utils.get_blocked_ips()
self.assertEqual(data_out, [])
@patch("defender.config.ATTEMPT_COOLOFF_TIME", "a")
def test_bad_attempt_cooloff_configuration(self):
self.assertRaises(Exception)
@patch("defender.config.ATTEMPT_COOLOFF_TIME", ["a"])
def test_bad_attempt_cooloff_configuration_with_list(self):
self.assertRaises(Exception)
@patch("defender.config.LOCKOUT_COOLOFF_TIMES", "a")
def test_bad_lockout_cooloff_configuration(self):
self.assertRaises(Exception)
@patch("defender.config.LOCKOUT_COOLOFF_TIMES", [300, "a"])
def test_bad_list_lockout_cooloff_configuration(self):
self.assertRaises(Exception)
@patch("defender.config.LOCKOUT_COOLOFF_TIMES", [300, dict(a="a")])
def test_bad_list_with_dict_lockout_cooloff_configuration(self):
self.assertRaises(Exception)
@patch("defender.config.LOCKOUT_COOLOFF_TIMES", [2, 4])
def test_lockout_cooloff_correctly_scales_with_ip_when_set(self):
self.test_failure_limit_by_ip_once()
time.sleep(config.LOCKOUT_COOLOFF_TIMES[0])
self.test_valid_login()
self.test_failure_limit_by_ip_once()
time.sleep(config.LOCKOUT_COOLOFF_TIMES[0])
self.test_blocked_ip_cannot_login()
utils.reset_failed_attempts(ip_address="127.0.0.1")
@patch("defender.config.LOCKOUT_COOLOFF_TIMES", [2, 4])
def test_lockout_cooloff_correctly_scales_with_username_when_set(self):
self.test_failure_limit_by_username_once()
time.sleep(config.LOCKOUT_COOLOFF_TIMES[0])
self.test_valid_login()
self.test_failure_limit_by_username_once()
time.sleep(config.LOCKOUT_COOLOFF_TIMES[0])
self.test_blocked_username_cannot_login()
utils.reset_failed_attempts(username=VALID_USERNAME)
@patch("defender.config.LOCKOUT_COOLOFF_TIMES", [2, 4])
def test_lockout_correctly_releases_after_scaling_with_ip(self):
self.test_failure_limit_by_ip_once()
time.sleep(config.LOCKOUT_COOLOFF_TIMES[0])
self.test_valid_login()
self.test_failure_limit_by_ip_once()
time.sleep(config.LOCKOUT_COOLOFF_TIMES[1])
self.test_valid_login()
@patch("defender.config.LOCKOUT_COOLOFF_TIMES", [2, 4])
def test_lockout_correctly_releases_after_scaling_with_username(self):
self.test_failure_limit_by_username_once()
time.sleep(config.LOCKOUT_COOLOFF_TIMES[0])
self.test_valid_login()
self.test_failure_limit_by_username_once()
time.sleep(config.LOCKOUT_COOLOFF_TIMES[1])
self.test_valid_login()
class SignalTest(DefenderTestCase):
""" Test that signals are properly sent when blocking usernames and IPs.

View file

@ -10,7 +10,7 @@ from django.utils.module_loading import import_string
from .connection import get_redis_connection
from . import config
from .data import store_login_attempt
from .data import get_approx_account_lockouts_from_login_attempts, store_login_attempt
from .signals import (
send_username_block_signal,
send_ip_block_signal,
@ -166,8 +166,8 @@ def increment_key(key):
""" given a key increment the value """
pipe = REDIS_SERVER.pipeline()
pipe.incr(key, 1)
if config.COOLOFF_TIME:
pipe.expire(key, config.COOLOFF_TIME)
if config.ATTEMPT_COOLOFF_TIME:
pipe.expire(key, config.ATTEMPT_COOLOFF_TIME)
new_value = pipe.execute()[0]
return new_value
@ -204,6 +204,13 @@ def get_user_attempts(request, get_username=get_username_from_request, username=
# return the larger of the two.
return max(ip_count, username_count)
def get_lockout_cooloff_time(ip_address=None, username=None):
index = min(
len(config.LOCKOUT_COOLOFF_TIMES) - 1,
get_approx_account_lockouts_from_login_attempts(ip_address, username)
)
return config.LOCKOUT_COOLOFF_TIMES[index]
def block_ip(ip_address):
""" given the ip, block it """
@ -215,8 +222,9 @@ def block_ip(ip_address):
return
already_blocked = is_source_ip_already_locked(ip_address)
key = get_ip_blocked_cache_key(ip_address)
if config.COOLOFF_TIME:
REDIS_SERVER.set(key, "blocked", config.COOLOFF_TIME)
cooloff_time = get_lockout_cooloff_time(ip_address=ip_address)
if cooloff_time:
REDIS_SERVER.set(key, "blocked", cooloff_time)
else:
REDIS_SERVER.set(key, "blocked")
if not already_blocked:
@ -233,8 +241,9 @@ def block_username(username):
return
already_blocked = is_user_already_locked(username)
key = get_username_blocked_cache_key(username)
if config.COOLOFF_TIME:
REDIS_SERVER.set(key, "blocked", config.COOLOFF_TIME)
cooloff_time = get_lockout_cooloff_time(username=username)
if cooloff_time:
REDIS_SERVER.set(key, "blocked", cooloff_time)
else:
REDIS_SERVER.set(key, "blocked")
if not already_blocked:
@ -332,9 +341,10 @@ def reset_failed_attempts(ip_address=None, username=None):
def lockout_response(request):
""" if we are locked out, here is the response """
if config.LOCKOUT_TEMPLATE:
cooloff_time = get_lockout_cooloff_time(ip_address=get_ip(request), username=get_username_from_request(request))
context = {
"cooloff_time_seconds": config.COOLOFF_TIME,
"cooloff_time_minutes": config.COOLOFF_TIME / 60,
"cooloff_time_seconds": cooloff_time,
"cooloff_time_minutes": cooloff_time / 60,
"failure_limit": config.FAILURE_LIMIT,
}
return render(request, config.LOCKOUT_TEMPLATE, context)
@ -342,7 +352,7 @@ def lockout_response(request):
if config.LOCKOUT_URL:
return HttpResponseRedirect(config.LOCKOUT_URL)
if config.COOLOFF_TIME:
if get_lockout_cooloff_time():
return HttpResponse(
"Account locked: too many login attempts. " "Please try again later."
)