mirror of
https://github.com/jazzband/django-ddp.git
synced 2026-03-16 22:40:24 +00:00
Merge branch 'release/0.11.0'
This commit is contained in:
commit
4bf0acd68c
4 changed files with 56 additions and 8 deletions
|
|
@ -1,6 +1,11 @@
|
|||
Change Log
|
||||
==========
|
||||
|
||||
0.11.0
|
||||
------
|
||||
* Support more than 8KB of change data by splitting large payloads into
|
||||
multiple chunks.
|
||||
|
||||
0.10.2
|
||||
------
|
||||
* Add `Logs` publication that can be configured to emit logs via DDP
|
||||
|
|
|
|||
32
dddp/api.py
32
dddp/api.py
|
|
@ -5,6 +5,7 @@ from __future__ import absolute_import, unicode_literals, print_function
|
|||
import collections
|
||||
from copy import deepcopy
|
||||
import traceback
|
||||
import uuid
|
||||
|
||||
# requirements
|
||||
import dbarray
|
||||
|
|
@ -888,13 +889,32 @@ class DDP(APIMixin):
|
|||
if my_connection_id in connection_ids:
|
||||
# msg must go to connection that initiated the change
|
||||
payload['_tx_id'] = this.ws.get_tx_id()
|
||||
# header is sent in every payload
|
||||
header = {
|
||||
'uuid': uuid.uuid1().int, # UUID1 should be unique
|
||||
'seq': 1, # increments for each 8KB chunk
|
||||
'fin': 0, # zero if more chunks expected, 1 if last chunk.
|
||||
}
|
||||
data = ejson.dumps(payload)
|
||||
cursor = connections[using].cursor()
|
||||
cursor.execute(
|
||||
'NOTIFY "ddp", %s',
|
||||
[
|
||||
ejson.dumps(payload),
|
||||
],
|
||||
)
|
||||
while data:
|
||||
hdr = ejson.dumps(header)
|
||||
# use all available payload space for chunk
|
||||
max_len = 8000 - len(hdr) - 100
|
||||
# take a chunk from data
|
||||
chunk, data = data[:max_len], data[max_len:]
|
||||
if not data:
|
||||
# last chunk, set fin=1.
|
||||
header['fin'] = 1
|
||||
hdr = ejson.dumps(header)
|
||||
# print('NOTIFY: %s' % hdr)
|
||||
cursor.execute(
|
||||
'NOTIFY "ddp", %s',
|
||||
[
|
||||
'%s|%s' % (hdr, chunk), # pipe separates hdr|chunk.
|
||||
],
|
||||
)
|
||||
header['seq'] += 1 # increment sequence.
|
||||
|
||||
|
||||
API = DDP()
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ class PostgresGreenlet(gevent.Greenlet):
|
|||
|
||||
# queues for processing incoming sub/unsub requests and processing
|
||||
self.connections = {}
|
||||
self.chunks = {}
|
||||
self._stop_event = gevent.event.Event()
|
||||
|
||||
# connect to DB in async mode
|
||||
|
|
@ -62,7 +63,29 @@ class PostgresGreenlet(gevent.Greenlet):
|
|||
"Got NOTIFY (pid=%d, payload=%r)",
|
||||
notify.pid, notify.payload,
|
||||
)
|
||||
data = ejson.loads(notify.payload)
|
||||
|
||||
# read the header and check seq/fin.
|
||||
hdr, chunk = notify.payload.split('|', 1)
|
||||
# print('RECEIVE: %s' % hdr)
|
||||
header = ejson.loads(hdr)
|
||||
uuid = header['uuid']
|
||||
size, chunks = self.chunks.setdefault(uuid, [0, {}])
|
||||
if header['fin']:
|
||||
size = self.chunks[uuid][0] = header['seq']
|
||||
|
||||
# stash the chunk
|
||||
chunks[header['seq']] = chunk
|
||||
|
||||
if len(chunks) != size:
|
||||
# haven't got all the chunks yet
|
||||
continue # process next NOTIFY in loop
|
||||
|
||||
# got the last chunk -> process it.
|
||||
data = ''.join(
|
||||
chunk for _, chunk in sorted(chunks.items())
|
||||
)
|
||||
del self.chunks[uuid] # don't forget to cleanup!
|
||||
data = ejson.loads(data)
|
||||
sender = data.pop('_sender', None)
|
||||
tx_id = data.pop('_tx_id', None)
|
||||
for connection_id in data.pop('_connection_ids'):
|
||||
|
|
|
|||
2
setup.py
2
setup.py
|
|
@ -5,7 +5,7 @@ from setuptools import setup, find_packages
|
|||
|
||||
setup(
|
||||
name='django-ddp',
|
||||
version='0.10.2',
|
||||
version='0.11.0',
|
||||
description=__doc__,
|
||||
long_description=open('README.rst').read(),
|
||||
author='Tyson Clugg',
|
||||
|
|
|
|||
Loading…
Reference in a new issue