feat(api/2): Added new module for WagtailAPI v2

This commit is contained in:
Karl Hobley 2015-10-08 20:36:21 +01:00
parent e3504af94c
commit b832672811
15 changed files with 2411 additions and 1 deletions

View file

@ -1,7 +1,18 @@
from django.apps import AppConfig
from django.apps import AppConfig, apps
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
class WagtailAPIAppConfig(AppConfig):
name = 'wagtail.api'
label = 'wagtailapi'
verbose_name = "Wagtail API"
def ready(self):
# Install cache purging signal handlers
if getattr(settings, 'WAGTAILAPI_USE_FRONTENDCACHE', False):
if apps.is_installed('wagtail.contrib.wagtailfrontendcache'):
from wagtail.api.v2.signal_handlers import register_signal_handlers as register_signal_handlers_v2
register_signal_handlers_v2()
else:
raise ImproperlyConfigured("The setting 'WAGTAILAPI_USE_FRONTENDCACHE' is True but 'wagtail.contrib.wagtailfrontendcache' is not in INSTALLED_APPS.")

View file

247
wagtail/api/v2/endpoints.py Normal file
View file

@ -0,0 +1,247 @@
from __future__ import absolute_import
from collections import OrderedDict
from django.conf.urls import url
from django.http import Http404
from django.core.urlresolvers import reverse
from django.apps import apps
from rest_framework import status
from rest_framework.response import Response
from rest_framework.viewsets import GenericViewSet
from rest_framework.renderers import JSONRenderer, BrowsableAPIRenderer
from wagtail.wagtailcore.models import Page
from wagtail.wagtailimages.models import get_image_model
from wagtail.wagtaildocs.models import Document
from wagtail.wagtailcore.utils import resolve_model_string
from .filters import (
FieldsFilter, OrderingFilter, SearchFilter,
ChildOfFilter, DescendantOfFilter
)
from .pagination import WagtailPagination
from .serializers import BaseSerializer, PageSerializer, DocumentSerializer, ImageSerializer, get_serializer_class
from .utils import BadRequestError
class BaseAPIEndpoint(GenericViewSet):
renderer_classes = [JSONRenderer]
# The BrowsableAPIRenderer requires rest_framework to be installed
# Remove this check in Wagtail 1.4 as rest_framework will be required
# RemovedInWagtail14Warning
if apps.is_installed('rest_framework'):
renderer_classes.append(BrowsableAPIRenderer)
pagination_class = WagtailPagination
base_serializer_class = BaseSerializer
filter_backends = []
model = None # Set on subclass
known_query_parameters = frozenset([
'limit',
'offset',
'fields',
'order',
'search',
# Used by jQuery for cache-busting. See #1671
'_',
# Required by BrowsableAPIRenderer
'format',
])
extra_api_fields = []
name = None # Set on subclass.
def get_queryset(self):
return self.model.objects.all().order_by('id')
def listing_view(self, request):
queryset = self.get_queryset()
self.check_query_parameters(queryset)
queryset = self.filter_queryset(queryset)
queryset = self.paginate_queryset(queryset)
serializer = self.get_serializer(queryset, many=True)
return self.get_paginated_response(serializer.data)
def detail_view(self, request, pk):
instance = self.get_object()
serializer = self.get_serializer(instance)
return Response(serializer.data)
def handle_exception(self, exc):
if isinstance(exc, Http404):
data = {'message': str(exc)}
return Response(data, status=status.HTTP_404_NOT_FOUND)
elif isinstance(exc, BadRequestError):
data = {'message': str(exc)}
return Response(data, status=status.HTTP_400_BAD_REQUEST)
return super(BaseAPIEndpoint, self).handle_exception(exc)
def get_api_fields(self, model):
"""
This returns a list of field names that are allowed to
be used in the API (excluding the id field).
"""
api_fields = self.extra_api_fields[:]
if hasattr(model, 'api_fields'):
api_fields.extend(model.api_fields)
return api_fields
def check_query_parameters(self, queryset):
"""
Ensure that only valid query paramters are included in the URL.
"""
query_parameters = set(self.request.GET.keys())
# All query paramters must be either a field or an operation
allowed_query_parameters = set(self.get_api_fields(queryset.model)).union(self.known_query_parameters).union({'id'})
unknown_parameters = query_parameters - allowed_query_parameters
if unknown_parameters:
raise BadRequestError("query parameter is not an operation or a recognised field: %s" % ', '.join(sorted(unknown_parameters)))
def get_serializer_class(self):
request = self.request
# Get model
if self.action == 'listing_view':
model = self.get_queryset().model
else:
model = type(self.get_object())
# Get all available fields
all_fields = self.get_api_fields(model)
all_fields = list(OrderedDict.fromkeys(all_fields)) # Removes any duplicates in case the developer put "title" in api_fields
if self.action == 'listing_view':
# Listing views just show the title field and any other allowed field the user specified
if 'fields' in request.GET:
fields = set(request.GET['fields'].split(','))
else:
fields = {'title'}
unknown_fields = fields - set(all_fields)
if unknown_fields:
raise BadRequestError("unknown fields: %s" % ', '.join(sorted(unknown_fields)))
# Reorder fields so it matches the order of all_fields
fields = [field for field in all_fields if field in fields]
else:
# Detail views show all fields all the time
fields = all_fields
# Always show id and meta first
fields = ['id', 'meta'] + fields
# If showing details, add the parent field
if isinstance(self, PagesAPIEndpoint) and self.get_serializer_context().get('show_details', False):
fields.insert(2, 'parent')
return get_serializer_class(model, fields, base=self.base_serializer_class)
def get_serializer_context(self):
"""
The serialization context differs between listing and detail views.
"""
context = {
'request': self.request,
'view': self,
'router': self.request.wagtailapi_router
}
if self.action == 'detail_view':
context['show_details'] = True
return context
def get_renderer_context(self):
context = super(BaseAPIEndpoint, self).get_renderer_context()
context['indent'] = 4
return context
@classmethod
def get_urlpatterns(cls):
"""
This returns a list of URL patterns for the endpoint
"""
return [
url(r'^$', cls.as_view({'get': 'listing_view'}), name='listing'),
url(r'^(?P<pk>\d+)/$', cls.as_view({'get': 'detail_view'}), name='detail'),
]
@classmethod
def get_object_detail_urlpath(cls, model, pk, namespace=''):
if namespace:
url_name = namespace + ':detail'
else:
url_name = 'detail'
return reverse(url_name, args=(pk, ))
class PagesAPIEndpoint(BaseAPIEndpoint):
base_serializer_class = PageSerializer
filter_backends = [
FieldsFilter,
ChildOfFilter,
DescendantOfFilter,
OrderingFilter,
SearchFilter
]
known_query_parameters = BaseAPIEndpoint.known_query_parameters.union([
'type',
'child_of',
'descendant_of',
])
extra_api_fields = ['title']
name = 'pages'
model = Page
def get_queryset(self):
request = self.request
# Allow pages to be filtered to a specific type
if 'type' not in request.GET:
model = Page
else:
model_name = request.GET['type']
try:
model = resolve_model_string(model_name)
except LookupError:
raise BadRequestError("type doesn't exist")
if not issubclass(model, Page):
raise BadRequestError("type doesn't exist")
# Get live pages that are not in a private section
queryset = model.objects.public().live()
# Filter by site
queryset = queryset.descendant_of(request.site.root_page, inclusive=True)
return queryset
def get_object(self):
base = super(PagesAPIEndpoint, self).get_object()
return base.specific
class ImagesAPIEndpoint(BaseAPIEndpoint):
base_serializer_class = ImageSerializer
filter_backends = [FieldsFilter, OrderingFilter, SearchFilter]
extra_api_fields = ['title', 'tags', 'width', 'height']
name = 'images'
model = get_image_model()
class DocumentsAPIEndpoint(BaseAPIEndpoint):
base_serializer_class = DocumentSerializer
filter_backends = [FieldsFilter, OrderingFilter, SearchFilter]
extra_api_fields = ['title', 'tags']
name = 'documents'
model = Document

150
wagtail/api/v2/filters.py Normal file
View file

@ -0,0 +1,150 @@
from django.conf import settings
from rest_framework.filters import BaseFilterBackend
from taggit.managers import _TaggableManager
from wagtail.wagtailcore.models import Page
from wagtail.wagtailsearch.backends import get_search_backend
from .utils import BadRequestError, pages_for_site
class FieldsFilter(BaseFilterBackend):
def filter_queryset(self, request, queryset, view):
"""
This performs field level filtering on the result set
Eg: ?title=James Joyce
"""
fields = set(view.get_api_fields(queryset.model)).union({'id'})
for field_name, value in request.GET.items():
if field_name in fields:
field = getattr(queryset.model, field_name, None)
if isinstance(field, _TaggableManager):
for tag in value.split(','):
queryset = queryset.filter(**{field_name + '__name': tag})
# Stick a message on the queryset to indicate that tag filtering has been performed
# This will let the do_search method know that it must raise an error as searching
# and tag filtering at the same time is not supported
queryset._filtered_by_tag = True
else:
queryset = queryset.filter(**{field_name: value})
return queryset
class OrderingFilter(BaseFilterBackend):
def filter_queryset(self, request, queryset, view):
"""
This applies ordering to the result set
Eg: ?order=title
It also supports reverse ordering
Eg: ?order=-title
And random ordering
Eg: ?order=random
"""
if 'order' in request.GET:
# Prevent ordering while searching
if 'search' in request.GET:
raise BadRequestError("ordering with a search query is not supported")
order_by = request.GET['order']
# Random ordering
if order_by == 'random':
# Prevent ordering by random with offset
if 'offset' in request.GET:
raise BadRequestError("random ordering with offset is not supported")
return queryset.order_by('?')
# Check if reverse ordering is set
if order_by.startswith('-'):
reverse_order = True
order_by = order_by[1:]
else:
reverse_order = False
# Add ordering
if order_by == 'id' or order_by in view.get_api_fields(queryset.model):
queryset = queryset.order_by(order_by)
else:
# Unknown field
raise BadRequestError("cannot order by '%s' (unknown field)" % order_by)
# Reverse order
if reverse_order:
queryset = queryset.reverse()
return queryset
class SearchFilter(BaseFilterBackend):
def filter_queryset(self, request, queryset, view):
"""
This performs a full-text search on the result set
Eg: ?search=James Joyce
"""
search_enabled = getattr(settings, 'WAGTAILAPI_SEARCH_ENABLED', True)
if 'search' in request.GET:
if not search_enabled:
raise BadRequestError("search is disabled")
# Searching and filtering by tag at the same time is not supported
if getattr(queryset, '_filtered_by_tag', False):
raise BadRequestError("filtering by tag with a search query is not supported")
search_query = request.GET['search']
sb = get_search_backend()
queryset = sb.search(search_query, queryset)
return queryset
class ChildOfFilter(BaseFilterBackend):
def filter_queryset(self, request, queryset, view):
if 'child_of' in request.GET:
try:
parent_page_id = int(request.GET['child_of'])
assert parent_page_id >= 0
except (ValueError, AssertionError):
raise BadRequestError("child_of must be a positive integer")
site_pages = pages_for_site(request.site)
try:
parent_page = site_pages.get(id=parent_page_id)
queryset = queryset.child_of(parent_page)
queryset._filtered_by_child_of = True
return queryset
except Page.DoesNotExist:
raise BadRequestError("parent page doesn't exist")
return queryset
class DescendantOfFilter(BaseFilterBackend):
def filter_queryset(self, request, queryset, view):
if 'descendant_of' in request.GET:
if getattr(queryset, '_filtered_by_child_of', False):
raise BadRequestError("filtering by descendant_of with child_of is not supported")
try:
ancestor_page_id = int(request.GET['descendant_of'])
assert ancestor_page_id >= 0
except (ValueError, AssertionError):
raise BadRequestError("descendant_of must be a positive integer")
site_pages = pages_for_site(request.site)
try:
ancestor_page = site_pages.get(id=ancestor_page_id)
return queryset.descendant_of(ancestor_page)
except Page.DoesNotExist:
raise BadRequestError("ancestor page doesn't exist")
return queryset

View file

@ -0,0 +1,45 @@
from collections import OrderedDict
from django.conf import settings
from rest_framework.pagination import BasePagination
from rest_framework.response import Response
from .utils import BadRequestError
class WagtailPagination(BasePagination):
def paginate_queryset(self, queryset, request, view=None):
limit_max = getattr(settings, 'WAGTAILAPI_LIMIT_MAX', 20)
try:
offset = int(request.GET.get('offset', 0))
assert offset >= 0
except (ValueError, AssertionError):
raise BadRequestError("offset must be a positive integer")
try:
limit = int(request.GET.get('limit', min(20, limit_max)))
if limit > limit_max:
raise BadRequestError("limit cannot be higher than %d" % limit_max)
assert limit >= 0
except (ValueError, AssertionError):
raise BadRequestError("limit must be a positive integer")
start = offset
stop = offset + limit
self.view = view
self.total_count = queryset.count()
return queryset[start:stop]
def get_paginated_response(self, data):
data = OrderedDict([
('meta', OrderedDict([
('total_count', self.total_count),
])),
(self.view.name, data),
])
return Response(data)

76
wagtail/api/v2/router.py Normal file
View file

@ -0,0 +1,76 @@
import functools
from django.conf.urls import url, include
from wagtail.utils.urlpatterns import decorate_urlpatterns
class WagtailAPIRouter(object):
"""
A class that provides routing and cross-linking for a collection
of API endpoints
"""
def __init__(self, url_namespace):
self.url_namespace = url_namespace
self._endpoints = {}
def register_endpoint(self, name, class_):
self._endpoints[name] = class_
def get_model_endpoint(self, model):
"""
Finds the endpoint in the API that represents a model
Returns a (name, endpoint_class) tuple. Or None if an
endpoint is not found.
"""
for name, class_ in self._endpoints.items():
if issubclass(model, class_.model):
return name, class_
def get_object_detail_urlpath(self, model, pk):
"""
Returns a URL path (excluding scheme and hostname) to the detail
page of an object.
Returns None if the object is not represented by any endpoints.
"""
endpoint = self.get_model_endpoint(model)
if endpoint:
endpoint_name, endpoint_class = endpoint[0], endpoint[1]
url_namespace = self.url_namespace + ':' + endpoint_name
return endpoint_class.get_object_detail_urlpath(model, pk, namespace=url_namespace)
def wrap_view(self, func):
@functools.wraps(func)
def wrapped(request, *args, **kwargs):
request.wagtailapi_router = self
return func(request, *args, **kwargs)
return wrapped
def get_urlpatterns(self):
urlpatterns = []
for name, class_ in self._endpoints.items():
pattern = url(
r'^{}/'.format(name),
include(class_.get_urlpatterns(), namespace=name)
)
urlpatterns.append(pattern)
decorate_urlpatterns(urlpatterns, self.wrap_view)
return urlpatterns
@property
def urls(self):
"""
A shortcut to allow quick registration of the API in a URLconf.
Use with Django's include() function:
url(r'api/', include(myapi.urls)),
"""
return self.get_urlpatterns(), self.url_namespace, self.url_namespace

View file

@ -0,0 +1,293 @@
from __future__ import absolute_import
from collections import OrderedDict
from modelcluster.models import get_all_child_relations
from taggit.managers import _TaggableManager
from rest_framework import serializers
from rest_framework.fields import Field
from rest_framework import relations
from wagtail.utils.compat import get_related_model
from wagtail.wagtailcore import fields as wagtailcore_fields
from .utils import get_full_url, pages_for_site
def get_object_detail_url(context, model, pk):
url_path = context['router'].get_object_detail_urlpath(model, pk)
if url_path:
return get_full_url(context['request'], url_path)
class MetaField(Field):
"""
Serializes the "meta" section of each object.
This section is used for storing non-field data such as model name, urls, etc.
Example:
"meta": {
"type": "wagtailimages.Image",
"detail_url": "http://api.example.com/v1/images/1/"
}
"""
def get_attribute(self, instance):
return instance
def to_representation(self, obj):
return OrderedDict([
('type', type(obj)._meta.app_label + '.' + type(obj).__name__),
('detail_url', get_object_detail_url(self.context, type(obj), obj.pk)),
])
class PageMetaField(MetaField):
"""
A subclass of MetaField for Page objects.
Changes the "type" field to use the name of the specific model of the page.
Example:
"meta": {
"type": "blog.BlogPage",
"detail_url": "http://api.example.com/v1/pages/1/"
}
"""
def to_representation(self, page):
return OrderedDict([
('type', page.specific_class._meta.app_label + '.' + page.specific_class.__name__),
('detail_url', get_object_detail_url(self.context, type(page), page.pk)),
])
class DocumentMetaField(MetaField):
"""
A subclass of MetaField for Document objects.
Adds a "download_url" field.
"meta": {
"type": "wagtaildocs.Document",
"detail_url": "http://api.example.com/v1/documents/1/",
"download_url": "http://api.example.com/documents/1/my_document.pdf"
}
"""
def to_representation(self, document):
data = OrderedDict([
('type', "wagtaildocs.Document"),
('detail_url', get_object_detail_url(self.context, type(document), document.pk)),
])
# Add download url
if self.context.get('show_details', False):
data['download_url'] = get_full_url(self.context['request'], document.url)
return data
class RelatedField(relations.RelatedField):
"""
Serializes related objects (eg, foreign keys).
Example:
"feed_image": {
"id": 1,
"meta": {
"type": "wagtailimages.Image",
"detail_url": "http://api.example.com/v1/images/1/"
}
}
"""
meta_field_serializer_class = MetaField
def to_representation(self, value):
meta_serializer = self.meta_field_serializer_class()
meta_serializer.bind('meta', self)
return OrderedDict([
('id', value.pk),
('meta', meta_serializer.to_representation(value)),
])
class PageParentField(RelatedField):
"""
Serializes the "parent" field on Page objects.
Pages don't have a "parent" field so some extra logic is needed to find the
parent page. That logic is implemented in this class.
The representation is the same as the RelatedField class.
"""
meta_field_serializer_class = PageMetaField
def get_attribute(self, instance):
parent = instance.get_parent()
site_pages = pages_for_site(self.context['request'].site)
if site_pages.filter(id=parent.id).exists():
return parent
class ChildRelationField(Field):
"""
Serializes child relations.
Child relations are any model that is related to a Page using a ParentalKey.
They are used for repeated fields on a page such as carousel items or related
links.
Child objects are part of the pages content so we nest them. The relation is
represented as a list of objects.
Example:
"carousel_items": [
{
"title": "First carousel item",
"image": {
"id": 1,
"meta": {
"type": "wagtailimages.Image",
"detail_url": "http://api.example.com/v1/images/1/"
}
}
},
"carousel_items": [
{
"title": "Second carousel item (no image)",
"image": null
}
]
"""
def __init__(self, *args, **kwargs):
self.child_fields = kwargs.pop('child_fields')
super(ChildRelationField, self).__init__(*args, **kwargs)
def to_representation(self, value):
serializer_class = get_serializer_class(value.model, self.child_fields)
serializer = serializer_class(context=self.context)
return [
serializer.to_representation(child_object)
for child_object in value.all()
]
class StreamField(Field):
"""
Serializes StreamField values.
Stream fields are stored in JSON format in the database. We reuse that in
the API.
Example:
"body": [
{
"type": "heading",
"value": {
"text": "Hello world!",
"size": "h1"
}
},
{
"type": "paragraph",
"value": "Some content"
}
{
"type": "image",
"value": 1
}
]
Where "heading" is a struct block containing "text" and "size" fields, and
"paragraph" is a simple text block.
Note that foreign keys are represented slightly differently in stream fields
to other parts of the API. In stream fields, a foreign key is represented
by an integer (the ID of the related object) but elsewhere in the API,
foreign objects are nested objects with id and meta as attributes.
"""
def to_representation(self, value):
return value.stream_block.get_prep_value(value)
class TagsField(Field):
"""
Serializes django-taggit TaggableManager fields.
These fields are a common way to link tags to objects in Wagtail. The API
serializes these as a list of strings taken from the name attribute of each
tag.
Example:
"tags": ["bird", "wagtail"]
"""
def to_representation(self, value):
return list(value.all().order_by('name').values_list('name', flat=True))
class BaseSerializer(serializers.ModelSerializer):
# Add StreamField to serializer_field_mapping
serializer_field_mapping = serializers.ModelSerializer.serializer_field_mapping.copy()
serializer_field_mapping.update({
wagtailcore_fields.StreamField: StreamField,
})
serializer_related_field = RelatedField
meta = MetaField()
def build_property_field(self, field_name, model_class):
# TaggableManager is not a Django field so it gets treated as a property
field = getattr(model_class, field_name)
if isinstance(field, _TaggableManager):
return TagsField, {}
return super(BaseSerializer, self).build_property_field(field_name, model_class)
class PageSerializer(BaseSerializer):
meta = PageMetaField()
parent = PageParentField(read_only=True)
def build_relational_field(self, field_name, relation_info):
# Find all relation fields that point to child class and make them use
# the ChildRelationField class.
if relation_info.to_many:
model = getattr(self.Meta, 'model')
child_relations = {
child_relation.field.rel.related_name: get_related_model(child_relation)
for child_relation in get_all_child_relations(model)
}
if field_name in child_relations and hasattr(child_relations[field_name], 'api_fields'):
return ChildRelationField, {'child_fields': child_relations[field_name].api_fields}
return super(BaseSerializer, self).build_relational_field(field_name, relation_info)
class ImageSerializer(BaseSerializer):
pass
class DocumentSerializer(BaseSerializer):
meta = DocumentMetaField()
def get_serializer_class(model_, fields_, base=BaseSerializer):
class Meta:
model = model_
fields = fields_
return type(model_.__name__ + 'Serializer', (base, ), {
'Meta': Meta
})

View file

@ -0,0 +1,54 @@
from django.core.urlresolvers import reverse
from django.db.models.signals import post_save, post_delete
from wagtail.wagtailcore.signals import page_published, page_unpublished
from wagtail.wagtailcore.models import get_page_models
from wagtail.wagtailimages.models import get_image_model
from wagtail.wagtaildocs.models import Document
from wagtail.contrib.wagtailfrontendcache.utils import purge_url_from_cache
from .utils import get_base_url
def purge_page_from_cache(instance, **kwargs):
base_url = get_base_url()
purge_url_from_cache(base_url + reverse('wagtailapi_v2:pages:detail', args=(instance.id, )))
def purge_image_from_cache(instance, **kwargs):
if not kwargs.get('created', False):
base_url = get_base_url()
purge_url_from_cache(base_url + reverse('wagtailapi_v2:images:detail', args=(instance.id, )))
def purge_document_from_cache(instance, **kwargs):
if not kwargs.get('created', False):
base_url = get_base_url()
purge_url_from_cache(base_url + reverse('wagtailapi_v2:documents:detail', args=(instance.id, )))
def register_signal_handlers():
Image = get_image_model()
for model in get_page_models():
page_published.connect(purge_page_from_cache, sender=model)
page_unpublished.connect(purge_page_from_cache, sender=model)
post_save.connect(purge_image_from_cache, sender=Image)
post_delete.connect(purge_image_from_cache, sender=Image)
post_save.connect(purge_document_from_cache, sender=Document)
post_delete.connect(purge_document_from_cache, sender=Document)
def unregister_signal_handlers():
Image = get_image_model()
for model in get_page_models():
page_published.disconnect(purge_page_from_cache, sender=model)
page_unpublished.disconnect(purge_page_from_cache, sender=model)
post_save.disconnect(purge_image_from_cache, sender=Image)
post_delete.disconnect(purge_image_from_cache, sender=Image)
post_save.disconnect(purge_document_from_cache, sender=Document)
post_delete.disconnect(purge_document_from_cache, sender=Document)

View file

View file

@ -0,0 +1,382 @@
import json
import mock
from django.test import TestCase
from django.test.utils import override_settings
from django.core.urlresolvers import reverse
from wagtail.wagtaildocs.models import Document
from wagtail.api.v2 import signal_handlers
class TestDocumentListing(TestCase):
fixtures = ['demosite.json']
def get_response(self, **params):
return self.client.get(reverse('wagtailapi_v2:documents:listing'), params)
def get_document_id_list(self, content):
return [page['id'] for page in content['documents']]
# BASIC TESTS
def test_basic(self):
response = self.get_response()
self.assertEqual(response.status_code, 200)
self.assertEqual(response['Content-type'], 'application/json')
# Will crash if the JSON is invalid
content = json.loads(response.content.decode('UTF-8'))
# Check that the meta section is there
self.assertIn('meta', content)
self.assertIsInstance(content['meta'], dict)
# Check that the total count is there and correct
self.assertIn('total_count', content['meta'])
self.assertIsInstance(content['meta']['total_count'], int)
self.assertEqual(content['meta']['total_count'], Document.objects.count())
# Check that the documents section is there
self.assertIn('documents', content)
self.assertIsInstance(content['documents'], list)
# Check that each document has a meta section with type and detail_url attributes
for document in content['documents']:
self.assertIn('meta', document)
self.assertIsInstance(document['meta'], dict)
self.assertEqual(set(document['meta'].keys()), {'type', 'detail_url'})
# Type should always be wagtaildocs.Document
self.assertEqual(document['meta']['type'], 'wagtaildocs.Document')
# Check detail_url
self.assertEqual(document['meta']['detail_url'], 'http://localhost/api/v2beta/documents/%d/' % document['id'])
# EXTRA FIELDS
def test_extra_fields_default(self):
response = self.get_response()
content = json.loads(response.content.decode('UTF-8'))
for document in content['documents']:
self.assertEqual(set(document.keys()), {'id', 'meta', 'title'})
def test_extra_fields(self):
response = self.get_response(fields='title,tags')
content = json.loads(response.content.decode('UTF-8'))
for document in content['documents']:
self.assertEqual(set(document.keys()), {'id', 'meta', 'title', 'tags'})
def test_extra_fields_tags(self):
response = self.get_response(fields='tags')
content = json.loads(response.content.decode('UTF-8'))
for document in content['documents']:
self.assertIsInstance(document['tags'], list)
def test_extra_fields_which_are_not_in_api_fields_gives_error(self):
response = self.get_response(fields='uploaded_by_user')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "unknown fields: uploaded_by_user"})
def test_extra_fields_unknown_field_gives_error(self):
response = self.get_response(fields='123,title,abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "unknown fields: 123, abc"})
# FILTERING
def test_filtering_exact_filter(self):
response = self.get_response(title='James Joyce')
content = json.loads(response.content.decode('UTF-8'))
document_id_list = self.get_document_id_list(content)
self.assertEqual(document_id_list, [2])
def test_filtering_on_id(self):
response = self.get_response(id=10)
content = json.loads(response.content.decode('UTF-8'))
document_id_list = self.get_document_id_list(content)
self.assertEqual(document_id_list, [10])
def test_filtering_tags(self):
Document.objects.get(id=3).tags.add('test')
response = self.get_response(tags='test')
content = json.loads(response.content.decode('UTF-8'))
document_id_list = self.get_document_id_list(content)
self.assertEqual(document_id_list, [3])
def test_filtering_unknown_field_gives_error(self):
response = self.get_response(not_a_field='abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "query parameter is not an operation or a recognised field: not_a_field"})
# ORDERING
def test_ordering_by_title(self):
response = self.get_response(order='title')
content = json.loads(response.content.decode('UTF-8'))
document_id_list = self.get_document_id_list(content)
self.assertEqual(document_id_list, [3, 12, 10, 2, 7, 8, 5, 4, 1, 11, 9, 6])
def test_ordering_by_title_backwards(self):
response = self.get_response(order='-title')
content = json.loads(response.content.decode('UTF-8'))
document_id_list = self.get_document_id_list(content)
self.assertEqual(document_id_list, [6, 9, 11, 1, 4, 5, 8, 7, 2, 10, 12, 3])
def test_ordering_by_random(self):
response_1 = self.get_response(order='random')
content_1 = json.loads(response_1.content.decode('UTF-8'))
document_id_list_1 = self.get_document_id_list(content_1)
response_2 = self.get_response(order='random')
content_2 = json.loads(response_2.content.decode('UTF-8'))
document_id_list_2 = self.get_document_id_list(content_2)
self.assertNotEqual(document_id_list_1, document_id_list_2)
def test_ordering_by_random_backwards_gives_error(self):
response = self.get_response(order='-random')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "cannot order by 'random' (unknown field)"})
def test_ordering_by_random_with_offset_gives_error(self):
response = self.get_response(order='random', offset=10)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "random ordering with offset is not supported"})
def test_ordering_by_unknown_field_gives_error(self):
response = self.get_response(order='not_a_field')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "cannot order by 'not_a_field' (unknown field)"})
# LIMIT
def test_limit_only_two_results_returned(self):
response = self.get_response(limit=2)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(len(content['documents']), 2)
def test_limit_total_count(self):
response = self.get_response(limit=2)
content = json.loads(response.content.decode('UTF-8'))
# The total count must not be affected by "limit"
self.assertEqual(content['meta']['total_count'], Document.objects.count())
def test_limit_not_integer_gives_error(self):
response = self.get_response(limit='abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "limit must be a positive integer"})
def test_limit_too_high_gives_error(self):
response = self.get_response(limit=1000)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "limit cannot be higher than 20"})
@override_settings(WAGTAILAPI_LIMIT_MAX=10)
def test_limit_maximum_can_be_changed(self):
response = self.get_response(limit=20)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "limit cannot be higher than 10"})
@override_settings(WAGTAILAPI_LIMIT_MAX=2)
def test_limit_default_changes_with_max(self):
# The default limit is 20. If WAGTAILAPI_LIMIT_MAX is less than that,
# the default should change accordingly.
response = self.get_response()
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(len(content['documents']), 2)
# OFFSET
def test_offset_5_usually_appears_5th_in_list(self):
response = self.get_response()
content = json.loads(response.content.decode('UTF-8'))
document_id_list = self.get_document_id_list(content)
self.assertEqual(document_id_list.index(5), 4)
def test_offset_5_moves_after_offset(self):
response = self.get_response(offset=4)
content = json.loads(response.content.decode('UTF-8'))
document_id_list = self.get_document_id_list(content)
self.assertEqual(document_id_list.index(5), 0)
def test_offset_total_count(self):
response = self.get_response(offset=10)
content = json.loads(response.content.decode('UTF-8'))
# The total count must not be affected by "offset"
self.assertEqual(content['meta']['total_count'], Document.objects.count())
def test_offset_not_integer_gives_error(self):
response = self.get_response(offset='abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "offset must be a positive integer"})
# SEARCH
def test_search_for_james_joyce(self):
response = self.get_response(search='james')
content = json.loads(response.content.decode('UTF-8'))
document_id_list = self.get_document_id_list(content)
self.assertEqual(set(document_id_list), set([2]))
def test_search_when_ordering_gives_error(self):
response = self.get_response(search='james', order='title')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "ordering with a search query is not supported"})
@override_settings(WAGTAILAPI_SEARCH_ENABLED=False)
def test_search_when_disabled_gives_error(self):
response = self.get_response(search='james')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "search is disabled"})
def test_search_when_filtering_by_tag_gives_error(self):
response = self.get_response(search='james', tags='wagtail')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "filtering by tag with a search query is not supported"})
class TestDocumentDetail(TestCase):
fixtures = ['demosite.json']
def get_response(self, image_id, **params):
return self.client.get(reverse('wagtailapi_v2:documents:detail', args=(image_id, )), params)
def test_basic(self):
response = self.get_response(1)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['Content-type'], 'application/json')
# Will crash if the JSON is invalid
content = json.loads(response.content.decode('UTF-8'))
# Check the id field
self.assertIn('id', content)
self.assertEqual(content['id'], 1)
# Check that the meta section is there
self.assertIn('meta', content)
self.assertIsInstance(content['meta'], dict)
# Check the meta type
self.assertIn('type', content['meta'])
self.assertEqual(content['meta']['type'], 'wagtaildocs.Document')
# Check the meta detail_url
self.assertIn('detail_url', content['meta'])
self.assertEqual(content['meta']['detail_url'], 'http://localhost/api/v2beta/documents/1/')
# Check the meta download_url
self.assertIn('download_url', content['meta'])
self.assertEqual(content['meta']['download_url'], 'http://localhost/documents/1/wagtail_by_markyharky.jpg')
# Check the title field
self.assertIn('title', content)
self.assertEqual(content['title'], "Wagtail by mark Harkin")
# Check the tags field
self.assertIn('tags', content)
self.assertEqual(content['tags'], [])
def test_tags(self):
Document.objects.get(id=1).tags.add('hello')
Document.objects.get(id=1).tags.add('world')
response = self.get_response(1)
content = json.loads(response.content.decode('UTF-8'))
self.assertIn('tags', content)
self.assertEqual(content['tags'], ['hello', 'world'])
@override_settings(WAGTAILAPI_BASE_URL='http://api.example.com/')
def test_download_url_with_custom_base_url(self):
response = self.get_response(1)
content = json.loads(response.content.decode('UTF-8'))
self.assertIn('download_url', content['meta'])
self.assertEqual(content['meta']['download_url'], 'http://api.example.com/documents/1/wagtail_by_markyharky.jpg')
@override_settings(
WAGTAILFRONTENDCACHE={
'varnish': {
'BACKEND': 'wagtail.contrib.wagtailfrontendcache.backends.HTTPBackend',
'LOCATION': 'http://localhost:8000',
},
},
WAGTAILAPI_BASE_URL='http://api.example.com',
)
@mock.patch('wagtail.contrib.wagtailfrontendcache.backends.HTTPBackend.purge')
class TestDocumentCacheInvalidation(TestCase):
fixtures = ['demosite.json']
@classmethod
def setUpClass(cls):
super(TestDocumentCacheInvalidation, cls).setUpClass()
signal_handlers.register_signal_handlers()
@classmethod
def tearDownClass(cls):
super(TestDocumentCacheInvalidation, cls).tearDownClass()
signal_handlers.unregister_signal_handlers()
def test_resave_document_purges(self, purge):
Document.objects.get(id=5).save()
purge.assert_any_call('http://api.example.com/api/v2beta/documents/5/')
def test_delete_document_purges(self, purge):
Document.objects.get(id=5).delete()
purge.assert_any_call('http://api.example.com/api/v2beta/documents/5/')

View file

@ -0,0 +1,379 @@
import json
import mock
from django.test import TestCase
from django.test.utils import override_settings
from django.core.urlresolvers import reverse
from wagtail.wagtailimages.models import get_image_model
from wagtail.api.v2 import signal_handlers
class TestImageListing(TestCase):
fixtures = ['demosite.json']
def get_response(self, **params):
return self.client.get(reverse('wagtailapi_v2:images:listing'), params)
def get_image_id_list(self, content):
return [page['id'] for page in content['images']]
# BASIC TESTS
def test_basic(self):
response = self.get_response()
self.assertEqual(response.status_code, 200)
self.assertEqual(response['Content-type'], 'application/json')
# Will crash if the JSON is invalid
content = json.loads(response.content.decode('UTF-8'))
# Check that the meta section is there
self.assertIn('meta', content)
self.assertIsInstance(content['meta'], dict)
# Check that the total count is there and correct
self.assertIn('total_count', content['meta'])
self.assertIsInstance(content['meta']['total_count'], int)
self.assertEqual(content['meta']['total_count'], get_image_model().objects.count())
# Check that the images section is there
self.assertIn('images', content)
self.assertIsInstance(content['images'], list)
# Check that each image has a meta section with type and detail_url attributes
for image in content['images']:
self.assertIn('meta', image)
self.assertIsInstance(image['meta'], dict)
self.assertEqual(set(image['meta'].keys()), {'type', 'detail_url'})
# Type should always be wagtailimages.Image
self.assertEqual(image['meta']['type'], 'wagtailimages.Image')
# Check detail url
self.assertEqual(image['meta']['detail_url'], 'http://localhost/api/v2beta/images/%d/' % image['id'])
# EXTRA FIELDS
def test_extra_fields_default(self):
response = self.get_response()
content = json.loads(response.content.decode('UTF-8'))
for image in content['images']:
self.assertEqual(set(image.keys()), {'id', 'meta', 'title'})
def test_extra_fields(self):
response = self.get_response(fields='title,width,height')
content = json.loads(response.content.decode('UTF-8'))
for image in content['images']:
self.assertEqual(set(image.keys()), {'id', 'meta', 'title', 'width', 'height'})
def test_extra_fields_tags(self):
response = self.get_response(fields='tags')
content = json.loads(response.content.decode('UTF-8'))
for image in content['images']:
self.assertEqual(set(image.keys()), {'id', 'meta', 'tags'})
self.assertIsInstance(image['tags'], list)
def test_extra_fields_which_are_not_in_api_fields_gives_error(self):
response = self.get_response(fields='uploaded_by_user')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "unknown fields: uploaded_by_user"})
def test_extra_fields_unknown_field_gives_error(self):
response = self.get_response(fields='123,title,abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "unknown fields: 123, abc"})
# FILTERING
def test_filtering_exact_filter(self):
response = self.get_response(title='James Joyce')
content = json.loads(response.content.decode('UTF-8'))
image_id_list = self.get_image_id_list(content)
self.assertEqual(image_id_list, [5])
def test_filtering_on_id(self):
response = self.get_response(id=10)
content = json.loads(response.content.decode('UTF-8'))
image_id_list = self.get_image_id_list(content)
self.assertEqual(image_id_list, [10])
def test_filtering_tags(self):
get_image_model().objects.get(id=6).tags.add('test')
response = self.get_response(tags='test')
content = json.loads(response.content.decode('UTF-8'))
image_id_list = self.get_image_id_list(content)
self.assertEqual(image_id_list, [6])
def test_filtering_unknown_field_gives_error(self):
response = self.get_response(not_a_field='abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "query parameter is not an operation or a recognised field: not_a_field"})
# ORDERING
def test_ordering_by_title(self):
response = self.get_response(order='title')
content = json.loads(response.content.decode('UTF-8'))
image_id_list = self.get_image_id_list(content)
self.assertEqual(image_id_list, [6, 15, 13, 5, 10, 11, 8, 7, 4, 14, 12, 9])
def test_ordering_by_title_backwards(self):
response = self.get_response(order='-title')
content = json.loads(response.content.decode('UTF-8'))
image_id_list = self.get_image_id_list(content)
self.assertEqual(image_id_list, [9, 12, 14, 4, 7, 8, 11, 10, 5, 13, 15, 6])
def test_ordering_by_random(self):
response_1 = self.get_response(order='random')
content_1 = json.loads(response_1.content.decode('UTF-8'))
image_id_list_1 = self.get_image_id_list(content_1)
response_2 = self.get_response(order='random')
content_2 = json.loads(response_2.content.decode('UTF-8'))
image_id_list_2 = self.get_image_id_list(content_2)
self.assertNotEqual(image_id_list_1, image_id_list_2)
def test_ordering_by_random_backwards_gives_error(self):
response = self.get_response(order='-random')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "cannot order by 'random' (unknown field)"})
def test_ordering_by_random_with_offset_gives_error(self):
response = self.get_response(order='random', offset=10)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "random ordering with offset is not supported"})
def test_ordering_by_unknown_field_gives_error(self):
response = self.get_response(order='not_a_field')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "cannot order by 'not_a_field' (unknown field)"})
# LIMIT
def test_limit_only_two_results_returned(self):
response = self.get_response(limit=2)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(len(content['images']), 2)
def test_limit_total_count(self):
response = self.get_response(limit=2)
content = json.loads(response.content.decode('UTF-8'))
# The total count must not be affected by "limit"
self.assertEqual(content['meta']['total_count'], get_image_model().objects.count())
def test_limit_not_integer_gives_error(self):
response = self.get_response(limit='abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "limit must be a positive integer"})
def test_limit_too_high_gives_error(self):
response = self.get_response(limit=1000)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "limit cannot be higher than 20"})
@override_settings(WAGTAILAPI_LIMIT_MAX=10)
def test_limit_maximum_can_be_changed(self):
response = self.get_response(limit=20)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "limit cannot be higher than 10"})
@override_settings(WAGTAILAPI_LIMIT_MAX=2)
def test_limit_default_changes_with_max(self):
# The default limit is 20. If WAGTAILAPI_LIMIT_MAX is less than that,
# the default should change accordingly.
response = self.get_response()
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(len(content['images']), 2)
# OFFSET
def test_offset_10_usually_appears_7th_in_list(self):
response = self.get_response()
content = json.loads(response.content.decode('UTF-8'))
image_id_list = self.get_image_id_list(content)
self.assertEqual(image_id_list.index(10), 6)
def test_offset_10_moves_after_offset(self):
response = self.get_response(offset=4)
content = json.loads(response.content.decode('UTF-8'))
image_id_list = self.get_image_id_list(content)
self.assertEqual(image_id_list.index(10), 2)
def test_offset_total_count(self):
response = self.get_response(offset=10)
content = json.loads(response.content.decode('UTF-8'))
# The total count must not be affected by "offset"
self.assertEqual(content['meta']['total_count'], get_image_model().objects.count())
def test_offset_not_integer_gives_error(self):
response = self.get_response(offset='abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "offset must be a positive integer"})
# SEARCH
def test_search_for_james_joyce(self):
response = self.get_response(search='james')
content = json.loads(response.content.decode('UTF-8'))
image_id_list = self.get_image_id_list(content)
self.assertEqual(set(image_id_list), set([5]))
def test_search_when_ordering_gives_error(self):
response = self.get_response(search='james', order='title')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "ordering with a search query is not supported"})
@override_settings(WAGTAILAPI_SEARCH_ENABLED=False)
def test_search_when_disabled_gives_error(self):
response = self.get_response(search='james')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "search is disabled"})
def test_search_when_filtering_by_tag_gives_error(self):
response = self.get_response(search='james', tags='wagtail')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "filtering by tag with a search query is not supported"})
class TestImageDetail(TestCase):
fixtures = ['demosite.json']
def get_response(self, image_id, **params):
return self.client.get(reverse('wagtailapi_v2:images:detail', args=(image_id, )), params)
def test_basic(self):
response = self.get_response(5)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['Content-type'], 'application/json')
# Will crash if the JSON is invalid
content = json.loads(response.content.decode('UTF-8'))
# Check the id field
self.assertIn('id', content)
self.assertEqual(content['id'], 5)
# Check that the meta section is there
self.assertIn('meta', content)
self.assertIsInstance(content['meta'], dict)
# Check the meta type
self.assertIn('type', content['meta'])
self.assertEqual(content['meta']['type'], 'wagtailimages.Image')
# Check the meta detail_url
self.assertIn('detail_url', content['meta'])
self.assertEqual(content['meta']['detail_url'], 'http://localhost/api/v2beta/images/5/')
# Check the title field
self.assertIn('title', content)
self.assertEqual(content['title'], "James Joyce")
# Check the width and height fields
self.assertIn('width', content)
self.assertIn('height', content)
self.assertEqual(content['width'], 500)
self.assertEqual(content['height'], 392)
# Check the tags field
self.assertIn('tags', content)
self.assertEqual(content['tags'], [])
def test_tags(self):
image = get_image_model().objects.get(id=5)
image.tags.add('hello')
image.tags.add('world')
response = self.get_response(5)
content = json.loads(response.content.decode('UTF-8'))
self.assertIn('tags', content)
self.assertEqual(content['tags'], ['hello', 'world'])
@override_settings(
WAGTAILFRONTENDCACHE={
'varnish': {
'BACKEND': 'wagtail.contrib.wagtailfrontendcache.backends.HTTPBackend',
'LOCATION': 'http://localhost:8000',
},
},
WAGTAILAPI_BASE_URL='http://api.example.com',
)
@mock.patch('wagtail.contrib.wagtailfrontendcache.backends.HTTPBackend.purge')
class TestImageCacheInvalidation(TestCase):
fixtures = ['demosite.json']
@classmethod
def setUpClass(cls):
super(TestImageCacheInvalidation, cls).setUpClass()
signal_handlers.register_signal_handlers()
@classmethod
def tearDownClass(cls):
super(TestImageCacheInvalidation, cls).tearDownClass()
signal_handlers.unregister_signal_handlers()
def test_resave_image_purges(self, purge):
get_image_model().objects.get(id=5).save()
purge.assert_any_call('http://api.example.com/api/v2beta/images/5/')
def test_delete_image_purges(self, purge):
get_image_model().objects.get(id=5).delete()
purge.assert_any_call('http://api.example.com/api/v2beta/images/5/')

View file

@ -0,0 +1,726 @@
import json
import mock
import collections
from django.test import TestCase
from django.test.utils import override_settings
from django.core.urlresolvers import reverse
from wagtail.wagtailcore.models import Page
from wagtail.api.v2 import signal_handlers
from wagtail.tests.demosite import models
from wagtail.tests.testapp.models import StreamPage
def get_total_page_count():
# Need to take away 1 as the root page is invisible over the API
return Page.objects.live().public().count() - 1
class TestPageListing(TestCase):
fixtures = ['demosite.json']
def get_response(self, **params):
return self.client.get(reverse('wagtailapi_v2:pages:listing'), params)
def get_page_id_list(self, content):
return [page['id'] for page in content['pages']]
# BASIC TESTS
def test_basic(self):
response = self.get_response()
self.assertEqual(response.status_code, 200)
self.assertEqual(response['Content-type'], 'application/json')
# Will crash if the JSON is invalid
content = json.loads(response.content.decode('UTF-8'))
# Check that the meta section is there
self.assertIn('meta', content)
self.assertIsInstance(content['meta'], dict)
# Check that the total count is there and correct
self.assertIn('total_count', content['meta'])
self.assertIsInstance(content['meta']['total_count'], int)
self.assertEqual(content['meta']['total_count'], get_total_page_count())
# Check that the pages section is there
self.assertIn('pages', content)
self.assertIsInstance(content['pages'], list)
# Check that each page has a meta section with type and detail_url attributes
for page in content['pages']:
self.assertIn('meta', page)
self.assertIsInstance(page['meta'], dict)
self.assertEqual(set(page['meta'].keys()), {'type', 'detail_url'})
def test_unpublished_pages_dont_appear_in_list(self):
total_count = get_total_page_count()
page = models.BlogEntryPage.objects.get(id=16)
page.unpublish()
response = self.get_response()
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(content['meta']['total_count'], total_count - 1)
def test_private_pages_dont_appear_in_list(self):
total_count = get_total_page_count()
page = models.BlogIndexPage.objects.get(id=5)
page.view_restrictions.create(password='test')
new_total_count = get_total_page_count()
self.assertNotEqual(total_count, new_total_count)
response = self.get_response()
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(content['meta']['total_count'], new_total_count)
# TYPE FILTER
def test_type_filter_results_are_all_blog_entries(self):
response = self.get_response(type='demosite.BlogEntryPage')
content = json.loads(response.content.decode('UTF-8'))
for page in content['pages']:
self.assertEqual(page['meta']['type'], 'demosite.BlogEntryPage')
def test_type_filter_total_count(self):
response = self.get_response(type='demosite.BlogEntryPage')
content = json.loads(response.content.decode('UTF-8'))
# Total count must be reduced as this filters the results
self.assertEqual(content['meta']['total_count'], 3)
def test_non_existant_type_gives_error(self):
response = self.get_response(type='demosite.IDontExist')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "type doesn't exist"})
def test_non_page_type_gives_error(self):
response = self.get_response(type='auth.User')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "type doesn't exist"})
# EXTRA FIELDS
def test_extra_fields_default(self):
response = self.get_response(type='demosite.BlogEntryPage')
content = json.loads(response.content.decode('UTF-8'))
for page in content['pages']:
self.assertEqual(set(page.keys()), {'id', 'meta', 'title'})
def test_extra_fields(self):
response = self.get_response(type='demosite.BlogEntryPage', fields='title,date,feed_image')
content = json.loads(response.content.decode('UTF-8'))
for page in content['pages']:
self.assertEqual(set(page.keys()), {'id', 'meta', 'title', 'date', 'feed_image'})
def test_extra_fields_child_relation(self):
response = self.get_response(type='demosite.BlogEntryPage', fields='title,related_links')
content = json.loads(response.content.decode('UTF-8'))
for page in content['pages']:
self.assertEqual(set(page.keys()), {'id', 'meta', 'title', 'related_links'})
self.assertIsInstance(page['related_links'], list)
def test_extra_fields_foreign_key(self):
response = self.get_response(type='demosite.BlogEntryPage', fields='title,date,feed_image')
content = json.loads(response.content.decode('UTF-8'))
for page in content['pages']:
feed_image = page['feed_image']
if feed_image is not None:
self.assertIsInstance(feed_image, dict)
self.assertEqual(set(feed_image.keys()), {'id', 'meta'})
self.assertIsInstance(feed_image['id'], int)
self.assertIsInstance(feed_image['meta'], dict)
self.assertEqual(set(feed_image['meta'].keys()), {'type', 'detail_url'})
self.assertEqual(feed_image['meta']['type'], 'wagtailimages.Image')
self.assertEqual(feed_image['meta']['detail_url'], 'http://localhost/api/v2beta/images/%d/' % feed_image['id'])
def test_extra_fields_tags(self):
response = self.get_response(type='demosite.BlogEntryPage', fields='tags')
content = json.loads(response.content.decode('UTF-8'))
for page in content['pages']:
self.assertEqual(set(page.keys()), {'id', 'meta', 'tags'})
self.assertIsInstance(page['tags'], list)
def test_extra_field_ordering(self):
response = self.get_response(type='demosite.BlogEntryPage', fields='date,title,feed_image,related_links')
# Will crash if the JSON is invalid
content = json.loads(response.content.decode('UTF-8'))
# Test field order
content = json.JSONDecoder(object_pairs_hook=collections.OrderedDict).decode(response.content.decode('UTF-8'))
field_order = [
'id',
'meta',
'title',
'date',
'feed_image',
'related_links',
]
self.assertEqual(list(content['pages'][0].keys()), field_order)
def test_extra_fields_without_type_gives_error(self):
response = self.get_response(fields='title,related_links')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "unknown fields: related_links"})
def test_extra_fields_which_are_not_in_api_fields_gives_error(self):
response = self.get_response(fields='path')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "unknown fields: path"})
def test_extra_fields_unknown_field_gives_error(self):
response = self.get_response(fields='123,title,abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "unknown fields: 123, abc"})
# FILTERING
def test_filtering_exact_filter(self):
response = self.get_response(title='Home page')
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [2])
def test_filtering_exact_filter_on_specific_field(self):
response = self.get_response(type='demosite.BlogEntryPage', date='2013-12-02')
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [16])
def test_filtering_on_id(self):
response = self.get_response(id=16)
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [16])
def test_filtering_doesnt_work_on_specific_fields_without_type(self):
response = self.get_response(date='2013-12-02')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "query parameter is not an operation or a recognised field: date"})
def test_filtering_tags(self):
response = self.get_response(type='demosite.BlogEntryPage', tags='wagtail')
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [16, 18])
def test_filtering_multiple_tags(self):
response = self.get_response(type='demosite.BlogEntryPage', tags='wagtail,bird')
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [16])
def test_filtering_unknown_field_gives_error(self):
response = self.get_response(not_a_field='abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "query parameter is not an operation or a recognised field: not_a_field"})
# CHILD OF FILTER
def test_child_of_filter(self):
response = self.get_response(child_of=5)
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [16, 18, 19])
def test_child_of_with_type(self):
response = self.get_response(type='demosite.EventPage', child_of=5)
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [])
def test_child_of_unknown_page_gives_error(self):
response = self.get_response(child_of=1000)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "parent page doesn't exist"})
def test_child_of_not_integer_gives_error(self):
response = self.get_response(child_of='abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "child_of must be a positive integer"})
def test_child_of_page_thats_not_in_same_site_gives_error(self):
# Root page is not in any site, so pretend it doesn't exist
response = self.get_response(child_of=1)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "parent page doesn't exist"})
# DESCENDANT OF FILTER
def test_descendant_of_filter(self):
response = self.get_response(descendant_of=6)
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [10, 15, 17, 21, 22, 23])
def test_descendant_of_with_type(self):
response = self.get_response(type='tests.EventPage', descendant_of=6)
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [])
def test_descendant_of_unknown_page_gives_error(self):
response = self.get_response(descendant_of=1000)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "ancestor page doesn't exist"})
def test_descendant_of_not_integer_gives_error(self):
response = self.get_response(descendant_of='abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "descendant_of must be a positive integer"})
def test_descendant_of_page_thats_not_in_same_site_gives_error(self):
# Root page is not in any site, so pretend it doesn't exist
response = self.get_response(descendant_of=1)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "ancestor page doesn't exist"})
def test_descendant_of_when_filtering_by_child_of_gives_error(self):
response = self.get_response(descendant_of=6, child_of=5)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "filtering by descendant_of with child_of is not supported"})
# ORDERING
def test_ordering_default(self):
response = self.get_response()
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [2, 4, 8, 9, 5, 16, 18, 19, 6, 10, 15, 17, 21, 22, 23, 20, 13, 14, 12])
def test_ordering_by_title(self):
response = self.get_response(order='title')
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [21, 22, 19, 23, 5, 16, 18, 12, 14, 8, 9, 4, 2, 13, 20, 17, 6, 10, 15])
def test_ordering_by_title_backwards(self):
response = self.get_response(order='-title')
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [15, 10, 6, 17, 20, 13, 2, 4, 9, 8, 14, 12, 18, 16, 5, 23, 19, 22, 21])
def test_ordering_by_random(self):
response_1 = self.get_response(order='random')
content_1 = json.loads(response_1.content.decode('UTF-8'))
page_id_list_1 = self.get_page_id_list(content_1)
response_2 = self.get_response(order='random')
content_2 = json.loads(response_2.content.decode('UTF-8'))
page_id_list_2 = self.get_page_id_list(content_2)
self.assertNotEqual(page_id_list_1, page_id_list_2)
def test_ordering_by_random_backwards_gives_error(self):
response = self.get_response(order='-random')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "cannot order by 'random' (unknown field)"})
def test_ordering_by_random_with_offset_gives_error(self):
response = self.get_response(order='random', offset=10)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "random ordering with offset is not supported"})
def test_ordering_default_with_type(self):
response = self.get_response(type='demosite.BlogEntryPage')
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [16, 18, 19])
def test_ordering_by_title_with_type(self):
response = self.get_response(type='demosite.BlogEntryPage', order='title')
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [19, 16, 18])
def test_ordering_by_specific_field_with_type(self):
response = self.get_response(type='demosite.BlogEntryPage', order='date')
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list, [16, 18, 19])
def test_ordering_by_unknown_field_gives_error(self):
response = self.get_response(order='not_a_field')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "cannot order by 'not_a_field' (unknown field)"})
# LIMIT
def test_limit_only_two_results_returned(self):
response = self.get_response(limit=2)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(len(content['pages']), 2)
def test_limit_total_count(self):
response = self.get_response(limit=2)
content = json.loads(response.content.decode('UTF-8'))
# The total count must not be affected by "limit"
self.assertEqual(content['meta']['total_count'], get_total_page_count())
def test_limit_not_integer_gives_error(self):
response = self.get_response(limit='abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "limit must be a positive integer"})
def test_limit_too_high_gives_error(self):
response = self.get_response(limit=1000)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "limit cannot be higher than 20"})
@override_settings(WAGTAILAPI_LIMIT_MAX=10)
def test_limit_maximum_can_be_changed(self):
response = self.get_response(limit=20)
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "limit cannot be higher than 10"})
@override_settings(WAGTAILAPI_LIMIT_MAX=2)
def test_limit_default_changes_with_max(self):
# The default limit is 20. If WAGTAILAPI_LIMIT_MAX is less than that,
# the default should change accordingly.
response = self.get_response()
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(len(content['pages']), 2)
# OFFSET
def test_offset_5_usually_appears_5th_in_list(self):
response = self.get_response()
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list.index(5), 4)
def test_offset_5_moves_after_offset(self):
response = self.get_response(offset=4)
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(page_id_list.index(5), 0)
def test_offset_total_count(self):
response = self.get_response(offset=10)
content = json.loads(response.content.decode('UTF-8'))
# The total count must not be affected by "offset"
self.assertEqual(content['meta']['total_count'], get_total_page_count())
def test_offset_not_integer_gives_error(self):
response = self.get_response(offset='abc')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "offset must be a positive integer"})
# SEARCH
def test_search_for_blog(self):
response = self.get_response(search='blog')
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
# Check that the results are the blog index and three blog pages
self.assertEqual(set(page_id_list), set([5, 16, 18, 19]))
def test_search_with_type(self):
response = self.get_response(type='demosite.BlogEntryPage', search='blog')
content = json.loads(response.content.decode('UTF-8'))
page_id_list = self.get_page_id_list(content)
self.assertEqual(set(page_id_list), set([16, 18, 19]))
def test_search_when_ordering_gives_error(self):
response = self.get_response(search='blog', order='title')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "ordering with a search query is not supported"})
@override_settings(WAGTAILAPI_SEARCH_ENABLED=False)
def test_search_when_disabled_gives_error(self):
response = self.get_response(search='blog')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "search is disabled"})
def test_search_when_filtering_by_tag_gives_error(self):
response = self.get_response(type='demosite.BlogEntryPage', search='blog', tags='wagtail')
content = json.loads(response.content.decode('UTF-8'))
self.assertEqual(response.status_code, 400)
self.assertEqual(content, {'message': "filtering by tag with a search query is not supported"})
class TestPageDetail(TestCase):
fixtures = ['demosite.json']
def get_response(self, page_id, **params):
return self.client.get(reverse('wagtailapi_v2:pages:detail', args=(page_id, )), params)
def test_basic(self):
response = self.get_response(16)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['Content-type'], 'application/json')
# Will crash if the JSON is invalid
content = json.loads(response.content.decode('UTF-8'))
# Check the id field
self.assertIn('id', content)
self.assertEqual(content['id'], 16)
# Check that the meta section is there
self.assertIn('meta', content)
self.assertIsInstance(content['meta'], dict)
# Check the meta type
self.assertIn('type', content['meta'])
self.assertEqual(content['meta']['type'], 'demosite.BlogEntryPage')
# Check the meta detail_url
self.assertIn('detail_url', content['meta'])
self.assertEqual(content['meta']['detail_url'], 'http://localhost/api/v2beta/pages/16/')
# Check the parent field
self.assertIn('parent', content)
self.assertIsInstance(content['parent'], dict)
self.assertEqual(set(content['parent'].keys()), {'id', 'meta'})
self.assertEqual(content['parent']['id'], 5)
self.assertIsInstance(content['parent']['meta'], dict)
self.assertEqual(set(content['parent']['meta'].keys()), {'type', 'detail_url'})
self.assertEqual(content['parent']['meta']['type'], 'demosite.BlogIndexPage')
self.assertEqual(content['parent']['meta']['detail_url'], 'http://localhost/api/v2beta/pages/5/')
# Check that the custom fields are included
self.assertIn('date', content)
self.assertIn('body', content)
self.assertIn('tags', content)
self.assertIn('feed_image', content)
self.assertIn('related_links', content)
self.assertIn('carousel_items', content)
# Check that the date was serialised properly
self.assertEqual(content['date'], '2013-12-02')
# Check that the tags were serialised properly
self.assertEqual(content['tags'], ['bird', 'wagtail'])
# Check that the feed image was serialised properly
self.assertIsInstance(content['feed_image'], dict)
self.assertEqual(set(content['feed_image'].keys()), {'id', 'meta'})
self.assertEqual(content['feed_image']['id'], 7)
self.assertIsInstance(content['feed_image']['meta'], dict)
self.assertEqual(set(content['feed_image']['meta'].keys()), {'type', 'detail_url'})
self.assertEqual(content['feed_image']['meta']['type'], 'wagtailimages.Image')
self.assertEqual(content['feed_image']['meta']['detail_url'], 'http://localhost/api/v2beta/images/7/')
# Check that the child relations were serialised properly
self.assertEqual(content['related_links'], [])
for carousel_item in content['carousel_items']:
self.assertEqual(set(carousel_item.keys()), {'embed_url', 'link', 'caption', 'image'})
def test_meta_parent_id_doesnt_show_root_page(self):
# Root page isn't in the site so don't show it if the user is looking at the home page
response = self.get_response(2)
content = json.loads(response.content.decode('UTF-8'))
self.assertNotIn('parent', content['meta'])
def test_field_ordering(self):
response = self.get_response(16)
# Will crash if the JSON is invalid
content = json.loads(response.content.decode('UTF-8'))
# Test field order
content = json.JSONDecoder(object_pairs_hook=collections.OrderedDict).decode(response.content.decode('UTF-8'))
field_order = [
'id',
'meta',
'parent',
'title',
'body',
'tags',
'date',
'feed_image',
'carousel_items',
'related_links',
]
self.assertEqual(list(content.keys()), field_order)
def test_null_foreign_key(self):
models.BlogEntryPage.objects.filter(id=16).update(feed_image_id=None)
response = self.get_response(16)
content = json.loads(response.content.decode('UTF-8'))
self.assertIn('related_links', content)
self.assertEqual(content['feed_image'], None)
class TestPageDetailWithStreamField(TestCase):
fixtures = ['test.json']
def setUp(self):
self.homepage = Page.objects.get(url_path='/home/')
def make_stream_page(self, body):
stream_page = StreamPage(
title='stream page',
slug='stream-page',
body=body
)
return self.homepage.add_child(instance=stream_page)
def test_can_fetch_streamfield_content(self):
stream_page = self.make_stream_page('[{"type": "text", "value": "foo"}]')
response_url = reverse('wagtailapi_v2:pages:detail', args=(stream_page.id, ))
response = self.client.get(response_url)
self.assertEqual(response.status_code, 200)
self.assertEqual(response['content-type'], 'application/json')
content = json.loads(response.content.decode('utf-8'))
self.assertIn('id', content)
self.assertEqual(content['id'], stream_page.id)
self.assertIn('body', content)
self.assertEqual(content['body'], [{'type': 'text', 'value': 'foo'}])
def test_image_block(self):
stream_page = self.make_stream_page('[{"type": "image", "value": 1}]')
response_url = reverse('wagtailapi_v2:pages:detail', args=(stream_page.id, ))
response = self.client.get(response_url)
content = json.loads(response.content.decode('utf-8'))
# ForeignKeys in a StreamField shouldn't be translated into dictionary representation
self.assertEqual(content['body'], [{'type': 'image', 'value': 1}])
@override_settings(
WAGTAILFRONTENDCACHE={
'varnish': {
'BACKEND': 'wagtail.contrib.wagtailfrontendcache.backends.HTTPBackend',
'LOCATION': 'http://localhost:8000',
},
},
WAGTAILAPI_BASE_URL='http://api.example.com',
)
@mock.patch('wagtail.contrib.wagtailfrontendcache.backends.HTTPBackend.purge')
class TestPageCacheInvalidation(TestCase):
fixtures = ['demosite.json']
@classmethod
def setUpClass(cls):
super(TestPageCacheInvalidation, cls).setUpClass()
signal_handlers.register_signal_handlers()
@classmethod
def tearDownClass(cls):
super(TestPageCacheInvalidation, cls).tearDownClass()
signal_handlers.unregister_signal_handlers()
def test_republish_page_purges(self, purge):
Page.objects.get(id=2).save_revision().publish()
purge.assert_any_call('http://api.example.com/api/v2beta/pages/2/')
def test_unpublish_page_purges(self, purge):
Page.objects.get(id=2).unpublish()
purge.assert_any_call('http://api.example.com/api/v2beta/pages/2/')
def test_delete_page_purges(self, purge):
Page.objects.get(id=16).delete()
purge.assert_any_call('http://api.example.com/api/v2beta/pages/16/')
def test_save_draft_doesnt_purge(self, purge):
Page.objects.get(id=2).save_revision()
purge.assert_not_called()

16
wagtail/api/v2/urls.py Normal file
View file

@ -0,0 +1,16 @@
from __future__ import absolute_import
from django.conf.urls import url
from .endpoints import PagesAPIEndpoint, ImagesAPIEndpoint, DocumentsAPIEndpoint
from .router import WagtailAPIRouter
v2 = WagtailAPIRouter('wagtailapi_v2')
v2.register_endpoint('pages', PagesAPIEndpoint)
v2.register_endpoint('images', ImagesAPIEndpoint)
v2.register_endpoint('documents', DocumentsAPIEndpoint)
urlpatterns = [
url(r'^v2beta/', v2.urls),
]

29
wagtail/api/v2/utils.py Normal file
View file

@ -0,0 +1,29 @@
from django.conf import settings
from django.utils.six.moves.urllib.parse import urlparse
from wagtail.wagtailcore.models import Page
class BadRequestError(Exception):
pass
def get_base_url(request=None):
base_url = getattr(settings, 'WAGTAILAPI_BASE_URL', request.site.root_url if request else None)
if base_url:
# We only want the scheme and netloc
base_url_parsed = urlparse(base_url)
return base_url_parsed.scheme + '://' + base_url_parsed.netloc
def get_full_url(request, path):
base_url = get_base_url(request) or ''
return base_url + path
def pages_for_site(site):
pages = Page.objects.public().live()
pages = pages.descendant_of(site.root_page, inclusive=True)
return pages

View file

@ -7,6 +7,7 @@ from wagtail.wagtailimages import urls as wagtailimages_urls
from wagtail.wagtailsearch import urls as wagtailsearch_urls
from wagtail.contrib.wagtailsitemaps.views import sitemap
from wagtail.contrib.wagtailapi import urls as wagtailapi_urls
from wagtail.api.v2 import urls as wagtailapi2_urls
from wagtail.tests.testapp import urls as testapp_urls
@ -18,6 +19,7 @@ urlpatterns = [
url(r'^images/', include(wagtailimages_urls)),
url(r'^api/', include(wagtailapi_urls)),
url(r'^api/', include(wagtailapi2_urls)),
url(r'^sitemap\.xml$', sitemap),
url(r'^testapp/', include(testapp_urls)),