mirror of
https://github.com/jazzband/django-ddp.git
synced 2026-03-16 22:40:24 +00:00
Refactored server.py intoA dddp management command, added pub/sub capability (realtime), intial result sets... woo!
This commit is contained in:
parent
17490d8caf
commit
88e6799ae2
6 changed files with 546 additions and 435 deletions
25
dddp/apps.py
25
dddp/apps.py
|
|
@ -9,6 +9,9 @@ from django.core import serializers
|
|||
from django.db import connections
|
||||
from django.db.models import signals
|
||||
|
||||
from dddp.msg import obj_change_as_msg
|
||||
|
||||
|
||||
class DjangoDDPConfig(AppConfig):
|
||||
name = 'dddp'
|
||||
verbose_name = 'Django DDP'
|
||||
|
|
@ -38,28 +41,8 @@ class DjangoDDPConfig(AppConfig):
|
|||
)
|
||||
|
||||
def send_notify(self, model, obj, msg, using):
|
||||
data = self.serializer.serialize([obj])[0]
|
||||
name = data['model']
|
||||
print(name)
|
||||
#name = '%s.%s' % (
|
||||
# model._meta.app_label,
|
||||
# model._meta.object_name,
|
||||
#)
|
||||
|
||||
# always use UUID as ID
|
||||
if isinstance(data['pk'], int):
|
||||
#data['pk'] = uuid.uuid3(uuid.NAMESPACE_URL, '%d' % data['pk']).hex
|
||||
data['pk'] = '%d' % data['pk']
|
||||
|
||||
payload = {
|
||||
'msg': msg,
|
||||
'collection': name,
|
||||
'id': data['pk'],
|
||||
}
|
||||
if msg != 'removed':
|
||||
payload['fields'] = data['fields']
|
||||
name, payload = obj_change_as_msg(obj, msg)
|
||||
cursor = connections[using].cursor()
|
||||
print(name)
|
||||
cursor.execute(
|
||||
'NOTIFY "%s", %%s' % name,
|
||||
[
|
||||
|
|
|
|||
0
dddp/management/__init__.py
Normal file
0
dddp/management/__init__.py
Normal file
0
dddp/management/commands/__init__.py
Normal file
0
dddp/management/commands/__init__.py
Normal file
517
dddp/management/commands/dddp.py
Normal file
517
dddp/management/commands/dddp.py
Normal file
|
|
@ -0,0 +1,517 @@
|
|||
"""Django DDP WebSocket service."""
|
||||
|
||||
from __future__ import print_function, absolute_import
|
||||
|
||||
from django.db import connection, close_old_connections
|
||||
close_old_connections()
|
||||
|
||||
import collections
|
||||
import gevent.monkey
|
||||
import psycogreen.gevent
|
||||
gevent.monkey.patch_all()
|
||||
psycogreen.gevent.patch_psycopg()
|
||||
|
||||
import inspect
|
||||
import optparse
|
||||
import random
|
||||
import signal
|
||||
import socket # green
|
||||
import traceback
|
||||
import uuid
|
||||
|
||||
import ejson
|
||||
import gevent
|
||||
import gevent.queue
|
||||
import gevent.select
|
||||
import geventwebsocket
|
||||
import psycopg2 # green
|
||||
import psycopg2.extras
|
||||
from django.core.handlers.base import BaseHandler
|
||||
from django.core.handlers.wsgi import WSGIRequest
|
||||
from django.core.management.base import BaseCommand
|
||||
from django.db.models.loading import get_model
|
||||
|
||||
from dddp.msg import obj_change_as_msg
|
||||
|
||||
|
||||
BASE_HANDLER = BaseHandler()
|
||||
|
||||
|
||||
class PostgresGreenlet(gevent.Greenlet):
|
||||
|
||||
"""Greenlet for multiplexing database operations."""
|
||||
|
||||
def __init__(self, conn):
|
||||
"""Prepare async connection."""
|
||||
# greenify!
|
||||
super(PostgresGreenlet, self).__init__()
|
||||
|
||||
# queues for processing incoming sub/unsub requests and processing
|
||||
self.subs = gevent.queue.Queue()
|
||||
self.unsubs = gevent.queue.Queue()
|
||||
self.proc_queue = gevent.queue.Queue()
|
||||
self._stop_event = gevent.event.Event()
|
||||
|
||||
# dict of name: subscribers
|
||||
self.all_subs = collections.defaultdict(dict)
|
||||
self._sub_lock = gevent.lock.RLock()
|
||||
|
||||
# connect to DB in async mode
|
||||
conn.allow_thread_sharing = True
|
||||
self.connection = conn
|
||||
self.conn_params = conn.get_connection_params()
|
||||
self.conn_params['async'] = True
|
||||
self.conn = conn.get_new_connection(self.conn_params)
|
||||
self.poll() # wait for conneciton to start
|
||||
self.cur = self.conn.cursor()
|
||||
|
||||
def _run(self): # pylint: disable=method-hidden
|
||||
"""Spawn sub tasks, wait for stop signal."""
|
||||
gevent.spawn(self.process_conn)
|
||||
gevent.spawn(self.process_subs)
|
||||
gevent.spawn(self.process_unsubs)
|
||||
self._stop_event.wait()
|
||||
|
||||
def stop(self):
|
||||
"""Stop subtasks and let run() finish."""
|
||||
self._stop_event.set()
|
||||
|
||||
def subscribe(self, func, obj, id_, name, params):
|
||||
"""Register callback `func` to be called after NOTIFY for `name`."""
|
||||
self.subs.put((func, obj, id_, name, params))
|
||||
|
||||
def unsubscribe(self, func, obj, id_):
|
||||
"""Un-register callback `func` to be called after NOTIFY for `name`."""
|
||||
self.unsubs.put((func, obj, id_))
|
||||
|
||||
def process_conn(self):
|
||||
"""Subtask to process NOTIFY async events from DB connection."""
|
||||
while not self._stop_event.is_set():
|
||||
# TODO: change timeout so self._stop_event is periodically checked?
|
||||
gevent.select.select([self.conn], [], [], timeout=None)
|
||||
self.poll()
|
||||
|
||||
def poll(self):
|
||||
"""Poll DB socket and process async tasks."""
|
||||
while 1:
|
||||
state = self.conn.poll()
|
||||
if state == psycopg2.extensions.POLL_OK:
|
||||
while self.conn.notifies:
|
||||
notify = self.conn.notifies.pop()
|
||||
name = notify.channel
|
||||
print("Got NOTIFY:", notify.pid, name, notify.payload)
|
||||
try:
|
||||
self._sub_lock.acquire()
|
||||
subs = self.all_subs[name]
|
||||
data = ejson.loads(notify.payload)
|
||||
for (_, id_), (func, params) in subs.items():
|
||||
gevent.spawn(func, id_, name, params, data)
|
||||
finally:
|
||||
self._sub_lock.release()
|
||||
break
|
||||
elif state == psycopg2.extensions.POLL_WRITE:
|
||||
gevent.select.select([], [self.conn.fileno()], [])
|
||||
elif state == psycopg2.extensions.POLL_READ:
|
||||
gevent.select.select([self.conn.fileno()], [], [])
|
||||
else:
|
||||
print('POLL_ERR: %s' % state)
|
||||
|
||||
def process_subs(self):
|
||||
"""Subtask to process `sub` requests from `self.subs` queue."""
|
||||
while not self._stop_event.is_set():
|
||||
func, obj, id_, name, params = self.subs.get()
|
||||
try:
|
||||
self._sub_lock.acquire()
|
||||
subs = self.all_subs[name]
|
||||
if len(subs) == 0:
|
||||
print('LISTEN "%s";' % name)
|
||||
self.poll()
|
||||
self.cur.execute('LISTEN "%s";' % name)
|
||||
self.poll()
|
||||
subs[(obj, id_)] = (func, params)
|
||||
finally:
|
||||
self._sub_lock.release()
|
||||
|
||||
def process_unsubs(self):
|
||||
"""Subtask to process `unsub` requests from `self.unsubs` queue."""
|
||||
while not self._stop_event.is_set():
|
||||
func, obj, id_ = self.unsubs.get()
|
||||
try:
|
||||
self._sub_lock.acquire()
|
||||
for name, subs in self.all_subs.items():
|
||||
subs.pop((obj, id_), None)
|
||||
if len(subs) == 0:
|
||||
print('UNLISTEN "%s";' % name)
|
||||
self.cur.execute('UNLISTEN "%s";' % name)
|
||||
self.poll()
|
||||
del self.all_subs[name]
|
||||
finally:
|
||||
self._sub_lock.release()
|
||||
gevent.spawn(func, id_)
|
||||
|
||||
|
||||
class MeteorError(Exception):
|
||||
|
||||
"""MeteorError."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def validate_kwargs(func, kwargs, func_name=None):
|
||||
"""Validate arguments to be supplied to func."""
|
||||
if func_name is None:
|
||||
func_name = func.__name__
|
||||
argspec = inspect.getargspec(func)
|
||||
args = argspec.args[:]
|
||||
|
||||
# ignore implicit 'self' argument
|
||||
if inspect.ismethod(func) and args[0] == 'self':
|
||||
args[:1] = []
|
||||
|
||||
# translate 'foo_' to avoid reserved names like 'id'
|
||||
trans = {
|
||||
arg: arg.endswith('_') and arg[:-1] or arg
|
||||
for arg
|
||||
in args
|
||||
}
|
||||
for key in list(kwargs):
|
||||
key_adj = '%s_' % key
|
||||
if key_adj in args:
|
||||
kwargs[key_adj] = kwargs.pop(key)
|
||||
|
||||
required = args[:-len(argspec.defaults)]
|
||||
supplied = sorted(kwargs)
|
||||
missing = [
|
||||
trans.get(arg, arg) for arg in required
|
||||
if arg not in supplied
|
||||
]
|
||||
if missing:
|
||||
raise MeteorError(
|
||||
'Missing required arguments to %s: %s' % (
|
||||
func_name,
|
||||
' '.join(missing),
|
||||
),
|
||||
)
|
||||
extra = [
|
||||
arg for arg in supplied
|
||||
if arg not in args
|
||||
]
|
||||
if extra:
|
||||
raise MeteorError(
|
||||
'Unknown arguments to %s: %s' % (
|
||||
func_name,
|
||||
' '.join(extra),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class DDPApplication(geventwebsocket.WebSocketApplication):
|
||||
|
||||
"""Django DDP WebSocket application."""
|
||||
|
||||
methods = {}
|
||||
versions = [ # first item is preferred version
|
||||
'1',
|
||||
'pre1',
|
||||
'pre2',
|
||||
]
|
||||
pgworker = None
|
||||
remote_addr = None
|
||||
version = None
|
||||
support = None
|
||||
session = None
|
||||
subs = None
|
||||
request = None
|
||||
|
||||
def on_open(self):
|
||||
"""Handle new websocket connection."""
|
||||
self.request = WSGIRequest(self.ws.environ)
|
||||
# Apply request middleware (so we get request.user and other attrs)
|
||||
# pylint: disable=protected-access
|
||||
for middleware_method in BASE_HANDLER._request_middleware:
|
||||
response = middleware_method(self.request)
|
||||
if response:
|
||||
raise ValueError(response)
|
||||
|
||||
self.remote_addr = '{0[REMOTE_ADDR]}:{0[REMOTE_PORT]}'.format(
|
||||
self.ws.environ,
|
||||
)
|
||||
self.subs = {}
|
||||
print('+ %s OPEN %s' % (self, self.request.user))
|
||||
self.send('o')
|
||||
self.send('a["{\\"server_id\\":\\"0\\"}"]')
|
||||
|
||||
def __str__(self):
|
||||
"""Show remote address that connected to us."""
|
||||
return self.remote_addr
|
||||
|
||||
def on_close(self, reason):
|
||||
"""Handle closing of websocket connection."""
|
||||
print('- %s %s' % (self, reason or 'CLOSE'))
|
||||
|
||||
def on_message(self, message):
|
||||
"""Process a message received from remote."""
|
||||
if self.ws.closed:
|
||||
return None
|
||||
try:
|
||||
print('< %s %r' % (self, message))
|
||||
|
||||
# parse message set
|
||||
try:
|
||||
msgs = ejson.loads(message)
|
||||
except ValueError, err:
|
||||
raise MeteorError(400, 'Data is not valid EJSON')
|
||||
if not isinstance(msgs, list):
|
||||
raise MeteorError(400, 'Invalid EJSON messages')
|
||||
|
||||
# process individual messages
|
||||
while msgs:
|
||||
# parse message payload
|
||||
raw = msgs.pop(0)
|
||||
try:
|
||||
try:
|
||||
data = ejson.loads(raw)
|
||||
except ValueError, err:
|
||||
raise MeteorError(400, 'Data is not valid EJSON')
|
||||
if not isinstance(data, dict):
|
||||
self.error(400, 'Invalid EJSON message payload', raw)
|
||||
continue
|
||||
try:
|
||||
msg = data.pop('msg')
|
||||
except KeyError:
|
||||
raise MeteorError(400, 'Missing `msg` parameter', raw)
|
||||
# dispatch message
|
||||
self.dispatch(msg, data)
|
||||
except MeteorError, err:
|
||||
self.error(err)
|
||||
except Exception, err:
|
||||
traceback.print_exc()
|
||||
self.error(err)
|
||||
except geventwebsocket.WebSocketError, err:
|
||||
self.ws.close()
|
||||
except MeteorError, err:
|
||||
self.error(err)
|
||||
|
||||
def dispatch(self, msg, kwargs):
|
||||
"""Dispatch msg to appropriate recv_foo handler."""
|
||||
# enforce calling 'connect' first
|
||||
if not self.session and msg != 'connect':
|
||||
raise MeteorError(400, 'Session not establised - try `connect`.')
|
||||
|
||||
# lookup method handler
|
||||
try:
|
||||
handler = getattr(self, 'recv_%s' % msg)
|
||||
except (AttributeError, UnicodeEncodeError):
|
||||
raise MeteorError(404, 'Method not found')
|
||||
|
||||
# validate handler arguments
|
||||
validate_kwargs(handler, kwargs)
|
||||
|
||||
# dispatch to handler
|
||||
try:
|
||||
handler(**kwargs)
|
||||
except Exception, err:
|
||||
raise MeteorError(500, 'Internal server error', err)
|
||||
|
||||
def send(self, data):
|
||||
"""Send raw `data` to WebSocket client."""
|
||||
print('> %s %r' % (self, data))
|
||||
try:
|
||||
self.ws.send(data)
|
||||
except geventwebsocket.WebSocketError, err:
|
||||
self.ws.close()
|
||||
|
||||
def reply(self, msg, **kwargs):
|
||||
"""Send EJSON reply to remote."""
|
||||
kwargs['msg'] = msg
|
||||
data = ejson.dumps([ejson.dumps(kwargs)])
|
||||
self.send('a%s' % data)
|
||||
|
||||
def error(self, err, reason=None, detail=None):
|
||||
"""Send EJSON error to remote."""
|
||||
if isinstance(err, MeteorError):
|
||||
(err, reason, detail) = (err.args[:] + (None, None, None))[:3]
|
||||
data = {
|
||||
'error': '%s' % (err or '<UNKNOWN_ERROR>'),
|
||||
}
|
||||
if reason:
|
||||
data['reason'] = reason
|
||||
if detail:
|
||||
data['detail'] = detail
|
||||
print('! %s %r' % (self, data))
|
||||
self.reply('error', **data)
|
||||
|
||||
def recv_connect(self, version, support, session=None):
|
||||
"""DDP connect handler."""
|
||||
if self.session:
|
||||
self.error(
|
||||
'Session already established.',
|
||||
reason='Current session in detail.',
|
||||
detail=self.session,
|
||||
)
|
||||
elif version not in self.versions:
|
||||
self.reply('failed', version=self.versions[0])
|
||||
elif version not in support:
|
||||
self.error('Client version/support mismatch.')
|
||||
else:
|
||||
if not session:
|
||||
session = uuid.uuid4().hex
|
||||
self.version = version
|
||||
self.support = support
|
||||
self.session = session
|
||||
self.reply('connected', session=self.session)
|
||||
|
||||
def recv_ping(self, id_=None):
|
||||
"""DDP ping handler."""
|
||||
if id_ is None:
|
||||
self.reply('pong')
|
||||
else:
|
||||
self.reply('pong', id=id_)
|
||||
|
||||
def sub_notify(self, id_, name, params, data):
|
||||
"""Send added/changed/removed msg due to receiving NOTIFY."""
|
||||
self.reply(**data)
|
||||
|
||||
def recv_sub(self, id_, name, params=None):
|
||||
"""DDP sub handler."""
|
||||
self.pgworker.subscribe(self.sub_notify, self, id_, name, params)
|
||||
|
||||
model = get_model(name)
|
||||
for obj in model.objects.all():
|
||||
_, payload = obj_change_as_msg(obj, 'added')
|
||||
self.sub_notify(id_, name, params, payload)
|
||||
|
||||
self.reply('ready', subs=[id_])
|
||||
|
||||
def sub_unnotify(self, id_):
|
||||
"""Send added/changed/removed msg due to receiving NOTIFY."""
|
||||
pass # TODO: find out if we're meant to send anything to the client
|
||||
|
||||
def recv_unsub(self, id_):
|
||||
"""DDP unsub handler."""
|
||||
self.pgworker.unsubscribe(self.sub_unnotify, self, id_)
|
||||
|
||||
def recv_method(self, method, params, id_, randomSeed=None):
|
||||
"""DDP method handler."""
|
||||
try:
|
||||
func = self.methods[method]
|
||||
except KeyError:
|
||||
self.reply('result', id=id_, error=u'Unknown method: %s' % method)
|
||||
else:
|
||||
try:
|
||||
self.reply('result', id=id_, result=func(**params))
|
||||
except Exception, err:
|
||||
self.reply('result', id=id_, error='%s' % err)
|
||||
finally:
|
||||
pass
|
||||
|
||||
|
||||
def ddpp_sockjs_xhr(environ, start_response):
|
||||
"""Dummy method that doesn't handle XHR requests."""
|
||||
start_response(
|
||||
'404 Not found',
|
||||
[
|
||||
('Content-Type', 'text/plain; charset=UTF-8'),
|
||||
(
|
||||
'Access-Control-Allow-Origin',
|
||||
'/'.join(environ['HTTP_REFERER'].split('/')[:3]),
|
||||
),
|
||||
('Access-Control-Allow-Credentials', 'true'),
|
||||
# ('access-control-allow-credentials', 'true'),
|
||||
('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0'),
|
||||
('Connection', 'keep-alive'),
|
||||
('Vary', 'Origin'),
|
||||
],
|
||||
)
|
||||
yield 'No.'
|
||||
|
||||
|
||||
def ddpp_sockjs_info(environ, start_response):
|
||||
"""Inform client that WebSocket service is available."""
|
||||
start_response(
|
||||
'200 OK',
|
||||
[
|
||||
('Content-Type', 'application/json; charset=UTF-8'),
|
||||
(
|
||||
'Access-Control-Allow-Origin',
|
||||
'/'.join(environ['HTTP_REFERER'].split('/')[:3]),
|
||||
),
|
||||
('Access-Control-Allow-Credentials', 'true'),
|
||||
# ('access-control-allow-credentials', 'true'),
|
||||
('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0'),
|
||||
('Connection', 'keep-alive'),
|
||||
('Vary', 'Origin'),
|
||||
],
|
||||
)
|
||||
yield ejson.dumps(collections.OrderedDict([
|
||||
('websocket', True),
|
||||
('origins', [
|
||||
'*:*',
|
||||
]),
|
||||
('cookie_needed', False),
|
||||
('entropy', random.getrandbits(32)),
|
||||
]))
|
||||
|
||||
|
||||
from meerqat import wsgi
|
||||
resource = geventwebsocket.Resource({
|
||||
r'/websocket': DDPApplication,
|
||||
r'^/sockjs/\d+/\w+/websocket$': DDPApplication,
|
||||
r'^/sockjs/\d+/\w+/xhr$': ddpp_sockjs_xhr,
|
||||
r'^/sockjs/info$': ddpp_sockjs_info,
|
||||
r'^/(?!(websocket|sockjs)/)': wsgi.application,
|
||||
})
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
|
||||
"""Command to run DDP web service."""
|
||||
|
||||
args = 'HOST PORT'
|
||||
help = 'Run DDP service'
|
||||
requires_system_checks = False
|
||||
|
||||
option_list = BaseCommand.option_list + (
|
||||
optparse.make_option(
|
||||
'-H', '--host', dest="host", metavar='HOST',
|
||||
help='TCP listening host (default: localhost)', default='localhost',
|
||||
),
|
||||
optparse.make_option(
|
||||
'-p', '--port', dest="port", metavar='PORT',
|
||||
help='TCP listening port (default: 3000)', default='3000',
|
||||
),
|
||||
)
|
||||
|
||||
def handle(self, *args, **options):
|
||||
"""Spawn greenlets for handling websockets and PostgreSQL calls."""
|
||||
# setup PostgresGreenlet to multiplex DB calls
|
||||
postgres = PostgresGreenlet(connection)
|
||||
DDPApplication.pgworker = postgres
|
||||
|
||||
# setup WebSocketServer to dispatch web requests
|
||||
BASE_HANDLER.load_middleware()
|
||||
host = options['host']
|
||||
port = options['port']
|
||||
if port.isdigit():
|
||||
port = int(port)
|
||||
else:
|
||||
port = socket.getservbyname(port)
|
||||
webserver = geventwebsocket.WebSocketServer(
|
||||
(host, port),
|
||||
resource,
|
||||
debug=int(options['verbosity']) > 1,
|
||||
)
|
||||
|
||||
def killall(*args, **kwargs):
|
||||
"""Kill all green threads."""
|
||||
postgres.stop()
|
||||
webserver.stop()
|
||||
|
||||
# die gracefully with SIGINT or SIGQUIT
|
||||
gevent.signal(signal.SIGINT, killall)
|
||||
gevent.signal(signal.SIGQUIT, killall)
|
||||
|
||||
# start greenlets
|
||||
print('Running as ws://%s:%d/websocket' % (host, port))
|
||||
postgres.start()
|
||||
webserver.serve_forever()
|
||||
25
dddp/msg.py
Normal file
25
dddp/msg.py
Normal file
|
|
@ -0,0 +1,25 @@
|
|||
"""Django DDP utils for DDP messaging."""
|
||||
|
||||
from django.core import serializers
|
||||
|
||||
SERIALIZER = serializers.get_serializer('python')()
|
||||
|
||||
|
||||
def obj_change_as_msg(obj, msg):
|
||||
"""Generate a DDP msg for obj with specified msg type."""
|
||||
data = SERIALIZER.serialize([obj])[0]
|
||||
name = data['model']
|
||||
|
||||
# cast ID as string
|
||||
if not isinstance(data['pk'], basestring):
|
||||
data['pk'] = '%d' % data['pk']
|
||||
|
||||
payload = {
|
||||
'msg': msg,
|
||||
'collection': name,
|
||||
'id': data['pk'],
|
||||
}
|
||||
if msg != 'removed':
|
||||
payload['fields'] = data['fields']
|
||||
|
||||
return (name, payload)
|
||||
414
dddp/server.py
414
dddp/server.py
|
|
@ -1,414 +0,0 @@
|
|||
"""Django DDP WebSocket service."""
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
import collections
|
||||
import gevent.monkey
|
||||
import psycogreen.gevent
|
||||
gevent.monkey.patch_all()
|
||||
psycogreen.gevent.patch_psycopg()
|
||||
|
||||
import inspect
|
||||
import random
|
||||
import signal
|
||||
import socket # green
|
||||
import sys
|
||||
import traceback
|
||||
import uuid
|
||||
|
||||
import ejson
|
||||
import gevent
|
||||
import gevent.queue
|
||||
import gevent.select
|
||||
import geventwebsocket
|
||||
import psycopg2 # green
|
||||
import psycopg2.extras
|
||||
from django.db import connection
|
||||
|
||||
|
||||
sub_queue = gevent.queue.Queue()
|
||||
unsub_queue = gevent.queue.Queue()
|
||||
all_subs = collections.defaultdict(list)
|
||||
|
||||
|
||||
def subscribe(func, name, params):
|
||||
sub_queue.put((func, name, params))
|
||||
|
||||
|
||||
def unsubscribe(func, name, params):
|
||||
unsub_queue.put((func, name, params))
|
||||
|
||||
|
||||
def pg_loop(conn):
|
||||
if gevent.select.select([conn], [], [], 5) == ([], [], []):
|
||||
print("Timeout")
|
||||
else:
|
||||
pg_wait(conn)
|
||||
|
||||
|
||||
def pg_wait(conn):
|
||||
while True:
|
||||
state = conn.poll()
|
||||
if state == psycopg2.extensions.POLL_OK:
|
||||
while conn.notifies:
|
||||
notify = conn.notifies.pop()
|
||||
name = notify.channel
|
||||
print("Got NOTIFY:", notify.pid, name, notify.payload)
|
||||
subs = all_subs[name]
|
||||
data = ejson.loads(notify.payload)
|
||||
for (func, params) in subs:
|
||||
try:
|
||||
gevent.spawn(func, name, params, data)
|
||||
#func(name, params, data)
|
||||
except Exception, err:
|
||||
print("Callback error: %s" % err)
|
||||
traceback.print_exc()
|
||||
break
|
||||
elif state == psycopg2.extensions.POLL_WRITE:
|
||||
print('POLL_WRITE')
|
||||
gevent.select.select([], [conn.fileno()], [])
|
||||
elif state == psycopg2.extensions.POLL_READ:
|
||||
print('POLL_READ')
|
||||
gevent.select.select([conn.fileno()], [], [])
|
||||
else:
|
||||
print('POLL_ERR: %s' % state)
|
||||
raise psycopg2.OperationalError("poll() returned %s" % state)
|
||||
|
||||
|
||||
def pg_listen():
|
||||
conn_params = connection.get_connection_params()
|
||||
conn_params['async'] = True
|
||||
#psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
|
||||
#pg_wait = psycopg2.extras.wait_select
|
||||
conn = connection.get_new_connection(conn_params)
|
||||
#pg_wait(conn)
|
||||
pg_wait(conn)
|
||||
#psycopg2.extras.wait_select(conn)
|
||||
cur = conn.cursor()
|
||||
#cur.execute('LISTEN "survey.response";')
|
||||
print('LISTEN')
|
||||
while 1:
|
||||
try:
|
||||
func, name, params = sub_queue.get_nowait()
|
||||
subs = all_subs[name]
|
||||
if len(subs) == 0:
|
||||
print('LISTEN "%s";' % name)
|
||||
cur.execute('LISTEN "%s";' % name)
|
||||
subs.append((func, params))
|
||||
except gevent.queue.Empty:
|
||||
pass
|
||||
try:
|
||||
func, name, params = unsub_queue.get_nowait()
|
||||
subs = all_subs[name]
|
||||
subs.remove(func, params)
|
||||
if len(subs) == 0:
|
||||
print('UNLISTEN "%s";' % name)
|
||||
cur.execute('UNLISTEN "%s";' % name)
|
||||
except gevent.queue.Empty:
|
||||
pass
|
||||
|
||||
pg_loop(conn)
|
||||
#psycopg2.extras.wait_select(conn)
|
||||
|
||||
|
||||
class MeteorError(Exception):
|
||||
|
||||
"""MeteorError."""
|
||||
|
||||
pass
|
||||
|
||||
|
||||
def validate_kwargs(func, kwargs, func_name=None):
|
||||
"""Validate arguments to be supplied to func."""
|
||||
if func_name is None:
|
||||
func_name = func.__name__
|
||||
argspec = inspect.getargspec(func)
|
||||
args = argspec.args[:]
|
||||
|
||||
# ignore implicit 'self' argument
|
||||
if inspect.ismethod(func):
|
||||
args[:1] = []
|
||||
|
||||
# translate 'foo' args into 'foo_' to avoid crap argument names
|
||||
for key in list(kwargs):
|
||||
key_adj = '%s_' % key
|
||||
if key_adj in args:
|
||||
kwargs[key_adj] = kwargs.pop(key)
|
||||
|
||||
required = args[:-len(argspec.defaults)]
|
||||
supplied = sorted(kwargs)
|
||||
missing = [
|
||||
arg for arg in required
|
||||
if arg not in supplied
|
||||
]
|
||||
if missing:
|
||||
raise MeteorError(
|
||||
'Missing required arguments to %s: %s' % (
|
||||
func_name,
|
||||
' '.join(missing),
|
||||
),
|
||||
)
|
||||
extra = [
|
||||
arg for arg in supplied
|
||||
if arg not in args
|
||||
]
|
||||
if extra:
|
||||
raise MeteorError(
|
||||
'Unknown arguments to %s: %s' % (
|
||||
func_name,
|
||||
' '.join(extra),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class DDPApplication(geventwebsocket.WebSocketApplication):
|
||||
|
||||
"""Django DDP WebSocket application."""
|
||||
|
||||
methods = {}
|
||||
versions = [ # first item is preferred version
|
||||
'1',
|
||||
'pre1',
|
||||
'pre2',
|
||||
]
|
||||
remote_addr = None
|
||||
version = None
|
||||
support = None
|
||||
session = None
|
||||
subs = None
|
||||
|
||||
#def __init__(self, environ, start_response, *args, **kwargs):
|
||||
# print('INIT: %r %r' % (args, kwargs))
|
||||
# super_init = super(DDPApplication, self).__init__
|
||||
# print(inspect.getargspec(super_init))
|
||||
# return super_init(environ, start_response, *args, **kwargs)
|
||||
|
||||
def on_open(self):
|
||||
"""Handle new websocket connection."""
|
||||
self.remote_addr = '{0[REMOTE_ADDR]}:{0[REMOTE_PORT]}'.format(
|
||||
self.ws.environ,
|
||||
)
|
||||
self.subs = {}
|
||||
print('+ %s OPEN' % self)
|
||||
self.send('o')
|
||||
self.send('a["{\\"server_id\\":\\"0\\"}"]')
|
||||
|
||||
def __str__(self):
|
||||
"""Show remote address that connected to us."""
|
||||
return self.remote_addr
|
||||
|
||||
def on_close(self, reason):
|
||||
"""Handle closing of websocket connection."""
|
||||
print('- %s %s' % (self, reason or 'CLOSE'))
|
||||
|
||||
def on_message(self, raw):
|
||||
"""Process a message received from remote."""
|
||||
if self.ws.closed:
|
||||
return None
|
||||
try:
|
||||
print('< %s %r' % (self, raw))
|
||||
data = ejson.loads(raw)
|
||||
data = ejson.loads(data[0])
|
||||
try:
|
||||
msg = data.pop('msg')
|
||||
except (AttributeError, TypeError, IndexError):
|
||||
raise MeteorError('Invalid EJSON message type.')
|
||||
except KeyError:
|
||||
raise MeteorError('Missing required field: msg')
|
||||
else:
|
||||
self.dispatch(msg, data)
|
||||
except geventwebsocket.WebSocketError, err:
|
||||
self.on_close(err)
|
||||
except Exception, err:
|
||||
self.error(err)
|
||||
|
||||
def dispatch(self, msg, kwargs):
|
||||
"""Dispatch msg to appropriate recv_foo handler."""
|
||||
# enforce calling 'connect' first
|
||||
if not self.session and msg != 'connect':
|
||||
raise MeteorError("Session not establised - try 'connect'.")
|
||||
|
||||
# lookup method handler
|
||||
try:
|
||||
handler = getattr(self, 'recv_%s' % msg)
|
||||
except (AttributeError, UnicodeEncodeError):
|
||||
raise MeteorError(404, 'Method not found')
|
||||
|
||||
# validate handler arguments
|
||||
validate_kwargs(handler, kwargs)
|
||||
|
||||
# dispatch to handler
|
||||
try:
|
||||
handler(**kwargs)
|
||||
except Exception, err:
|
||||
raise MeteorError(500, 'Internal server error', err)
|
||||
|
||||
def send(self, data):
|
||||
print('> %s %r' % (self, data))
|
||||
self.ws.send(data)
|
||||
|
||||
def reply(self, msg, **kwargs):
|
||||
"""Send EJSON reply to remote."""
|
||||
kwargs['msg'] = msg
|
||||
data = ejson.dumps([ejson.dumps(kwargs)])
|
||||
self.send('a%s' % data)
|
||||
|
||||
def error(self, err, reason=None, detail=None):
|
||||
"""Send EJSON error to remote."""
|
||||
if isinstance(err, MeteorError):
|
||||
(err, reason, detail) = (err.args[:] + (None, None, None))[:3]
|
||||
if not err:
|
||||
err = '<UNKNOWN_ERROR>'
|
||||
data = {
|
||||
'error': err,
|
||||
}
|
||||
if reason:
|
||||
data['reason'] = reason
|
||||
if detail:
|
||||
data['detail'] = detail
|
||||
print('! %s %r' % (self, data))
|
||||
self.reply('error', **data)
|
||||
|
||||
def recv_connect(self, version, support, session=None):
|
||||
"""DDP connect handler."""
|
||||
if self.session:
|
||||
self.error(
|
||||
'Session already established.',
|
||||
reason='Current session in detail.',
|
||||
detail=self.session,
|
||||
)
|
||||
elif version not in self.versions:
|
||||
self.reply('failed', version=self.versions[0])
|
||||
elif version not in support:
|
||||
self.error('Client version/support mismatch.')
|
||||
else:
|
||||
if not session:
|
||||
session = uuid.uuid4().hex
|
||||
self.version = version
|
||||
self.support = support
|
||||
self.session = session
|
||||
self.reply('connected', session=self.session)
|
||||
|
||||
def recv_ping(self, id_=None):
|
||||
"""DDP ping handler."""
|
||||
if id_ is None:
|
||||
self.reply('pong')
|
||||
else:
|
||||
self.reply('pong', id=id_)
|
||||
|
||||
def sub_notify(self, name, params, data):
|
||||
self.reply(**data)
|
||||
|
||||
def recv_sub(self, id_, name, params=None):
|
||||
"""DDP sub handler."""
|
||||
print('recv_sub: %r %r %r' % (id_, name, params))
|
||||
subscribe(self.sub_notify, name, params)
|
||||
self.reply('ready', subs=[id_])
|
||||
|
||||
def recv_unsub(self, id_):
|
||||
"""DDP unsub handler."""
|
||||
unsubscribe(self.sub_notify, name, params)
|
||||
|
||||
def recv_method(self, method, params, id_, randomSeed=None):
|
||||
"""DDP method handler."""
|
||||
try:
|
||||
func = self.methods[method]
|
||||
except KeyError:
|
||||
self.reply('result', id=id_, error=u'Unknown method: %s' % method)
|
||||
else:
|
||||
try:
|
||||
self.reply('result', id=id_, result=func(**params))
|
||||
except Exception, err:
|
||||
self.reply('result', id=id_, error='%s' % err)
|
||||
finally:
|
||||
pass
|
||||
|
||||
def ddpp_sockjs_xhr(environ, start_response):
|
||||
start_response(
|
||||
'404 Not found',
|
||||
[
|
||||
('Content-Type', 'text/plain; charset=UTF-8'),
|
||||
(
|
||||
'Access-Control-Allow-Origin',
|
||||
'%s://%s:3000' % (
|
||||
environ['wsgi.url_scheme'],
|
||||
environ['HTTP_HOST'].split(':')[0],
|
||||
),
|
||||
),
|
||||
('Access-Control-Allow-Credentials', 'true'),
|
||||
# ('access-control-allow-credentials', 'true'),
|
||||
('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0'),
|
||||
('Connection', 'keep-alive'),
|
||||
('Vary', 'Origin'),
|
||||
],
|
||||
)
|
||||
yield 'No.'
|
||||
|
||||
def ddpp_sockjs_info(environ, start_response):
|
||||
print(environ)
|
||||
host = environ['HTTP_HOST'].split(':')[0]
|
||||
start_response(
|
||||
'200 OK',
|
||||
[
|
||||
('Content-Type', 'application/json; charset=UTF-8'),
|
||||
(
|
||||
'Access-Control-Allow-Origin',
|
||||
'%s://%s:3000' % (
|
||||
environ['wsgi.url_scheme'],
|
||||
environ['HTTP_HOST'].split(':')[0],
|
||||
),
|
||||
),
|
||||
('Access-Control-Allow-Credentials', 'true'),
|
||||
# ('access-control-allow-credentials', 'true'),
|
||||
('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0'),
|
||||
('Connection', 'keep-alive'),
|
||||
('Vary', 'Origin'),
|
||||
],
|
||||
)
|
||||
yield ejson.dumps(collections.OrderedDict([
|
||||
('websocket', True),
|
||||
('origins', [
|
||||
'*:*',
|
||||
]),
|
||||
('cookie_needed', False),
|
||||
('entropy', random.getrandbits(32)),
|
||||
]))
|
||||
|
||||
class DDPServer(geventwebsocket.WebSocketServer):
|
||||
def __init__(self, *args, **kwargs):
|
||||
print('INIT: %r %r' % (args, kwargs))
|
||||
return super(DDPServer, self).__init__(*args, **kwargs)
|
||||
|
||||
resource = geventwebsocket.Resource({
|
||||
r'/websocket': DDPApplication,
|
||||
r'^/sockjs/\d+/\w+/websocket$': DDPApplication,
|
||||
r'^/sockjs/\d+/\w+/xhr$': ddpp_sockjs_xhr,
|
||||
r'^/sockjs/info$': ddpp_sockjs_info,
|
||||
})
|
||||
|
||||
if __name__ == '__main__':
|
||||
addr = (sys.argv[1:] + [':8001'])[0]
|
||||
(host, port) = (addr.rsplit(':', 1)[:2] + [':8001'])[:2]
|
||||
if port.isdigit():
|
||||
port = int(port)
|
||||
else:
|
||||
port = socket.getservbyname(port)
|
||||
server = geventwebsocket.WebSocketServer(
|
||||
(host, port),
|
||||
resource,
|
||||
debug='-v' in sys.argv,
|
||||
)
|
||||
print('Running as ws://%s:%d/websocket' % (host, port))
|
||||
threads = [
|
||||
gevent.spawn(server.serve_forever),
|
||||
gevent.spawn(pg_listen),
|
||||
]
|
||||
|
||||
def killall():
|
||||
"""Kill all green threads."""
|
||||
for thread in threads:
|
||||
thread.kill()
|
||||
|
||||
gevent.signal(signal.SIGINT, killall)
|
||||
gevent.signal(signal.SIGQUIT, killall)
|
||||
gevent.joinall(threads)
|
||||
Loading…
Reference in a new issue