Check order of added/changed when emitting WebSocket frames, don't try to emit removed when client doesn't have the item.

This commit is contained in:
Tyson Clugg 2015-05-27 14:28:30 +10:00
parent fa51d89575
commit f5f127e9ba
2 changed files with 76 additions and 61 deletions

View file

@ -433,7 +433,6 @@ class DDP(APIMixin):
def __init__(self):
"""DDP API init."""
self._registry = {}
self._subs = {}
self._ddp_subscribers = {}
def get_collection(self, model):
@ -459,6 +458,57 @@ class DDP(APIMixin):
else:
raise TypeError('Invalid query spec: %r' % qs)
def sub_unique_objects(self, obj, params=None, pub=None, *args, **kwargs):
"""Return objects that are only visible through given subscription."""
if params is None:
params = ejson.loads(obj.params_ejson)
if pub is None:
pub = self._registry[pub_path(obj.publication)]
queries = collections.OrderedDict(
(col.name, (col, qs))
for (qs, col)
in (
self.qs_and_collection(qs)
for qs
in pub.get_queries(*params)
)
)
# mergebox via MVCC! For details on how this is possible, read this:
# https://devcenter.heroku.com/articles/postgresql-concurrency
to_send = collections.OrderedDict(
(
name,
col.objects_for_user(
user=obj.user_id,
qs=qs,
*args, **kwargs
),
)
for name, (col, qs)
in queries.items()
)
for other in Subscription.objects.filter(
connection=obj.connection_id,
collections__collection_name__in=queries.keys(),
).exclude(
pk=obj.pk,
).order_by('pk').distinct():
other_pub = self._registry[pub_path(other.publication)]
for qs in other_pub.get_queries(*other.params):
qs, col = self.qs_and_collection(qs)
if col.name not in to_send:
continue
to_send[col.name] = to_send[col.name].exclude(
pk__in=col.objects_for_user(
user=other.user_id,
qs=qs,
*args, **kwargs
).values('pk'),
)
for collection_name, qs in to_send.items():
col = self.get_col_by_name(collection_name)
yield col, qs.distinct()
@api_endpoint
def sub(self, id_, name, *params):
"""Create subscription, send matched objects that haven't been sent."""
@ -475,7 +525,7 @@ class DDP(APIMixin):
},
})
return
obj, created = Subscription.objects.get_or_create(
sub, created = Subscription.objects.get_or_create(
connection_id=this.ws.connection.pk,
sub_id=id_,
user_id=this.request.user.pk,
@ -488,56 +538,14 @@ class DDP(APIMixin):
this.send({'msg': 'ready', 'subs': [id_]})
return
# re-read from DB so we can get transaction ID (xmin)
obj = Subscription.objects.extra(**XMIN).get(pk=obj.pk)
queries = collections.OrderedDict(
(col.name, (col, qs))
for (qs, col)
in (
self.qs_and_collection(qs)
for qs
in pub.get_queries(*params)
)
)
self._subs[id_] = (this.ws, sorted(queries))
# mergebox via MVCC! For details on how this is possible, read this:
# https://devcenter.heroku.com/articles/postgresql-concurrency
to_send = collections.OrderedDict(
(
name,
col.objects_for_user(
user=this.request.user.pk,
qs=qs,
xmin__lte=obj.xmin,
),
)
for name, (col, qs)
in queries.items()
)
for name, (col, qs) in queries.items():
obj.collections.create(
sub = Subscription.objects.extra(**XMIN).get(pk=sub.pk)
for col, qs in self.sub_unique_objects(
sub, params, pub, xmin__lte=sub.xmin,
):
sub.collections.create(
model_name=model_name(qs.model),
collection_name=name,
collection_name=col.name,
)
for other in Subscription.objects.filter(
connection=this.ws.connection,
collections__collection_name__in=queries.keys(),
).exclude(
pk=obj.pk,
).order_by('pk').distinct():
other_pub = self._registry[pub_path(other.publication)]
for qs in other_pub.get_queries(*other.params):
qs, col = self.qs_and_collection(qs)
if col not in to_send:
continue
to_send[col] = to_send[col.name].exclude(
pk__in=col.objects_for_user(
user=this.request.user.pk,
qs=qs,
xmin__lte=obj.xmin,
).values('pk'),
)
for collection_name, qs in to_send.items():
col = self.get_col_by_name(collection_name)
for obj in qs:
payload = col.obj_change_as_msg(obj, ADDED)
this.send(payload)
@ -546,11 +554,15 @@ class DDP(APIMixin):
@api_endpoint
def unsub(self, id_):
"""Remove a subscription."""
Subscription.objects.filter(
connection=this.ws.connection,
sub_id=id_,
).delete()
this.ws.send({'msg': 'nosub', 'id': id_})
sub = Subscription.objects.get(
connection=this.ws.connection, sub_id=id_,
)
for col, qs in self.sub_unique_objects(sub):
for obj in qs:
payload = col.obj_change_as_msg(obj, REMOVED)
this.send(payload)
sub.delete()
this.send({'msg': 'nosub', 'id': id_})
@api_endpoint
def method(self, method, params, id_):

View file

@ -226,7 +226,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
if self._tx_buffer:
self.logger.debug(
'TX received %d, waiting for %d, have %r.',
tx_id, self._tx_next_id, sorted(self._tx_buffer),
tx_id, self._tx_next_id, self._tx_buffer,
)
self._tx_buffer[tx_id] = data
@ -234,6 +234,10 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
while self._tx_next_id in self._tx_buffer:
# pull next message from buffer
data = self._tx_buffer.pop(self._tx_next_id)
if self._tx_buffer:
self.logger.debug('TX found %d', self._tx_next_id)
# advance next message ID
self._tx_next_id = next(self._tx_next_id_gen)
if not isinstance(data, basestring):
# ejson payload
msg = data.get('msg', None)
@ -251,12 +255,11 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
msg = data['msg'] = ADDED
ids.add(meteor_id)
elif msg == REMOVED:
ids.remove(meteor_id)
try:
ids.remove(meteor_id)
except KeyError:
continue # client doesn't have this, don't send.
data = 'a%s' % ejson.dumps([ejson.dumps(data)])
if self._tx_buffer:
self.logger.debug('TX found %d', self._tx_next_id)
# advance next message ID
self._tx_next_id = next(self._tx_next_id_gen)
# send message
self.logger.debug('> %s %r', self, data)
try: