django-ddp/dddp/postgres.py
2015-04-08 16:14:22 +10:00

145 lines
5.4 KiB
Python

"""Django DDP PostgreSQL Greenlet."""
from __future__ import absolute_import
import collections
import gevent.monkey
import psycogreen.gevent
gevent.monkey.patch_all()
psycogreen.gevent.patch_psycopg()
import ejson
import gevent
import gevent.queue
import gevent.select
import psycopg2 # green
import psycopg2.extensions
from geventwebsocket.logging import create_logger
class PostgresGreenlet(gevent.Greenlet):
"""Greenlet for multiplexing database operations."""
def __init__(self, conn, debug=False):
"""Prepare async connection."""
# greenify!
super(PostgresGreenlet, self).__init__()
self.logger = create_logger(__name__, debug=debug)
# queues for processing incoming sub/unsub requests and processing
self.subs = gevent.queue.Queue()
self.unsubs = gevent.queue.Queue()
self.proc_queue = gevent.queue.Queue()
self._stop_event = gevent.event.Event()
# dict of name: subscribers
# eg: {'bookstore.book': {'tpozNWMPphaJ2n8bj': <function at ...>}}
self.all_subs = collections.defaultdict(dict)
self._sub_lock = gevent.lock.RLock()
# connect to DB in async mode
conn.allow_thread_sharing = True
self.connection = conn
self.conn_params = conn.get_connection_params()
self.conn_params.update(
async=True,
)
self.conn = psycopg2.connect(**self.conn_params)
self.poll() # wait for conneciton to start
self.cur = self.conn.cursor()
def _run(self): # pylint: disable=method-hidden
"""Spawn sub tasks, wait for stop signal."""
gevent.spawn(self.process_conn)
gevent.spawn(self.process_subs)
gevent.spawn(self.process_unsubs)
self._stop_event.wait()
def stop(self):
"""Stop subtasks and let run() finish."""
self._stop_event.set()
def subscribe(self, func, id_, names):
"""Register callback `func` to be called after NOTIFY for `names`."""
self.subs.put((func, id_, names))
def unsubscribe(self, func, id_, names):
"""Un-register callback `func` to be called after NOTIFY for `names`."""
self.unsubs.put((func, id_, names))
def process_conn(self):
"""Subtask to process NOTIFY async events from DB connection."""
while not self._stop_event.is_set():
# TODO: change timeout so self._stop_event is periodically checked?
gevent.select.select([self.conn], [], [], timeout=None)
self.poll()
def poll(self):
"""Poll DB socket and process async tasks."""
while 1:
state = self.conn.poll()
if state == psycopg2.extensions.POLL_OK:
while self.conn.notifies:
notify = self.conn.notifies.pop()
name = notify.channel
self.logger.info(
"Got NOTIFY (pid=%d, name=%r, payload=%r)",
notify.pid, name, notify.payload,
)
try:
self._sub_lock.acquire()
self.logger.info(self.all_subs)
subs = self.all_subs[name]
data = ejson.loads(notify.payload)
sub_ids = data.pop('_sub_ids')
self.logger.info('Subscribers: %r', sub_ids)
self.logger.info(subs)
for id_, func in subs.items():
if id_ not in sub_ids:
continue # not for this subscription
gevent.spawn(func, id_, name, data)
finally:
self._sub_lock.release()
break
elif state == psycopg2.extensions.POLL_WRITE:
gevent.select.select([], [self.conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
gevent.select.select([self.conn.fileno()], [], [])
else:
self.logger.warn('POLL_ERR: %s', state)
def process_subs(self):
"""Subtask to process `sub` requests from `self.subs` queue."""
while not self._stop_event.is_set():
func, id_, names = self.subs.get()
try:
self._sub_lock.acquire()
for name in names:
subs = self.all_subs[name]
if len(subs) == 0:
self.logger.debug('LISTEN "%s";', name)
self.poll()
self.cur.execute('LISTEN "%s";' % name)
self.poll()
subs[id_] = func
finally:
self._sub_lock.release()
def process_unsubs(self):
"""Subtask to process `unsub` requests from `self.unsubs` queue."""
while not self._stop_event.is_set():
func, id_, names = self.unsubs.get()
try:
self._sub_lock.acquire()
for name in names:
subs = self.all_subs[name]
subs.pop(id_, None)
if len(subs) == 0:
self.logger.info('UNLISTEN "%s";', name)
self.cur.execute('UNLISTEN "%s";' % name)
self.poll()
del self.all_subs[name]
finally:
self._sub_lock.release()
gevent.spawn(func, id_, names)