diff --git a/CHANGES.rst b/CHANGES.rst index c07e2a6..cf0a14a 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -13,6 +13,8 @@ develop Github organisation). * Started work on documentation refresh. * Fail on start if any child threads can't start (eg: port in use). +* Rudimentary support for dynamic publications which may now refer to `this.user`. +* Correctly close DB connections during shutdown, useful for test suite. * Add missing versions and dates to the change log, and note on Semantic Versioning. * Emit `django.core.signals.request_finished` to close DB connection and diff --git a/dddp/__init__.py b/dddp/__init__.py index 834f6c7..8270b31 100644 --- a/dddp/__init__.py +++ b/dddp/__init__.py @@ -121,6 +121,9 @@ THREAD_LOCAL_FACTORIES = { 'alea_random': alea.Alea, 'random_streams': RandomStreams, 'serializer': serializer_factory, + 'user_id': lambda: None, + 'user_ddp_id': lambda: None, + 'user': lambda: None, } THREAD_LOCAL = ThreadLocal() METEOR_ID_CHARS = u'23456789ABCDEFGHJKLMNPQRSTWXYZabcdefghijkmnopqrstuvwxyz' diff --git a/dddp/api.py b/dddp/api.py index b638c06..0e77f47 100644 --- a/dddp/api.py +++ b/dddp/api.py @@ -11,7 +11,7 @@ import uuid import dbarray from django.conf import settings import django.contrib.postgres.fields -from django.db import connections, router +from django.db import connections, router, transaction from django.db.models import aggregates, Q try: # pylint: disable=E0611 @@ -538,15 +538,40 @@ class Publication(APIMixin): name = None queries = None - def get_queries(self, *params): - """DDP get_queries - must override if using params.""" - if params: - raise NotImplementedError( - 'Publication params not implemented on %r publication.' % ( - self.name, - ), - ) - return self.queries[:] + def user_queries(self, user, *params): + """Return queries for this publication as seen by `user`.""" + try: + get_queries = self.get_queries + except AttributeError: + # statically defined queries + if self.queries is None: + raise NotImplementedError( + 'Must set either queries or implement get_queries method.', + ) + if params: + raise NotImplementedError( + 'Publication params not implemented on %r publication.' % ( + self.name, + ), + ) + return self.queries[:] + + if user is False: + # no need to play with `this.user_id` or `this.user_ddp_id`. + return get_queries(*params) + + # stash the old user details + old_user_id = this.user_id + old_user_ddp_id = this.user_ddp_id + # apply the desired user details + this.user_id = None if user is None else user.pk + this.user_ddp_id = None if user is None else get_meteor_id(user) + try: + return get_queries(*params) + finally: + # restore the old user details + this.user_id = old_user_id + this.user_ddp_id = old_user_ddp_id @api_endpoint def collections(self, *params): @@ -555,7 +580,7 @@ class Publication(APIMixin): set( hasattr(qs, 'model') and model_name(qs.model) or qs[1] for qs - in self.get_queries(*params) + in self.get_queries(False, *params) ) ) @@ -611,7 +636,7 @@ class DDP(APIMixin): (col, qs) for (qs, col) in ( self.qs_and_collection(qs) for qs - in pub.get_queries(*params) + in pub.user_queries(obj.user, *params) ) ) # mergebox via MVCC! For details on how this is possible, read this: @@ -635,7 +660,7 @@ class DDP(APIMixin): pk=obj.pk, ).order_by('pk').distinct(): other_pub = self.get_pub_by_name(other.publication) - for qs in other_pub.get_queries(*other.params): + for qs in other_pub.user_queries(other.user, *other.params): qs, col = self.qs_and_collection(qs) if col not in to_send: continue @@ -652,8 +677,21 @@ class DDP(APIMixin): @api_endpoint def sub(self, id_, name, *params): """Create subscription, send matched objects that haven't been sent.""" - return self.do_sub(id_, name, False, *params) + try: + return self.do_sub(id_, name, False, *params) + except Exception as err: + this.send({ + 'msg': 'nosub', + 'id': id_, + 'error': { + 'error': 500, + 'errorType': 'Meteor.Error', + 'message': '%s' % err, + 'reason': 'Subscription failed', + }, + }) + @transaction.atomic def do_sub(self, id_, name, silent, *params): """Subscribe the current thread to the specified publication.""" try: @@ -662,6 +700,7 @@ class DDP(APIMixin): if not silent: this.send({ 'msg': 'nosub', + 'id': id_, 'error': { 'error': 404, 'errorType': 'Meteor.Error', @@ -922,10 +961,14 @@ class DDP(APIMixin): collections__model_name=model_name(model), ).prefetch_related('collections'): pub = self.get_pub_by_name(sub.publication) + try: + queries = list(pub.user_queries(sub.user, *sub.params)) + except Exception: + queries = [] for qs, col in ( self.qs_and_collection(qs) for qs - in pub.get_queries(*sub.params) + in queries ): # check if obj is an instance of the model for the queryset if qs.model is not model: