From 7083c5c92f0c8bfd4f00d93e9dc4cc03456dba85 Mon Sep 17 00:00:00 2001 From: Tyson Clugg Date: Mon, 10 Aug 2015 18:47:04 +1000 Subject: [PATCH] Chunked payload for NOTIFY/LISTEN to get around 8KB limit on payload. --- dddp/api.py | 32 ++++++++++++++++++++++++++------ dddp/postgres.py | 25 ++++++++++++++++++++++++- 2 files changed, 50 insertions(+), 7 deletions(-) diff --git a/dddp/api.py b/dddp/api.py index 1780d27..b3d8d97 100644 --- a/dddp/api.py +++ b/dddp/api.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, unicode_literals, print_function import collections from copy import deepcopy import traceback +import uuid # requirements import dbarray @@ -888,13 +889,32 @@ class DDP(APIMixin): if my_connection_id in connection_ids: # msg must go to connection that initiated the change payload['_tx_id'] = this.ws.get_tx_id() + # header is sent in every payload + header = { + 'uuid': uuid.uuid1().int, # UUID1 should be unique + 'seq': 1, # increments for each 8KB chunk + 'fin': 0, # zero if more chunks expected, 1 if last chunk. + } + data = ejson.dumps(payload) cursor = connections[using].cursor() - cursor.execute( - 'NOTIFY "ddp", %s', - [ - ejson.dumps(payload), - ], - ) + while data: + hdr = ejson.dumps(header) + # use all available payload space for chunk + max_len = 8000 - len(hdr) - 100 + # take a chunk from data + chunk, data = data[:max_len], data[max_len:] + if not data: + # last chunk, set fin=1. + header['fin'] = 1 + hdr = ejson.dumps(header) + # print('NOTIFY: %s' % hdr) + cursor.execute( + 'NOTIFY "ddp", %s', + [ + '%s|%s' % (hdr, chunk), # pipe separates hdr|chunk. + ], + ) + header['seq'] += 1 # increment sequence. API = DDP() diff --git a/dddp/postgres.py b/dddp/postgres.py index 2c2404c..51844d0 100644 --- a/dddp/postgres.py +++ b/dddp/postgres.py @@ -22,6 +22,7 @@ class PostgresGreenlet(gevent.Greenlet): # queues for processing incoming sub/unsub requests and processing self.connections = {} + self.chunks = {} self._stop_event = gevent.event.Event() # connect to DB in async mode @@ -62,7 +63,29 @@ class PostgresGreenlet(gevent.Greenlet): "Got NOTIFY (pid=%d, payload=%r)", notify.pid, notify.payload, ) - data = ejson.loads(notify.payload) + + # read the header and check seq/fin. + hdr, chunk = notify.payload.split('|', 1) + # print('RECEIVE: %s' % hdr) + header = ejson.loads(hdr) + uuid = header['uuid'] + size, chunks = self.chunks.setdefault(uuid, [0, {}]) + if header['fin']: + size = self.chunks[uuid][0] = header['seq'] + + # stash the chunk + chunks[header['seq']] = chunk + + if len(chunks) != size: + # haven't got all the chunks yet + continue # process next NOTIFY in loop + + # got the last chunk -> process it. + data = ''.join( + chunk for _, chunk in sorted(chunks.items()) + ) + del self.chunks[uuid] # don't forget to cleanup! + data = ejson.loads(data) sender = data.pop('_sender', None) tx_id = data.pop('_tx_id', None) for connection_id in data.pop('_connection_ids'):