diff --git a/dddp/api.py b/dddp/api.py index c13e784..4864de3 100644 --- a/dddp/api.py +++ b/dddp/api.py @@ -433,7 +433,6 @@ class DDP(APIMixin): def __init__(self): """DDP API init.""" self._registry = {} - self._subs = {} self._ddp_subscribers = {} def get_collection(self, model): @@ -459,6 +458,57 @@ class DDP(APIMixin): else: raise TypeError('Invalid query spec: %r' % qs) + def sub_unique_objects(self, obj, params=None, pub=None, *args, **kwargs): + """Return objects that are only visible through given subscription.""" + if params is None: + params = ejson.loads(obj.params_ejson) + if pub is None: + pub = self._registry[pub_path(obj.publication)] + queries = collections.OrderedDict( + (col.name, (col, qs)) + for (qs, col) + in ( + self.qs_and_collection(qs) + for qs + in pub.get_queries(*params) + ) + ) + # mergebox via MVCC! For details on how this is possible, read this: + # https://devcenter.heroku.com/articles/postgresql-concurrency + to_send = collections.OrderedDict( + ( + name, + col.objects_for_user( + user=obj.user_id, + qs=qs, + *args, **kwargs + ), + ) + for name, (col, qs) + in queries.items() + ) + for other in Subscription.objects.filter( + connection=obj.connection_id, + collections__collection_name__in=queries.keys(), + ).exclude( + pk=obj.pk, + ).order_by('pk').distinct(): + other_pub = self._registry[pub_path(other.publication)] + for qs in other_pub.get_queries(*other.params): + qs, col = self.qs_and_collection(qs) + if col.name not in to_send: + continue + to_send[col.name] = to_send[col.name].exclude( + pk__in=col.objects_for_user( + user=other.user_id, + qs=qs, + *args, **kwargs + ).values('pk'), + ) + for collection_name, qs in to_send.items(): + col = self.get_col_by_name(collection_name) + yield col, qs.distinct() + @api_endpoint def sub(self, id_, name, *params): """Create subscription, send matched objects that haven't been sent.""" @@ -475,7 +525,7 @@ class DDP(APIMixin): }, }) return - obj, created = Subscription.objects.get_or_create( + sub, created = Subscription.objects.get_or_create( connection_id=this.ws.connection.pk, sub_id=id_, user_id=this.request.user.pk, @@ -488,56 +538,14 @@ class DDP(APIMixin): this.send({'msg': 'ready', 'subs': [id_]}) return # re-read from DB so we can get transaction ID (xmin) - obj = Subscription.objects.extra(**XMIN).get(pk=obj.pk) - queries = collections.OrderedDict( - (col.name, (col, qs)) - for (qs, col) - in ( - self.qs_and_collection(qs) - for qs - in pub.get_queries(*params) - ) - ) - self._subs[id_] = (this.ws, sorted(queries)) - # mergebox via MVCC! For details on how this is possible, read this: - # https://devcenter.heroku.com/articles/postgresql-concurrency - to_send = collections.OrderedDict( - ( - name, - col.objects_for_user( - user=this.request.user.pk, - qs=qs, - xmin__lte=obj.xmin, - ), - ) - for name, (col, qs) - in queries.items() - ) - for name, (col, qs) in queries.items(): - obj.collections.create( + sub = Subscription.objects.extra(**XMIN).get(pk=sub.pk) + for col, qs in self.sub_unique_objects( + sub, params, pub, xmin__lte=sub.xmin, + ): + sub.collections.create( model_name=model_name(qs.model), - collection_name=name, + collection_name=col.name, ) - for other in Subscription.objects.filter( - connection=this.ws.connection, - collections__collection_name__in=queries.keys(), - ).exclude( - pk=obj.pk, - ).order_by('pk').distinct(): - other_pub = self._registry[pub_path(other.publication)] - for qs in other_pub.get_queries(*other.params): - qs, col = self.qs_and_collection(qs) - if col not in to_send: - continue - to_send[col] = to_send[col.name].exclude( - pk__in=col.objects_for_user( - user=this.request.user.pk, - qs=qs, - xmin__lte=obj.xmin, - ).values('pk'), - ) - for collection_name, qs in to_send.items(): - col = self.get_col_by_name(collection_name) for obj in qs: payload = col.obj_change_as_msg(obj, ADDED) this.send(payload) @@ -546,11 +554,15 @@ class DDP(APIMixin): @api_endpoint def unsub(self, id_): """Remove a subscription.""" - Subscription.objects.filter( - connection=this.ws.connection, - sub_id=id_, - ).delete() - this.ws.send({'msg': 'nosub', 'id': id_}) + sub = Subscription.objects.get( + connection=this.ws.connection, sub_id=id_, + ) + for col, qs in self.sub_unique_objects(sub): + for obj in qs: + payload = col.obj_change_as_msg(obj, REMOVED) + this.send(payload) + sub.delete() + this.send({'msg': 'nosub', 'id': id_}) @api_endpoint def method(self, method, params, id_): diff --git a/dddp/websocket.py b/dddp/websocket.py index 37e7406..be4f507 100644 --- a/dddp/websocket.py +++ b/dddp/websocket.py @@ -226,7 +226,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): if self._tx_buffer: self.logger.debug( 'TX received %d, waiting for %d, have %r.', - tx_id, self._tx_next_id, sorted(self._tx_buffer), + tx_id, self._tx_next_id, self._tx_buffer, ) self._tx_buffer[tx_id] = data @@ -234,6 +234,10 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): while self._tx_next_id in self._tx_buffer: # pull next message from buffer data = self._tx_buffer.pop(self._tx_next_id) + if self._tx_buffer: + self.logger.debug('TX found %d', self._tx_next_id) + # advance next message ID + self._tx_next_id = next(self._tx_next_id_gen) if not isinstance(data, basestring): # ejson payload msg = data.get('msg', None) @@ -251,12 +255,11 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): msg = data['msg'] = ADDED ids.add(meteor_id) elif msg == REMOVED: - ids.remove(meteor_id) + try: + ids.remove(meteor_id) + except KeyError: + continue # client doesn't have this, don't send. data = 'a%s' % ejson.dumps([ejson.dumps(data)]) - if self._tx_buffer: - self.logger.debug('TX found %d', self._tx_next_id) - # advance next message ID - self._tx_next_id = next(self._tx_next_id_gen) # send message self.logger.debug('> %s %r', self, data) try: