Merge pull request #183 from amureki/issues/182/separate_backend_escaping

Fixed #182 -- split different backends escaping
This commit is contained in:
Dave Hall 2016-11-01 12:22:37 +00:00 committed by GitHub
commit 5c939a19d3
3 changed files with 63 additions and 59 deletions

1
.gitignore vendored
View file

@ -12,3 +12,4 @@ Thumbs.db
/dist
*.sqlite3
*.egg-info
build

View file

@ -10,6 +10,8 @@ these tests have been amended to 'fooo' and 'baar'. Ho hum.
from __future__ import unicode_literals
import json
import re
try:
from unittest import skipUnless
except:
@ -29,8 +31,8 @@ from watson.backends import escape_query
from test_watson.models import WatsonTestModel1, WatsonTestModel2
from test_watson import admin # Force early registration of all admin models.
class RegistrationTest(TestCase):
class RegistrationTest(TestCase):
def testRegistration(self):
# Register the model and test.
watson.register(WatsonTestModel1)
@ -49,17 +51,18 @@ class RegistrationTest(TestCase):
class EscapingTest(TestCase):
def testEscaping(self):
# Test query escaping.
self.assertEqual(escape_query(""), "")
self.assertEqual(escape_query("abcd"), "abcd")
self.assertEqual(escape_query("abcd efgh"), "abcd efgh")
self.assertEqual(escape_query("abcd efgh"), "abcd efgh")
self.assertEqual(escape_query("&&abcd&"), "abcd")
re_escape_chars = re.compile(r'[&:"(|)!><~*+-]', re.UNICODE)
self.assertEqual(escape_query("", re_escape_chars), "")
self.assertEqual(escape_query("abcd", re_escape_chars), "abcd")
self.assertEqual(escape_query("abcd efgh", re_escape_chars), "abcd efgh")
self.assertEqual(escape_query("abcd efgh", re_escape_chars), "abcd efgh")
self.assertEqual(escape_query("&&abcd&", re_escape_chars), "abcd")
# check if we leave good characters
good_chars = "'$@#$^=_.,"
for char in good_chars:
self.assertEqual(
escape_query("abcd{}efgh".format(char)),
escape_query("abcd{}efgh".format(char), re_escape_chars),
"abcd{}efgh".format(char)
)
@ -67,7 +70,7 @@ class EscapingTest(TestCase):
bad_chars = '&:"(|)!><~*+-'
for char in bad_chars:
self.assertEqual(
escape_query("abcd{}efgh".format(char)), "abcd efgh"
escape_query("abcd{}efgh".format(char), re_escape_chars), "abcd efgh"
)

View file

@ -21,27 +21,27 @@ def regex_from_word(word):
)
# PostgreSQL to_tsquery operators: ! & : ( ) |
RE_POSTGRES_ESCAPE_CHARS = re.compile(r'[&:(|)!]', re.UNICODE)
# MySQL boolean full-text search operators: > < ( ) " ~ * + -
RE_MYSQL_ESCAPE_CHARS = re.compile(r'["()><~*+-]', re.UNICODE)
RE_SPACE = re.compile(r"[\s]+", re.UNICODE)
# PostgreSQL to_tsquery operators: ! & : ( ) |
# MySQL boolean full-text search operators: > < ( ) " ~ * + -
RE_NON_WORD = re.compile(r'[&:"(|)!><~*+-]', re.UNICODE)
def escape_query(text):
def escape_query(text, re_escape_chars):
"""
normalizes the query text to a format that can be consumed
by the backend database
"""
text = force_text(text)
text = RE_SPACE.sub(" ", text) # Standardize spacing.
text = RE_NON_WORD.sub(" ", text) # Replace harmful characters with space.
text = re_escape_chars.sub(" ", text) # Replace harmful characters with space.
text = text.strip()
return text
class SearchBackend(six.with_metaclass(abc.ABCMeta)):
"""Base class for all search backends."""
def is_installed(self):
@ -65,7 +65,7 @@ class SearchBackend(six.with_metaclass(abc.ABCMeta)):
def do_search_ranking(self, engine_slug, queryset, search_text):
"""Ranks the given queryset according to the relevance of the given search text."""
return queryset.extra(
select = {
select={
"watson_rank": "1",
},
)
@ -78,7 +78,7 @@ class SearchBackend(six.with_metaclass(abc.ABCMeta)):
def do_filter_ranking(self, engine_slug, queryset, search_text):
"""Ranks the given queryset according to the relevance of the given search text."""
return queryset.extra(
select = {
select={
"watson_rank": "1",
},
)
@ -154,9 +154,9 @@ class RegexSearchMixin(six.with_metaclass(abc.ABCMeta)):
# Compile the query.
full_word_query = " AND ".join(word_query).format(**word_kwargs)
return queryset.extra(
tables = (db_table,),
where = (full_word_query,),
params = word_args,
tables=(db_table,),
where=(full_word_query,),
params=word_args,
)
@ -177,7 +177,7 @@ class PostgresSearchBackend(SearchBackend):
return " & ".join(
"$${0}$$:*".format(word)
for word
in escape_query(text).split()
in escape_query(text, RE_POSTGRES_ESCAPE_CHARS).split()
)
def is_installed(self):
@ -226,7 +226,7 @@ class PostgresSearchBackend(SearchBackend):
CREATE TRIGGER watson_searchentry_trigger BEFORE INSERT OR UPDATE
ON watson_searchentry FOR EACH ROW EXECUTE PROCEDURE watson_searchentry_trigger_handler();
""".format(
search_config = self.search_config
search_config=self.search_config
))
@transaction.atomic()
@ -249,22 +249,22 @@ class PostgresSearchBackend(SearchBackend):
def do_search(self, engine_slug, queryset, search_text):
"""Performs the full text search."""
return queryset.extra(
where = ("search_tsv @@ to_tsquery('{search_config}', %s)".format(
search_config = self.search_config
where=("search_tsv @@ to_tsquery('{search_config}', %s)".format(
search_config=self.search_config
),),
params = (self.escape_postgres_query(search_text),),
params=(self.escape_postgres_query(search_text),),
)
def do_search_ranking(self, engine_slug, queryset, search_text):
"""Performs full text ranking."""
return queryset.extra(
select = {
select={
"watson_rank": "ts_rank_cd(watson_searchentry.search_tsv, to_tsquery('{search_config}', %s))".format(
search_config = self.search_config
search_config=self.search_config
),
},
select_params = (self.escape_postgres_query(search_text),),
order_by = ("-watson_rank",),
select_params=(self.escape_postgres_query(search_text),),
order_by=("-watson_rank",),
)
def do_filter(self, engine_slug, queryset, search_text):
@ -280,33 +280,33 @@ class PostgresSearchBackend(SearchBackend):
# Cast to text to make join work with uuid columns
ref_name_typecast = "::text"
return queryset.extra(
tables = ("watson_searchentry",),
where = (
tables=("watson_searchentry",),
where=(
"watson_searchentry.engine_slug = %s",
"watson_searchentry.search_tsv @@ to_tsquery('{search_config}', %s)".format(
search_config = self.search_config
search_config=self.search_config
),
"watson_searchentry.{ref_name} = {table_name}.{pk_name}{ref_name_typecast}".format(
ref_name = ref_name,
table_name = connection.ops.quote_name(model._meta.db_table),
pk_name = connection.ops.quote_name(pk.db_column or pk.attname),
ref_name_typecast = ref_name_typecast
ref_name=ref_name,
table_name=connection.ops.quote_name(model._meta.db_table),
pk_name=connection.ops.quote_name(pk.db_column or pk.attname),
ref_name_typecast=ref_name_typecast
),
"watson_searchentry.content_type_id = %s"
),
params = (engine_slug, self.escape_postgres_query(search_text), content_type.id),
params=(engine_slug, self.escape_postgres_query(search_text), content_type.id),
)
def do_filter_ranking(self, engine_slug, queryset, search_text):
"""Performs the full text ranking."""
return queryset.extra(
select = {
select={
"watson_rank": "ts_rank_cd(watson_searchentry.search_tsv, to_tsquery('{search_config}', %s))".format(
search_config = self.search_config
search_config=self.search_config
),
},
select_params = (self.escape_postgres_query(search_text),),
order_by = ("-watson_rank",),
select_params=(self.escape_postgres_query(search_text),),
order_by=("-watson_rank",),
)
@ -325,7 +325,7 @@ class PostgresLegacySearchBackend(PostgresSearchBackend):
return " & ".join(
"$${0}$$".format(word)
for word
in escape_query(text).split()
in escape_query(text, RE_POSTGRES_ESCAPE_CHARS).split()
)
@ -343,9 +343,9 @@ class PostgresPrefixLegacySearchBackend(RegexSearchMixin, PostgresLegacySearchBa
def escape_mysql_boolean_query(search_text):
return " ".join(
'+{word}*'.format(
word = word,
word=word,
)
for word in escape_query(search_text).split()
for word in escape_query(search_text, RE_MYSQL_ESCAPE_CHARS).split()
)
@ -364,7 +364,7 @@ class MySQLSearchBackend(SearchBackend):
cursor.execute("SELECT CONSTRAINT_NAME FROM information_schema.TABLE_CONSTRAINTS WHERE CONSTRAINT_SCHEMA = DATABASE() AND TABLE_NAME = 'watson_searchentry' AND CONSTRAINT_TYPE = 'FOREIGN KEY'")
for constraint_name, in cursor.fetchall():
cursor.execute("ALTER TABLE watson_searchentry DROP FOREIGN KEY {constraint_name}".format(
constraint_name = constraint_name,
constraint_name=constraint_name,
))
# Change the storage engine to MyISAM.
cursor.execute("ALTER TABLE watson_searchentry ENGINE = MyISAM")
@ -395,23 +395,23 @@ class MySQLSearchBackend(SearchBackend):
def do_search(self, engine_slug, queryset, search_text):
"""Performs the full text search."""
return queryset.extra(
where = ("MATCH (title, description, content) AGAINST (%s IN BOOLEAN MODE)",),
params = (self._format_query(search_text),),
where=("MATCH (title, description, content) AGAINST (%s IN BOOLEAN MODE)",),
params=(self._format_query(search_text),),
)
def do_search_ranking(self, engine_slug, queryset, search_text):
"""Performs full text ranking."""
search_text = self._format_query(search_text)
return queryset.extra(
select = {
select={
"watson_rank": """
((MATCH (title) AGAINST (%s IN BOOLEAN MODE)) * 3) +
((MATCH (description) AGAINST (%s IN BOOLEAN MODE)) * 2) +
((MATCH (content) AGAINST (%s IN BOOLEAN MODE)) * 1)
""",
},
select_params = (search_text, search_text, search_text,),
order_by = ("-watson_rank",),
select_params=(search_text, search_text, search_text,),
order_by=("-watson_rank",),
)
def do_filter(self, engine_slug, queryset, search_text):
@ -424,33 +424,33 @@ class MySQLSearchBackend(SearchBackend):
else:
ref_name = "object_id"
return queryset.extra(
tables = ("watson_searchentry",),
where = (
tables=("watson_searchentry",),
where=(
"watson_searchentry.engine_slug = %s",
"MATCH (watson_searchentry.title, watson_searchentry.description, watson_searchentry.content) AGAINST (%s IN BOOLEAN MODE)",
"watson_searchentry.{ref_name} = {table_name}.{pk_name}".format(
ref_name = ref_name,
table_name = connection.ops.quote_name(model._meta.db_table),
pk_name = connection.ops.quote_name(pk.db_column or pk.attname),
ref_name=ref_name,
table_name=connection.ops.quote_name(model._meta.db_table),
pk_name=connection.ops.quote_name(pk.db_column or pk.attname),
),
"watson_searchentry.content_type_id = %s",
),
params = (engine_slug, self._format_query(search_text), content_type.id),
params=(engine_slug, self._format_query(search_text), content_type.id),
)
def do_filter_ranking(self, engine_slug, queryset, search_text):
"""Performs the full text ranking."""
search_text = self._format_query(search_text)
return queryset.extra(
select = {
select={
"watson_rank": """
((MATCH (watson_searchentry.title) AGAINST (%s IN BOOLEAN MODE)) * 3) +
((MATCH (watson_searchentry.description) AGAINST (%s IN BOOLEAN MODE)) * 2) +
((MATCH (watson_searchentry.content) AGAINST (%s IN BOOLEAN MODE)) * 1)
""",
},
select_params = (search_text, search_text, search_text,),
order_by = ("-watson_rank",),
select_params=(search_text, search_text, search_text,),
order_by=("-watson_rank",),
)