send user/ip blocked signal only once

This commit is contained in:
horida 2019-09-09 10:04:13 +02:00 committed by Ken Cochrane
parent fcfa88d679
commit ce95906488
2 changed files with 34 additions and 2 deletions

View file

@ -948,6 +948,20 @@ class SignalTest(DefenderTestCase):
utils.unblock_ip('8.8.8.8')
self.assertIsNone(self.blocked_ip)
def test_should_not_send_signal_when_ip_already_blocked(self):
self.blocked_ip = None
def handler(sender, ip_address, **kwargs):
self.blocked_ip = ip_address
ip_block_signal.connect(handler)
key = utils.get_ip_blocked_cache_key('8.8.8.8')
utils.REDIS_SERVER.set(key, 'blocked')
utils.block_ip('8.8.8.8')
self.assertIsNone(self.blocked_ip)
def test_should_send_signal_when_blocking_username(self):
self.blocked_username = None
@ -968,6 +982,20 @@ class SignalTest(DefenderTestCase):
utils.unblock_username('richard_hendricks')
self.assertIsNone(self.blocked_username)
def test_should_not_send_signal_when_username_already_blocked(self):
self.blocked_username = None
def handler(sender, username, **kwargs):
self.blocked_username = username
username_block_signal.connect(handler)
key = utils.get_username_blocked_cache_key('richard hendricks')
utils.REDIS_SERVER.set(key, 'blocked')
utils.block_ip('richard hendricks')
self.assertIsNone(self.blocked_username)
class DefenderTestCaseTest(DefenderTestCase):
""" Make sure that we're cleaning the cache between tests """

View file

@ -174,12 +174,14 @@ def block_ip(ip_address):
if config.DISABLE_IP_LOCKOUT:
# no need to block, we disabled it.
return
already_blocked = is_source_ip_already_locked(ip_address)
key = get_ip_blocked_cache_key(ip_address)
if config.COOLOFF_TIME:
REDIS_SERVER.set(key, 'blocked', config.COOLOFF_TIME)
else:
REDIS_SERVER.set(key, 'blocked')
send_ip_block_signal(ip_address)
if not already_blocked:
send_ip_block_signal(ip_address)
def block_username(username):
@ -190,12 +192,14 @@ def block_username(username):
if config.DISABLE_USERNAME_LOCKOUT:
# no need to block, we disabled it.
return
already_blocked = is_user_already_locked(username)
key = get_username_blocked_cache_key(username)
if config.COOLOFF_TIME:
REDIS_SERVER.set(key, 'blocked', config.COOLOFF_TIME)
else:
REDIS_SERVER.set(key, 'blocked')
send_username_block_signal(username)
if not already_blocked:
send_username_block_signal(username)
def record_failed_attempt(ip_address, username):