Merge pull request #342 from kaedroho/searchchanges/ditch-elasticutils

Search Changes 1 - Ditch ElasticUtils
This commit is contained in:
Matt Westcott 2014-06-23 14:11:24 +01:00
commit 766f14e709
3 changed files with 162 additions and 52 deletions

View file

@ -12,7 +12,7 @@ services:
# Package installation
install:
- python setup.py install
- pip install psycopg2 pyelasticsearch elasticutils==0.8.2 wand embedly
- pip install psycopg2 elasticsearch wand embedly
- pip install coveralls
# Pre-test configuration
before_script:

View file

@ -13,7 +13,7 @@ MEDIA_ROOT = os.path.join(WAGTAIL_ROOT, 'test-media')
if not settings.configured:
try:
import elasticutils
import elasticsearch
has_elasticsearch = True
except ImportError:
has_elasticsearch = False

View file

@ -1,6 +1,9 @@
from __future__ import absolute_import
from django.db import models
from elasticutils import get_es, S
from elasticsearch import Elasticsearch, NotFoundError, RequestError
from elasticsearch.helpers import bulk
from wagtail.wagtailsearch.backends.base import BaseSearch
from wagtail.wagtailsearch.indexed import Indexed
@ -9,16 +12,141 @@ import string
class ElasticSearchResults(object):
def __init__(self, model, query, prefetch_related=[]):
def __init__(self, backend, model, query_string, fields=None, filters={}, prefetch_related=[]):
self.backend = backend
self.model = model
self.query = query
self.count = query.count()
self.query_string = query_string
self.fields = fields
self.filters = filters
self.prefetch_related = prefetch_related
def _get_filters(self):
# Filters
filters = []
# Filter by content type
filters.append({
'prefix': {
'content_type': self.model.indexed_get_content_type()
}
})
# Extra filters
if self.filters:
for key, value in self.filters.items():
if '__' in key:
field, lookup = key.split('__')
else:
field = key
lookup = None
if lookup is None:
if value is None:
filters.append({
'missing': {
'field': field,
}
})
else:
filters.append({
'term': {
field: value
}
})
if lookup in ['startswith', 'prefix']:
filters.append({
'prefix': {
field: value
}
})
if lookup in ['gt', 'gte', 'lt', 'lte']:
filters.append({
'range': {
field: {
lookup: value,
}
}
})
if lookup == 'range':
lower, upper = value
filters.append({
'range': {
field: {
'gte': lower,
'lte': upper,
}
}
})
return filters
def _get_query(self):
# Query
query = {
'query_string': {
'query': self.query_string,
}
}
# Fields
if self.fields:
query['query_string']['fields'] = self.fields
# Filters
filters = self._get_filters()
return {
'filtered': {
'query': query,
'filter': {
'and': filters,
}
}
}
def _get_results_pks(self, offset=0, limit=None):
query = self._get_query()
query['from'] = offset
if limit is not None:
query['size'] = limit
hits = self.backend.es.search(
index=self.backend.es_index,
body=dict(query=query),
_source=False,
fields='pk',
)
pks = [hit['fields']['pk'] for hit in hits['hits']['hits']]
# ElasticSearch 1.x likes to pack pks into lists, unpack them if this has happened
return [pk[0] if isinstance(pk, list) else pk for pk in pks]
def _get_count(self):
query = self._get_query()
# Elasticsearch 1.x
count = self.backend.es.count(
index=self.backend.es_index,
body=dict(query=query),
)
# ElasticSearch 0.90.x fallback
if not count['_shards']['successful'] and "No query registered for [query]]" in count['_shards']['failures'][0]['reason']:
count = self.backend.es.count(
index=self.backend.es_index,
body=query,
)
return count['count']
def __getitem__(self, key):
if isinstance(key, slice):
# Get primary keys
pk_list_unclean = [result._source["pk"] for result in self.query[key]]
pk_list_unclean = self._get_results_pks(key.start, key.stop - key.start)
# Remove duplicate keys (and preserve order)
seen_pks = set()
@ -45,11 +173,11 @@ class ElasticSearchResults(object):
return results_sorted
else:
# Return a single item
pk = self.query[key]._source["pk"]
pk = self._get_results_pks(key, key + 1)[0]
return self.model.objects.get(pk=pk)
def __len__(self):
return self.count
return self._get_count()
class ElasticSearch(BaseSearch):
@ -64,22 +192,17 @@ class ElasticSearch(BaseSearch):
# Get ElasticSearch interface
# Any remaining params are passed into the ElasticSearch constructor
self.es = get_es(
self.es = Elasticsearch(
urls=self.es_urls,
timeout=self.es_timeout,
force_new=self.es_force_new,
**params)
self.s = S().es(
urls=self.es_urls,
timeout=self.es_timeout,
force_new=self.es_force_new,
**params).indexes(self.es_index)
def reset_index(self):
# Delete old index
try:
self.es.delete_index(self.es_index)
except:
self.es.indices.delete(self.es_index)
except NotFoundError:
pass
# Settings
@ -128,7 +251,7 @@ class ElasticSearch(BaseSearch):
}
# Create new index
self.es.create_index(self.es_index, INDEX_SETTINGS)
self.es.indices.create(self.es_index, INDEX_SETTINGS)
def add_type(self, model):
# Get type name
@ -144,14 +267,14 @@ class ElasticSearch(BaseSearch):
}.items() + indexed_fields.items())
# Put mapping
self.es.put_mapping(self.es_index, content_type, {
self.es.indices.put_mapping(index=self.es_index, doc_type=content_type, body={
content_type: {
"properties": fields,
}
})
def refresh_index(self):
self.es.refresh(self.es_index)
self.es.indices.refresh(self.es_index)
def add(self, obj):
# Make sure the object can be indexed
@ -183,24 +306,33 @@ class ElasticSearch(BaseSearch):
type_set[obj_type].append(obj.indexed_build_document())
# Loop through each type and bulk add them
results = []
for type_name, type_objects in type_set.items():
results.append((type_name, len(type_objects)))
self.es.bulk_index(self.es_index, type_name, type_objects)
return results
# Get list of actions
actions = []
for obj in type_objects:
action = {
'_index': self.es_index,
'_type': type_name,
'_id': obj['id'],
}
action.update(obj)
actions.append(action)
bulk(self.es, actions)
def delete(self, obj):
# Object must be a decendant of Indexed and be a django model
if not isinstance(obj, Indexed) or not isinstance(obj, models.Model):
return
# Get ID for document
doc_id = obj.indexed_get_document_id()
# Delete document
try:
self.es.delete(self.es_index, obj.indexed_get_content_type(), doc_id)
except:
self.es.delete(
self.es_index,
obj.indexed_get_content_type(),
obj.indexed_get_document_id(),
)
except NotFoundError:
pass # Document doesn't exist, ignore this exception
def search(self, query_string, model, fields=None, filters={}, prefetch_related=[]):
@ -215,27 +347,5 @@ class ElasticSearch(BaseSearch):
if not query_string:
return []
# Query
if fields:
query = self.s.query_raw({
"query_string": {
"query": query_string,
"fields": fields,
}
})
else:
query = self.s.query_raw({
"query_string": {
"query": query_string,
}
})
# Filter results by this content type
query = query.filter(content_type__prefix=model.indexed_get_content_type())
# Extra filters
if filters:
query = query.filter(**filters)
# Return search results
return ElasticSearchResults(model, query, prefetch_related=prefetch_related)
return ElasticSearchResults(self, model, query_string, fields=fields, filters=filters, prefetch_related=prefetch_related)