From 4081206c77fd5c9bcfc03dfe6ed923dfa4d93f23 Mon Sep 17 00:00:00 2001 From: Karl Hobley Date: Sat, 5 Apr 2014 20:10:58 +0100 Subject: [PATCH 1/4] Scrapped ElasticUtils, use Elasticsearch-py instead Conflicts: wagtail/wagtailsearch/backends/elasticsearch.py wagtail/wagtailsearch/indexed.py --- runtests.py | 2 +- .../wagtailsearch/backends/elasticsearch.py | 187 +++++++++++++----- 2 files changed, 142 insertions(+), 47 deletions(-) diff --git a/runtests.py b/runtests.py index 96653b40d..efa66910f 100755 --- a/runtests.py +++ b/runtests.py @@ -13,7 +13,7 @@ MEDIA_ROOT = os.path.join(WAGTAIL_ROOT, 'test-media') if not settings.configured: try: - import elasticutils + import elasticsearch has_elasticsearch = True except ImportError: has_elasticsearch = False diff --git a/wagtail/wagtailsearch/backends/elasticsearch.py b/wagtail/wagtailsearch/backends/elasticsearch.py index d2b58d564..7571db5ac 100644 --- a/wagtail/wagtailsearch/backends/elasticsearch.py +++ b/wagtail/wagtailsearch/backends/elasticsearch.py @@ -1,6 +1,8 @@ +from __future__ import absolute_import + from django.db import models -from elasticutils import get_es, S +from elasticsearch import Elasticsearch, NotFoundError from wagtail.wagtailsearch.backends.base import BaseSearch from wagtail.wagtailsearch.indexed import Indexed @@ -9,16 +11,131 @@ import string class ElasticSearchResults(object): - def __init__(self, model, query, prefetch_related=[]): + def __init__(self, backend, model, query_string, fields=None, filters={}, prefetch_related=[]): + self.backend = backend self.model = model - self.query = query - self.count = query.count() + self.query_string = query_string + self.fields = fields + self.filters = filters self.prefetch_related = prefetch_related + def _get_filters(self): + # Filters + filters = [] + + # Filter by content type + filters.append({ + 'prefix': { + 'content_type': self.model.indexed_get_content_type() + } + }) + + # Extra filters + if self.filters: + for key, value in self.filters.items(): + if '__' in key: + field, lookup = key.split('__') + else: + field = key + lookup = None + + if lookup is None: + if value is None: + filters.append({ + 'missing': { + 'field': field, + } + }) + else: + filters.append({ + 'term': { + field: value + } + }) + + if lookup in ['startswith', 'prefix']: + filters.append({ + 'prefix': { + field: value + } + }) + + if lookup in ['gt', 'gte', 'lt', 'lte']: + filters.append({ + 'range': { + field: { + lookup: value, + } + } + }) + + if lookup == 'range': + lower, upper = value + filters.append({ + 'range': { + field: { + 'gte': lower, + 'lte': upper, + } + } + }) + + return filters + + def _get_query(self): + # Query + query = { + 'query_string': { + 'query': self.query_string, + } + } + + # Fields + if self.fields: + query['query_string']['fields'] = self.fields + + # Filters + filters = self._get_filters() + + return { + 'query': { + 'filtered': { + 'query': query, + 'filter': { + 'and': filters, + } + } + } + } + + def _get_results_pks(self, offset=0, limit=None): + query = self._get_query() + query['query']['from'] = offset + if limit is not None: + query['query']['size'] = limit + + hits = self.backend.es.search( + index=self.backend.es_index, + body=query, + _source=False, + fields='pk', + ) + + return [hit['fields']['pk'][0] for hit in hits['hits']['hits']] + + def _get_count(self): + query = self._get_query() + count = self.backend.es.count( + index=self.backend.es_index, + body=query, + ) + + return count['count'] + def __getitem__(self, key): if isinstance(key, slice): # Get primary keys - pk_list_unclean = [result._source["pk"] for result in self.query[key]] + pk_list_unclean = self._get_results_pks(key.start, key.stop - key.start) # Remove duplicate keys (and preserve order) seen_pks = set() @@ -45,11 +162,11 @@ class ElasticSearchResults(object): return results_sorted else: # Return a single item - pk = self.query[key]._source["pk"] + pk = self._get_results_pks(key, key + 1)[0] return self.model.objects.get(pk=pk) def __len__(self): - return self.count + return self._get_count() class ElasticSearch(BaseSearch): @@ -64,22 +181,17 @@ class ElasticSearch(BaseSearch): # Get ElasticSearch interface # Any remaining params are passed into the ElasticSearch constructor - self.es = get_es( + self.es = Elasticsearch( urls=self.es_urls, timeout=self.es_timeout, force_new=self.es_force_new, **params) - self.s = S().es( - urls=self.es_urls, - timeout=self.es_timeout, - force_new=self.es_force_new, - **params).indexes(self.es_index) def reset_index(self): # Delete old index try: - self.es.delete_index(self.es_index) - except: + self.es.indices.delete(self.es_index) + except NotFoundError: pass # Settings @@ -128,7 +240,7 @@ class ElasticSearch(BaseSearch): } # Create new index - self.es.create_index(self.es_index, INDEX_SETTINGS) + self.es.indices.create(self.es_index, INDEX_SETTINGS) def add_type(self, model): # Get type name @@ -144,14 +256,14 @@ class ElasticSearch(BaseSearch): }.items() + indexed_fields.items()) # Put mapping - self.es.put_mapping(self.es_index, content_type, { + self.es.indices.put_mapping(index=self.es_index, doc_type=content_type, body={ content_type: { "properties": fields, } }) def refresh_index(self): - self.es.refresh(self.es_index) + self.es.indices.refresh(self.es_index) def add(self, obj): # Make sure the object can be indexed @@ -165,6 +277,10 @@ class ElasticSearch(BaseSearch): self.es.index(self.es_index, obj.indexed_get_content_type(), doc, id=doc["id"]) def add_bulk(self, obj_list): + # TODO: Make this work with new elastic search module + for obj in obj_list: + self.add(obj) + return # Group all objects by their type type_set = {} for obj in obj_list: @@ -194,13 +310,14 @@ class ElasticSearch(BaseSearch): if not isinstance(obj, Indexed) or not isinstance(obj, models.Model): return - # Get ID for document - doc_id = obj.indexed_get_document_id() - # Delete document try: - self.es.delete(self.es_index, obj.indexed_get_content_type(), doc_id) - except: + self.es.delete( + self.es_index, + obj.indexed_get_content_type(), + obj.indexed_get_document_id(), + ) + except NotFoundError: pass # Document doesn't exist, ignore this exception def search(self, query_string, model, fields=None, filters={}, prefetch_related=[]): @@ -215,27 +332,5 @@ class ElasticSearch(BaseSearch): if not query_string: return [] - # Query - if fields: - query = self.s.query_raw({ - "query_string": { - "query": query_string, - "fields": fields, - } - }) - else: - query = self.s.query_raw({ - "query_string": { - "query": query_string, - } - }) - - # Filter results by this content type - query = query.filter(content_type__prefix=model.indexed_get_content_type()) - - # Extra filters - if filters: - query = query.filter(**filters) - # Return search results - return ElasticSearchResults(model, query, prefetch_related=prefetch_related) + return ElasticSearchResults(self, model, query_string, fields=fields, filters=filters, prefetch_related=prefetch_related) From c00c4d24372079e8a07cc4276cfb2568643abd80 Mon Sep 17 00:00:00 2001 From: Karl Hobley Date: Sun, 6 Apr 2014 14:35:57 +0100 Subject: [PATCH 2/4] New ElasticSearch module now supports bulk insert --- .../wagtailsearch/backends/elasticsearch.py | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/wagtail/wagtailsearch/backends/elasticsearch.py b/wagtail/wagtailsearch/backends/elasticsearch.py index 7571db5ac..78de0e94f 100644 --- a/wagtail/wagtailsearch/backends/elasticsearch.py +++ b/wagtail/wagtailsearch/backends/elasticsearch.py @@ -3,6 +3,7 @@ from __future__ import absolute_import from django.db import models from elasticsearch import Elasticsearch, NotFoundError +from elasticsearch.helpers import bulk from wagtail.wagtailsearch.backends.base import BaseSearch from wagtail.wagtailsearch.indexed import Indexed @@ -277,10 +278,6 @@ class ElasticSearch(BaseSearch): self.es.index(self.es_index, obj.indexed_get_content_type(), doc, id=doc["id"]) def add_bulk(self, obj_list): - # TODO: Make this work with new elastic search module - for obj in obj_list: - self.add(obj) - return # Group all objects by their type type_set = {} for obj in obj_list: @@ -299,11 +296,19 @@ class ElasticSearch(BaseSearch): type_set[obj_type].append(obj.indexed_build_document()) # Loop through each type and bulk add them - results = [] for type_name, type_objects in type_set.items(): - results.append((type_name, len(type_objects))) - self.es.bulk_index(self.es_index, type_name, type_objects) - return results + # Get list of actions + actions = [] + for obj in type_objects: + action = { + '_index': self.es_index, + '_type': type_name, + '_id': obj['id'], + } + action.update(obj) + actions.append(action) + + bulk(self.es, actions) def delete(self, obj): # Object must be a decendant of Indexed and be a django model From 598e6193da90908241937fe1a3fe9ab27983df36 Mon Sep 17 00:00:00 2001 From: Karl Hobley Date: Sun, 6 Apr 2014 18:07:53 +0100 Subject: [PATCH 3/4] Install elasticsearch instead of elasticutils on travis Conflicts: .travis.yml --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 3e0a4bb98..8764f2859 100644 --- a/.travis.yml +++ b/.travis.yml @@ -12,7 +12,7 @@ services: # Package installation install: - python setup.py install - - pip install psycopg2 pyelasticsearch elasticutils==0.8.2 wand embedly + - pip install psycopg2 elasticsearch wand embedly - pip install coveralls # Pre-test configuration before_script: From 05cb87e3eb99c07764f176359d3ad488e020bb37 Mon Sep 17 00:00:00 2001 From: Karl Hobley Date: Tue, 8 Apr 2014 10:47:13 +0100 Subject: [PATCH 4/4] Tests now succeed on elasticsearch 0.90.x --- .../wagtailsearch/backends/elasticsearch.py | 34 ++++++++++++------- 1 file changed, 22 insertions(+), 12 deletions(-) diff --git a/wagtail/wagtailsearch/backends/elasticsearch.py b/wagtail/wagtailsearch/backends/elasticsearch.py index 78de0e94f..d626d1d1c 100644 --- a/wagtail/wagtailsearch/backends/elasticsearch.py +++ b/wagtail/wagtailsearch/backends/elasticsearch.py @@ -2,7 +2,7 @@ from __future__ import absolute_import from django.db import models -from elasticsearch import Elasticsearch, NotFoundError +from elasticsearch import Elasticsearch, NotFoundError, RequestError from elasticsearch.helpers import bulk from wagtail.wagtailsearch.backends.base import BaseSearch @@ -99,38 +99,48 @@ class ElasticSearchResults(object): filters = self._get_filters() return { - 'query': { - 'filtered': { - 'query': query, - 'filter': { - 'and': filters, - } + 'filtered': { + 'query': query, + 'filter': { + 'and': filters, } } } def _get_results_pks(self, offset=0, limit=None): query = self._get_query() - query['query']['from'] = offset + query['from'] = offset if limit is not None: - query['query']['size'] = limit + query['size'] = limit hits = self.backend.es.search( index=self.backend.es_index, - body=query, + body=dict(query=query), _source=False, fields='pk', ) - return [hit['fields']['pk'][0] for hit in hits['hits']['hits']] + pks = [hit['fields']['pk'] for hit in hits['hits']['hits']] + + # ElasticSearch 1.x likes to pack pks into lists, unpack them if this has happened + return [pk[0] if isinstance(pk, list) else pk for pk in pks] def _get_count(self): query = self._get_query() + + # Elasticsearch 1.x count = self.backend.es.count( index=self.backend.es_index, - body=query, + body=dict(query=query), ) + # ElasticSearch 0.90.x fallback + if not count['_shards']['successful'] and "No query registered for [query]]" in count['_shards']['failures'][0]['reason']: + count = self.backend.es.count( + index=self.backend.es_index, + body=query, + ) + return count['count'] def __getitem__(self, key):