mirror of
https://github.com/jazzband/django-constance.git
synced 2026-04-25 17:24:51 +00:00
Added tests and fixed async deadlock on aset
This commit is contained in:
parent
f8fa58cd75
commit
5a9f4162d3
6 changed files with 454 additions and 13 deletions
|
|
@ -83,16 +83,21 @@ class RedisBackend(Backend):
|
|||
self._rd.set(self.add_prefix(key), dumps(value))
|
||||
signals.config_updated.send(sender=config, key=key, old_value=old_value, new_value=value)
|
||||
|
||||
async def aset(self, key, value):
|
||||
# We need the old value for the signal.
|
||||
# Signals are synchronous in Django, but we can't easily change that here.
|
||||
old_value = await self.aget(key)
|
||||
async def _aset_internal(self, key, value, old_value):
|
||||
"""
|
||||
Internal set operation. Separated to allow subclasses to provide old_value
|
||||
without going through self.aget() which may have locking behavior.
|
||||
"""
|
||||
if hasattr(self._ard, "aset"):
|
||||
await self._ard.aset(self.add_prefix(key), dumps(value))
|
||||
else:
|
||||
await asyncio.to_thread(self._rd.set, self.add_prefix(key), dumps(value))
|
||||
signals.config_updated.send(sender=config, key=key, old_value=old_value, new_value=value)
|
||||
|
||||
async def aset(self, key, value):
|
||||
old_value = await self.aget(key)
|
||||
await self._aset_internal(key, value, old_value)
|
||||
|
||||
|
||||
class CachingRedisBackend(RedisBackend):
|
||||
_sentinel = object()
|
||||
|
|
@ -128,18 +133,25 @@ class CachingRedisBackend(RedisBackend):
|
|||
|
||||
return value[1]
|
||||
|
||||
async def _aget_unlocked(self, key):
|
||||
"""
|
||||
Get value with cache support but without acquiring lock.
|
||||
Caller must already hold the lock.
|
||||
"""
|
||||
value = self._cache.get(key, self._sentinel)
|
||||
if value is self._sentinel or self._has_expired(value):
|
||||
new_value = await super().aget(key)
|
||||
self._cache_value(key, new_value)
|
||||
return new_value
|
||||
return value[1]
|
||||
|
||||
async def aget(self, key):
|
||||
value = self._cache.get(key, self._sentinel)
|
||||
|
||||
if value is self._sentinel or self._has_expired(value):
|
||||
async with self._get_async_lock():
|
||||
# Double-check after acquiring lock
|
||||
value = self._cache.get(key, self._sentinel)
|
||||
if value is self._sentinel or self._has_expired(value):
|
||||
new_value = await super().aget(key)
|
||||
self._cache_value(key, new_value)
|
||||
return new_value
|
||||
return value[1]
|
||||
# Double-check after acquiring lock, then delegate to unlocked version
|
||||
return await self._aget_unlocked(key)
|
||||
|
||||
return value[1]
|
||||
|
||||
|
|
@ -150,7 +162,10 @@ class CachingRedisBackend(RedisBackend):
|
|||
|
||||
async def aset(self, key, value):
|
||||
async with self._get_async_lock():
|
||||
await super().aset(key, value)
|
||||
# Use unlocked version since we already hold the lock
|
||||
old_value = await self._aget_unlocked(key)
|
||||
# Use internal method to avoid lock recursion (super().aset calls self.aget)
|
||||
await self._aset_internal(key, value, old_value)
|
||||
self._cache_value(key, value)
|
||||
|
||||
def mget(self, keys):
|
||||
|
|
|
|||
|
|
@ -127,7 +127,8 @@ class Config:
|
|||
|
||||
def __setattr__(self, key, value):
|
||||
if key == "_backend":
|
||||
return super().__setattr__(key, value)
|
||||
super().__setattr__(key, value)
|
||||
return
|
||||
if key not in settings.CONFIG:
|
||||
raise AttributeError(key)
|
||||
self._backend.set(key, value)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
from django.test import TestCase
|
||||
from django.test import TransactionTestCase
|
||||
|
||||
from constance import settings
|
||||
from constance.base import Config
|
||||
from tests.storage import StorageTestsMixin
|
||||
|
||||
|
||||
|
|
@ -52,3 +54,95 @@ class TestDatabaseWithCache(StorageTestsMixin, TestCase):
|
|||
def tearDown(self):
|
||||
settings.BACKEND = self.old_backend
|
||||
settings.DATABASE_CACHE_BACKEND = self.old_cache_backend
|
||||
|
||||
|
||||
class TestDatabaseAsync(TransactionTestCase):
|
||||
def setUp(self):
|
||||
self.old_backend = settings.BACKEND
|
||||
settings.BACKEND = "constance.backends.database.DatabaseBackend"
|
||||
self.config = Config()
|
||||
|
||||
def tearDown(self):
|
||||
settings.BACKEND = self.old_backend
|
||||
|
||||
async def test_aget_returns_none_for_missing_key(self):
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertIsNone(result)
|
||||
|
||||
async def test_aget_returns_value(self):
|
||||
await self.config._backend.aset("INT_VALUE", 42)
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertEqual(result, 42)
|
||||
|
||||
async def test_aset_stores_value(self):
|
||||
await self.config._backend.aset("INT_VALUE", 99)
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertEqual(result, 99)
|
||||
|
||||
async def test_amget_returns_empty_for_no_keys(self):
|
||||
result = await self.config._backend.amget([])
|
||||
self.assertEqual(result, {})
|
||||
|
||||
async def test_amget_returns_values(self):
|
||||
await self.config._backend.aset("INT_VALUE", 10)
|
||||
await self.config._backend.aset("BOOL_VALUE", True)
|
||||
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
|
||||
self.assertEqual(result, {"INT_VALUE": 10, "BOOL_VALUE": True})
|
||||
|
||||
|
||||
class TestDatabaseWithCacheAsync(TransactionTestCase):
|
||||
def setUp(self):
|
||||
self.old_backend = settings.BACKEND
|
||||
settings.BACKEND = "constance.backends.database.DatabaseBackend"
|
||||
self.old_cache_backend = settings.DATABASE_CACHE_BACKEND
|
||||
settings.DATABASE_CACHE_BACKEND = "default"
|
||||
self.config = Config()
|
||||
self.config._backend._cache.clear()
|
||||
|
||||
def tearDown(self):
|
||||
settings.BACKEND = self.old_backend
|
||||
settings.DATABASE_CACHE_BACKEND = self.old_cache_backend
|
||||
|
||||
async def test_aget_returns_none_for_missing_key(self):
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertIsNone(result)
|
||||
|
||||
async def test_aget_returns_value_from_cache(self):
|
||||
# First set a value using async
|
||||
await self.config._backend.aset("INT_VALUE", 42)
|
||||
# Clear cache and re-fetch to test aget path
|
||||
self.config._backend._cache.clear()
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertEqual(result, 42)
|
||||
|
||||
async def test_aget_populates_cache(self):
|
||||
await self.config._backend.aset("INT_VALUE", 42)
|
||||
self.config._backend._cache.clear()
|
||||
|
||||
# aget should populate the cache
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertEqual(result, 42)
|
||||
|
||||
async def test_aset_stores_value(self):
|
||||
await self.config._backend.aset("INT_VALUE", 99)
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertEqual(result, 99)
|
||||
|
||||
async def test_amget_returns_empty_for_no_keys(self):
|
||||
result = await self.config._backend.amget([])
|
||||
self.assertEqual(result, {})
|
||||
|
||||
async def test_amget_returns_values(self):
|
||||
await self.config._backend.aset("INT_VALUE", 10)
|
||||
await self.config._backend.aset("BOOL_VALUE", True)
|
||||
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
|
||||
self.assertEqual(result, {"INT_VALUE": 10, "BOOL_VALUE": True})
|
||||
|
||||
async def test_amget_uses_cache(self):
|
||||
# Set values using async and ensure they're cached
|
||||
await self.config._backend.aset("INT_VALUE", 10)
|
||||
await self.config._backend.aset("BOOL_VALUE", True)
|
||||
|
||||
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
|
||||
self.assertEqual(result["INT_VALUE"], 10)
|
||||
self.assertEqual(result["BOOL_VALUE"], True)
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
from django.test import TestCase
|
||||
from django.test import TransactionTestCase
|
||||
|
||||
from constance import settings
|
||||
from constance.base import Config
|
||||
from tests.storage import StorageTestsMixin
|
||||
|
||||
|
||||
|
|
@ -14,3 +16,43 @@ class TestMemory(StorageTestsMixin, TestCase):
|
|||
def tearDown(self):
|
||||
self.config._backend._storage = {}
|
||||
settings.BACKEND = self.old_backend
|
||||
|
||||
def test_mget_empty_keys(self):
|
||||
result = self.config._backend.mget([])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestMemoryAsync(TransactionTestCase):
|
||||
def setUp(self):
|
||||
self.old_backend = settings.BACKEND
|
||||
settings.BACKEND = "constance.backends.memory.MemoryBackend"
|
||||
self.config = Config()
|
||||
self.config._backend._storage = {}
|
||||
|
||||
def tearDown(self):
|
||||
self.config._backend._storage = {}
|
||||
settings.BACKEND = self.old_backend
|
||||
|
||||
async def test_aget_returns_none_for_missing_key(self):
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertIsNone(result)
|
||||
|
||||
async def test_aget_returns_value(self):
|
||||
self.config._backend.set("INT_VALUE", 42)
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertEqual(result, 42)
|
||||
|
||||
async def test_aset_stores_value(self):
|
||||
await self.config._backend.aset("INT_VALUE", 99)
|
||||
result = self.config._backend.get("INT_VALUE")
|
||||
self.assertEqual(result, 99)
|
||||
|
||||
async def test_amget_returns_empty_for_no_keys(self):
|
||||
result = await self.config._backend.amget([])
|
||||
self.assertEqual(result, {})
|
||||
|
||||
async def test_amget_returns_values(self):
|
||||
self.config._backend.set("INT_VALUE", 10)
|
||||
self.config._backend.set("BOOL_VALUE", True)
|
||||
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
|
||||
self.assertEqual(result, {"INT_VALUE": 10, "BOOL_VALUE": True})
|
||||
|
|
|
|||
|
|
@ -1,6 +1,8 @@
|
|||
from django.test import TestCase
|
||||
from django.test import TransactionTestCase
|
||||
|
||||
from constance import settings
|
||||
from constance.base import Config
|
||||
from tests.storage import StorageTestsMixin
|
||||
|
||||
|
||||
|
|
@ -17,6 +19,162 @@ class TestRedis(StorageTestsMixin, TestCase):
|
|||
self.config._backend._rd.clear()
|
||||
settings.BACKEND = self.old_backend
|
||||
|
||||
def test_mget_empty_keys(self):
|
||||
# Test that mget returns None for empty keys
|
||||
result = list(self.config._backend.mget([]) or [])
|
||||
self.assertEqual(result, [])
|
||||
|
||||
|
||||
class TestCachingRedis(TestRedis):
|
||||
_BACKEND = "constance.backends.redisd.CachingRedisBackend"
|
||||
|
||||
def test_mget_empty_keys(self):
|
||||
# Test that mget returns None for empty keys
|
||||
result = list(self.config._backend.mget([]) or [])
|
||||
self.assertEqual(result, [])
|
||||
|
||||
|
||||
class TestRedisAsync(TransactionTestCase):
|
||||
_BACKEND = "constance.backends.redisd.RedisBackend"
|
||||
|
||||
def setUp(self):
|
||||
self.old_backend = settings.BACKEND
|
||||
settings.BACKEND = self._BACKEND
|
||||
self.config = Config()
|
||||
self.config._backend._rd.clear()
|
||||
|
||||
def tearDown(self):
|
||||
self.config._backend._rd.clear()
|
||||
settings.BACKEND = self.old_backend
|
||||
|
||||
async def test_aget_returns_none_for_missing_key(self):
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertIsNone(result)
|
||||
|
||||
async def test_aget_returns_value(self):
|
||||
self.config._backend.set("INT_VALUE", 42)
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertEqual(result, 42)
|
||||
|
||||
async def test_aset_stores_value(self):
|
||||
await self.config._backend.aset("INT_VALUE", 99)
|
||||
result = self.config._backend.get("INT_VALUE")
|
||||
self.assertEqual(result, 99)
|
||||
|
||||
async def test_amget_returns_empty_for_no_keys(self):
|
||||
result = await self.config._backend.amget([])
|
||||
self.assertEqual(result, {})
|
||||
|
||||
async def test_amget_returns_values(self):
|
||||
self.config._backend.set("INT_VALUE", 10)
|
||||
self.config._backend.set("BOOL_VALUE", True)
|
||||
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
|
||||
self.assertEqual(result, {"INT_VALUE": 10, "BOOL_VALUE": True})
|
||||
|
||||
async def test_amget_skips_missing_keys(self):
|
||||
self.config._backend.set("INT_VALUE", 10)
|
||||
result = await self.config._backend.amget(["INT_VALUE", "MISSING_KEY"])
|
||||
self.assertEqual(result, {"INT_VALUE": 10})
|
||||
|
||||
|
||||
class TestCachingRedisAsync(TransactionTestCase):
|
||||
_BACKEND = "constance.backends.redisd.CachingRedisBackend"
|
||||
|
||||
def setUp(self):
|
||||
self.old_backend = settings.BACKEND
|
||||
settings.BACKEND = self._BACKEND
|
||||
self.config = Config()
|
||||
self.config._backend._rd.clear()
|
||||
self.config._backend._cache.clear()
|
||||
|
||||
def tearDown(self):
|
||||
self.config._backend._rd.clear()
|
||||
self.config._backend._cache.clear()
|
||||
settings.BACKEND = self.old_backend
|
||||
|
||||
async def test_aget_caches_value(self):
|
||||
# First set a value via sync
|
||||
self.config._backend.set("INT_VALUE", 42)
|
||||
# Clear the in-memory cache
|
||||
self.config._backend._cache.clear()
|
||||
|
||||
# Async get should fetch and cache
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertEqual(result, 42)
|
||||
|
||||
# Verify it's cached
|
||||
self.assertIn("INT_VALUE", self.config._backend._cache)
|
||||
|
||||
async def test_aget_returns_cached_value(self):
|
||||
# Manually set cache
|
||||
from time import monotonic
|
||||
|
||||
timeout = self.config._backend._timeout
|
||||
self.config._backend._cache["INT_VALUE"] = (monotonic() + timeout, 100)
|
||||
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertEqual(result, 100)
|
||||
|
||||
async def test_aget_refreshes_expired_cache(self):
|
||||
from time import monotonic
|
||||
|
||||
# Set expired cache
|
||||
self.config._backend._cache["INT_VALUE"] = (monotonic() - 10, 100)
|
||||
# Set different value in redis using proper codec format
|
||||
self.config._backend._rd.set(
|
||||
self.config._backend.add_prefix("INT_VALUE"),
|
||||
b'{"__type__": "default", "__value__": 200}',
|
||||
)
|
||||
|
||||
result = await self.config._backend.aget("INT_VALUE")
|
||||
self.assertEqual(result, 200)
|
||||
|
||||
async def test_aset_updates_cache(self):
|
||||
await self.config._backend.aset("INT_VALUE", 55)
|
||||
|
||||
# Verify cache is updated
|
||||
self.assertIn("INT_VALUE", self.config._backend._cache)
|
||||
self.assertEqual(self.config._backend._cache["INT_VALUE"][1], 55)
|
||||
|
||||
async def test_amget_returns_empty_for_no_keys(self):
|
||||
result = await self.config._backend.amget([])
|
||||
self.assertEqual(result, {})
|
||||
|
||||
async def test_amget_returns_cached_values(self):
|
||||
from time import monotonic
|
||||
|
||||
timeout = self.config._backend._timeout
|
||||
self.config._backend._cache["INT_VALUE"] = (monotonic() + timeout, 10)
|
||||
self.config._backend._cache["BOOL_VALUE"] = (monotonic() + timeout, True)
|
||||
|
||||
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
|
||||
self.assertEqual(result, {"INT_VALUE": 10, "BOOL_VALUE": True})
|
||||
|
||||
async def test_amget_fetches_missing_keys(self):
|
||||
from time import monotonic
|
||||
|
||||
timeout = self.config._backend._timeout
|
||||
# One key cached, one in Redis only
|
||||
self.config._backend._cache["INT_VALUE"] = (monotonic() + timeout, 10)
|
||||
self.config._backend._rd.set(
|
||||
self.config._backend.add_prefix("BOOL_VALUE"),
|
||||
b'{"__type__": "default", "__value__": true}',
|
||||
)
|
||||
|
||||
result = await self.config._backend.amget(["INT_VALUE", "BOOL_VALUE"])
|
||||
self.assertEqual(result["INT_VALUE"], 10)
|
||||
self.assertEqual(result["BOOL_VALUE"], True)
|
||||
|
||||
async def test_amget_refreshes_expired_keys(self):
|
||||
from time import monotonic
|
||||
|
||||
# Set expired cache
|
||||
self.config._backend._cache["INT_VALUE"] = (monotonic() - 10, 100)
|
||||
# Set different value in redis using proper codec format
|
||||
self.config._backend._rd.set(
|
||||
self.config._backend.add_prefix("INT_VALUE"),
|
||||
b'{"__type__": "default", "__value__": 200}',
|
||||
)
|
||||
|
||||
result = await self.config._backend.amget(["INT_VALUE"])
|
||||
self.assertEqual(result["INT_VALUE"], 200)
|
||||
|
|
|
|||
|
|
@ -68,3 +68,134 @@ class AsyncTestCase(TransactionTestCase):
|
|||
self.assertEqual(len(config.LIST_VALUE), 3)
|
||||
self.assertIn(1, config.LIST_VALUE)
|
||||
self.assertEqual(list(config.LIST_VALUE)[0], 1)
|
||||
|
||||
|
||||
class AsyncValueProxyTestCase(TransactionTestCase):
|
||||
"""Tests for AsyncValueProxy dunder methods in async context."""
|
||||
|
||||
async def test_str_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = str(config.STRING_VALUE)
|
||||
self.assertEqual(result, "Hello world")
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_repr_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = repr(config.STRING_VALUE)
|
||||
self.assertEqual(result, "'Hello world'")
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_float_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = float(config.FLOAT_VALUE)
|
||||
self.assertAlmostEqual(result, 3.1415926536)
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_eq_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = config.INT_VALUE == 1
|
||||
self.assertTrue(result)
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_ne_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = config.INT_VALUE != 2
|
||||
self.assertTrue(result)
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_lt_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = config.INT_VALUE < 10
|
||||
self.assertTrue(result)
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_le_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = config.INT_VALUE <= 1
|
||||
self.assertTrue(result)
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_gt_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = config.INT_VALUE > 0
|
||||
self.assertTrue(result)
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_ge_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = config.INT_VALUE >= 1
|
||||
self.assertTrue(result)
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_hash_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = hash(config.INT_VALUE)
|
||||
self.assertEqual(result, hash(1))
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_sub_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = config.INT_VALUE - 1
|
||||
self.assertEqual(result, 0)
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_mul_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = config.INT_VALUE * 5
|
||||
self.assertEqual(result, 5)
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_truediv_proxy(self):
|
||||
with warnings.catch_warnings(record=True) as w:
|
||||
warnings.simplefilter("always")
|
||||
result = config.INT_VALUE / 1
|
||||
self.assertEqual(result, 1.0)
|
||||
self.assertTrue(any("Synchronous access" in str(warn.message) for warn in w))
|
||||
|
||||
async def test_aset_invalid_key(self):
|
||||
with self.assertRaises(AttributeError):
|
||||
await config.aset("INVALID_KEY", 42)
|
||||
|
||||
|
||||
class AsyncUtilsTestCase(TransactionTestCase):
|
||||
"""Tests for async utility functions."""
|
||||
|
||||
async def test_aget_values_for_keys_invalid_type(self):
|
||||
with self.assertRaises(TypeError):
|
||||
await utils.aget_values_for_keys("key1")
|
||||
|
||||
async def test_aget_values_for_keys_missing_key(self):
|
||||
with self.assertRaises(AttributeError) as ctx:
|
||||
await utils.aget_values_for_keys(["INVALID_KEY"])
|
||||
self.assertIn("INVALID_KEY", str(ctx.exception))
|
||||
|
||||
async def test_aget_values_for_keys_empty(self):
|
||||
result = await utils.aget_values_for_keys([])
|
||||
self.assertEqual(result, {})
|
||||
|
||||
|
||||
class ConfigBaseTestCase(TransactionTestCase):
|
||||
"""Tests for Config class edge cases."""
|
||||
|
||||
def test_config_dir(self):
|
||||
# Test __dir__ method
|
||||
keys = dir(config)
|
||||
self.assertIn("INT_VALUE", keys)
|
||||
self.assertIn("BOOL_VALUE", keys)
|
||||
|
||||
def test_access_backend_attribute(self):
|
||||
# Test accessing _backend attribute in sync context
|
||||
backend = config._backend
|
||||
self.assertIsNotNone(backend)
|
||||
|
|
|
|||
Loading…
Reference in a new issue