django-cachalot/cachalot/tests/postgres.py
Andrew-Chen-Wang d699c5b8c3 Squashed commit of the following:
commit 4a33e7b68196bf6e0ee0b7f942a992532495b616
Author: Andrew-Chen-Wang <acwangpython@gmail.com>
Date:   Mon Aug 10 16:04:26 2020 -0400

    Replace f-strings with join for Python 3.5

commit dcb33232d605c01025469e776c4ed8eb6ae0a326
Author: Andrew-Chen-Wang <acwangpython@gmail.com>
Date:   Mon Aug 10 15:55:09 2020 -0400

    Fix sql_flush for dj versions below 3.1
    * Removed any other instances of JSONField for Django 3.1 removed many things like JsonAdapter in favor of regular json module

commit 74195e9ff5a52dba2449a55e543a27ebd99e4fc9
Author: Andrew-Chen-Wang <acwangpython@gmail.com>
Date:   Mon Aug 10 09:17:03 2020 -0400

    Add Django 3.1 to Travis
    * Specify Django version when checking if JSONField exists

commit da5c1fa4c8b2f2efba0b12d7b27460c544e2473a
Author: Andrew-Chen-Wang <acwangpython@gmail.com>
Date:   Wed Aug 5 17:39:58 2020 -0400

    Added support for Django 3.1
    * Some fields were removed, others were deprecated. They are now in a list of deprecation to follow. Some new fields were added like PositiveBigIntegerField that won't be tested
    * monkey_patch.py has a try/except import for EmptyResultSet that was from archaic Django. Removed apparently due to compatibility issues
2020-08-10 16:49:46 -04:00

433 lines
17 KiB
Python

from datetime import date, datetime
from decimal import Decimal
from unittest import skipUnless
from django.contrib.postgres.functions import TransactionNow
from django.db import connection
from django.test import TransactionTestCase, override_settings
from psycopg2.extras import NumericRange, DateRange, DateTimeTZRange
from pytz import timezone
from ..utils import UncachableQuery
from .api import invalidate
from .models import PostgresModel, Test
from .test_utils import TestUtilsMixin
# FIXME: Add tests for aggregations.
def is_pg_field_available(name):
fields = []
try:
from django.contrib.postgres.fields import FloatRangeField
fields.append("FloatRangeField")
except ImportError:
pass
try:
from django.contrib.postgres.fields import DecimalRangeField
fields.append("DecimalRangeField")
except ImportError:
pass
try:
from django import VERSION
from django.contrib.postgres.fields import JSONField
if VERSION[0] < 4:
fields.append("JSONField")
except ImportError:
pass
return name in fields
@skipUnless(connection.vendor == 'postgresql',
'This test is only for PostgreSQL')
@override_settings(USE_TZ=True)
class PostgresReadTestCase(TestUtilsMixin, TransactionTestCase):
def setUp(self):
self.obj1 = PostgresModel(
int_array=[1, 2, 3],
hstore={'a': 'b', 'c': None},
int_range=[1900, 2000],
date_range=['1678-03-04', '1741-07-28'],
datetime_range=[
datetime(1989, 1, 30, 12, 20,
tzinfo=timezone('Europe/Paris')), None
]
)
self.obj2 = PostgresModel(
int_array=[4, None, 6],
hstore={'a': '1', 'b': '2'},
int_range=[1989, None],
date_range=['1989-01-30', None],
datetime_range=[None, None])
if is_pg_field_available("JSONField"):
self.obj1.json = {'a': 1, 'b': 2}
self.obj2.json = [
'something',
{
'a': 1,
'b': None,
'c': 123.456,
'd': True,
'e': {
'another': 'dict',
'and yet': {
'another': 'one',
'with a list': [],
},
},
},
]
if is_pg_field_available("FloatRangeField"):
self.obj1.float_range = [-1e3, 9.87654321]
self.obj2.float_range = [0.0, None]
if is_pg_field_available("DecimalRangeField"):
self.obj1.decimal_range = [-1e3, 9.87654321]
self.obj2.decimal_range = [0.0, None]
self.obj1.save()
self.obj2.save()
def test_unaccent(self):
Test.objects.create(name='Clémentine')
Test.objects.create(name='Clementine')
qs = (Test.objects.filter(name__unaccent='Clémentine')
.values_list('name', flat=True))
self.assert_tables(qs, Test)
self.assert_query_cached(qs, ['Clementine', 'Clémentine'])
def test_int_array(self):
with self.assertNumQueries(1):
data1 = [o.int_array for o in PostgresModel.objects.all()]
with self.assertNumQueries(1):
data2 = list(PostgresModel.objects
.values_list('int_array', flat=True))
self.assertListEqual(data2, data1)
self.assertListEqual(data2, [[1, 2, 3], [4, None, 6]])
invalidate(PostgresModel)
qs = PostgresModel.objects.values_list('int_array', flat=True)
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [[1, 2, 3], [4, None, 6]])
qs = (PostgresModel.objects.filter(int_array__contains=[3])
.values_list('int_array', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [[1, 2, 3]])
qs = (PostgresModel.objects
.filter(int_array__contained_by=[1, 2, 3, 4, 5, 6])
.values_list('int_array', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [[1, 2, 3]])
qs = (PostgresModel.objects.filter(int_array__overlap=[3, 4])
.values_list('int_array', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [[1, 2, 3], [4, None, 6]])
qs = (PostgresModel.objects.filter(int_array__len__in=(2, 3))
.values_list('int_array', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [[1, 2, 3], [4, None, 6]])
qs = (PostgresModel.objects.filter(int_array__2=6)
.values_list('int_array', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [[4, None, 6]])
qs = (PostgresModel.objects.filter(int_array__0_2=(1, 2))
.values_list('int_array', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [[1, 2, 3]])
def test_hstore(self):
with self.assertNumQueries(1):
data1 = [o.hstore for o in PostgresModel.objects.all()]
with self.assertNumQueries(1):
data2 = list(PostgresModel.objects
.values_list('hstore', flat=True))
self.assertListEqual(data2, data1)
self.assertListEqual(data2, [{'a': 'b', 'c': None},
{'a': '1', 'b': '2'}])
invalidate(PostgresModel)
qs = PostgresModel.objects.values_list('hstore', flat=True)
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [{'a': 'b', 'c': None},
{'a': '1', 'b': '2'}])
qs = (PostgresModel.objects.filter(hstore__a='1')
.values_list('hstore', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [{'a': '1', 'b': '2'}])
qs = (PostgresModel.objects.filter(hstore__contains={'a': 'b'})
.values_list('hstore', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [{'a': 'b', 'c': None}])
qs = (PostgresModel.objects
.filter(hstore__contained_by={'a': 'b', 'c': None, 'b': '2'})
.values_list('hstore', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [{'a': 'b', 'c': None}])
qs = (PostgresModel.objects.filter(hstore__has_key='c')
.values_list('hstore', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [{'a': 'b', 'c': None}])
qs = (PostgresModel.objects.filter(hstore__has_keys=['a', 'b'])
.values_list('hstore', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [{'a': '1', 'b': '2'}])
qs = (PostgresModel.objects.filter(hstore__keys=['a', 'b'])
.values_list('hstore', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [{'a': '1', 'b': '2'}])
qs = (PostgresModel.objects.filter(hstore__values=['1', '2'])
.values_list('hstore', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [{'a': '1', 'b': '2'}])
@skipUnless(is_pg_field_available("JSONField"),
"JSONField was removed in Dj 4.0")
def test_json(self):
with self.assertNumQueries(1):
data1 = [o.json for o in PostgresModel.objects.all()]
with self.assertNumQueries(1):
data2 = list(PostgresModel.objects.values_list('json', flat=True))
self.assertListEqual(data2, data1)
self.assertListEqual(data2, [self.obj1.json, self.obj2.json])
invalidate(PostgresModel)
qs = PostgresModel.objects.values_list('json', flat=True)
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [self.obj1.json, self.obj2.json])
# Tests an index.
qs = (PostgresModel.objects.filter(json__0='something')
.values_list('json', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [self.obj2.json])
qs = (PostgresModel.objects
.filter(json__0__nonexistent_key='something')
.values_list('json', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [])
# Tests a path with spaces.
qs = (PostgresModel.objects
.filter(**{'json__1__e__and yet__another': 'one'})
.values_list('json', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [self.obj2.json])
qs = (PostgresModel.objects.filter(json__contains=['something'])
.values_list('json', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [self.obj2.json])
qs = (PostgresModel.objects
.filter(json__contained_by={'a': 1, 'b': 2, 'any': 'thing'})
.values_list('json', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [self.obj1.json])
qs = (PostgresModel.objects.filter(json__has_key='a')
.values_list('json', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [self.obj1.json])
qs = (PostgresModel.objects.filter(json__has_any_keys=['a', 'b', 'c'])
.values_list('json', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [self.obj1.json])
qs = (PostgresModel.objects.filter(json__has_keys=['a', 'b'])
.values_list('json', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [self.obj1.json])
@skipUnless(is_pg_field_available("JSONField"),
"JSONField was removed in Dj 4.0")
def test_mutable_result_change(self):
"""
Checks that changing a mutable returned by a query has no effect
on other executions of the query.
"""
qs = PostgresModel.objects.values_list('int_array', flat=True)
data = list(qs.all())
self.assertListEqual(data, [[1, 2, 3], [4, None, 6]])
data[0].append(4)
data[1].remove(4)
data[1][0] = 5
self.assertListEqual(data, [[1, 2, 3, 4], [5, 6]])
self.assertListEqual(list(qs.all()), [[1, 2, 3], [4, None, 6]])
qs = PostgresModel.objects.values_list('json', flat=True)
data = list(qs.all())
self.assertListEqual(data, [self.obj1.json, self.obj2.json])
data[0]['c'] = 3
del data[0]['b']
data[1].pop(0)
data[1][0]['e']['and yet']['some other'] = True
data[1][0]['f'] = 6
json1 = {'a': 1, 'c': 3}
json2 = [
{
'a': 1,
'b': None,
'c': 123.456,
'd': True,
'e': {
'another': 'dict',
'and yet': {
'another': 'one',
'with a list': [],
'some other': True
},
},
'f': 6
},
]
self.assertListEqual(data, [json1, json2])
self.assertListEqual(list(qs.all()),
[self.obj1.json, self.obj2.json])
def test_int_range(self):
with self.assertNumQueries(1):
data1 = [o.int_range for o in PostgresModel.objects.all()]
with self.assertNumQueries(1):
data2 = list(PostgresModel.objects
.values_list('int_range', flat=True))
self.assertListEqual(data2, data1)
self.assertListEqual(data2, [NumericRange(1900, 2000),
NumericRange(1989)])
invalidate(PostgresModel)
qs = PostgresModel.objects.values_list('int_range', flat=True)
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [NumericRange(1900, 2000),
NumericRange(1989)])
qs = (PostgresModel.objects.filter(int_range__contains=2015)
.values_list('int_range', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [NumericRange(1989)])
qs = (PostgresModel.objects
.filter(int_range__contains=NumericRange(1950, 1990))
.values_list('int_range', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [NumericRange(1900, 2000)])
qs = (PostgresModel.objects
.filter(int_range__contained_by=NumericRange(0, 2050))
.values_list('int_range', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [NumericRange(1900, 2000)])
qs = (PostgresModel.objects.filter(int_range__fully_lt=(2015, None))
.values_list('int_range', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [NumericRange(1900, 2000)])
qs = (PostgresModel.objects.filter(int_range__fully_gt=(1970, 1980))
.values_list('int_range', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [NumericRange(1989)])
qs = (PostgresModel.objects.filter(int_range__not_lt=(1970, 1980))
.values_list('int_range', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [NumericRange(1989)])
qs = (PostgresModel.objects.filter(int_range__not_gt=(1970, 1980))
.values_list('int_range', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [])
qs = (PostgresModel.objects.filter(int_range__adjacent_to=(1900, 1989))
.values_list('int_range', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [NumericRange(1989)])
qs = (PostgresModel.objects.filter(int_range__startswith=1900)
.values_list('int_range', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [NumericRange(1900, 2000)])
qs = (PostgresModel.objects.filter(int_range__endswith=2000)
.values_list('int_range', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [NumericRange(1900, 2000)])
PostgresModel.objects.create(int_range=[1900, 1900])
qs = (PostgresModel.objects.filter(int_range__isempty=True)
.values_list('int_range', flat=True))
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [NumericRange(empty=True)])
@skipUnless(is_pg_field_available("FloatRangeField"),
"FloatRangeField was removed in Dj 3.1")
def test_float_range(self):
qs = PostgresModel.objects.values_list('float_range', flat=True)
self.assert_tables(qs, PostgresModel)
# For a strange reason, probably a misconception in psycopg2
# or a bad name in django.contrib.postgres (less probable),
# FloatRange returns decimals instead of floats.
# Note from ACW: crisis averted, renamed to DecimalRangeField
self.assert_query_cached(qs, [
NumericRange(Decimal('-1000.0'), Decimal('9.87654321')),
NumericRange(Decimal('0.0'))])
@skipUnless(is_pg_field_available("DecimalRangeField"),
"DecimalRangeField was added in Dj 2.2")
def test_decimal_range(self):
qs = PostgresModel.objects.values_list('decimal_range', flat=True)
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [
NumericRange(Decimal('-1000.0'), Decimal('9.87654321')),
NumericRange(Decimal('0.0'))])
def test_date_range(self):
qs = PostgresModel.objects.values_list('date_range', flat=True)
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [
DateRange(date(1678, 3, 4), date(1741, 7, 28)),
DateRange(date(1989, 1, 30))])
def test_datetime_range(self):
qs = PostgresModel.objects.values_list('datetime_range', flat=True)
self.assert_tables(qs, PostgresModel)
self.assert_query_cached(qs, [
DateTimeTZRange(datetime(1989, 1, 30, 12, 20,
tzinfo=timezone('Europe/Paris'))),
DateTimeTZRange(bounds='()')])
def test_transaction_now(self):
"""
Checks that queries with a TransactionNow() parameter are not cached.
"""
obj = Test.objects.create(datetime='1992-07-02T12:00:00')
qs = Test.objects.filter(datetime__lte=TransactionNow())
with self.assertRaises(UncachableQuery):
self.assert_tables(qs, Test)
self.assert_query_cached(qs, [obj], after=1)