Use redis parse_url method instead of a custom one (#234)

* Use redis parse_url method instead of a custom one

The custom method defined here has no real advantage

- the redis lib implements it better and have support for many use cases
- maintaining this implementation is error-prone and unnecessary work for overworked open-source contributors :)

Especially, when you want to pass query parameters here, they are not supported (for eg a custom certificate authority)

* remove test about url parsing
* remove unused imports
This commit is contained in:
dkr-sahar 2023-10-12 19:20:58 +02:00 committed by GitHub
parent f6c73e093b
commit ba548fa9c3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 3 additions and 124 deletions

View file

@ -31,55 +31,5 @@ def get_redis_connection():
except AttributeError:
# django_redis.cache.RedisCache case (django-redis package)
return cache.client.get_client(True)
else: # pragma: no cover
redis_config = parse_redis_url(
config.DEFENDER_REDIS_URL, config.DEFENDER_REDIS_PASSWORD_QUOTE)
return redis.StrictRedis(
host=redis_config.get("HOST"),
port=redis_config.get("PORT"),
db=redis_config.get("DB"),
username=redis_config.get("USERNAME"),
password=redis_config.get("PASSWORD"),
ssl=redis_config.get("SSL"),
)
def parse_redis_url(url, password_quote=None):
"""Parses a redis URL."""
# create config with some sane defaults
redis_config = {
"DB": 0,
"PASSWORD": None,
"HOST": "localhost",
"PORT": 6379,
"SSL": False,
}
if not url:
return redis_config
purl = urlparse.urlparse(url)
# Remove query strings.
path = purl.path[1:]
path = path.split("?", 2)[0]
if path:
redis_config.update({"DB": int(path)})
if purl.password:
password = purl.password
if password_quote:
password = urlparse.unquote(password)
redis_config.update({"PASSWORD": password})
if purl.hostname:
redis_config.update({"HOST": purl.hostname})
if purl.username:
redis_config.update({"USERNAME": purl.username})
if purl.port:
redis_config.update({"PORT": int(purl.port)})
if purl.scheme in ["https", "rediss"]:
redis_config.update({"SSL": True})
return redis_config
else: # pragma: no cover)
return redis.StrictRedis.from_url(config.DEFENDER_REDIS_URL)

View file

@ -3,11 +3,8 @@ import string
import time
from unittest.mock import patch
from datetime import datetime, timedelta
from django.contrib.auth.models import AnonymousUser, User
from django.contrib.sessions.backends.db import SessionStore
from django.db.models import Q
from django.http import HttpRequest, HttpResponse
from django.test.client import RequestFactory
from django.test.testcases import TestCase
@ -26,7 +23,7 @@ from .signals import (
username_block as username_block_signal,
username_unblock as username_unblock_signal,
)
from .connection import parse_redis_url, get_redis_connection
from .connection import get_redis_connection
from .decorators import watch_login
from .models import AccessAttempt
from .test import DefenderTestCase, DefenderTransactionTestCase
@ -478,74 +475,6 @@ class AccessAttemptTest(DefenderTestCase):
self.assertEqual(utils.is_valid_ip("::ffff:192.0.2.128"), True)
self.assertEqual(utils.is_valid_ip("::ffff:8.8.8.8"), True)
def test_parse_redis_url(self):
""" test the parse_redis_url method """
# full regular
conf = parse_redis_url("redis://user:password@localhost2:1234/2", False)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
self.assertEqual(conf.get("USERNAME"), "user")
# full non local
conf = parse_redis_url(
"redis://user:pass@www.localhost.com:1234/2", False)
self.assertEqual(conf.get("HOST"), "www.localhost.com")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), "pass")
self.assertEqual(conf.get("PORT"), 1234)
self.assertEqual(conf.get("USERNAME"), "user")
# no user name
conf = parse_redis_url("redis://password@localhost2:1234/2", False)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), None)
self.assertEqual(conf.get("PORT"), 1234)
# no user name 2 with colon
conf = parse_redis_url("redis://:password@localhost2:1234/2", False)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
# Empty
conf = parse_redis_url(None, False)
self.assertEqual(conf.get("HOST"), "localhost")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), None)
self.assertEqual(conf.get("PORT"), 6379)
# no db
conf = parse_redis_url("redis://:password@localhost2:1234", False)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
# no password
conf = parse_redis_url("redis://localhost2:1234/0", False)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), None)
self.assertEqual(conf.get("PORT"), 1234)
# password with special character and set the password_quote = True
conf = parse_redis_url("redis://:calmkart%23%40%21@localhost:6379/0", True)
self.assertEqual(conf.get("HOST"), "localhost")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), "calmkart#@!")
self.assertEqual(conf.get("PORT"), 6379)
# password without special character and set the password_quote = True
conf = parse_redis_url("redis://:password@localhost2:1234", True)
self.assertEqual(conf.get("HOST"), "localhost2")
self.assertEqual(conf.get("DB"), 0)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
@patch("defender.config.DEFENDER_REDIS_NAME", "default")
def test_get_redis_connection_django_conf(self):
""" get the redis connection """