mirror of
https://github.com/jazzband/django-ddp.git
synced 2026-03-17 15:00:24 +00:00
106 lines
3.9 KiB
Python
106 lines
3.9 KiB
Python
"""Django DDP PostgreSQL Greenlet."""
|
|
|
|
from __future__ import absolute_import
|
|
|
|
import ejson
|
|
import gevent
|
|
import gevent.queue
|
|
import gevent.select
|
|
import psycopg2 # green
|
|
import psycopg2.extensions
|
|
from geventwebsocket.logging import create_logger
|
|
|
|
|
|
class PostgresGreenlet(gevent.Greenlet):
|
|
|
|
"""Greenlet for multiplexing database operations."""
|
|
|
|
def __init__(self, conn, debug=False):
|
|
"""Prepare async connection."""
|
|
super(PostgresGreenlet, self).__init__()
|
|
self.logger = create_logger(__name__, debug=debug)
|
|
|
|
# queues for processing incoming sub/unsub requests and processing
|
|
self.connections = {}
|
|
self.chunks = {}
|
|
self._stop_event = gevent.event.Event()
|
|
|
|
# connect to DB in async mode
|
|
conn.allow_thread_sharing = True
|
|
self.connection = conn
|
|
self.conn_params = conn.get_connection_params()
|
|
self.conn_params.update(
|
|
async=True,
|
|
)
|
|
self.conn = psycopg2.connect(**self.conn_params)
|
|
self.poll() # wait for conneciton to start
|
|
self.cur = self.conn.cursor()
|
|
|
|
def _run(self): # pylint: disable=method-hidden
|
|
"""Spawn sub tasks, wait for stop signal."""
|
|
gevent.spawn(self.process_conn)
|
|
self._stop_event.wait()
|
|
|
|
def stop(self):
|
|
"""Stop subtasks and let run() finish."""
|
|
self._stop_event.set()
|
|
|
|
def process_conn(self):
|
|
"""Subtask to process NOTIFY async events from DB connection."""
|
|
self.cur.execute('LISTEN "ddp";')
|
|
while not self._stop_event.is_set():
|
|
gevent.select.select([self.conn], [], [], timeout=None)
|
|
self.poll()
|
|
|
|
def poll(self):
|
|
"""Poll DB socket and process async tasks."""
|
|
while 1:
|
|
state = self.conn.poll()
|
|
if state == psycopg2.extensions.POLL_OK:
|
|
while self.conn.notifies:
|
|
notify = self.conn.notifies.pop()
|
|
self.logger.info(
|
|
"Got NOTIFY (pid=%d, payload=%r)",
|
|
notify.pid, notify.payload,
|
|
)
|
|
|
|
# read the header and check seq/fin.
|
|
hdr, chunk = notify.payload.split('|', 1)
|
|
# print('RECEIVE: %s' % hdr)
|
|
header = ejson.loads(hdr)
|
|
uuid = header['uuid']
|
|
size, chunks = self.chunks.setdefault(uuid, [0, {}])
|
|
if header['fin']:
|
|
size = self.chunks[uuid][0] = header['seq']
|
|
|
|
# stash the chunk
|
|
chunks[header['seq']] = chunk
|
|
|
|
if len(chunks) != size:
|
|
# haven't got all the chunks yet
|
|
continue # process next NOTIFY in loop
|
|
|
|
# got the last chunk -> process it.
|
|
data = ''.join(
|
|
chunk for _, chunk in sorted(chunks.items())
|
|
)
|
|
del self.chunks[uuid] # don't forget to cleanup!
|
|
data = ejson.loads(data)
|
|
sender = data.pop('_sender', None)
|
|
tx_id = data.pop('_tx_id', None)
|
|
for connection_id in data.pop('_connection_ids'):
|
|
try:
|
|
ws = self.connections[connection_id]
|
|
except KeyError:
|
|
continue # connection not in this process
|
|
if connection_id == sender:
|
|
ws.send(data, tx_id=tx_id)
|
|
else:
|
|
ws.send(data)
|
|
break
|
|
elif state == psycopg2.extensions.POLL_WRITE:
|
|
gevent.select.select([], [self.conn.fileno()], [])
|
|
elif state == psycopg2.extensions.POLL_READ:
|
|
gevent.select.select([self.conn.fileno()], [], [])
|
|
else:
|
|
self.logger.warn('POLL_ERR: %s', state)
|