Fix unsubscribe from publications.

This commit is contained in:
Tyson Clugg 2015-04-23 09:50:45 +10:00
parent 5768d1ebcf
commit 59cc3f0ad1
2 changed files with 7 additions and 8 deletions

View file

@ -473,7 +473,7 @@ class DDP(APIMixin):
this.send_msg(payload)
this.send_msg({'msg': 'ready', 'subs': [id_]})
def unsub_notify(self, id_, names):
def unsub_notify(self, id_):
"""Dispatch DDP updates to connections."""
(ws, _) = self._subs.pop(id_, (None, []))
if ws is not None:

View file

@ -19,7 +19,6 @@ class PostgresGreenlet(gevent.Greenlet):
def __init__(self, conn, debug=False):
"""Prepare async connection."""
# greenify!
super(PostgresGreenlet, self).__init__()
self.logger = create_logger(__name__, debug=debug)
@ -60,9 +59,9 @@ class PostgresGreenlet(gevent.Greenlet):
"""Register callback `func` to be called after NOTIFY for `names`."""
self.subs.put((func, id_, names))
def unsubscribe(self, func, id_, names):
"""Un-register callback `func` to be called after NOTIFY for `names`."""
self.unsubs.put((func, id_, names))
def unsubscribe(self, func, id_):
"""Un-register callback `func` to be called after NOTIFY for `id`."""
self.unsubs.put((func, id_))
def process_conn(self):
"""Subtask to process NOTIFY async events from DB connection."""
@ -125,10 +124,10 @@ class PostgresGreenlet(gevent.Greenlet):
def process_unsubs(self):
"""Subtask to process `unsub` requests from `self.unsubs` queue."""
while not self._stop_event.is_set():
func, id_, names = self.unsubs.get()
func, id_ = self.unsubs.get()
try:
self._sub_lock.acquire()
for name in names:
for name in list(self.all_subs):
subs = self.all_subs[name]
subs.pop(id_, None)
if len(subs) == 0:
@ -138,4 +137,4 @@ class PostgresGreenlet(gevent.Greenlet):
del self.all_subs[name]
finally:
self._sub_lock.release()
gevent.spawn(func, id_, names)
gevent.spawn(func, id_)