mirror of
https://github.com/Hopiu/django-cachalot.git
synced 2026-05-09 21:24:44 +00:00
* Remove system check for Django version * Closes #175 * Bump version and amend CHANGELOG * Update with GitHub action CI * Update README with Python and Django versions * Limit Django version to 3.2, inclusively. * Add note on Django constraints to README * Bump minor version * Justified by dropping support for dependency versions * Drop support for Django 2.0-2.1, Python 3.5 * Change CI badge in README to GitHub actions * Add support for Pymemcache for Django 3.2+ * Add Django 3.2 to test matrix * Fix MySQL test_explain * Allow filebased delta leniency in tests * Allow Subquery in finding more Subqueries (Fixes #156) * Reverts #157 with proper fix. The initial problem was due to `django.db.models.expressions.Subquery` allowing both QuerySet and sql.Query to be used. * Fixes Django 3.2 test_subquery and test_invalidate_subquery testing by also checking the lhs * Fix Django 2.2 Subquery having no query attr * Fix filebased test time delta * Fix test_invalidate_having due to new inner_query * The new inner_query replaces subquery for Django 3.2 where subquery is now a boolean. That's why I initially used that TypeError in _get_tables_from_sql. inner_query looks to be a sql.Query * Add PyMemcacheCache to supported cache backends Co-authored-by: Dominik George <nik@naturalnet.de>
63 lines
2.5 KiB
Python
63 lines
2.5 KiB
Python
from django import VERSION as DJANGO_VERSION
|
|
from django.core.management.color import no_style
|
|
from django.db import connection, transaction
|
|
|
|
from .models import PostgresModel
|
|
from ..utils import _get_tables
|
|
|
|
|
|
class TestUtilsMixin:
|
|
def setUp(self):
|
|
self.is_sqlite = connection.vendor == 'sqlite'
|
|
self.is_mysql = connection.vendor == 'mysql'
|
|
self.is_postgresql = connection.vendor == 'postgresql'
|
|
self.django_version = DJANGO_VERSION
|
|
self.force_reopen_connection()
|
|
|
|
# TODO: Remove this workaround when this issue is fixed:
|
|
# https://code.djangoproject.com/ticket/29494
|
|
def tearDown(self):
|
|
if connection.vendor == 'postgresql':
|
|
flush_args = [no_style(), (PostgresModel._meta.db_table,),]
|
|
if float(".".join(map(str, DJANGO_VERSION[:2]))) < 3.1:
|
|
flush_args.append(())
|
|
flush_sql_list = connection.ops.sql_flush(*flush_args)
|
|
with transaction.atomic():
|
|
for sql in flush_sql_list:
|
|
with connection.cursor() as cursor:
|
|
cursor.execute(sql)
|
|
|
|
def force_reopen_connection(self):
|
|
if connection.vendor in ('mysql', 'postgresql'):
|
|
# We need to reopen the connection or Django
|
|
# will execute an extra SQL request below.
|
|
connection.cursor()
|
|
|
|
def assert_tables(self, queryset, *tables):
|
|
tables = {table if isinstance(table, str)
|
|
else table._meta.db_table for table in tables}
|
|
self.assertSetEqual(_get_tables(queryset.db, queryset.query), tables)
|
|
|
|
def assert_query_cached(self, queryset, result=None, result_type=None,
|
|
compare_results=True, before=1, after=0):
|
|
if result_type is None:
|
|
result_type = list if result is None else type(result)
|
|
with self.assertNumQueries(before):
|
|
data1 = queryset.all()
|
|
if result_type is list:
|
|
data1 = list(data1)
|
|
with self.assertNumQueries(after):
|
|
data2 = queryset.all()
|
|
if result_type is list:
|
|
data2 = list(data2)
|
|
if not compare_results:
|
|
return
|
|
assert_functions = {
|
|
list: self.assertListEqual,
|
|
set: self.assertSetEqual,
|
|
dict: self.assertDictEqual,
|
|
}
|
|
assert_function = assert_functions.get(result_type, self.assertEqual)
|
|
assert_function(data2, data1)
|
|
if result is not None:
|
|
assert_function(data2, result)
|