diff --git a/.travis.yml b/.travis.yml index 3e0a4bb98..8764f2859 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ services: # Package installation install: - python setup.py install - - pip install psycopg2 pyelasticsearch elasticutils==0.8.2 wand embedly + - pip install psycopg2 elasticsearch wand embedly - pip install coveralls # Pre-test configuration before_script: diff --git a/runtests.py b/runtests.py index 96653b40d..efa66910f 100755 --- a/runtests.py +++ b/runtests.py @@ -13,7 +13,7 @@ MEDIA_ROOT = os.path.join(WAGTAIL_ROOT, 'test-media') if not settings.configured: try: - import elasticutils + import elasticsearch has_elasticsearch = True except ImportError: has_elasticsearch = False diff --git a/wagtail/wagtailsearch/backends/elasticsearch.py b/wagtail/wagtailsearch/backends/elasticsearch.py index d2b58d564..d626d1d1c 100644 --- a/wagtail/wagtailsearch/backends/elasticsearch.py +++ b/wagtail/wagtailsearch/backends/elasticsearch.py @@ -1,6 +1,9 @@ +from __future__ import absolute_import + from django.db import models -from elasticutils import get_es, S +from elasticsearch import Elasticsearch, NotFoundError, RequestError +from elasticsearch.helpers import bulk from wagtail.wagtailsearch.backends.base import BaseSearch from wagtail.wagtailsearch.indexed import Indexed @@ -9,16 +12,141 @@ import string class ElasticSearchResults(object): - def __init__(self, model, query, prefetch_related=[]): + def __init__(self, backend, model, query_string, fields=None, filters={}, prefetch_related=[]): + self.backend = backend self.model = model - self.query = query - self.count = query.count() + self.query_string = query_string + self.fields = fields + self.filters = filters self.prefetch_related = prefetch_related + def _get_filters(self): + # Filters + filters = [] + + # Filter by content type + filters.append({ + 'prefix': { + 'content_type': self.model.indexed_get_content_type() + } + }) + + # Extra filters + if self.filters: + for key, value in self.filters.items(): + if '__' in key: + field, lookup = key.split('__') + else: + field = key + lookup = None + + if lookup is None: + if value is None: + filters.append({ + 'missing': { + 'field': field, + } + }) + else: + filters.append({ + 'term': { + field: value + } + }) + + if lookup in ['startswith', 'prefix']: + filters.append({ + 'prefix': { + field: value + } + }) + + if lookup in ['gt', 'gte', 'lt', 'lte']: + filters.append({ + 'range': { + field: { + lookup: value, + } + } + }) + + if lookup == 'range': + lower, upper = value + filters.append({ + 'range': { + field: { + 'gte': lower, + 'lte': upper, + } + } + }) + + return filters + + def _get_query(self): + # Query + query = { + 'query_string': { + 'query': self.query_string, + } + } + + # Fields + if self.fields: + query['query_string']['fields'] = self.fields + + # Filters + filters = self._get_filters() + + return { + 'filtered': { + 'query': query, + 'filter': { + 'and': filters, + } + } + } + + def _get_results_pks(self, offset=0, limit=None): + query = self._get_query() + query['from'] = offset + if limit is not None: + query['size'] = limit + + hits = self.backend.es.search( + index=self.backend.es_index, + body=dict(query=query), + _source=False, + fields='pk', + ) + + pks = [hit['fields']['pk'] for hit in hits['hits']['hits']] + + # ElasticSearch 1.x likes to pack pks into lists, unpack them if this has happened + return [pk[0] if isinstance(pk, list) else pk for pk in pks] + + def _get_count(self): + query = self._get_query() + + # Elasticsearch 1.x + count = self.backend.es.count( + index=self.backend.es_index, + body=dict(query=query), + ) + + # ElasticSearch 0.90.x fallback + if not count['_shards']['successful'] and "No query registered for [query]]" in count['_shards']['failures'][0]['reason']: + count = self.backend.es.count( + index=self.backend.es_index, + body=query, + ) + + return count['count'] + def __getitem__(self, key): if isinstance(key, slice): # Get primary keys - pk_list_unclean = [result._source["pk"] for result in self.query[key]] + pk_list_unclean = self._get_results_pks(key.start, key.stop - key.start) # Remove duplicate keys (and preserve order) seen_pks = set() @@ -45,11 +173,11 @@ class ElasticSearchResults(object): return results_sorted else: # Return a single item - pk = self.query[key]._source["pk"] + pk = self._get_results_pks(key, key + 1)[0] return self.model.objects.get(pk=pk) def __len__(self): - return self.count + return self._get_count() class ElasticSearch(BaseSearch): @@ -64,22 +192,17 @@ class ElasticSearch(BaseSearch): # Get ElasticSearch interface # Any remaining params are passed into the ElasticSearch constructor - self.es = get_es( + self.es = Elasticsearch( urls=self.es_urls, timeout=self.es_timeout, force_new=self.es_force_new, **params) - self.s = S().es( - urls=self.es_urls, - timeout=self.es_timeout, - force_new=self.es_force_new, - **params).indexes(self.es_index) def reset_index(self): # Delete old index try: - self.es.delete_index(self.es_index) - except: + self.es.indices.delete(self.es_index) + except NotFoundError: pass # Settings @@ -128,7 +251,7 @@ class ElasticSearch(BaseSearch): } # Create new index - self.es.create_index(self.es_index, INDEX_SETTINGS) + self.es.indices.create(self.es_index, INDEX_SETTINGS) def add_type(self, model): # Get type name @@ -144,14 +267,14 @@ class ElasticSearch(BaseSearch): }.items() + indexed_fields.items()) # Put mapping - self.es.put_mapping(self.es_index, content_type, { + self.es.indices.put_mapping(index=self.es_index, doc_type=content_type, body={ content_type: { "properties": fields, } }) def refresh_index(self): - self.es.refresh(self.es_index) + self.es.indices.refresh(self.es_index) def add(self, obj): # Make sure the object can be indexed @@ -183,24 +306,33 @@ class ElasticSearch(BaseSearch): type_set[obj_type].append(obj.indexed_build_document()) # Loop through each type and bulk add them - results = [] for type_name, type_objects in type_set.items(): - results.append((type_name, len(type_objects))) - self.es.bulk_index(self.es_index, type_name, type_objects) - return results + # Get list of actions + actions = [] + for obj in type_objects: + action = { + '_index': self.es_index, + '_type': type_name, + '_id': obj['id'], + } + action.update(obj) + actions.append(action) + + bulk(self.es, actions) def delete(self, obj): # Object must be a decendant of Indexed and be a django model if not isinstance(obj, Indexed) or not isinstance(obj, models.Model): return - # Get ID for document - doc_id = obj.indexed_get_document_id() - # Delete document try: - self.es.delete(self.es_index, obj.indexed_get_content_type(), doc_id) - except: + self.es.delete( + self.es_index, + obj.indexed_get_content_type(), + obj.indexed_get_document_id(), + ) + except NotFoundError: pass # Document doesn't exist, ignore this exception def search(self, query_string, model, fields=None, filters={}, prefetch_related=[]): @@ -215,27 +347,5 @@ class ElasticSearch(BaseSearch): if not query_string: return [] - # Query - if fields: - query = self.s.query_raw({ - "query_string": { - "query": query_string, - "fields": fields, - } - }) - else: - query = self.s.query_raw({ - "query_string": { - "query": query_string, - } - }) - - # Filter results by this content type - query = query.filter(content_type__prefix=model.indexed_get_content_type()) - - # Extra filters - if filters: - query = query.filter(**filters) - # Return search results - return ElasticSearchResults(model, query, prefetch_related=prefetch_related) + return ElasticSearchResults(self, model, query_string, fields=fields, filters=filters, prefetch_related=prefetch_related)