mirror of
https://github.com/jazzband/django-axes.git
synced 2026-05-08 15:44:46 +00:00
163 lines
5.7 KiB
Python
163 lines
5.7 KiB
Python
from socket import inet_pton, AF_INET6, error
|
|
|
|
from django.core.cache import cache
|
|
from django.utils import six
|
|
|
|
from axes.conf import settings
|
|
from axes.models import AccessAttempt
|
|
|
|
|
|
def query2str(items, max_length=1024):
|
|
"""Turns a dictionary into an easy-to-read list of key-value pairs.
|
|
|
|
If there's a field called "password" it will be excluded from the output.
|
|
|
|
The length of the output is limited to max_length to avoid a DoS attack
|
|
via excessively large payloads.
|
|
"""
|
|
return '\n'.join([
|
|
'%s=%s' % (k, v) for k, v in six.iteritems(items)
|
|
if k != settings.AXES_PASSWORD_FORM_FIELD
|
|
][:int(max_length / 2)])[:max_length]
|
|
|
|
|
|
def get_client_str(username, ip_address, user_agent=None, path_info=None):
|
|
if settings.AXES_VERBOSE:
|
|
if isinstance(path_info, tuple):
|
|
path_info = path_info[0]
|
|
details = "{{user: '{0}', ip: '{1}', user-agent: '{2}', path: '{3}'}}"
|
|
return details.format(username, ip_address, user_agent, path_info)
|
|
|
|
if settings.AXES_ONLY_USER_FAILURES:
|
|
client = username
|
|
elif settings.AXES_LOCK_OUT_BY_COMBINATION_USER_AND_IP:
|
|
client = '{0} from {1}'.format(username, ip_address)
|
|
else:
|
|
client = ip_address
|
|
|
|
if settings.AXES_USE_USER_AGENT:
|
|
client += '(user-agent={0})'.format(user_agent)
|
|
|
|
return client
|
|
|
|
|
|
def is_ipv6(ip):
|
|
try:
|
|
inet_pton(AF_INET6, ip)
|
|
except (OSError, error):
|
|
return False
|
|
return True
|
|
|
|
|
|
def get_ip(request):
|
|
"""Parse IP address from REMOTE_ADDR or
|
|
AXES_REVERSE_PROXY_HEADER if AXES_BEHIND_REVERSE_PROXY is set."""
|
|
if settings.AXES_BEHIND_REVERSE_PROXY:
|
|
# For requests originating from behind a reverse proxy,
|
|
# resolve the IP address from the given AXES_REVERSE_PROXY_HEADER.
|
|
# AXES_REVERSE_PROXY_HEADER defaults to HTTP_X_FORWARDED_FOR,
|
|
# which is the Django name for the HTTP X-Forwarder-For header.
|
|
# Please see RFC7239 for additional information:
|
|
# https://tools.ietf.org/html/rfc7239#section-5
|
|
|
|
# The REVERSE_PROXY_HEADER HTTP header is a list
|
|
# of potentionally unsecure IPs, for example:
|
|
# X-Forwarded-For: 1.1.1.1, 11.11.11.11:8080, 111.111.111.111
|
|
ip_str = request.META.get(settings.AXES_REVERSE_PROXY_HEADER, '')
|
|
|
|
# We need to know the number of proxies present in the request chain
|
|
# in order to securely calculate the one IP that is the real client IP.
|
|
#
|
|
# This is because IP headers can have multiple IPs in different
|
|
# configurations, with e.g. the X-Forwarded-For header containing
|
|
# the originating client IP, proxies and possibly spoofed values.
|
|
#
|
|
# If you are using a special header for client calculation such as the
|
|
# X-Real-IP or the like with nginx, please check this configuration.
|
|
#
|
|
# Please see discussion for more information:
|
|
# https://github.com/jazzband/django-axes/issues/224
|
|
ip_list = [ip.strip() for ip in ip_str.split(',')]
|
|
|
|
# Pick the nth last IP in the given list of addresses after parsing
|
|
if len(ip_list) >= settings.AXES_NUM_PROXIES:
|
|
ip = ip_list[-settings.AXES_NUM_PROXIES]
|
|
|
|
# Fix IIS adding client port number to the
|
|
# 'X-Forwarded-For' header (strip port)
|
|
if not is_ipv6(ip):
|
|
ip = ip.split(':', 1)[0]
|
|
|
|
# If nth last is not found, default to no IP and raise a warning
|
|
else:
|
|
ip = ''
|
|
raise Warning(
|
|
'AXES: Axes is configured for operation behind a '
|
|
'reverse proxy but received too few IPs in the HTTP '
|
|
'AXES_REVERSE_PROXY_HEADER. Check your '
|
|
'AXES_NUM_PROXIES configuration. '
|
|
'Header name: {0}, value: {1}'.format(
|
|
settings.AXES_REVERSE_PROXY_HEADER, ip_str
|
|
)
|
|
)
|
|
|
|
if not ip:
|
|
raise Warning(
|
|
'AXES: Axes is configured for operation behind a reverse '
|
|
'proxy but could not find a suitable IP in the specified '
|
|
'HTTP header. Check your proxy server settings to make '
|
|
'sure correct headers are being passed to Django in '
|
|
'AXES_REVERSE_PROXY_HEADER. '
|
|
'Header name: {0}, value: {1}'.format(
|
|
settings.AXES_REVERSE_PROXY_HEADER, ip_str
|
|
)
|
|
)
|
|
|
|
return ip
|
|
|
|
return request.META.get('REMOTE_ADDR', '')
|
|
|
|
|
|
def reset(ip=None, username=None):
|
|
"""Reset records that match ip or username, and
|
|
return the count of removed attempts.
|
|
"""
|
|
count = 0
|
|
|
|
attempts = AccessAttempt.objects.all()
|
|
if ip:
|
|
attempts = attempts.filter(ip_address=ip)
|
|
if username:
|
|
attempts = attempts.filter(username=username)
|
|
|
|
if attempts:
|
|
count = attempts.count()
|
|
from axes.decorators import get_cache_key
|
|
for attempt in attempts:
|
|
cache_hash_key = get_cache_key(attempt)
|
|
if cache.get(cache_hash_key):
|
|
cache.delete(cache_hash_key)
|
|
|
|
attempts.delete()
|
|
return count
|
|
|
|
|
|
def iso8601(timestamp):
|
|
"""Returns datetime.timedelta translated to ISO 8601 formatted duration.
|
|
"""
|
|
seconds = timestamp.total_seconds()
|
|
minutes, seconds = divmod(seconds, 60)
|
|
hours, minutes = divmod(minutes, 60)
|
|
days, hours = divmod(hours, 24)
|
|
|
|
date = '{:.0f}D'.format(days) if days else ''
|
|
|
|
time_values = hours, minutes, seconds
|
|
time_designators = 'H', 'M', 'S'
|
|
|
|
time = ''.join([
|
|
('{:.0f}'.format(value) + designator)
|
|
for value, designator in zip(time_values, time_designators)
|
|
if value]
|
|
)
|
|
return 'P' + date + ('T' + time if time else '')
|