django-ddp/dddp/postgres.py

118 lines
4.2 KiB
Python

"""Django DDP PostgreSQL Greenlet."""
from __future__ import absolute_import
import ejson
import gevent
import gevent.queue
import gevent.select
import psycopg2 # green
import psycopg2.extensions
class PostgresGreenlet(gevent.Greenlet):
"""Greenlet for multiplexing database operations."""
def __init__(self, conn, debug=False):
"""Prepare async connection."""
super(PostgresGreenlet, self).__init__()
import logging
self.logger = logging.getLogger('dddp.postgres')
# queues for processing incoming sub/unsub requests and processing
self.connections = {}
self.chunks = {}
self._stop_event = gevent.event.Event()
# connect to DB in async mode
conn.allow_thread_sharing = True
self.connection = conn
self.select_greenlet = None
def _run(self): # pylint: disable=method-hidden
"""Spawn sub tasks, wait for stop signal."""
conn_params = self.connection.get_connection_params()
conn_params.update(
async=True,
)
conn = psycopg2.connect(**conn_params)
self.poll(conn) # wait for conneciton to start
cur = conn.cursor()
import logging
logging.getLogger('dddp').info('=> Started PostgresGreenlet.')
cur.execute('LISTEN "ddp";')
while not self._stop_event.is_set():
try:
self.select_greenlet = gevent.spawn(
gevent.select.select,
[conn], [], [], timeout=None,
)
self.select_greenlet.join()
except gevent.GreenletExit:
self._stop_event.set()
finally:
self.select_greenlet = None
self.poll(conn)
cur.close()
conn.close()
def stop(self):
"""Stop subtasks and let run() finish."""
self._stop_event.set()
if self.select_greenlet is not None:
self.select_greenlet.kill()
def poll(self, conn):
"""Poll DB socket and process async tasks."""
while 1:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
while conn.notifies:
notify = conn.notifies.pop()
self.logger.info(
"Got NOTIFY (pid=%d, payload=%r)",
notify.pid, notify.payload,
)
# read the header and check seq/fin.
hdr, chunk = notify.payload.split('|', 1)
# print('RECEIVE: %s' % hdr)
header = ejson.loads(hdr)
uuid = header['uuid']
size, chunks = self.chunks.setdefault(uuid, [0, {}])
if header['fin']:
size = self.chunks[uuid][0] = header['seq']
# stash the chunk
chunks[header['seq']] = chunk
if len(chunks) != size:
# haven't got all the chunks yet
continue # process next NOTIFY in loop
# got the last chunk -> process it.
data = ''.join(
chunk for _, chunk in sorted(chunks.items())
)
del self.chunks[uuid] # don't forget to cleanup!
data = ejson.loads(data)
sender = data.pop('_sender', None)
tx_id = data.pop('_tx_id', None)
for connection_id in data.pop('_connection_ids'):
try:
websocket = self.connections[connection_id]
except KeyError:
continue # connection not in this process
if connection_id == sender:
websocket.send(data, tx_id=tx_id)
else:
websocket.send(data)
break
elif state == psycopg2.extensions.POLL_WRITE:
gevent.select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
gevent.select.select([conn.fileno()], [], [])
else:
self.logger.warn('POLL_ERR: %s', state)