"""Search backends used by django-watson.""" import re from django.conf import settings from django.contrib.contenttypes.models import ContentType from django.db import connection from django.db.models import Q from watson.models import SearchEntry, has_int_pk def regex_from_word(word): """Generates a regext from the given search word.""" return u"(\s{word})|(^{word})".format( word = re.escape(word), ) class SearchBackend(object): """Base class for all search backends.""" def is_installed(self): """Checks whether django-watson is installed.""" return True def do_install(self): """Executes the SQL needed to install django-watson.""" pass def do_uninstall(self): """Executes the SQL needed to uninstall django-watson.""" pass requires_installation = False supports_ranking = False supports_prefix_matching = True def do_search(self, engine_slug, queryset, search_text): """Filters the given queryset according the the search logic for this backend.""" word_query = Q() for word in search_text.split(): regex = regex_from_word(word) word_query &= (Q(title__iregex=regex) | Q(description__iregex=regex) | Q(content__iregex=regex)) return queryset.filter( word_query ) def do_search_ranking(self, engine_slug, queryset, search_text): """Ranks the given queryset according to the relevance of the given search text.""" return queryset.extra( select = { "watson_rank": "1", }, ) def do_filter(self, engine_slug, queryset, search_text): """Filters the given queryset according the the search logic for this backend.""" model = queryset.model db_table = connection.ops.quote_name(SearchEntry._meta.db_table) model_db_table = connection.ops.quote_name(model._meta.db_table) pk = model._meta.pk id = connection.ops.quote_name(pk.db_column or pk.attname) # Add in basic filters. word_query = [u""" ({db_table}.{engine_slug} = %s) """, """ ({db_table}.{content_type_id} = %s) """] word_kwargs= { u"db_table": db_table, u"model_db_table": model_db_table, u"engine_slug": connection.ops.quote_name(u"engine_slug"), u"title": connection.ops.quote_name(u"title"), u"description": connection.ops.quote_name(u"description"), u"content": connection.ops.quote_name(u"content"), u"content_type_id": connection.ops.quote_name(u"content_type_id"), u"object_id": connection.ops.quote_name(u"object_id"), u"object_id_int": connection.ops.quote_name(u"object_id_int"), u"id": id, } word_args = [ engine_slug, ContentType.objects.get_for_model(model).id, ] # Add in join. if has_int_pk(model): word_query.append(""" ({db_table}.{object_id_int} = {model_db_table}.{id}) """) else: word_query.append(""" ({db_table}.{object_id} = {model_db_table}.{id}) """) # Add in all words. for word in search_text.split(): regex = regex_from_word(word) word_query.append(u""" ({db_table}.{title} REGEXP '(?i)' || %s OR {db_table}.{description} REGEXP '(?i)' || %s OR {db_table}.{content} REGEXP '(?i)' || %s) """) word_args.extend((regex, regex, regex)) # Compile the query. full_word_query = u" AND ".join(word_query).format(**word_kwargs) return queryset.extra( tables = (db_table,), where = (full_word_query,), params = word_args, ) def do_filter_ranking(self, engine_slug, queryset, search_text): """Ranks the given queryset according to the relevance of the given search text.""" return queryset.extra( select = { "watson_rank": "1", }, ) def save_search_entry(self, search_entry, obj, adapter): """Saves the given search entry in the database.""" search_entry.save() class PostgresSearchBackend(SearchBackend): """A search backend that uses native PostgreSQL full text indices.""" def escape_postgres_query(self, text): """Escapes the given text to become a valid ts_query.""" return u" & ".join( u"{}:*".format(word) for word in text.replace(u"(", u"").replace(u")", u"").replace(u":", u"").replace(u"|", u"").replace("!", "").split() ) def is_installed(self): """Checks whether django-watson is installed.""" cursor = connection.cursor() cursor.execute(""" SELECT attname FROM pg_attribute WHERE attrelid = (SELECT oid FROM pg_class WHERE relname = 'watson_searchentry') AND attname = 'search_tsv'; """) return bool(cursor.fetchall()) def do_install(self): """Executes the PostgreSQL specific SQL code to install django-watson.""" connection.cursor().execute(""" -- Ensure that plpgsql is installed. CREATE OR REPLACE FUNCTION make_plpgsql() RETURNS VOID LANGUAGE SQL AS $$ CREATE LANGUAGE plpgsql; $$; SELECT CASE WHEN EXISTS( SELECT 1 FROM pg_catalog.pg_language WHERE lanname='plpgsql' ) THEN NULL ELSE make_plpgsql() END; DROP FUNCTION make_plpgsql(); -- Create the search index. ALTER TABLE watson_searchentry ADD COLUMN search_tsv tsvector NOT NULL; CREATE INDEX watson_searchentry_search_tsv ON watson_searchentry USING gin(search_tsv); -- Create the trigger function. CREATE FUNCTION watson_searchentry_trigger_handler() RETURNS trigger AS $$ begin new.search_tsv := setweight(to_tsvector('pg_catalog.english', coalesce(new.title, '')), 'A') || setweight(to_tsvector('pg_catalog.english', coalesce(new.description, '')), 'C') || setweight(to_tsvector('pg_catalog.english', coalesce(new.content, '')), 'D'); return new; end $$ LANGUAGE plpgsql; CREATE TRIGGER watson_searchentry_trigger BEFORE INSERT OR UPDATE ON watson_searchentry FOR EACH ROW EXECUTE PROCEDURE watson_searchentry_trigger_handler(); """) def do_uninstall(self): """Executes the PostgreSQL specific SQL code to uninstall django-watson.""" connection.cursor().execute(""" ALTER TABLE watson_searchentry DROP COLUMN search_tsv; DROP TRIGGER watson_searchentry_trigger ON watson_searchentry; DROP FUNCTION watson_searchentry_trigger_handler(); """) requires_installation = True supports_ranking = True def do_search(self, engine_slug, queryset, search_text): """Performs the full text search.""" return queryset.extra( where = ("search_tsv @@ to_tsquery(%s)",), params = (self.escape_postgres_query(search_text),), ) def do_search_ranking(self, engine_slug, queryset, search_text): """Performs full text ranking.""" return queryset.extra( select = { "watson_rank": "ts_rank_cd(search_tsv, to_tsquery(%s))", }, select_params = (self.escape_postgres_query(search_text),), order_by = ("-watson_rank",), ) def do_filter(self, engine_slug, queryset, search_text): """Performs the full text filter.""" model = queryset.model content_type = ContentType.objects.get_for_model(model) pk = model._meta.pk if has_int_pk(model): ref_name = "object_id_int" else: ref_name = "object_id" return queryset.extra( tables = ("watson_searchentry",), where = ( "watson_searchentry.engine_slug = %s", "watson_searchentry.search_tsv @@ to_tsquery(%s)", "watson_searchentry.{ref_name} = {table_name}.{pk_name}".format( ref_name = ref_name, table_name = connection.ops.quote_name(model._meta.db_table), pk_name = connection.ops.quote_name(pk.db_column or pk.attname), ), "watson_searchentry.content_type_id = %s" ), params = (engine_slug, self.escape_postgres_query(search_text), content_type.id), ) def do_filter_ranking(self, engine_slug, queryset, search_text): """Performs the full text ranking.""" return queryset.extra( select = { "watson_rank": "ts_rank_cd(watson_searchentry.search_tsv, to_tsquery(%s))", }, select_params = (self.escape_postgres_query(search_text),), order_by = ("-watson_rank",), ) class PostgresLegacySearchBackend(PostgresSearchBackend): """ A search backend that uses native PostgreSQL full text indices. This backend doesn't support prefix matching, and works with PostgreSQL 8.3 and below. """ supports_prefix_matching = False def escape_postgres_query(self, text): """Escapes the given text to become a valid ts_query.""" return u" & ".join(text.replace(u"(", u"").replace(u")", u"").replace(u":", u"").replace(u"|", u"").replace("!", "").split()) def escape_mysql_boolean_query(search_text): return u" ".join( u'+{word}*'.format( word = word, ) for word in search_text.replace(u"+", u"").replace(u"-", u"").replace(u"<", u"").replace(u">", u"").replace(u"(", u"").replace(u")", u"").replace(u"*", u"").replace(u'"', u"").split() ) class MySQLSearchBackend(SearchBackend): def is_installed(self): """Checks whether django-watson is installed.""" cursor = connection.cursor() cursor.execute("SHOW INDEX FROM watson_searchentry WHERE Key_name = 'watson_searchentry_fulltext'"); return bool(cursor.fetchall()) def do_install(self): """Executes the MySQL specific SQL code to install django-watson.""" cursor = connection.cursor() # Drop all foreign keys on the watson_searchentry table. cursor.execute("SELECT CONSTRAINT_NAME FROM information_schema.TABLE_CONSTRAINTS WHERE CONSTRAINT_SCHEMA = DATABASE() AND TABLE_NAME = 'watson_searchentry' AND CONSTRAINT_TYPE = 'FOREIGN KEY'") for constraint_name, in cursor.fetchall(): cursor.execute("ALTER TABLE watson_searchentry DROP FOREIGN KEY {constraint_name}".format( constraint_name = constraint_name, )) # Change the storage engine to MyISAM. cursor.execute("ALTER TABLE watson_searchentry ENGINE = MyISAM") # Change the collaction to a case-insensitive one. cursor.execute("ALTER TABLE watson_searchentry CONVERT TO CHARACTER SET utf8 COLLATE utf8_general_ci") # Add the full text indexes. cursor.execute("CREATE FULLTEXT INDEX watson_searchentry_fulltext ON watson_searchentry (title, description, content)") cursor.execute("CREATE FULLTEXT INDEX watson_searchentry_title ON watson_searchentry (title)") cursor.execute("CREATE FULLTEXT INDEX watson_searchentry_description ON watson_searchentry (description)") cursor.execute("CREATE FULLTEXT INDEX watson_searchentry_content ON watson_searchentry (content)") def do_uninstall(self): """Executes the SQL needed to uninstall django-watson.""" cursor = connection.cursor() # Destroy the full text indexes. cursor.execute("DROP INDEX watson_searchentry_fulltext ON watson_searchentry") cursor.execute("DROP INDEX watson_searchentry_title ON watson_searchentry") cursor.execute("DROP INDEX watson_searchentry_description ON watson_searchentry") cursor.execute("DROP INDEX watson_searchentry_content ON watson_searchentry") requires_installation = True supports_ranking = True def do_search(self, engine_slug, queryset, search_text): """Performs the full text search.""" return queryset.extra( where = ("MATCH (title, description, content) AGAINST (%s IN BOOLEAN MODE)",), params = (escape_mysql_boolean_query(search_text),), ) def do_search_ranking(self, engine_slug, queryset, search_text): """Performs full text ranking.""" search_text = escape_mysql_boolean_query(search_text) return queryset.extra( select = { "watson_rank": """ ((MATCH (title) AGAINST (%s IN BOOLEAN MODE)) * 3) + ((MATCH (description) AGAINST (%s IN BOOLEAN MODE)) * 2) + ((MATCH (content) AGAINST (%s IN BOOLEAN MODE)) * 1) """, }, select_params = (search_text, search_text, search_text,), order_by = ("-watson_rank",), ) def do_filter(self, engine_slug, queryset, search_text): """Performs the full text filter.""" model = queryset.model content_type = ContentType.objects.get_for_model(model) pk = model._meta.pk if has_int_pk(model): ref_name = "object_id_int" else: ref_name = "object_id" return queryset.extra( tables = ("watson_searchentry",), where = ( "watson_searchentry.engine_slug = %s", "MATCH (watson_searchentry.title, watson_searchentry.description, watson_searchentry.content) AGAINST (%s IN BOOLEAN MODE)", "watson_searchentry.{ref_name} = {table_name}.{pk_name}".format( ref_name = ref_name, table_name = connection.ops.quote_name(model._meta.db_table), pk_name = connection.ops.quote_name(pk.db_column or pk.attname), ), "watson_searchentry.content_type_id = %s", ), params = (engine_slug, escape_mysql_boolean_query(search_text), content_type.id), ) def do_filter_ranking(self, engine_slug, queryset, search_text): """Performs the full text ranking.""" search_text = escape_mysql_boolean_query(search_text) return queryset.extra( select = { "watson_rank": """ ((MATCH (watson_searchentry.title) AGAINST (%s IN BOOLEAN MODE)) * 3) + ((MATCH (watson_searchentry.description) AGAINST (%s IN BOOLEAN MODE)) * 2) + ((MATCH (watson_searchentry.content) AGAINST (%s IN BOOLEAN MODE)) * 1) """, }, select_params = (search_text, search_text, search_text,), order_by = ("-watson_rank",), ) class AdaptiveSearchBackend(SearchBackend): """ A search backend that guesses the correct search backend based on the DATABASES["default"] settings. """ def __new__(cls): """Guess the correct search backend and initialize it.""" database_engine = settings.DATABASES["default"]["ENGINE"] if database_engine.endswith("postgresql_psycopg2") or database_engine.endswith("postgresql"): from django.db.backends.postgresql.version import get_version cursor = connection.cursor() version = get_version(cursor) if version > (8, 4, 0): return PostgresSearchBackend() if version > (8, 3, 0): return PostgresLegacySearchBackend() if database_engine.endswith("mysql"): return MySQLSearchBackend() return SearchBackend()