Merge pull request #52 from docker-hub/bc

Add helpers that do not assume how to retrieve `username`
This commit is contained in:
Ken Cochrane 2016-01-25 13:54:54 -05:00
commit adc429ab50
3 changed files with 47 additions and 25 deletions

View file

@ -1,15 +1,14 @@
from . import config
from .data import store_login_attempt
# not sure how to get this to look better. ideally we want to dynamically
# apply the celery decorator based on the USE_CELERY setting.
if config.USE_CELERY:
from celery import shared_task
from celery import shared_task
@shared_task()
def add_login_attempt_task(user_agent, ip_address, username,
http_accept, path_info, login_valid):
""" Create a record for the login attempt """
store_login_attempt(user_agent, ip_address, username,
http_accept, path_info, login_valid)
@shared_task()
def add_login_attempt_task(user_agent, ip_address, username,
http_accept, path_info, login_valid):
""" Create a record for the login attempt """
store_login_attempt(user_agent, ip_address, username,
http_accept, path_info, login_valid)

View file

@ -686,3 +686,21 @@ class DefenderTransactionTestCaseTest(DefenderTransactionTestCase):
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
class TestUtils(DefenderTestCase):
def test_username_blocking(self):
username = 'foo'
self.assertFalse(utils.is_user_already_locked(username))
utils.block_username(username)
self.assertTrue(utils.is_user_already_locked(username))
utils.unblock_username(username)
self.assertFalse(utils.is_user_already_locked(username))
def test_ip_address_blocking(self):
ip = '1.2.3.4'
self.assertFalse(utils.is_source_ip_already_locked(ip))
utils.block_ip(ip)
self.assertTrue(utils.is_source_ip_already_locked(ip))
utils.unblock_ip(ip)
self.assertFalse(utils.is_source_ip_already_locked(ip))

View file

@ -265,25 +265,30 @@ def lockout_response(request):
"Contact an admin to unlock your account.")
def is_user_already_locked(username):
"""Is this username already locked?"""
if username is None:
return False
return REDIS_SERVER.get(get_username_blocked_cache_key(username))
def is_source_ip_already_locked(ip_address):
"""Is this IP already locked?"""
if ip_address is None:
return False
if config.DISABLE_IP_LOCKOUT:
return False
return REDIS_SERVER.get(get_ip_blocked_cache_key(ip_address))
def is_already_locked(request):
""" Is this IP/username already locked? """
if not config.DISABLE_IP_LOCKOUT:
# ip blocked?
ip_address = get_ip(request)
ip_blocked = REDIS_SERVER.get(get_ip_blocked_cache_key(ip_address))
else:
# we disabled ip lockout, so it will never be blocked.
ip_blocked = False
# username blocked?
username = request.POST.get(config.USERNAME_FORM_FIELD, None)
user_blocked = REDIS_SERVER.get(get_username_blocked_cache_key(username))
"""Parse the username & IP from the request, and see if it's already locked."""
user_blocked = is_user_already_locked(
request.POST.get(config.USERNAME_FORM_FIELD, None))
ip_blocked = is_source_ip_already_locked(get_ip(request))
if config.LOCKOUT_BY_IP_USERNAME:
LOG.info("Block by ip & username")
# if both this IP and this username are present the request is
# blocked
# if both this IP and this username are present the request is blocked
return ip_blocked and user_blocked
return ip_blocked or user_blocked