diff --git a/.gitignore b/.gitignore index 75eb18b..10ba773 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,4 @@ Thumbs.db /dist *.sqlite3 *.egg-info +build \ No newline at end of file diff --git a/src/tests/test_watson/tests.py b/src/tests/test_watson/tests.py index 54085d6..bc2d58b 100644 --- a/src/tests/test_watson/tests.py +++ b/src/tests/test_watson/tests.py @@ -10,6 +10,8 @@ these tests have been amended to 'fooo' and 'baar'. Ho hum. from __future__ import unicode_literals import json +import re + try: from unittest import skipUnless except: @@ -29,8 +31,8 @@ from watson.backends import escape_query from test_watson.models import WatsonTestModel1, WatsonTestModel2 from test_watson import admin # Force early registration of all admin models. -class RegistrationTest(TestCase): +class RegistrationTest(TestCase): def testRegistration(self): # Register the model and test. watson.register(WatsonTestModel1) @@ -49,17 +51,18 @@ class RegistrationTest(TestCase): class EscapingTest(TestCase): def testEscaping(self): # Test query escaping. - self.assertEqual(escape_query(""), "") - self.assertEqual(escape_query("abcd"), "abcd") - self.assertEqual(escape_query("abcd efgh"), "abcd efgh") - self.assertEqual(escape_query("abcd efgh"), "abcd efgh") - self.assertEqual(escape_query("&&abcd&"), "abcd") + re_escape_chars = re.compile(r'[&:"(|)!><~*+-]', re.UNICODE) + self.assertEqual(escape_query("", re_escape_chars), "") + self.assertEqual(escape_query("abcd", re_escape_chars), "abcd") + self.assertEqual(escape_query("abcd efgh", re_escape_chars), "abcd efgh") + self.assertEqual(escape_query("abcd efgh", re_escape_chars), "abcd efgh") + self.assertEqual(escape_query("&&abcd&", re_escape_chars), "abcd") # check if we leave good characters good_chars = "'$@#$^=_.," for char in good_chars: self.assertEqual( - escape_query("abcd{}efgh".format(char)), + escape_query("abcd{}efgh".format(char), re_escape_chars), "abcd{}efgh".format(char) ) @@ -67,7 +70,7 @@ class EscapingTest(TestCase): bad_chars = '&:"(|)!><~*+-' for char in bad_chars: self.assertEqual( - escape_query("abcd{}efgh".format(char)), "abcd efgh" + escape_query("abcd{}efgh".format(char), re_escape_chars), "abcd efgh" ) diff --git a/src/watson/backends.py b/src/watson/backends.py index df148aa..8ec1c4a 100644 --- a/src/watson/backends.py +++ b/src/watson/backends.py @@ -21,27 +21,27 @@ def regex_from_word(word): ) +# PostgreSQL to_tsquery operators: ! & : ( ) | +RE_POSTGRES_ESCAPE_CHARS = re.compile(r'[&:(|)!]', re.UNICODE) +# MySQL boolean full-text search operators: > < ( ) " ~ * + - +RE_MYSQL_ESCAPE_CHARS = re.compile(r'["()><~*+-]', re.UNICODE) + RE_SPACE = re.compile(r"[\s]+", re.UNICODE) -# PostgreSQL to_tsquery operators: ! & : ( ) | -# MySQL boolean full-text search operators: > < ( ) " ~ * + - -RE_NON_WORD = re.compile(r'[&:"(|)!><~*+-]', re.UNICODE) - -def escape_query(text): +def escape_query(text, re_escape_chars): """ normalizes the query text to a format that can be consumed by the backend database """ text = force_text(text) text = RE_SPACE.sub(" ", text) # Standardize spacing. - text = RE_NON_WORD.sub(" ", text) # Replace harmful characters with space. + text = re_escape_chars.sub(" ", text) # Replace harmful characters with space. text = text.strip() return text class SearchBackend(six.with_metaclass(abc.ABCMeta)): - """Base class for all search backends.""" def is_installed(self): @@ -65,7 +65,7 @@ class SearchBackend(six.with_metaclass(abc.ABCMeta)): def do_search_ranking(self, engine_slug, queryset, search_text): """Ranks the given queryset according to the relevance of the given search text.""" return queryset.extra( - select = { + select={ "watson_rank": "1", }, ) @@ -78,7 +78,7 @@ class SearchBackend(six.with_metaclass(abc.ABCMeta)): def do_filter_ranking(self, engine_slug, queryset, search_text): """Ranks the given queryset according to the relevance of the given search text.""" return queryset.extra( - select = { + select={ "watson_rank": "1", }, ) @@ -154,9 +154,9 @@ class RegexSearchMixin(six.with_metaclass(abc.ABCMeta)): # Compile the query. full_word_query = " AND ".join(word_query).format(**word_kwargs) return queryset.extra( - tables = (db_table,), - where = (full_word_query,), - params = word_args, + tables=(db_table,), + where=(full_word_query,), + params=word_args, ) @@ -177,7 +177,7 @@ class PostgresSearchBackend(SearchBackend): return " & ".join( "$${0}$$:*".format(word) for word - in escape_query(text).split() + in escape_query(text, RE_POSTGRES_ESCAPE_CHARS).split() ) def is_installed(self): @@ -226,7 +226,7 @@ class PostgresSearchBackend(SearchBackend): CREATE TRIGGER watson_searchentry_trigger BEFORE INSERT OR UPDATE ON watson_searchentry FOR EACH ROW EXECUTE PROCEDURE watson_searchentry_trigger_handler(); """.format( - search_config = self.search_config + search_config=self.search_config )) @transaction.atomic() @@ -249,22 +249,22 @@ class PostgresSearchBackend(SearchBackend): def do_search(self, engine_slug, queryset, search_text): """Performs the full text search.""" return queryset.extra( - where = ("search_tsv @@ to_tsquery('{search_config}', %s)".format( - search_config = self.search_config + where=("search_tsv @@ to_tsquery('{search_config}', %s)".format( + search_config=self.search_config ),), - params = (self.escape_postgres_query(search_text),), + params=(self.escape_postgres_query(search_text),), ) def do_search_ranking(self, engine_slug, queryset, search_text): """Performs full text ranking.""" return queryset.extra( - select = { + select={ "watson_rank": "ts_rank_cd(watson_searchentry.search_tsv, to_tsquery('{search_config}', %s))".format( - search_config = self.search_config + search_config=self.search_config ), }, - select_params = (self.escape_postgres_query(search_text),), - order_by = ("-watson_rank",), + select_params=(self.escape_postgres_query(search_text),), + order_by=("-watson_rank",), ) def do_filter(self, engine_slug, queryset, search_text): @@ -280,33 +280,33 @@ class PostgresSearchBackend(SearchBackend): # Cast to text to make join work with uuid columns ref_name_typecast = "::text" return queryset.extra( - tables = ("watson_searchentry",), - where = ( + tables=("watson_searchentry",), + where=( "watson_searchentry.engine_slug = %s", "watson_searchentry.search_tsv @@ to_tsquery('{search_config}', %s)".format( - search_config = self.search_config + search_config=self.search_config ), "watson_searchentry.{ref_name} = {table_name}.{pk_name}{ref_name_typecast}".format( - ref_name = ref_name, - table_name = connection.ops.quote_name(model._meta.db_table), - pk_name = connection.ops.quote_name(pk.db_column or pk.attname), - ref_name_typecast = ref_name_typecast + ref_name=ref_name, + table_name=connection.ops.quote_name(model._meta.db_table), + pk_name=connection.ops.quote_name(pk.db_column or pk.attname), + ref_name_typecast=ref_name_typecast ), "watson_searchentry.content_type_id = %s" ), - params = (engine_slug, self.escape_postgres_query(search_text), content_type.id), + params=(engine_slug, self.escape_postgres_query(search_text), content_type.id), ) def do_filter_ranking(self, engine_slug, queryset, search_text): """Performs the full text ranking.""" return queryset.extra( - select = { + select={ "watson_rank": "ts_rank_cd(watson_searchentry.search_tsv, to_tsquery('{search_config}', %s))".format( - search_config = self.search_config + search_config=self.search_config ), }, - select_params = (self.escape_postgres_query(search_text),), - order_by = ("-watson_rank",), + select_params=(self.escape_postgres_query(search_text),), + order_by=("-watson_rank",), ) @@ -325,7 +325,7 @@ class PostgresLegacySearchBackend(PostgresSearchBackend): return " & ".join( "$${0}$$".format(word) for word - in escape_query(text).split() + in escape_query(text, RE_POSTGRES_ESCAPE_CHARS).split() ) @@ -343,9 +343,9 @@ class PostgresPrefixLegacySearchBackend(RegexSearchMixin, PostgresLegacySearchBa def escape_mysql_boolean_query(search_text): return " ".join( '+{word}*'.format( - word = word, + word=word, ) - for word in escape_query(search_text).split() + for word in escape_query(search_text, RE_MYSQL_ESCAPE_CHARS).split() ) @@ -364,7 +364,7 @@ class MySQLSearchBackend(SearchBackend): cursor.execute("SELECT CONSTRAINT_NAME FROM information_schema.TABLE_CONSTRAINTS WHERE CONSTRAINT_SCHEMA = DATABASE() AND TABLE_NAME = 'watson_searchentry' AND CONSTRAINT_TYPE = 'FOREIGN KEY'") for constraint_name, in cursor.fetchall(): cursor.execute("ALTER TABLE watson_searchentry DROP FOREIGN KEY {constraint_name}".format( - constraint_name = constraint_name, + constraint_name=constraint_name, )) # Change the storage engine to MyISAM. cursor.execute("ALTER TABLE watson_searchentry ENGINE = MyISAM") @@ -395,23 +395,23 @@ class MySQLSearchBackend(SearchBackend): def do_search(self, engine_slug, queryset, search_text): """Performs the full text search.""" return queryset.extra( - where = ("MATCH (title, description, content) AGAINST (%s IN BOOLEAN MODE)",), - params = (self._format_query(search_text),), + where=("MATCH (title, description, content) AGAINST (%s IN BOOLEAN MODE)",), + params=(self._format_query(search_text),), ) def do_search_ranking(self, engine_slug, queryset, search_text): """Performs full text ranking.""" search_text = self._format_query(search_text) return queryset.extra( - select = { + select={ "watson_rank": """ ((MATCH (title) AGAINST (%s IN BOOLEAN MODE)) * 3) + ((MATCH (description) AGAINST (%s IN BOOLEAN MODE)) * 2) + ((MATCH (content) AGAINST (%s IN BOOLEAN MODE)) * 1) """, }, - select_params = (search_text, search_text, search_text,), - order_by = ("-watson_rank",), + select_params=(search_text, search_text, search_text,), + order_by=("-watson_rank",), ) def do_filter(self, engine_slug, queryset, search_text): @@ -424,33 +424,33 @@ class MySQLSearchBackend(SearchBackend): else: ref_name = "object_id" return queryset.extra( - tables = ("watson_searchentry",), - where = ( + tables=("watson_searchentry",), + where=( "watson_searchentry.engine_slug = %s", "MATCH (watson_searchentry.title, watson_searchentry.description, watson_searchentry.content) AGAINST (%s IN BOOLEAN MODE)", "watson_searchentry.{ref_name} = {table_name}.{pk_name}".format( - ref_name = ref_name, - table_name = connection.ops.quote_name(model._meta.db_table), - pk_name = connection.ops.quote_name(pk.db_column or pk.attname), + ref_name=ref_name, + table_name=connection.ops.quote_name(model._meta.db_table), + pk_name=connection.ops.quote_name(pk.db_column or pk.attname), ), "watson_searchentry.content_type_id = %s", ), - params = (engine_slug, self._format_query(search_text), content_type.id), + params=(engine_slug, self._format_query(search_text), content_type.id), ) def do_filter_ranking(self, engine_slug, queryset, search_text): """Performs the full text ranking.""" search_text = self._format_query(search_text) return queryset.extra( - select = { + select={ "watson_rank": """ ((MATCH (watson_searchentry.title) AGAINST (%s IN BOOLEAN MODE)) * 3) + ((MATCH (watson_searchentry.description) AGAINST (%s IN BOOLEAN MODE)) * 2) + ((MATCH (watson_searchentry.content) AGAINST (%s IN BOOLEAN MODE)) * 1) """, }, - select_params = (search_text, search_text, search_text,), - order_by = ("-watson_rank",), + select_params=(search_text, search_text, search_text,), + order_by=("-watson_rank",), )