Chunked payload for NOTIFY/LISTEN to get around 8KB limit on payload.

This commit is contained in:
Tyson Clugg 2015-08-10 18:47:04 +10:00
parent fb36461156
commit 7083c5c92f
2 changed files with 50 additions and 7 deletions

View file

@ -5,6 +5,7 @@ from __future__ import absolute_import, unicode_literals, print_function
import collections
from copy import deepcopy
import traceback
import uuid
# requirements
import dbarray
@ -888,13 +889,32 @@ class DDP(APIMixin):
if my_connection_id in connection_ids:
# msg must go to connection that initiated the change
payload['_tx_id'] = this.ws.get_tx_id()
# header is sent in every payload
header = {
'uuid': uuid.uuid1().int, # UUID1 should be unique
'seq': 1, # increments for each 8KB chunk
'fin': 0, # zero if more chunks expected, 1 if last chunk.
}
data = ejson.dumps(payload)
cursor = connections[using].cursor()
cursor.execute(
'NOTIFY "ddp", %s',
[
ejson.dumps(payload),
],
)
while data:
hdr = ejson.dumps(header)
# use all available payload space for chunk
max_len = 8000 - len(hdr) - 100
# take a chunk from data
chunk, data = data[:max_len], data[max_len:]
if not data:
# last chunk, set fin=1.
header['fin'] = 1
hdr = ejson.dumps(header)
# print('NOTIFY: %s' % hdr)
cursor.execute(
'NOTIFY "ddp", %s',
[
'%s|%s' % (hdr, chunk), # pipe separates hdr|chunk.
],
)
header['seq'] += 1 # increment sequence.
API = DDP()

View file

@ -22,6 +22,7 @@ class PostgresGreenlet(gevent.Greenlet):
# queues for processing incoming sub/unsub requests and processing
self.connections = {}
self.chunks = {}
self._stop_event = gevent.event.Event()
# connect to DB in async mode
@ -62,7 +63,29 @@ class PostgresGreenlet(gevent.Greenlet):
"Got NOTIFY (pid=%d, payload=%r)",
notify.pid, notify.payload,
)
data = ejson.loads(notify.payload)
# read the header and check seq/fin.
hdr, chunk = notify.payload.split('|', 1)
# print('RECEIVE: %s' % hdr)
header = ejson.loads(hdr)
uuid = header['uuid']
size, chunks = self.chunks.setdefault(uuid, [0, {}])
if header['fin']:
size = self.chunks[uuid][0] = header['seq']
# stash the chunk
chunks[header['seq']] = chunk
if len(chunks) != size:
# haven't got all the chunks yet
continue # process next NOTIFY in loop
# got the last chunk -> process it.
data = ''.join(
chunk for _, chunk in sorted(chunks.items())
)
del self.chunks[uuid] # don't forget to cleanup!
data = ejson.loads(data)
sender = data.pop('_sender', None)
tx_id = data.pop('_tx_id', None)
for connection_id in data.pop('_connection_ids'):