diff --git a/README.rst b/README.rst index 82d1543..13229f6 100644 --- a/README.rst +++ b/README.rst @@ -1,4 +1,50 @@ django-ddp ========== -Django/PostgreSQL implementation of the Meteor DDP service. +Django/PostgreSQL implementation of the Meteor DDP service, allowing Meteor to subsribe to changes on Django models. + +Example usage +------------- + +.. code:: python + + # bookstore/ddp.py + + from dddp.api import API, Collection, Publication + from bookstore import models + + class Book(Collection): + model = models.Book + + + class Author(Collection): + model = models.Author + + + class AllBooks(Publication): + queries = [ + models.Author.objects.all(), + models.Book.objects.all(), + ] + + + class BooksByAuthorEmail(Publication): + def get_queries(self, author_email): + return [ + models.Author.objects.filter( + email=author_email, + ), + models.Book.objects.filter( + author__email=author_email, + ), + ] + + + API.register( + [Book, Author, AllBooks, BooksByAuthorEmail] + ) + +.. code:: sh + + # start DDP service using default port (8000) + $ manage.py dddp diff --git a/dddp/__init__.py b/dddp/__init__.py index 0c1bc4b..5bdea45 100644 --- a/dddp/__init__.py +++ b/dddp/__init__.py @@ -1,8 +1,9 @@ """Django/PostgreSQL implementation of the Meteor DDP service.""" +from __future__ import unicode_literals import os.path from pkg_resources import get_distribution, DistributionNotFound -from django.utils.module_loading import autodiscover_modules from gevent.local import local +from django.utils.module_loading import autodiscover_modules from dddp import alea try: @@ -17,27 +18,53 @@ else: default_app_config = 'dddp.apps.DjangoDDPConfig' + +class AlreadyRegistered(Exception): + + """Raised when registering over the top of an existing registration.""" + + pass + + class ThreadLocal(local): + + """Thread local storage for greenlet state.""" + _init_done = False def __init__(self, **default_factories): + """Create new thread storage instance.""" if self._init_done: raise SystemError('__init__ called too many times') self._init_done = True self._default_factories = default_factories def __getattr__(self, name): + """Create missing attributes using default factories.""" try: factory = self._default_factories[name] except KeyError: - raise AttributeError + raise AttributeError(name) obj = factory() setattr(self, name, obj) return obj def get(self, name, factory, *factory_args, **factory_kwargs): + """Get attribute, creating if required using specified factory.""" if not hasattr(self, name): return setattr(self, name, factory(*factory_args, **factory_kwargs)) return getattr(self, name) + THREAD_LOCAL = ThreadLocal(alea_random=alea.Alea) +METEOR_ID_CHARS = u'23456789ABCDEFGHJKLMNPQRSTWXYZabcdefghijkmnopqrstuvwxyz' + + +def meteor_random_id(): + return THREAD_LOCAL.alea_random.random_string(17, METEOR_ID_CHARS) + + +def autodiscover(): + from dddp.api import API + autodiscover_modules('ddp', register_to=API) + return API diff --git a/dddp/admin.py b/dddp/admin.py new file mode 100644 index 0000000..96c098a --- /dev/null +++ b/dddp/admin.py @@ -0,0 +1,107 @@ +from django.contrib import admin +from django.core.urlresolvers import reverse, NoReverseMatch +from django.utils.html import format_html +from dddp import models + + +def object_admin_link(obj): + kwargs = { + 'format_string': '{app_label}.{model} {object_id}: {object}', + 'app_label': obj.content_type.app_label, + 'model': obj.content_type.model, + 'object_id': obj.object_id, + 'object': obj.content_type.model_class().objects.get(pk=obj.object_id), + } + try: + kwargs.update( + url=reverse( + 'admin:%s_%s_change' % ( + obj.content_type.app_label, + obj.content_type.model, + ), + args=(obj.object_id,), + ), + format_string='%s' % kwargs['format_string'], + ) + except NoReverseMatch: + pass + return format_html(**kwargs) +object_admin_link.short_description = 'Object' +object_admin_link.allow_tags = True + + +class ObjectMapping(admin.ModelAdmin): + search_fields = [ + 'meteor_id', + ] + list_display = [ + 'meteor_id', + object_admin_link, + 'content_type', + 'object_id', + ] + list_filter = [ + 'content_type__app_label', + 'content_type', + ] + list_select_related = [ + 'content_type', + ] + + +class SubscriptionCollectionInline(admin.TabularInline): + model = models.SubscriptionCollection + fields = [ + 'collection_class', + ] + readonly_fields = [ + 'name', + 'collection_class', + ] + max_num = 0 + + +class Subscription(admin.ModelAdmin): + search_fields = [ + 'sub_id', + 'publication', + 'publication_class', + 'params_ejson', + ] + list_display = [ + 'sub_id', + 'connection', + 'user', + 'publication', + 'params_ejson', + ] + list_filter = [ + 'user__email', + 'publication', + ] + inlines = [ + SubscriptionCollectionInline, + ] + + +class SubscriptionCollection(admin.ModelAdmin): + search_fields = [ + 'name', + 'collection_class', + ] + list_display = [ + '__str__', + 'subscription', + 'name', + 'collection_class', + ] + list_filter = [ + 'name', + ] + + +for name, attr in vars(models).items(): + if hasattr(attr, '_meta'): + model_admin = locals().get(name, None) + if model_admin is not False: + admin.site.register(attr, model_admin) diff --git a/dddp/api.py b/dddp/api.py new file mode 100644 index 0000000..ce8bd49 --- /dev/null +++ b/dddp/api.py @@ -0,0 +1,540 @@ +"""Django DDP API, Collections, Cursors and Publications.""" +from __future__ import absolute_import +import collections +import traceback +import dbarray +from django.conf import settings +from django.db import connection, transaction +from django.db.models import aggregates, Q +from django.db.models.sql import aggregates as sql_aggregates +from django.utils.encoding import force_text +from dddp import AlreadyRegistered, THREAD_LOCAL as this +from dddp.models import Subscription +from dddp.msg import obj_change_as_msg +import ejson + + +XMIN = {'select': {'xmin': "'xmin'"}} + + +class Sql(object): + + """Extensions to django.db.models.sql.aggregates module.""" + + class Array(sql_aggregates.Aggregate): + + """Array SQL aggregate extension.""" + + lookup_name = 'array' + sql_function = 'array_agg' + +sql_aggregates.Array = Sql.Array + + +class Array(aggregates.Aggregate): + + """Array aggregate function.""" + + func = 'ARRAY' + name = 'Array' + + def add_to_query(self, query, alias, col, source, is_summary): + """Override source field internal type so the raw array is returned.""" + class ArrayField(dbarray.ArrayFieldBase, source.__class__): + + """ArrayField for override.""" + + __metaclass__ = dbarray.ArrayFieldMetaclass + + @staticmethod + def get_internal_type(): + """Return ficticious type so Django doesn't cast as int.""" + return 'ArrayType' + + new_source = ArrayField() + super(Array, self).add_to_query( + query, alias, col, new_source, is_summary, + ) + + +def api_endpoint(path_or_func): + """Decorator to mark a method as an API endpoint for later registration.""" + if callable(path_or_func): + path_or_func.api_path = path_or_func.__name__ + return path_or_func + else: + def _api_endpoint(func): + """Decorator inner.""" + func.api_path = path_or_func + return func + return _api_endpoint + + +def api_endpoints(obj): + """Iterator over all API endpoint names and callbacks.""" + for name in dir(obj): + attr = getattr(obj, name) + api_path = getattr(attr, 'api_path', None) + if api_path: + yield ( + '%s%s' % (obj.api_path_prefix, api_path), + attr, + ) + for api_provider in obj.api_providers: + for api_path, attr in api_endpoints(api_provider): + yield (api_path, attr) + + +class APIMeta(type): + + """DDP API metaclass.""" + + def __new__(mcs, name, bases, attrs): + """Create a new APIMixin class.""" + attrs['name'] = attrs.pop('name', None) or name + name_format = attrs.get('name_format', None) + if name_format: + attrs['name'] = name_format.format(**attrs) + api_path_prefix_format = attrs.get('api_path_prefix_format', None) + if api_path_prefix_format is not None: + attrs['api_path_prefix'] = api_path_prefix_format.format(**attrs) + return super(APIMeta, mcs).__new__(mcs, name, bases, attrs) + + +class APIMixin(object): + + """Mixin to support finding API endpoints for class instances.""" + + api_path_prefix_format = None + + api_providers = [] + api_path_prefix = '/' + _api_path_cache = None + + def api_path_map(self): + """Cached dict of api_path: func.""" + if self._api_path_cache is None: + self._api_path_cache = { + api_path: func + for api_path, func + in api_endpoints(self) + } + return self._api_path_cache + + def clear_api_path_map_cache(self): + """Clear out cache for api_path_map.""" + self._api_path_cache = None + for api_provider in self.api_providers: + if api_provider.clear_api_path_map_cache.im_self is not None: + api_provider.clear_api_path_map_cache() + + def api_endpoint(self, api_path): + """Return API endpoint for given api_path.""" + return self.api_path_map()[api_path] + + +def collection_name(model): + """Return collection name given model class.""" + # Django supports model._meta -> pylint: disable=W0212 + return force_text(model._meta) + + +COLLECTION_PATH_FORMAT = '/{name}/' + + +class CollectionMeta(APIMeta): + + """DDP Collection metaclass.""" + + def __new__(mcs, name, bases, attrs): + """Create a new Collection class.""" + attrs.update( + api_path_prefix_format=COLLECTION_PATH_FORMAT, + ) + model = attrs.get('model', None) + if model is not None: + attrs.update( + name=collection_name(model), + ) + return super(CollectionMeta, mcs).__new__(mcs, name, bases, attrs) + + +class Collection(APIMixin): + + """DDP Model Collection.""" + + __metaclass__ = CollectionMeta + + name = None + model = None + qs_filter = None + order_by = None + user_rel = None + + def get_queryset(self, base_qs=None): + """Return a filtered, ordered queryset for this collection.""" + qs = self.model.objects.all() if base_qs is None else base_qs + # enforce ordering so later use of distinct() works as expected. + if not qs.query.order_by: + if self.order_by is None: + qs = qs.order_by('pk') + else: + qs = qs.order_by(*self.order_by) + if self.qs_filter: + qs = qs.filter(self.qs_filter) + return qs + + queryset = property(get_queryset) + + def objects_for_user(self, user, qs=None, xmin__lte=None): + """Find objects in queryset related to specified user.""" + qs = self.get_queryset(qs) + user_rels = self.user_rel + if user_rels: + if isinstance(user_rels, basestring): + user_rels = [user_rels] + user_filter = None + for user_rel in user_rels: + filter_obj = Q(**{user_rel: user}) + if user_filter is None: + user_filter = filter_obj + else: + user_filter |= filter_obj + qs = qs.filter(user_filter).distinct() + if xmin__lte is not None: + qs = qs.extra( + where=["'xmin' <= %s"], + params=[xmin__lte], + ) + return qs + + def user_ids_for_object(self, obj, base_qs=None): + """Find user IDs related to object/pk in queryset.""" + qs = base_qs or self.queryset + if self.user_rel: + user_rels = self.user_rel + if isinstance(user_rels, basestring): + user_rels = [user_rels] + user_rel_map = { + '_user_rel_%d' % index: Array(user_rel) + for index, user_rel + in enumerate(user_rels) + } + user_ids = set() + for rel_user_ids in qs.filter( + pk=hasattr(obj, 'pk') and obj.pk or obj, + ).annotate(**user_rel_map).values_list(*user_rel_map.keys()).get(): + user_ids.update(rel_user_ids) + return sorted(user_ids.difference([None])) + else: + return None + + def field_schema(self): + """Generate schema for consumption by clients.""" + type_map = { + 'AutoField': 'String', + 'BooleanField': 'Boolean', + 'CharField': 'String', + 'DateTimeField': 'Date', + 'DecimalField': 'Number', + 'FloatField': 'Number', + 'ForeignKey': 'String', + 'PositiveIntegerField': 'Number', + 'TextField': 'String', + } + db_type_map = { + 'serial': 'Number', + 'text': 'String', + 'boolean': 'Boolean', + 'integer': 'Number', + } + # Django supports model._meta -> pylint: disable=W0212 + meta = self.model._meta + for field in meta.local_fields: + int_type = field.get_internal_type() + schema = { + 'type': ( + type_map.get(int_type, None) + ) or ( + db_type_map.get(field.db_type(connection), 'String') + ) + } + + rel = getattr(field, 'rel', None) + if rel: + schema['type'] = 'String' + schema['relation'] = { + 'name': field.name, + 'collection': collection_name(rel.to), + } + + choices = getattr(field, 'choices', None) + if choices: + schema['allowedValues'] = [val for val, _ in choices] + + blank = getattr(field, 'blank', None) + if blank: + schema['optional'] = True + + formfield = field.formfield() + if formfield: + schema['label'] = force_text(formfield.label) + + max_length = getattr(field, 'max_length', None) + if max_length is not None: + schema['max'] = max_length + + if int_type == 'PositiveIntegerField': + schema['min'] = 0 + if int_type in ('DecimalField', 'FloatField'): + schema['decimal'] = True + yield field.column, schema + for field in meta.local_many_to_many: + yield '%s_ids' % field.column, { + 'type': '[String]', + 'relation': { + 'name': field.name, + 'collection': collection_name(field.rel.to), + }, + } + + @api_endpoint + def schema(self): + """Return a representation of the schema for this collection.""" + return { + name: schema + for name, schema + in self.field_schema() + } + + +class PublicationMeta(APIMeta): + + """DDP Publication metaclass.""" + + def __new__(mcs, name, bases, attrs): + """Create a new Publication class.""" + attrs.update( + api_path_prefix_format='publications/{name}/', + ) + return super(PublicationMeta, mcs).__new__(mcs, name, bases, attrs) + + +class Publication(APIMixin): + + """DDP Publication (a set of queries).""" + + __metaclass__ = PublicationMeta + + name = None + queries = None + + def get_queries(self, *params): + """DDP get_queries - must override if using params.""" + if params: + raise NotImplementedError( + 'Publication params not implemented on %r publication.' % ( + self.name, + ), + ) + return self.queries[:] or [] + + @api_endpoint + def collections(self, *params): + """Return list of collections for this publication.""" + return sorted( + set( + collection_name(qs.model) + for qs + in self.get_queries(*params) + ) + ) + + +def pub_path(publication_name): + """Return api_path for a publication.""" + return Publication.api_path_prefix_format.format(name=publication_name) + + +class DDP(APIMixin): + + """Django DDP API.""" + + __metaclass__ = APIMeta + + pgworker = None + + def __init__(self): + """DDP API init.""" + self._registry = {} + self._subs = {} + + def get_collection(self, model): + """Return collection instance for given model.""" + name = collection_name(model) + path = COLLECTION_PATH_FORMAT.format(name=name) + return self._registry[path] + + @property + def api_providers(self): + """Return an iterable of API providers.""" + return self._registry.values() + + def sub_notify(self, id_, names, data): + """Dispatch DDP updates to connections.""" + ws, _ = self._subs[id_] + ws.send_msg(data) + + @api_endpoint + @transaction.atomic + def sub(self, id_, name, *params): + """Create subscription, send matched objects that haven't been sent.""" + try: + pub = self._registry[pub_path(name)] + except KeyError: + this.error('Invalid publication name: %r' % name) + return + obj, created = Subscription.objects.get_or_create( + connection=this.ws.connection, + sub_id=id_, + user=this.request.user, + defaults={ + 'publication': pub.name, + 'publication_class': '%s.%s' % ( + pub.__class__.__module__, + pub.__class__.__name__, + ), + 'params_ejson': ejson.dumps(params), + }, + ) + if not created: + this.send_msg({'msg': 'ready', 'subs': [id_]}) + return + # re-read from DB so we can get transaction ID (xmin) + obj = Subscription.objects.extra(**XMIN).get(pk=obj.pk) + queries = { + collection_name(collection.model): (collection, qs) + for (qs, collection) + in ( + (qs, self.get_collection(qs.model)) + for qs + in pub.get_queries(*params) + ) + } + self._subs[id_] = (this.ws, sorted(queries)) + self.pgworker.subscribe(self.sub_notify, id_, sorted(queries)) + # mergebox via MVCC! For details on how this is possible, read this: + # https://devcenter.heroku.com/articles/postgresql-concurrency + to_send = collections.OrderedDict( + ( + name, + collection.objects_for_user( + user=this.request.user.pk, + qs=qs, + xmin__lte=obj.xmin, + ), + ) + for name, (collection, qs) + in queries.items() + ) + for name, (collection, qs) in queries.items(): + obj.collections.create( + name=name, + collection_class='%s.%s' % ( + collection.__class__.__module__, + collection.__class__.__name__, + ), + ) + for other in Subscription.objects.filter( + connection=this.ws.connection, + collections__name__in=queries.keys(), + ).exclude( + pk=obj.pk, + ).order_by('pk').distinct(): + other_pub = self._registry[pub_path(other.publication)] + for qs in other_pub.get_queries(*other.params): + collection = self.get_collection(qs.model) + if collection.name not in to_send: + continue + to_send[collection.name] = to_send[collection.name].exclude( + pk__in=collection.objects_for_user( + user=this.request.user.pk, + qs=qs, + xmin__lte=obj.xmin, + ).values('pk'), + ) + for qs in to_send.values(): + for obj in qs: + name, payload = obj_change_as_msg(obj, 'added') + this.send_msg(payload) + this.send_msg({'msg': 'ready', 'subs': [id_]}) + + def unsub_notify(self, id_, names): + """Dispatch DDP updates to connections.""" + (ws, _) = self._subs.pop(id_, (None, [])) + if ws is not None: + Subscription.objects.filter( + connection=ws.connection, + sub_id=id_, + ).delete() + ws.send_msg({'msg': 'nosub', 'id': id_}) + + @api_endpoint + def unsub(self, id_): + """Remove a subscription.""" + self.pgworker.unsubscribe(self.unsub_notify, id_) + + @api_endpoint + def method(self, method, params, id_): + """Invoke a method.""" + try: + handler = self.api_path_map()[method] + except KeyError: + this.error('Unknown method: %s' % method) + return + try: + result = handler(*params) + this.send_msg({'msg': 'result', 'id': id_, 'result': result}) + except Exception, err: # log error+stack trace -> pylint: disable=W0703 + details = traceback.format_exc() + this.ws.logger.error(err, exc_info=True) + msg = { + 'msg': 'result', + 'id': id_, + 'error': { + 'error': 500, + 'reason': str(err), + }, + } + if settings.DEBUG: + msg['error']['details'] = details + this.send_msg(msg) + + def register(self, api_or_iterable): + """Register an API endpoint.""" + if hasattr(api_or_iterable, 'api_path_prefix'): + api_or_iterable = [api_or_iterable] + for api in api_or_iterable: + api = api() + if api.api_path_prefix in self._registry: + raise AlreadyRegistered( + 'API with prefix %r is already registered to %r' % ( + api.api_path_prefix, + self._registry[api.api_path_prefix], + ), + ) + self._registry[api.api_path_prefix] = api + self.clear_api_path_map_cache() + + @api_endpoint + def schema(self): + """Return schema for all registered collections.""" + res = {} + for api_provider in self.api_providers: + if issubclass(api_provider, Collection): + collection = api_provider() + res[collection_name(collection.model)] = collection.schema() + return res + + +API = DDP() diff --git a/dddp/apps.py b/dddp/apps.py index 2d3b9e7..daf6424 100644 --- a/dddp/apps.py +++ b/dddp/apps.py @@ -5,13 +5,21 @@ from __future__ import print_function from django.apps import AppConfig from django.core import serializers from django.conf import settings, ImproperlyConfigured -from django.db.models import signals +from django.db import DatabaseError +from django.db.models import signals, Field +from dddp.api import collection_name from dddp.notify import send_notify +from dddp.models import Connection + + +IN_MIGRATION = False def on_save(sender, **kwargs): """Post-save signal handler.""" + global IN_MIGRATION + if IN_MIGRATION: return send_notify( model=sender, obj=kwargs['instance'], @@ -22,6 +30,8 @@ def on_save(sender, **kwargs): def on_delete(sender, **kwargs): """Post-delete signal handler.""" + global IN_MIGRATION + if IN_MIGRATION: return send_notify( model=sender, obj=kwargs['instance'], @@ -32,6 +42,8 @@ def on_delete(sender, **kwargs): def on_m2m_changed(sender, **kwargs): """M2M-changed signal handler.""" + global IN_MIGRATION + if IN_MIGRATION: return # See https://docs.djangoproject.com/en/1.7/ref/signals/#m2m-changed if kwargs['action'] in ( 'post_add', @@ -54,16 +66,31 @@ def on_m2m_changed(sender, **kwargs): ) +def on_pre_migrate(sender, **kwargs): + global IN_MIGRATION + IN_MIGRATION = True + + +def on_post_migrate(sender, **kwargs): + """Post-migrate signal handler.""" + global IN_MIGRATION + IN_MIGRATION = False + try: + Connection.objects.all().delete() + except DatabaseError: + pass + + class DjangoDDPConfig(AppConfig): """Django app config for django-ddp.""" name = 'dddp' verbose_name = 'Django DDP' - serializer = serializers.get_serializer('python')() def ready(self): - """Initialisation for django-ddp (setup signal handlers).""" + """Initialisation for django-ddp (setup lookups and signal handlers).""" + serializers.register_serializer('ddp', 'dddp.serializer') if not settings.DATABASES: raise ImproperlyConfigured('No databases configured.') for (alias, conf) in settings.DATABASES.items(): @@ -76,3 +103,4 @@ class DjangoDDPConfig(AppConfig): signals.post_save.connect(on_save) signals.post_delete.connect(on_delete) signals.m2m_changed.connect(on_m2m_changed) + signals.post_migrate.connect(on_post_migrate) diff --git a/dddp/management/commands/dddp.py b/dddp/management/commands/dddp.py index c283b92..f04e601 100644 --- a/dddp/management/commands/dddp.py +++ b/dddp/management/commands/dddp.py @@ -3,6 +3,7 @@ from __future__ import print_function, absolute_import import collections +import inspect import optparse import random import signal @@ -19,6 +20,7 @@ import gevent.select import geventwebsocket import psycogreen.gevent +from dddp import autodiscover from dddp.postgres import PostgresGreenlet from dddp.websocket import DDPWebSocketApplication @@ -142,6 +144,17 @@ class Command(BaseCommand): gevent.signal(signal.SIGINT, killall) gevent.signal(signal.SIGQUIT, killall) + print('=> Discovering DDP endpoints...') + ddp = autodiscover() + ddp.pgworker = postgres + print( + '\n'.join( + ' %s' % api_path + for api_path + in sorted(ddp.api_path_map()) + ), + ) + # start greenlets postgres.start() print('=> Started PostgresGreenlet.') diff --git a/dddp/migrations/0002_auto_20150408_0321.py b/dddp/migrations/0002_auto_20150408_0321.py new file mode 100644 index 0000000..62c4959 --- /dev/null +++ b/dddp/migrations/0002_auto_20150408_0321.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations +from django.conf import settings +import dddp +import dddp.models +from dddp.migrations import TruncateOperation + + +class Migration(migrations.Migration): + + dependencies = [ + ('sessions', '0001_initial'), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ('dddp', '0001_initial'), + ] + + operations = [ + TruncateOperation(forwards=['subscription']), + migrations.CreateModel( + name='Connection', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('connection_id', dddp.models.AleaIdField(default=dddp.meteor_random_id, max_length=17)), + ('remote_addr', models.CharField(max_length=255)), + ('version', models.CharField(max_length=255)), + ('session', models.ForeignKey(to='sessions.Session')), + ], + options={ + }, + bases=(models.Model, object), + ), + migrations.CreateModel( + name='SubscriptionCollection', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('name', models.CharField(max_length=255)), + ('collection_class', models.CharField(max_length=255)), + ('subscription', models.ForeignKey(related_name='collections', to='dddp.Subscription')), + ], + options={ + }, + bases=(models.Model,), + ), + migrations.AlterUniqueTogether( + name='connection', + unique_together=set([('connection_id', 'session')]), + ), + migrations.AddField( + model_name='subscription', + name='connection', + field=models.ForeignKey(to='dddp.Connection'), + preserve_default=False, + ), + migrations.AddField( + model_name='subscription', + name='publication_class', + field=models.CharField(max_length=255), + preserve_default=False, + ), + migrations.AddField( + model_name='subscription', + name='sub_id', + field=models.CharField(max_length=17), + preserve_default=False, + ), + migrations.AddField( + model_name='subscription', + name='user', + field=models.ForeignKey(to=settings.AUTH_USER_MODEL), + preserve_default=False, + ), + migrations.AlterUniqueTogether( + name='subscription', + unique_together=set([('connection', 'sub_id')]), + ), + migrations.RemoveField( + model_name='subscription', + name='session', + ), + TruncateOperation(backwards=['subscription']) + ] diff --git a/dddp/migrations/__init__.py b/dddp/migrations/__init__.py index e69de29..29844c7 100644 --- a/dddp/migrations/__init__.py +++ b/dddp/migrations/__init__.py @@ -0,0 +1,37 @@ +from django.db.migrations.operations.base import Operation + + +class TruncateOperation(Operation): + + """Truncate (delete all rows) from the models specified.""" + + def __init__(self, forwards=None, backwards=None): + """Accept model names which are to be migrated.""" + self.truncate_forwards = forwards or [] + self.truncate_backwards = backwards or [] + + def truncate(self, app_label, schema_editor, models): + """Truncate tables.""" + for model_name in models: + model = '%s_%s' % (app_label, model_name) + schema_editor.execute( + 'TRUNCATE TABLE %s RESTART IDENTITY CASCADE' % ( + model.lower(), + ), + ) + + def state_forwards(self, app_label, state): + """Mutate state to match schema changes.""" + pass # Truncate doesn't change schema. + + def database_forwards(self, app_label, schema_editor, from_state, to_state): + """Use schema_editor to apply any forward changes.""" + self.truncate(app_label, schema_editor, self.truncate_forwards) + + def database_backwards(self, app_label, schema_editor, from_state, to_state): + """Use schema_editor to apply any reverse changes.""" + self.truncate(app_label, schema_editor, self.truncate_backwards) + + def describe(self): + """Describe what the operation does in console output.""" + return "Truncate tables" diff --git a/dddp/models.py b/dddp/models.py index 8ccca40..46b4e4f 100644 --- a/dddp/models.py +++ b/dddp/models.py @@ -1,13 +1,34 @@ """Django DDP models.""" -from django.db import models -from django.contrib.contenttypes.fields import GenericForeignKey +from django.db import models, transaction +from django.conf import settings from django.contrib.contenttypes.models import ContentType -from django.utils.module_loading import import_string +from django.utils.encoding import python_2_unicode_compatible import ejson -from dddp import THREAD_LOCAL +from dddp import meteor_random_id -METEOR_ID_CHARS = '23456789ABCDEFGHJKLMNPQRSTWXYZabcdefghijkmnopqrstuvwxyz' + +@transaction.atomic +def get_meteor_id(obj): + """Return an Alea ID for the given object.""" + # Django model._meta is now public API -> pylint: disable=W0212 + content_type = ContentType.objects.get_for_model(obj._meta.model) + mapping, _ = ObjectMapping.objects.get_or_create( + content_type=content_type, + object_id=obj.pk, + ) + return mapping.meteor_id + + +@transaction.atomic +def get_object_id(model, meteor_id): + """Return an object ID for the given meteor_id.""" + # Django model._meta is now public API -> pylint: disable=W0212 + content_type = ContentType.objects.get_for_model(model) + return ObjectMapping.objects.filter( + content_type=content_type, + meteor_id=meteor_id, + ).values_list('object_id', flat=True).get() class AleaIdField(models.CharField): @@ -16,21 +37,14 @@ class AleaIdField(models.CharField): def __init__(self, *args, **kwargs): """Assume max_length of 17 to match Meteor implementation.""" - kwargs.setdefault('max_length', 17) + kwargs.update( + default=meteor_random_id, + max_length=17, + ) super(AleaIdField, self).__init__(*args, **kwargs) - def pre_save(self, model_instance, add): - """Generate value if not set during INSERT.""" - if add and not getattr(model_instance, self.attname): - value = THREAD_LOCAL.alea_random.random_string( - self.max_length, METEOR_ID_CHARS, - ) - setattr(model_instance, self.attname, value) - return value - else: - return super(AleaIdField, self).pre_save(self, model_instance, add) - +@python_2_unicode_compatible class ObjectMapping(models.Model): """Mapping from regular Django model primary keys to Meteor object IDs.""" @@ -38,7 +52,13 @@ class ObjectMapping(models.Model): meteor_id = AleaIdField() content_type = models.ForeignKey(ContentType, db_index=True) object_id = models.PositiveIntegerField() - content_object = GenericForeignKey('content_type', 'object_id') + # content_object = GenericForeignKey('content_type', 'object_id') + + def __str__(self): + """Text representation of a mapping.""" + return '%s: %s[%s]' % ( + self.meteor_id, self.content_type, self.object_id, + ) class Meta(object): @@ -53,23 +73,63 @@ class ObjectMapping(models.Model): ] -class SubscriptionManager(models.Manager): - def get_queryset(self): - return super(SubscriptionManager, self).get_queryset().extra( - select={'xmin': 'xmin', 'xmax': 'xmax'}, +@python_2_unicode_compatible +class Connection(models.Model, object): + + """Django DDP connection instance.""" + + session = models.ForeignKey('sessions.Session') + connection_id = AleaIdField() + remote_addr = models.CharField(max_length=255) + version = models.CharField(max_length=255) + + class Meta(object): + + """Connection model meta.""" + + unique_together = [ + ['connection_id', 'session'], + ] + + def __str__(self): + """Text representation of subscription.""" + return u'%s/\u200b%s/\u200b%s' % ( + self.session_id, + self.connection_id, + self.remote_addr, ) -class Subscription(models.Model): +@python_2_unicode_compatible +class Subscription(models.Model, object): """Session subscription to a publication with params.""" _publication_cache = {} - session = models.ForeignKey('sessions.Session') + connection = models.ForeignKey(Connection) + sub_id = models.CharField(max_length=17) + user = models.ForeignKey(settings.AUTH_USER_MODEL) publication = models.CharField(max_length=255) + publication_class = models.CharField(max_length=255) params_ejson = models.TextField(default='{}') - objects = SubscriptionManager() + class Meta(object): + + """Subscription model meta.""" + + unique_together = [ + ['connection', 'sub_id'], + ] + + def __str__(self): + """Text representation of subscription.""" + return u'%s/\u200b%s/\u200b%s: %s%s' % ( + self.user, + self.connection_id, + self.sub_id, + self.publication, + self.params_ejson, + ) def get_params(self): """Get params dict.""" @@ -81,19 +141,17 @@ class Subscription(models.Model): params = property(get_params, set_params) - def get_publication_class(self): - """Get publication class (cached).""" - try: - return Subscription._publication_cache[self.publication] - except KeyError: - pub_cls = import_string(self.publication) - Subscription._publication_cache[self.publication] = pub_cls - return pub_cls - def get_publication(self): - """Get publication instance (with params).""" - return self.get_publication_class()(self.params) +@python_2_unicode_compatible +class SubscriptionCollection(models.Model): - def get_queryset(self): - pub = self.get_publication() - return pub.get_queryset() + """Collections for a subscription.""" + + subscription = models.ForeignKey(Subscription, related_name='collections') + name = models.CharField(max_length=255) + collection_class = models.CharField(max_length=255) + + def __str__(self): + return '%s' % ( + self.name, + ) diff --git a/dddp/msg.py b/dddp/msg.py index 11e7b16..3b942d6 100644 --- a/dddp/msg.py +++ b/dddp/msg.py @@ -1,13 +1,15 @@ """Django DDP utils for DDP messaging.""" +import collections +from django.core.serializers import get_serializer -from django.core import serializers - -SERIALIZER = serializers.get_serializer('python')() - +_SERIALIZER = None def obj_change_as_msg(obj, msg): """Generate a DDP msg for obj with specified msg type.""" - data = SERIALIZER.serialize([obj])[0] + global _SERIALIZER + if _SERIALIZER is None: + _SERIALIZER = get_serializer('ddp')() + data = _SERIALIZER.serialize([obj])[0] name = data['model'] # cast ID as string diff --git a/dddp/notify.py b/dddp/notify.py index 11d3cf8..e979a7d 100644 --- a/dddp/notify.py +++ b/dddp/notify.py @@ -1,14 +1,58 @@ """Django DDP notification support.""" +from __future__ import absolute_import + +import collections import ejson from django.db import connections +from django.utils.module_loading import import_string +from dddp.api import collection_name +from dddp.models import Subscription from dddp.msg import obj_change_as_msg +class ImportCache(collections.defaultdict): + @staticmethod + def __missing__(key): + return import_string(key) + +_CLS_CACHE = ImportCache() + + def send_notify(model, obj, msg, using): """Dispatch PostgreSQL async NOTIFY.""" + col_name = collection_name(model) + if col_name == 'migrations.migration': + return # never send migration models. + if col_name.startswith('dddp.'): + return # don't send DDP internal models. + sub_ids = set() + for sub in Subscription.objects.filter( + collections__name=col_name, + ): + pub = _CLS_CACHE[sub.publication_class]() + pub_queries = { + collection_name(qs.model): qs + for qs + in pub.get_queries(*sub.params) + if qs.model is model + } + for sub_col in sub.collections.filter( + name=col_name, + ): + qs = pub_queries[sub_col.name] + col = _CLS_CACHE[sub_col.collection_class]() + # filter qs using user_rel paths on collection + qs = col.get_queryset(qs) + if qs.filter(pk=obj.pk).exists(): + sub_ids.add(sub.sub_id) + + if not sub_ids: + return # no subscribers for this object, nothing more to do. + name, payload = obj_change_as_msg(obj, msg) + payload['_sub_ids'] = sorted(sub_ids) cursor = connections[using].cursor() cursor.execute( 'NOTIFY "%s", %%s' % name, diff --git a/dddp/postgres.py b/dddp/postgres.py index 1309e69..b87dd1d 100644 --- a/dddp/postgres.py +++ b/dddp/postgres.py @@ -1,6 +1,6 @@ """Django DDP PostgreSQL Greenlet.""" -from __future__ import print_function, absolute_import +from __future__ import absolute_import import collections import gevent.monkey @@ -13,9 +13,8 @@ import gevent import gevent.queue import gevent.select import psycopg2 # green -from geventwebsocket.logging import create_logger -import psycopg2 import psycopg2.extensions +from geventwebsocket.logging import create_logger class PostgresGreenlet(gevent.Greenlet): @@ -35,6 +34,7 @@ class PostgresGreenlet(gevent.Greenlet): self._stop_event = gevent.event.Event() # dict of name: subscribers + # eg: {'bookstore.book': {'tpozNWMPphaJ2n8bj': }} self.all_subs = collections.defaultdict(dict) self._sub_lock = gevent.lock.RLock() @@ -60,13 +60,13 @@ class PostgresGreenlet(gevent.Greenlet): """Stop subtasks and let run() finish.""" self._stop_event.set() - def subscribe(self, func, obj, id_, name, params): - """Register callback `func` to be called after NOTIFY for `name`.""" - self.subs.put((func, obj, id_, name, params)) + def subscribe(self, func, id_, names): + """Register callback `func` to be called after NOTIFY for `names`.""" + self.subs.put((func, id_, names)) - def unsubscribe(self, func, obj, id_): - """Un-register callback `func` to be called after NOTIFY for `name`.""" - self.unsubs.put((func, obj, id_)) + def unsubscribe(self, func, id_, names): + """Un-register callback `func` to be called after NOTIFY for `names`.""" + self.unsubs.put((func, id_, names)) def process_conn(self): """Subtask to process NOTIFY async events from DB connection.""" @@ -83,13 +83,22 @@ class PostgresGreenlet(gevent.Greenlet): while self.conn.notifies: notify = self.conn.notifies.pop() name = notify.channel - self.logger.info("Got NOTIFY (pid=%d, name=%r, payload=%r)", notify.pid, name, notify.payload) + self.logger.info( + "Got NOTIFY (pid=%d, name=%r, payload=%r)", + notify.pid, name, notify.payload, + ) try: self._sub_lock.acquire() + self.logger.info(self.all_subs) subs = self.all_subs[name] data = ejson.loads(notify.payload) - for (_, id_), (func, params) in subs.items(): - gevent.spawn(func, id_, name, params, data) + sub_ids = data.pop('_sub_ids') + self.logger.info('Subscribers: %r', sub_ids) + self.logger.info(subs) + for id_, func in subs.items(): + if id_ not in sub_ids: + continue # not for this subscription + gevent.spawn(func, id_, name, data) finally: self._sub_lock.release() break @@ -98,32 +107,34 @@ class PostgresGreenlet(gevent.Greenlet): elif state == psycopg2.extensions.POLL_READ: gevent.select.select([self.conn.fileno()], [], []) else: - self.logger.warn('POLL_ERR: %s' % state) + self.logger.warn('POLL_ERR: %s', state) def process_subs(self): """Subtask to process `sub` requests from `self.subs` queue.""" while not self._stop_event.is_set(): - func, obj, id_, name, params = self.subs.get() + func, id_, names = self.subs.get() try: self._sub_lock.acquire() - subs = self.all_subs[name] - if len(subs) == 0: - self.logger.debug('LISTEN "%s";', name) - self.poll() - self.cur.execute('LISTEN "%s";' % name) - self.poll() - subs[(obj, id_)] = (func, params) + for name in names: + subs = self.all_subs[name] + if len(subs) == 0: + self.logger.debug('LISTEN "%s";', name) + self.poll() + self.cur.execute('LISTEN "%s";' % name) + self.poll() + subs[id_] = func finally: self._sub_lock.release() def process_unsubs(self): """Subtask to process `unsub` requests from `self.unsubs` queue.""" while not self._stop_event.is_set(): - func, obj, id_ = self.unsubs.get() + func, id_, names = self.unsubs.get() try: self._sub_lock.acquire() - for name, subs in self.all_subs.items(): - subs.pop((obj, id_), None) + for name in names: + subs = self.all_subs[name] + subs.pop(id_, None) if len(subs) == 0: self.logger.info('UNLISTEN "%s";', name) self.cur.execute('UNLISTEN "%s";' % name) @@ -131,4 +142,4 @@ class PostgresGreenlet(gevent.Greenlet): del self.all_subs[name] finally: self._sub_lock.release() - gevent.spawn(func, id_) + gevent.spawn(func, id_, names) diff --git a/dddp/publisher.py b/dddp/publisher.py deleted file mode 100644 index dedead0..0000000 --- a/dddp/publisher.py +++ /dev/null @@ -1,35 +0,0 @@ -"""Django DDP publisher.""" - -class Publisher(object): - - """ - Django DDP publisher class. - - >>> all_books = Publisher('all_books', Book.objects.all()) - >>> my_books = Publisher( - ... 'my_books', - ... Book.objects.all(), - ... lambda request, qs: qs.filter(author=req.user) - ... ) - >>> books_by_author_email = Publisher( - ... 'books_by_author_email', - ... Book.objects.all(), - ... lambda request, qs, email: qs.filter(author__email=email) - ... ) - """ - - registry = {} - - def __init__(self, name, qs, func=None, register=True): - self.name = name - self.qs = qs - self.model = qs.query.model - self.func = func - if register: - self.register() - - def __contains__(self, (collection, pk)): - pass - - def register(self): - self.registry[self.name or self.model] = self diff --git a/dddp/serializer.py b/dddp/serializer.py new file mode 100644 index 0000000..0f4146b --- /dev/null +++ b/dddp/serializer.py @@ -0,0 +1,173 @@ +""" +A Python "serializer". Doesn't do much serializing per se -- just converts to +and from basic Python data types (lists, dicts, strings, etc.). Useful as a basis for +other serializers. +""" +from __future__ import unicode_literals + +from django.apps import apps +from django.conf import settings +from django.core.serializers import base +from django.db import DEFAULT_DB_ALIAS, models +from django.utils import six +from django.utils.encoding import force_text, is_protected_type +from dddp.models import get_meteor_id + + +class Serializer(base.Serializer): + """ + Serializes a QuerySet to basic Python objects. + """ + + internal_use_only = True + + def start_serialization(self): + self._current = None + self.objects = [] + + def end_serialization(self): + pass + + def start_object(self, obj): + self._current = {} + + def end_object(self, obj): + self.objects.append(self.get_dump_object(obj)) + self._current = None + + def get_dump_object(self, obj): + data = { + "model": force_text(obj._meta), + "fields": self._current, + } + if not self.use_natural_primary_keys or not hasattr(obj, 'natural_key'): + data["pk"] = get_meteor_id(obj) + + return data + + def handle_field(self, obj, field): + value = field._get_val_from_obj(obj) + # Protected types (i.e., primitives like None, numbers, dates, + # and Decimals) are passed through as is. All other values are + # converted to string first. + if is_protected_type(value): + self._current[field.column] = value + else: + self._current[field.column] = field.value_to_string(obj) + + def handle_fk_field(self, obj, field): + if self.use_natural_foreign_keys and hasattr(field.rel.to, 'natural_key'): + related = getattr(obj, field.name) + if related: + value = related.natural_key() + else: + value = None + else: + value = getattr(obj, field.get_attname()) + value = get_meteor_id(obj) + self._current[field.column] = value + + def handle_m2m_field(self, obj, field): + if field.rel.through._meta.auto_created: + if self.use_natural_foreign_keys and hasattr(field.rel.to, 'natural_key'): + m2m_value = lambda value: value.natural_key() + else: + m2m_value = lambda value: get_meteor_id(value) + self._current['%s_ids' % field.name] = [m2m_value(related) + for related in getattr(obj, field.name).iterator()] + + def getvalue(self): + return self.objects + + +def Deserializer(object_list, **options): + """ + Deserialize simple Python objects back into Django ORM instances. + + It's expected that you pass the Python objects themselves (instead of a + stream or a string) to the constructor + """ + db = options.pop('using', DEFAULT_DB_ALIAS) + ignore = options.pop('ignorenonexistent', False) + + for d in object_list: + # Look up the model and starting build a dict of data for it. + try: + Model = _get_model(d["model"]) + except base.DeserializationError: + if ignore: + continue + else: + raise + data = {} + if 'pk' in d: + data[Model._meta.pk.attname] = Model._meta.pk.to_python(d.get("pk", None)) + m2m_data = {} + field_names = {f.name for f in Model._meta.get_fields()} + field_name_map = { + f.column: f.name + for f in Model._meta.get_fields() + } + + # Handle each field + for (field_column, field_value) in six.iteritems(d["fields"]): + field_name = field_name_map.get(field_column, None) + + if ignore and field_name not in field_names: + # skip fields no longer on model + continue + + if isinstance(field_value, str): + field_value = force_text( + field_value, options.get("encoding", settings.DEFAULT_CHARSET), strings_only=True + ) + + field = Model._meta.get_field(field_name) + + # Handle M2M relations + if field.rel and isinstance(field.rel, models.ManyToManyRel): + if hasattr(field.rel.to._default_manager, 'get_by_natural_key'): + def m2m_convert(value): + if hasattr(value, '__iter__') and not isinstance(value, six.text_type): + return field.rel.to._default_manager.db_manager(db).get_by_natural_key(*value).pk + else: + return force_text(field.rel.to._meta.pk.to_python(value), strings_only=True) + else: + m2m_convert = lambda v: force_text(field.rel.to._meta.pk.to_python(v), strings_only=True) + m2m_data[field.name] = [m2m_convert(pk) for pk in field_value] + + # Handle FK fields + elif field.rel and isinstance(field.rel, models.ManyToOneRel): + if field_value is not None: + if hasattr(field.rel.to._default_manager, 'get_by_natural_key'): + if hasattr(field_value, '__iter__') and not isinstance(field_value, six.text_type): + obj = field.rel.to._default_manager.db_manager(db).get_by_natural_key(*field_value) + value = getattr(obj, field.rel.field_name) + # If this is a natural foreign key to an object that + # has a FK/O2O as the foreign key, use the FK value + if field.rel.to._meta.pk.rel: + value = value.pk + else: + value = field.rel.to._meta.get_field(field.rel.field_name).to_python(field_value) + data[field.attname] = value + else: + data[field.attname] = field.rel.to._meta.get_field(field.rel.field_name).to_python(field_value) + else: + data[field.attname] = None + + # Handle all other fields + else: + data[field.name] = field.to_python(field_value) + + obj = base.build_instance(Model, data, db) + yield base.DeserializedObject(obj, m2m_data) + + +def _get_model(model_identifier): + """ + Helper to look up a model from an "app_label.model_name" string. + """ + try: + return apps.get_model(model_identifier) + except (LookupError, TypeError): + raise base.DeserializationError("Invalid model identifier: '%s'" % model_identifier) diff --git a/dddp/site.py b/dddp/site.py new file mode 100644 index 0000000..1b9037f --- /dev/null +++ b/dddp/site.py @@ -0,0 +1,41 @@ +"""Django DDP sites.""" +from dddp import AlreadyRegistered +from dddp.api import PubSubAPI + + +class Site(object): + + """Django DDP site class.""" + + def __init__(self): + """Django DDP site init.""" + self._registry = {} + + def register(self, endpoint_or_iterable): + """Register an API endpoint.""" + if not hasattr(endpoint_or_iterable, 'api_path'): + endpoint_or_iterable = [endpoint_or_iterable] + for endpoint in endpoint_or_iterable: + if endpoint.api_path in self._registry: + raise AlreadyRegistered( + 'API endpoint with path %r already registerd to %r' % ( + endpoint.api_path, + self._registry, + ), + ) + self._registry[endpoint.api_path] = endpoint + + def unregister(self, endpoint_or_path_or_iterable): + """Un-register an API endpoint.""" + if not hasattr(endpoint_or_iterable, 'api_path'): + endpoint_or_iterable = [endpoint_or_iterable] + for endpoint in endpoint_or_iterable: + if isinstance(endpoint, basestring): + del self._registry[endpoint.api_path] = endpoint + else: + del self._registry[endpoint.api_path] = endpoint + +site = Site() + +publications = PubSubAPI() +site.register(publications.api_endpoints) diff --git a/dddp/websocket.py b/dddp/websocket.py index ac5b226..ea00796 100644 --- a/dddp/websocket.py +++ b/dddp/websocket.py @@ -1,18 +1,18 @@ """Django DDP WebSocket service.""" -from __future__ import print_function, absolute_import +from __future__ import absolute_import import inspect import traceback -import uuid import ejson import geventwebsocket from django.core.handlers.base import BaseHandler from django.core.handlers.wsgi import WSGIRequest -from django.db.models.loading import get_model +from django.db import transaction -from dddp.msg import obj_change_as_msg +from dddp import THREAD_LOCAL as this, alea +from dddp.api import API class MeteorError(Exception): @@ -44,7 +44,7 @@ def validate_kwargs(func, kwargs, func_name=None): if key_adj in args: kwargs[key_adj] = kwargs.pop(key) - required = args[:-len(argspec.defaults)] + required = args[:-len(argspec.defaults or [])] supplied = sorted(kwargs) missing = [ trans.get(arg, arg) for arg in required @@ -85,7 +85,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): remote_addr = None version = None support = None - session = None + connection = None subs = None request = None base_handler = BaseHandler() @@ -102,12 +102,20 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): response = middleware_method(self.request) if response: raise ValueError(response) + this.ws = self + this.request = self.request + this.send = self.send + this.send_msg = self.send_msg + this.reply = self.reply + this.error = self.error + this.session_key = this.request.session.session_key - self.remote_addr = '{0[REMOTE_ADDR]}:{0[REMOTE_PORT]}'.format( - self.ws.environ, - ) + this.remote_addr = self.remote_addr = \ + '{0[REMOTE_ADDR]}:{0[REMOTE_PORT]}'.format( + self.ws.environ, + ) self.subs = {} - self.logger.info('+ %s OPEN %s', self, self.request.user) + self.logger.info('+ %s OPEN %s', self, this.request.user) self.send('o') self.send('a["{\\"server_id\\":\\"0\\"}"]') @@ -117,8 +125,12 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): def on_close(self, reason): """Handle closing of websocket connection.""" + if self.connection is not None: + self.connection.delete() + self.connection = None self.logger.info('- %s %s', self, reason or 'CLOSE') + @transaction.atomic def on_message(self, message): """Process a message received from remote.""" if self.ws.closed: @@ -149,10 +161,13 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): try: msg = data.pop('msg') except KeyError: - raise MeteorError(400, 'Missing `msg` parameter', raw) + raise MeteorError( + 400, 'Bad request', None, {'offendingMessage': data} + ) # dispatch message self.dispatch(msg, data) except MeteorError, err: + traceback.print_exc() self.error(err) except Exception, err: traceback.print_exc() @@ -165,7 +180,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): def dispatch(self, msg, kwargs): """Dispatch msg to appropriate recv_foo handler.""" # enforce calling 'connect' first - if not self.session and msg != 'connect': + if self.connection is None and msg != 'connect': raise MeteorError(400, 'Session not establised - try `connect`.') # lookup method handler @@ -180,8 +195,9 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): # dispatch to handler try: handler(**kwargs) - except Exception, err: - raise MeteorError(500, 'Internal server error', err) + except Exception, err: # print stack trace --> pylint: disable=W0703 + traceback.print_exc() + self.error(MeteorError(500, 'Internal server error', err)) def send(self, data): """Send raw `data` to WebSocket client.""" @@ -191,45 +207,64 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): except geventwebsocket.WebSocketError: self.ws.close() + def send_msg(self, payload): + """Send EJSON payload to remote.""" + data = ejson.dumps([ejson.dumps(payload)]) + self.send('a%s' % data) + def reply(self, msg, **kwargs): """Send EJSON reply to remote.""" kwargs['msg'] = msg - data = ejson.dumps([ejson.dumps(kwargs)]) - self.send('a%s' % data) + self.send_msg(kwargs) - def error(self, err, reason=None, detail=None): + def error(self, err, reason=None, detail=None, **kwargs): """Send EJSON error to remote.""" if isinstance(err, MeteorError): - (err, reason, detail) = (err.args[:] + (None, None, None))[:3] + ( + err, reason, detail, kwargs, + ) = ( + err.args[:] + (None, None, None, None) + )[:4] + elif isinstance(err, Exception): + reason = str(err) data = { 'error': '%s' % (err or ''), } if reason: + if reason is Exception: + reason = str(reason) data['reason'] = reason if detail: + if isinstance(detail, Exception): + detail = str(detail) data['detail'] = detail + if kwargs: + data.update(kwargs) self.logger.error('! %s %r', self, data) self.reply('error', **data) def recv_connect(self, version, support, session=None): """DDP connect handler.""" - if self.session: + if self.connection is not None: self.error( 'Session already established.', reason='Current session in detail.', - detail=self.session, + detail=self.connection.connection_id, ) elif version not in self.versions: self.reply('failed', version=self.versions[0]) elif version not in support: self.error('Client version/support mismatch.') else: - if not session: - session = uuid.uuid4().hex - self.version = version - self.support = support - self.session = session - self.reply('connected', session=self.session) + from dddp.models import Connection + this.version = version + this.support = support + self.connection = Connection.objects.create( + session_id=this.session_key, + remote_addr=self.remote_addr, + version=version, + ) + self.reply('connected', session=self.connection.connection_id) def recv_ping(self, id_=None): """DDP ping handler.""" @@ -242,33 +277,17 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): """Send added/changed/removed msg due to receiving NOTIFY.""" self.reply(**data) - def recv_sub(self, id_, name, params=None): + def recv_sub(self, id_, name, params): """DDP sub handler.""" - self.pgworker.subscribe(self.sub_notify, self, id_, name, params) - - model = get_model(name) - for obj in model.objects.all(): - _, payload = obj_change_as_msg(obj, 'added') - self.sub_notify(id_, name, params, payload) - - self.reply('ready', subs=[id_]) - - def sub_unnotify(self, id_): - """Send added/changed/removed msg due to receiving NOTIFY.""" - pass # TODO: find out if we're meant to send anything to the client + API.sub(id_, name, *params) def recv_unsub(self, id_): """DDP unsub handler.""" - self.pgworker.unsubscribe(self.sub_unnotify, self, id_) + API.unsub(id_) def recv_method(self, method, params, id_, randomSeed=None): """DDP method handler.""" - try: - func = self.methods[method] - except KeyError: - self.reply('result', id=id_, error=u'Unknown method: %s' % method) - else: - try: - self.reply('result', id=id_, result=func(**params)) - except Exception, err: # pylint: disable=W0703 - self.reply('result', id=id_, error='%s' % err) + if randomSeed is not None: + this.alea_random = alea.Alea(randomSeed) + API.method(method, params, id_) + self.reply('updated', methods=[id_]) diff --git a/setup.py b/setup.py index e37f9e4..d52d52b 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ from setuptools import setup, find_packages setup( name='django-ddp', - version='0.1.1', + version='0.2.0', description=__doc__, long_description=open('README.rst').read(), author='Tyson Clugg', @@ -20,6 +20,7 @@ setup( 'gevent-websocket>=0.9', 'meteor-ejson>=1.0', 'psycogreen>=1.0', + 'django-dbarray>=0.2', ], classifiers=[ "Programming Language :: Python :: 2", diff --git a/test_project/django_todos/ddp.py b/test_project/django_todos/ddp.py new file mode 100644 index 0000000..e5976e5 --- /dev/null +++ b/test_project/django_todos/ddp.py @@ -0,0 +1,17 @@ +from dddp.api import API, Collection, Publication +from django_todos import models + + +class Task(Collection): + model = models.Task + + +class Tasks(Publication): + queries = [ + models.Task.objects.all(), + ] + +API.register([ + Task, + Tasks, +]) diff --git a/test_project/meteor_todos/.meteor/.finished-upgraders b/test_project/meteor_todos/.meteor/.finished-upgraders index 68df3d8..8a76103 100644 --- a/test_project/meteor_todos/.meteor/.finished-upgraders +++ b/test_project/meteor_todos/.meteor/.finished-upgraders @@ -5,3 +5,4 @@ notices-for-0.9.0 notices-for-0.9.1 0.9.4-platform-file +notices-for-facebook-graph-api-2 diff --git a/test_project/meteor_todos/.meteor/release b/test_project/meteor_todos/.meteor/release index 74a74cb..dab6b55 100644 --- a/test_project/meteor_todos/.meteor/release +++ b/test_project/meteor_todos/.meteor/release @@ -1 +1 @@ -METEOR@1.0.3.2 +METEOR@1.1.0.2 diff --git a/test_project/meteor_todos/.meteor/versions b/test_project/meteor_todos/.meteor/versions index ba4125b..410e1d9 100644 --- a/test_project/meteor_todos/.meteor/versions +++ b/test_project/meteor_todos/.meteor/versions @@ -1,50 +1,48 @@ -application-configuration@1.0.4 -autopublish@1.0.2 -autoupdate@1.1.5 -base64@1.0.2 -binary-heap@1.0.2 -blaze@2.0.4 -blaze-tools@1.0.2 -boilerplate-generator@1.0.2 -callback-hook@1.0.2 -check@1.0.4 -ddp@1.0.14 -deps@1.0.6 -ejson@1.0.5 -fastclick@1.0.2 -follower-livedata@1.0.3 -geojson-utils@1.0.2 -html-tools@1.0.3 -htmljs@1.0.3 -http@1.0.10 -id-map@1.0.2 -insecure@1.0.2 -jquery@1.11.3 -json@1.0.2 -launch-screen@1.0.1 -livedata@1.0.12 -logging@1.0.6 -meteor@1.1.4 -meteor-platform@1.2.1 -minifiers@1.1.3 -minimongo@1.0.6 -mobile-status-bar@1.0.2 -mongo@1.0.11 -observe-sequence@1.0.4 -ordered-dict@1.0.2 -random@1.0.2 -reactive-dict@1.0.5 -reactive-var@1.0.4 -reload@1.1.2 -retry@1.0.2 -routepolicy@1.0.4 -session@1.0.5 -spacebars@1.0.5 -spacebars-compiler@1.0.4 -templating@1.0.11 -tracker@1.0.5 -ui@1.0.5 -underscore@1.0.2 -url@1.0.3 -webapp@1.1.6 -webapp-hashing@1.0.2 +autopublish@1.0.3 +autoupdate@1.2.1 +base64@1.0.3 +binary-heap@1.0.3 +blaze@2.1.2 +blaze-tools@1.0.3 +boilerplate-generator@1.0.3 +callback-hook@1.0.3 +check@1.0.5 +ddp@1.1.0 +deps@1.0.7 +ejson@1.0.6 +fastclick@1.0.3 +geojson-utils@1.0.3 +html-tools@1.0.4 +htmljs@1.0.4 +http@1.1.0 +id-map@1.0.3 +insecure@1.0.3 +jquery@1.11.3_2 +json@1.0.3 +launch-screen@1.0.2 +livedata@1.0.13 +logging@1.0.7 +meteor@1.1.6 +meteor-platform@1.2.2 +minifiers@1.1.5 +minimongo@1.0.8 +mobile-status-bar@1.0.3 +mongo@1.1.0 +observe-sequence@1.0.6 +ordered-dict@1.0.3 +random@1.0.3 +reactive-dict@1.1.0 +reactive-var@1.0.5 +reload@1.1.3 +retry@1.0.3 +routepolicy@1.0.5 +session@1.1.0 +spacebars@1.0.6 +spacebars-compiler@1.0.6 +templating@1.1.1 +tracker@1.0.7 +ui@1.0.6 +underscore@1.0.3 +url@1.0.4 +webapp@1.2.0 +webapp-hashing@1.0.3 diff --git a/test_project/meteor_todos/meteor_todos.js b/test_project/meteor_todos/meteor_todos.js index fc8ed69..7e44016 100644 --- a/test_project/meteor_todos/meteor_todos.js +++ b/test_project/meteor_todos/meteor_todos.js @@ -1,8 +1,8 @@ if (Meteor.isClient) { // This code only runs on the client - Django = DDP.connect('http://localhost:8000/'); + Django = DDP.connect('http://'+window.location.hostname+':8000/'); Tasks = new Mongo.Collection("django_todos.task", {"connection": Django}); - Django.subscribe('django_todos.task'); + Django.subscribe('Tasks'); Template.body.helpers({ tasks: function () { return Tasks.find({});