cleaned up some landscape.io warnings

This commit is contained in:
Ken Cochrane 2015-03-20 11:09:45 -04:00
parent 8ef0420b23
commit fb095e4ca9
6 changed files with 64 additions and 63 deletions

View file

@ -11,13 +11,13 @@ from . import config
urlparse.uses_netloc.append("redis")
mocked_redis = mockredis.mock_strict_redis_client()
MOCKED_REDIS = mockredis.mock_strict_redis_client()
def get_redis_connection():
""" Get the redis connection if not using mock """
if config.MOCK_REDIS: # pragma: no cover
return mocked_redis # pragma: no cover
return MOCKED_REDIS # pragma: no cover
else: # pragma: no cover
redis_config = parse_redis_url(config.DEFENDER_REDIS_URL)
return redis.StrictRedis(
@ -31,7 +31,7 @@ def parse_redis_url(url):
"""Parses a redis URL."""
# create config with some sane defaults
config = {
redis_config = {
"DB": 0,
"PASSWORD": None,
"HOST": "localhost",
@ -39,7 +39,7 @@ def parse_redis_url(url):
}
if not url:
return config
return redis_config
url = urlparse.urlparse(url)
# Remove query strings.
@ -47,12 +47,12 @@ def parse_redis_url(url):
path = path.split('?', 2)[0]
if path:
config.update({"DB": int(path)})
redis_config.update({"DB": int(path)})
if url.password:
config.update({"PASSWORD": url.password})
redis_config.update({"PASSWORD": url.password})
if url.hostname:
config.update({"HOST": url.hostname})
redis_config.update({"HOST": url.hostname})
if url.port:
config.update({"PORT": int(url.port)})
redis_config.update({"PORT": int(url.port)})
return config
return redis_config

View file

@ -1,3 +1,4 @@
from __future__ import print_function
from datetime import timedelta
from django.core.management.base import BaseCommand

View file

@ -253,7 +253,7 @@ class AccessAttemptTest(DefenderTestCase):
self.test_failure_limit_by_ip_once()
# Reset the ip so we can try again
utils.reset_failed_attempts(ip='127.0.0.1')
utils.reset_failed_attempts(ip_address='127.0.0.1')
# Make a login attempt again
self.test_valid_login()
@ -550,13 +550,13 @@ class DefenderTestCaseTest(DefenderTestCase):
key = 'test_key'
def test_first_incr(self):
utils.redis_server.incr(self.key)
result = int(utils.redis_server.get(self.key))
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
def test_second_incr(self):
utils.redis_server.incr(self.key)
result = int(utils.redis_server.get(self.key))
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
@ -565,11 +565,11 @@ class DefenderTransactionTestCaseTest(DefenderTransactionTestCase):
key = 'test_key'
def test_first_incr(self):
utils.redis_server.incr(self.key)
result = int(utils.redis_server.get(self.key))
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)
def test_second_incr(self):
utils.redis_server.incr(self.key)
result = int(utils.redis_server.get(self.key))
utils.REDIS_SERVER.incr(self.key)
result = int(utils.REDIS_SERVER.get(self.key))
self.assertEqual(result, 1)

View file

@ -1,11 +1,11 @@
from django.conf.urls import patterns, url
from views import block_view, unblock_ip_view, unblock_username_view
from .views import block_view, unblock_ip_view, unblock_username_view
urlpatterns = patterns(
'',
url(r'^blocks/$', block_view,
name="defender_blocks_view"),
url(r'^blocks/ip/(?P<ip>[a-z0-9-._]+)/unblock$', unblock_ip_view,
url(r'^blocks/ip/(?P<ip_address>[a-z0-9-._]+)/unblock$', unblock_ip_view,
name="defender_unblock_ip_view"),
url(r'^blocks/username/(?P<username>[a-z0-9-._@]+)/unblock$',
unblock_username_view,

View file

@ -11,9 +11,9 @@ from .connection import get_redis_connection
from . import config
from .data import store_login_attempt
redis_server = get_redis_connection()
REDIS_SERVER = get_redis_connection()
log = logging.getLogger(__name__)
LOG = logging.getLogger(__name__)
def is_valid_ip(ip_address):
@ -33,25 +33,25 @@ def get_ip_address_from_request(request):
the loopback """
remote_addr = request.META.get('REMOTE_ADDR', '')
if remote_addr and is_valid_ip(remote_addr):
return remote_addr.strip()
return remote_addr.strip()
return '127.0.0.1'
def get_ip(request):
""" get the ip address from the request """
if config.BEHIND_REVERSE_PROXY:
ip = request.META.get(config.REVERSE_PROXY_HEADER, '')
ip = ip.split(",", 1)[0].strip()
if ip == '':
ip = get_ip_address_from_request(request)
ip_address = request.META.get(config.REVERSE_PROXY_HEADER, '')
ip_address = ip_address.split(",", 1)[0].strip()
if ip_address == '':
ip_address = get_ip_address_from_request(request)
else:
ip = get_ip_address_from_request(request)
return ip
ip_address = get_ip_address_from_request(request)
return ip_address
def get_ip_attempt_cache_key(ip):
def get_ip_attempt_cache_key(ip_address):
""" get the cache key by ip """
return "{0}:failed:ip:{1}".format(config.CACHE_PREFIX, ip)
return "{0}:failed:ip:{1}".format(config.CACHE_PREFIX, ip_address)
def get_username_attempt_cache_key(username):
@ -59,9 +59,9 @@ def get_username_attempt_cache_key(username):
return "{0}:failed:username:{1}".format(config.CACHE_PREFIX, username)
def get_ip_blocked_cache_key(ip):
def get_ip_blocked_cache_key(ip_address):
""" get the cache key by ip """
return "{0}:blocked:ip:{1}".format(config.CACHE_PREFIX, ip)
return "{0}:blocked:ip:{1}".format(config.CACHE_PREFIX, ip_address)
def get_username_blocked_cache_key(username):
@ -88,20 +88,20 @@ def strip_keys(key_list):
def get_blocked_ips():
""" get a list of blocked ips from redis """
key = get_ip_blocked_cache_key("*")
key_list = redis_server.keys(key)
key_list = REDIS_SERVER.keys(key)
return strip_keys(key_list)
def get_blocked_usernames():
""" get a list of blocked usernames from redis """
key = get_username_blocked_cache_key("*")
key_list = redis_server.keys(key)
key_list = REDIS_SERVER.keys(key)
return strip_keys(key_list)
def increment_key(key):
""" given a key increment the value """
pipe = redis_server.pipeline()
pipe = REDIS_SERVER.pipeline()
pipe.incr(key, 1)
if config.COOLOFF_TIME:
pipe.expire(key, config.COOLOFF_TIME)
@ -112,18 +112,18 @@ def increment_key(key):
def get_user_attempts(request):
""" Returns number of access attempts for this ip, username
"""
ip = get_ip(request)
ip_address = get_ip(request)
username = request.POST.get(config.USERNAME_FORM_FIELD, None)
# get by IP
ip_count = redis_server.get(get_ip_attempt_cache_key(ip))
ip_count = REDIS_SERVER.get(get_ip_attempt_cache_key(ip_address))
if not ip_count:
ip_count = 0
ip_count = int(ip_count)
# get by username
username_count = redis_server.get(get_username_attempt_cache_key(username))
username_count = REDIS_SERVER.get(get_username_attempt_cache_key(username))
if not username_count:
username_count = 0
username_count = int(username_count)
@ -132,16 +132,16 @@ def get_user_attempts(request):
return max(ip_count, username_count)
def block_ip(ip):
def block_ip(ip_address):
""" given the ip, block it """
if not ip:
if not ip_address:
# no reason to continue when there is no ip
return
key = get_ip_blocked_cache_key(ip)
key = get_ip_blocked_cache_key(ip_address)
if config.COOLOFF_TIME:
redis_server.set(key, 'blocked', config.COOLOFF_TIME)
REDIS_SERVER.set(key, 'blocked', config.COOLOFF_TIME)
else:
redis_server.set(key, 'blocked')
REDIS_SERVER.set(key, 'blocked')
def block_username(username):
@ -151,23 +151,23 @@ def block_username(username):
return
key = get_username_blocked_cache_key(username)
if config.COOLOFF_TIME:
redis_server.set(key, 'blocked', config.COOLOFF_TIME)
REDIS_SERVER.set(key, 'blocked', config.COOLOFF_TIME)
else:
redis_server.set(key, 'blocked')
REDIS_SERVER.set(key, 'blocked')
def record_failed_attempt(ip, username):
def record_failed_attempt(ip_address, username):
""" record the failed login attempt, if over limit return False,
if not over limit return True """
# increment the failed count, and get current number
ip_count = increment_key(get_ip_attempt_cache_key(ip))
ip_count = increment_key(get_ip_attempt_cache_key(ip_address))
user_count = increment_key(get_username_attempt_cache_key(username))
ip_block = False
user_block = False
# if either are over the limit, add to block
if ip_count > config.FAILURE_LIMIT:
block_ip(ip)
block_ip(ip_address)
ip_block = True
if user_count > config.FAILURE_LIMIT:
block_username(username)
@ -176,15 +176,15 @@ def record_failed_attempt(ip, username):
return not (ip_block or user_block)
def unblock_ip(ip, pipe=None):
def unblock_ip(ip_address, pipe=None):
""" unblock the given IP """
do_commit = False
if not pipe:
pipe = redis_server.pipeline()
pipe = REDIS_SERVER.pipeline()
do_commit = True
if ip:
pipe.delete(get_ip_attempt_cache_key(ip))
pipe.delete(get_ip_blocked_cache_key(ip))
if ip_address:
pipe.delete(get_ip_attempt_cache_key(ip_address))
pipe.delete(get_ip_blocked_cache_key(ip_address))
if do_commit:
pipe.execute()
@ -193,7 +193,7 @@ def unblock_username(username, pipe=None):
""" unblock the given Username """
do_commit = False
if not pipe:
pipe = redis_server.pipeline()
pipe = REDIS_SERVER.pipeline()
do_commit = True
if username:
pipe.delete(get_username_attempt_cache_key(username))
@ -202,12 +202,12 @@ def unblock_username(username, pipe=None):
pipe.execute()
def reset_failed_attempts(ip=None, username=None):
def reset_failed_attempts(ip_address=None, username=None):
""" reset the failed attempts for these ip's and usernames
"""
pipe = redis_server.pipeline()
pipe = REDIS_SERVER.pipeline()
unblock_ip(ip, pipe=pipe)
unblock_ip(ip_address, pipe=pipe)
unblock_username(username, pipe=pipe)
pipe.execute()
@ -241,7 +241,7 @@ def is_already_locked(request):
username = request.POST.get(config.USERNAME_FORM_FIELD, None)
# ip blocked?
ip_blocked = redis_server.get(get_ip_blocked_cache_key(ip_address))
ip_blocked = REDIS_SERVER.get(get_ip_blocked_cache_key(ip_address))
if not ip_blocked:
ip_blocked = False
@ -250,7 +250,7 @@ def is_already_locked(request):
return True
# username blocked?
user_blocked = redis_server.get(get_username_blocked_cache_key(username))
user_blocked = REDIS_SERVER.get(get_username_blocked_cache_key(username))
if user_blocked:
return True
@ -265,7 +265,7 @@ def check_request(request, login_unsuccessful):
if not login_unsuccessful:
# user logged in -- forget the failed attempts
reset_failed_attempts(ip=ip_address, username=username)
reset_failed_attempts(ip_address=ip_address, username=username)
return True
else:
# add a failed attempt for this user

View file

@ -22,10 +22,10 @@ def block_view(request):
@staff_member_required
def unblock_ip_view(request, ip):
def unblock_ip_view(request, ip_address):
""" upblock the given ip """
if request.method == 'POST':
unblock_ip(ip)
unblock_ip(ip_address)
return HttpResponseRedirect(reverse("defender_blocks_view"))