django-cachalot/cachalot/tests/test_utils.py
Andrew Chen Wang 165cdb6a00
Add Django 3.2 Support and drop error dependency check (#181)
* Remove system check for Django version
* Closes #175
* Bump version and amend CHANGELOG
* Update with GitHub action CI
* Update README with Python and Django versions
* Limit Django version to 3.2, inclusively.
* Add note on Django constraints to README
* Bump minor version
* Justified by dropping support for dependency versions
* Drop support for Django 2.0-2.1, Python 3.5
* Change CI badge in README to GitHub actions
* Add support for Pymemcache for Django 3.2+
* Add Django 3.2 to test matrix
* Fix MySQL test_explain
* Allow filebased delta leniency in tests
* Allow Subquery in finding more Subqueries (Fixes #156)
* Reverts #157 with proper fix. The initial problem was due to `django.db.models.expressions.Subquery` allowing both QuerySet and sql.Query to be used.
* Fixes Django 3.2 test_subquery and test_invalidate_subquery testing by also checking the lhs
* Fix Django 2.2 Subquery having no query attr
* Fix filebased test time delta
* Fix test_invalidate_having due to new inner_query
* The new inner_query replaces subquery for Django 3.2 where subquery is now a boolean. That's why I initially used that TypeError in _get_tables_from_sql. inner_query looks to be a sql.Query
* Add PyMemcacheCache to supported cache backends

Co-authored-by: Dominik George <nik@naturalnet.de>
2021-05-13 00:27:14 -04:00

63 lines
2.5 KiB
Python

from django import VERSION as DJANGO_VERSION
from django.core.management.color import no_style
from django.db import connection, transaction
from .models import PostgresModel
from ..utils import _get_tables
class TestUtilsMixin:
def setUp(self):
self.is_sqlite = connection.vendor == 'sqlite'
self.is_mysql = connection.vendor == 'mysql'
self.is_postgresql = connection.vendor == 'postgresql'
self.django_version = DJANGO_VERSION
self.force_reopen_connection()
# TODO: Remove this workaround when this issue is fixed:
# https://code.djangoproject.com/ticket/29494
def tearDown(self):
if connection.vendor == 'postgresql':
flush_args = [no_style(), (PostgresModel._meta.db_table,),]
if float(".".join(map(str, DJANGO_VERSION[:2]))) < 3.1:
flush_args.append(())
flush_sql_list = connection.ops.sql_flush(*flush_args)
with transaction.atomic():
for sql in flush_sql_list:
with connection.cursor() as cursor:
cursor.execute(sql)
def force_reopen_connection(self):
if connection.vendor in ('mysql', 'postgresql'):
# We need to reopen the connection or Django
# will execute an extra SQL request below.
connection.cursor()
def assert_tables(self, queryset, *tables):
tables = {table if isinstance(table, str)
else table._meta.db_table for table in tables}
self.assertSetEqual(_get_tables(queryset.db, queryset.query), tables)
def assert_query_cached(self, queryset, result=None, result_type=None,
compare_results=True, before=1, after=0):
if result_type is None:
result_type = list if result is None else type(result)
with self.assertNumQueries(before):
data1 = queryset.all()
if result_type is list:
data1 = list(data1)
with self.assertNumQueries(after):
data2 = queryset.all()
if result_type is list:
data2 = list(data2)
if not compare_results:
return
assert_functions = {
list: self.assertListEqual,
set: self.assertSetEqual,
dict: self.assertDictEqual,
}
assert_function = assert_functions.get(result_type, self.assertEqual)
assert_function(data2, data1)
if result is not None:
assert_function(data2, result)