more test fixes

This commit is contained in:
Ken Cochrane 2023-02-22 21:30:49 -05:00
parent fccdf4ae46
commit f6635ab940
2 changed files with 14 additions and 34 deletions

View file

@ -60,26 +60,26 @@ def parse_redis_url(url, password_quote=None):
if not url:
return redis_config
url = urlparse.urlparse(url)
print(url)
purl = urlparse.urlparse(url)
# Remove query strings.
path = url.path[1:]
path = purl.path[1:]
path = path.split("?", 2)[0]
if path:
redis_config.update({"DB": int(path)})
if url.password:
password = url.password
if purl.password:
password = purl.password
if password_quote:
password = urlparse.unquote(password)
redis_config.update({"PASSWORD": password})
if url.hostname:
redis_config.update({"HOST": url.hostname})
if url.username:
redis_config.update({"USERNAME": url.username})
if url.port:
redis_config.update({"PORT": int(url.port)})
if url.scheme in ["https", "rediss"]:
if purl.hostname:
redis_config.update({"HOST": purl.hostname})
if purl.username:
redis_config.update({"USERNAME": purl.username})
if purl.port:
redis_config.update({"PORT": int(purl.port)})
if purl.scheme in ["https", "rediss"]:
redis_config.update({"SSL": True})
return redis_config

View file

@ -486,6 +486,7 @@ class AccessAttemptTest(DefenderTestCase):
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), "password")
self.assertEqual(conf.get("PORT"), 1234)
self.assertEqual(conf.get("USERNAME"), "user")
# full non local
conf = parse_redis_url(
@ -494,6 +495,7 @@ class AccessAttemptTest(DefenderTestCase):
self.assertEqual(conf.get("DB"), 2)
self.assertEqual(conf.get("PASSWORD"), "pass")
self.assertEqual(conf.get("PORT"), 1234)
self.assertEqual(conf.get("USERNAME"), "user")
# no user name
conf = parse_redis_url("redis://password@localhost2:1234/2", False)
@ -1218,32 +1220,10 @@ class TestRedisConnection(TestCase):
# clean up
redis_client.config_set('requirepass', '')
@patch("defender.config.DEFENDER_REDIS_URL", REDIS_URL_NAME_PASS)
@patch("defender.config.MOCK_REDIS", False)
def test_get_redis_connection_with_name_password(self):
""" get redis connection with name and password """
connection = redis.Redis()
if connection.info().get('redis_version') < '6':
# redis versions before 6 don't support name, so skip.
return
connection.config_set('requirepass', 'mypass2')
redis_client = get_redis_connection()
self.assertIsInstance(redis_client, Redis)
redis_client.set('test2', 0)
result = int(redis_client.get('test2'))
self.assertEqual(result, 0)
redis_client.delete('test2')
# clean up
redis_client.config_set('requirepass', '')
@patch("defender.config.DEFENDER_REDIS_URL", REDIS_URL_NAME_PASS)
@patch("defender.config.MOCK_REDIS", False)
def test_get_redis_connection_with_acl(self):
""" get redis connection with password and name ACL """
print(self.REDIS_URL_NAME_PASS)
connection = redis.Redis()
if connection.info().get('redis_version') < '6':