🔥 Clean weird logic when processing proxy ips

This commit is contained in:
Camilo Nova 2016-07-20 11:38:37 -05:00
parent ea9a87b7de
commit 0239e173e0
2 changed files with 30 additions and 83 deletions

View file

@ -6,8 +6,6 @@ from datetime import timedelta
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth import logout
from django.core.exceptions import ValidationError
from django.core.validators import validate_ipv46_address
from django.http import HttpResponse
from django.http import HttpResponseRedirect
from django.shortcuts import render
@ -38,10 +36,6 @@ PASSWORD_FORM_FIELD = getattr(settings, 'AXES_PASSWORD_FORM_FIELD', 'password')
# see if the django app is sitting behind a reverse proxy
BEHIND_REVERSE_PROXY = getattr(settings, 'AXES_BEHIND_REVERSE_PROXY', False)
# see if the django app is sitting behind a reverse proxy but can be accessed directly
BEHIND_REVERSE_PROXY_WITH_DIRECT_ACCESS = \
getattr(settings, 'AXES_BEHIND_REVERSE_PROXY_WITH_DIRECT_ACCESS', False)
# if the django app is behind a reverse proxy, look for the ip address using this HTTP header value
REVERSE_PROXY_HEADER = \
getattr(settings, 'AXES_REVERSE_PROXY_HEADER', 'HTTP_X_FORWARDED_FOR')
@ -85,77 +79,20 @@ if BEHIND_REVERSE_PROXY:
log.debug('Looking for header value %s', REVERSE_PROXY_HEADER)
def is_valid_ip(ip_address):
"""Returns whether IP address is both valid AND, per RFC 1918, not reserved as
private"""
try:
validate_ipv46_address(ip_address)
except ValidationError:
return False
PRIVATE_IPS_PREFIX = (
'10.',
'172.16.', '172.17.', '172.18.', '172.19.', '172.20.', '172.21.',
'172.22.', '172.23.', '172.24.', '172.25.', '172.26.', '172.27.',
'172.28.', '172.29.', '172.30.', '172.31.',
'192.168.',
'127.',
)
return not ip_address.startswith(PRIVATE_IPS_PREFIX)
def get_ip_address_from_request(request):
"""
Makes the best attempt to get the client's real IP or return the loopback
"""
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR', '')
x_real_ip = request.META.get('HTTP_X_REAL_IP', '')
remote_addr = request.META.get('REMOTE_ADDR', '')
ip_address = None
if x_forwarded_for and ',' not in x_forwarded_for:
if is_valid_ip(x_forwarded_for):
ip_address = x_forwarded_for.strip()
else:
for ip_raw in x_forwarded_for.split(','):
ip = ip_raw.strip()
if is_valid_ip(ip):
ip_address = ip
break
if not ip_address:
if x_real_ip and is_valid_ip(x_real_ip):
ip_address = x_real_ip.strip()
elif remote_addr and is_valid_ip(remote_addr):
ip_address = remote_addr.strip()
else:
ip_address = '127.0.0.1'
return ip_address
def get_ip(request):
if not BEHIND_REVERSE_PROXY:
return get_ip_address_from_request(request)
ip = request.META.get('REMOTE_ADDR', '')
ip = request.META.get(REVERSE_PROXY_HEADER, '')
ip = ip.split(',', 1)[0].strip()
if ip == '':
if BEHIND_REVERSE_PROXY_WITH_DIRECT_ACCESS:
ip = request.META.get('REMOTE_ADDR', '')
if not ip_in_whitelist(ip):
raise Warning(
'Axes is configured for operation behind a reverse proxy '
'and to allow some IP addresses to have direct access. '
'{0} is not on the white list'.format(ip)
)
else:
if BEHIND_REVERSE_PROXY:
ip = request.META.get(REVERSE_PROXY_HEADER, '').split(',', 1)[0]
ip = ip.strip()
if not ip:
raise Warning(
'Axes is configured for operation behind a reverse proxy '
'but could not find an HTTP header value. Check your proxy '
'server settings to make sure this header value is being '
'passed. Header value {0}'.format(REVERSE_PROXY_HEADER)
)
return ip
@ -164,11 +101,13 @@ def query2str(items, max_length=1024):
If there's a field called "password" it will be excluded from the output.
The length of the output is limited to max_length to avoid a DoS attack via excessively large payloads.
The length of the output is limited to max_length to avoid a DoS attack
via excessively large payloads.
"""
return '\n'.join(['%s=%s' % (k, v) for k, v in six.iteritems(items)
if k != PASSWORD_FORM_FIELD][:int(max_length/2)])[:max_length]
return '\n'.join([
'%s=%s' % (k, v) for k, v in six.iteritems(items)
if k != PASSWORD_FORM_FIELD
][:int(max_length / 2)])[:max_length]
def ip_in_whitelist(ip):
@ -312,12 +251,15 @@ def watch_login(func):
response.status_code != 302
)
user_agent = request.META.get('HTTP_USER_AGENT', '<unknown>')[:255]
http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
path_info = request.META.get('PATH_INFO', '<unknown>')[:255]
AccessLog.objects.create(
user_agent=request.META.get('HTTP_USER_AGENT', '<unknown>')[:255],
user_agent=user_agent,
ip_address=get_ip(request),
username=request.POST.get(USERNAME_FORM_FIELD, None),
http_accept=request.META.get('HTTP_ACCEPT', '<unknown>'),
path_info=request.META.get('PATH_INFO', '<unknown>'),
http_accept=http_accept,
path_info=path_info,
trusted=not login_unsuccessful,
)
if check_request(request, login_unsuccessful):
@ -412,14 +354,17 @@ def check_request(request, login_unsuccessful):
attempt.post_data,
query2str(request.POST)
)
attempt.http_accept = request.META.get('HTTP_ACCEPT', '<unknown>')
attempt.path_info = request.META.get('PATH_INFO', '<unknown>')
attempt.http_accept = \
request.META.get('HTTP_ACCEPT', '<unknown>')[:1025]
attempt.path_info = \
request.META.get('PATH_INFO', '<unknown>')[:255]
attempt.failures_since_start = failures
attempt.attempt_time = datetime.now()
attempt.save()
log.info('AXES: Repeated login failure by %s. Updating access '
'record. Count = %s' %
(attempt.ip_address, failures))
log.info(
'AXES: Repeated login failure by %s. Updating access '
'record. Count = %s' % (attempt.ip_address, failures)
)
else:
create_new_failure_records(request, failures)
else:
@ -449,7 +394,7 @@ def check_request(request, login_unsuccessful):
if hasattr(request, 'user') and request.user.is_authenticated():
logout(request)
log.warn(
'AXES: locked out %s after repeated login attempts.' % (ip_address,)
'AXES: locked out %s after repeated login attempts.' % ip_address
)
# send signal when someone is locked out.
user_locked_out.send(

View file

@ -75,4 +75,6 @@ class AccessLog(CommonAccess):
)
def __unicode__(self):
return six.u('Access Log for %s @ %s') % (self.username, self.attempt_time)
return six.u('Access Log for %s @ %s') % (
self.username, self.attempt_time
)