From 59cc3f0ad12131550ad48fabeb9c72ae1c71e37f Mon Sep 17 00:00:00 2001 From: Tyson Clugg Date: Thu, 23 Apr 2015 09:50:45 +1000 Subject: [PATCH] Fix unsubscribe from publications. --- dddp/api.py | 2 +- dddp/postgres.py | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/dddp/api.py b/dddp/api.py index 182fc3e..8033556 100644 --- a/dddp/api.py +++ b/dddp/api.py @@ -473,7 +473,7 @@ class DDP(APIMixin): this.send_msg(payload) this.send_msg({'msg': 'ready', 'subs': [id_]}) - def unsub_notify(self, id_, names): + def unsub_notify(self, id_): """Dispatch DDP updates to connections.""" (ws, _) = self._subs.pop(id_, (None, [])) if ws is not None: diff --git a/dddp/postgres.py b/dddp/postgres.py index 01c31cb..50b6b8a 100644 --- a/dddp/postgres.py +++ b/dddp/postgres.py @@ -19,7 +19,6 @@ class PostgresGreenlet(gevent.Greenlet): def __init__(self, conn, debug=False): """Prepare async connection.""" - # greenify! super(PostgresGreenlet, self).__init__() self.logger = create_logger(__name__, debug=debug) @@ -60,9 +59,9 @@ class PostgresGreenlet(gevent.Greenlet): """Register callback `func` to be called after NOTIFY for `names`.""" self.subs.put((func, id_, names)) - def unsubscribe(self, func, id_, names): - """Un-register callback `func` to be called after NOTIFY for `names`.""" - self.unsubs.put((func, id_, names)) + def unsubscribe(self, func, id_): + """Un-register callback `func` to be called after NOTIFY for `id`.""" + self.unsubs.put((func, id_)) def process_conn(self): """Subtask to process NOTIFY async events from DB connection.""" @@ -125,10 +124,10 @@ class PostgresGreenlet(gevent.Greenlet): def process_unsubs(self): """Subtask to process `unsub` requests from `self.unsubs` queue.""" while not self._stop_event.is_set(): - func, id_, names = self.unsubs.get() + func, id_ = self.unsubs.get() try: self._sub_lock.acquire() - for name in names: + for name in list(self.all_subs): subs = self.all_subs[name] subs.pop(id_, None) if len(subs) == 0: @@ -138,4 +137,4 @@ class PostgresGreenlet(gevent.Greenlet): del self.all_subs[name] finally: self._sub_lock.release() - gevent.spawn(func, id_, names) + gevent.spawn(func, id_)