diff --git a/dddp/apps.py b/dddp/apps.py index a458de7..641f5e4 100644 --- a/dddp/apps.py +++ b/dddp/apps.py @@ -9,6 +9,9 @@ from django.core import serializers from django.db import connections from django.db.models import signals +from dddp.msg import obj_change_as_msg + + class DjangoDDPConfig(AppConfig): name = 'dddp' verbose_name = 'Django DDP' @@ -38,28 +41,8 @@ class DjangoDDPConfig(AppConfig): ) def send_notify(self, model, obj, msg, using): - data = self.serializer.serialize([obj])[0] - name = data['model'] - print(name) - #name = '%s.%s' % ( - # model._meta.app_label, - # model._meta.object_name, - #) - - # always use UUID as ID - if isinstance(data['pk'], int): - #data['pk'] = uuid.uuid3(uuid.NAMESPACE_URL, '%d' % data['pk']).hex - data['pk'] = '%d' % data['pk'] - - payload = { - 'msg': msg, - 'collection': name, - 'id': data['pk'], - } - if msg != 'removed': - payload['fields'] = data['fields'] + name, payload = obj_change_as_msg(obj, msg) cursor = connections[using].cursor() - print(name) cursor.execute( 'NOTIFY "%s", %%s' % name, [ diff --git a/dddp/management/__init__.py b/dddp/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dddp/management/commands/__init__.py b/dddp/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/dddp/management/commands/dddp.py b/dddp/management/commands/dddp.py new file mode 100644 index 0000000..27d0b26 --- /dev/null +++ b/dddp/management/commands/dddp.py @@ -0,0 +1,517 @@ +"""Django DDP WebSocket service.""" + +from __future__ import print_function, absolute_import + +from django.db import connection, close_old_connections +close_old_connections() + +import collections +import gevent.monkey +import psycogreen.gevent +gevent.monkey.patch_all() +psycogreen.gevent.patch_psycopg() + +import inspect +import optparse +import random +import signal +import socket # green +import traceback +import uuid + +import ejson +import gevent +import gevent.queue +import gevent.select +import geventwebsocket +import psycopg2 # green +import psycopg2.extras +from django.core.handlers.base import BaseHandler +from django.core.handlers.wsgi import WSGIRequest +from django.core.management.base import BaseCommand +from django.db.models.loading import get_model + +from dddp.msg import obj_change_as_msg + + +BASE_HANDLER = BaseHandler() + + +class PostgresGreenlet(gevent.Greenlet): + + """Greenlet for multiplexing database operations.""" + + def __init__(self, conn): + """Prepare async connection.""" + # greenify! + super(PostgresGreenlet, self).__init__() + + # queues for processing incoming sub/unsub requests and processing + self.subs = gevent.queue.Queue() + self.unsubs = gevent.queue.Queue() + self.proc_queue = gevent.queue.Queue() + self._stop_event = gevent.event.Event() + + # dict of name: subscribers + self.all_subs = collections.defaultdict(dict) + self._sub_lock = gevent.lock.RLock() + + # connect to DB in async mode + conn.allow_thread_sharing = True + self.connection = conn + self.conn_params = conn.get_connection_params() + self.conn_params['async'] = True + self.conn = conn.get_new_connection(self.conn_params) + self.poll() # wait for conneciton to start + self.cur = self.conn.cursor() + + def _run(self): # pylint: disable=method-hidden + """Spawn sub tasks, wait for stop signal.""" + gevent.spawn(self.process_conn) + gevent.spawn(self.process_subs) + gevent.spawn(self.process_unsubs) + self._stop_event.wait() + + def stop(self): + """Stop subtasks and let run() finish.""" + self._stop_event.set() + + def subscribe(self, func, obj, id_, name, params): + """Register callback `func` to be called after NOTIFY for `name`.""" + self.subs.put((func, obj, id_, name, params)) + + def unsubscribe(self, func, obj, id_): + """Un-register callback `func` to be called after NOTIFY for `name`.""" + self.unsubs.put((func, obj, id_)) + + def process_conn(self): + """Subtask to process NOTIFY async events from DB connection.""" + while not self._stop_event.is_set(): + # TODO: change timeout so self._stop_event is periodically checked? + gevent.select.select([self.conn], [], [], timeout=None) + self.poll() + + def poll(self): + """Poll DB socket and process async tasks.""" + while 1: + state = self.conn.poll() + if state == psycopg2.extensions.POLL_OK: + while self.conn.notifies: + notify = self.conn.notifies.pop() + name = notify.channel + print("Got NOTIFY:", notify.pid, name, notify.payload) + try: + self._sub_lock.acquire() + subs = self.all_subs[name] + data = ejson.loads(notify.payload) + for (_, id_), (func, params) in subs.items(): + gevent.spawn(func, id_, name, params, data) + finally: + self._sub_lock.release() + break + elif state == psycopg2.extensions.POLL_WRITE: + gevent.select.select([], [self.conn.fileno()], []) + elif state == psycopg2.extensions.POLL_READ: + gevent.select.select([self.conn.fileno()], [], []) + else: + print('POLL_ERR: %s' % state) + + def process_subs(self): + """Subtask to process `sub` requests from `self.subs` queue.""" + while not self._stop_event.is_set(): + func, obj, id_, name, params = self.subs.get() + try: + self._sub_lock.acquire() + subs = self.all_subs[name] + if len(subs) == 0: + print('LISTEN "%s";' % name) + self.poll() + self.cur.execute('LISTEN "%s";' % name) + self.poll() + subs[(obj, id_)] = (func, params) + finally: + self._sub_lock.release() + + def process_unsubs(self): + """Subtask to process `unsub` requests from `self.unsubs` queue.""" + while not self._stop_event.is_set(): + func, obj, id_ = self.unsubs.get() + try: + self._sub_lock.acquire() + for name, subs in self.all_subs.items(): + subs.pop((obj, id_), None) + if len(subs) == 0: + print('UNLISTEN "%s";' % name) + self.cur.execute('UNLISTEN "%s";' % name) + self.poll() + del self.all_subs[name] + finally: + self._sub_lock.release() + gevent.spawn(func, id_) + + +class MeteorError(Exception): + + """MeteorError.""" + + pass + + +def validate_kwargs(func, kwargs, func_name=None): + """Validate arguments to be supplied to func.""" + if func_name is None: + func_name = func.__name__ + argspec = inspect.getargspec(func) + args = argspec.args[:] + + # ignore implicit 'self' argument + if inspect.ismethod(func) and args[0] == 'self': + args[:1] = [] + + # translate 'foo_' to avoid reserved names like 'id' + trans = { + arg: arg.endswith('_') and arg[:-1] or arg + for arg + in args + } + for key in list(kwargs): + key_adj = '%s_' % key + if key_adj in args: + kwargs[key_adj] = kwargs.pop(key) + + required = args[:-len(argspec.defaults)] + supplied = sorted(kwargs) + missing = [ + trans.get(arg, arg) for arg in required + if arg not in supplied + ] + if missing: + raise MeteorError( + 'Missing required arguments to %s: %s' % ( + func_name, + ' '.join(missing), + ), + ) + extra = [ + arg for arg in supplied + if arg not in args + ] + if extra: + raise MeteorError( + 'Unknown arguments to %s: %s' % ( + func_name, + ' '.join(extra), + ), + ) + + +class DDPApplication(geventwebsocket.WebSocketApplication): + + """Django DDP WebSocket application.""" + + methods = {} + versions = [ # first item is preferred version + '1', + 'pre1', + 'pre2', + ] + pgworker = None + remote_addr = None + version = None + support = None + session = None + subs = None + request = None + + def on_open(self): + """Handle new websocket connection.""" + self.request = WSGIRequest(self.ws.environ) + # Apply request middleware (so we get request.user and other attrs) + # pylint: disable=protected-access + for middleware_method in BASE_HANDLER._request_middleware: + response = middleware_method(self.request) + if response: + raise ValueError(response) + + self.remote_addr = '{0[REMOTE_ADDR]}:{0[REMOTE_PORT]}'.format( + self.ws.environ, + ) + self.subs = {} + print('+ %s OPEN %s' % (self, self.request.user)) + self.send('o') + self.send('a["{\\"server_id\\":\\"0\\"}"]') + + def __str__(self): + """Show remote address that connected to us.""" + return self.remote_addr + + def on_close(self, reason): + """Handle closing of websocket connection.""" + print('- %s %s' % (self, reason or 'CLOSE')) + + def on_message(self, message): + """Process a message received from remote.""" + if self.ws.closed: + return None + try: + print('< %s %r' % (self, message)) + + # parse message set + try: + msgs = ejson.loads(message) + except ValueError, err: + raise MeteorError(400, 'Data is not valid EJSON') + if not isinstance(msgs, list): + raise MeteorError(400, 'Invalid EJSON messages') + + # process individual messages + while msgs: + # parse message payload + raw = msgs.pop(0) + try: + try: + data = ejson.loads(raw) + except ValueError, err: + raise MeteorError(400, 'Data is not valid EJSON') + if not isinstance(data, dict): + self.error(400, 'Invalid EJSON message payload', raw) + continue + try: + msg = data.pop('msg') + except KeyError: + raise MeteorError(400, 'Missing `msg` parameter', raw) + # dispatch message + self.dispatch(msg, data) + except MeteorError, err: + self.error(err) + except Exception, err: + traceback.print_exc() + self.error(err) + except geventwebsocket.WebSocketError, err: + self.ws.close() + except MeteorError, err: + self.error(err) + + def dispatch(self, msg, kwargs): + """Dispatch msg to appropriate recv_foo handler.""" + # enforce calling 'connect' first + if not self.session and msg != 'connect': + raise MeteorError(400, 'Session not establised - try `connect`.') + + # lookup method handler + try: + handler = getattr(self, 'recv_%s' % msg) + except (AttributeError, UnicodeEncodeError): + raise MeteorError(404, 'Method not found') + + # validate handler arguments + validate_kwargs(handler, kwargs) + + # dispatch to handler + try: + handler(**kwargs) + except Exception, err: + raise MeteorError(500, 'Internal server error', err) + + def send(self, data): + """Send raw `data` to WebSocket client.""" + print('> %s %r' % (self, data)) + try: + self.ws.send(data) + except geventwebsocket.WebSocketError, err: + self.ws.close() + + def reply(self, msg, **kwargs): + """Send EJSON reply to remote.""" + kwargs['msg'] = msg + data = ejson.dumps([ejson.dumps(kwargs)]) + self.send('a%s' % data) + + def error(self, err, reason=None, detail=None): + """Send EJSON error to remote.""" + if isinstance(err, MeteorError): + (err, reason, detail) = (err.args[:] + (None, None, None))[:3] + data = { + 'error': '%s' % (err or ''), + } + if reason: + data['reason'] = reason + if detail: + data['detail'] = detail + print('! %s %r' % (self, data)) + self.reply('error', **data) + + def recv_connect(self, version, support, session=None): + """DDP connect handler.""" + if self.session: + self.error( + 'Session already established.', + reason='Current session in detail.', + detail=self.session, + ) + elif version not in self.versions: + self.reply('failed', version=self.versions[0]) + elif version not in support: + self.error('Client version/support mismatch.') + else: + if not session: + session = uuid.uuid4().hex + self.version = version + self.support = support + self.session = session + self.reply('connected', session=self.session) + + def recv_ping(self, id_=None): + """DDP ping handler.""" + if id_ is None: + self.reply('pong') + else: + self.reply('pong', id=id_) + + def sub_notify(self, id_, name, params, data): + """Send added/changed/removed msg due to receiving NOTIFY.""" + self.reply(**data) + + def recv_sub(self, id_, name, params=None): + """DDP sub handler.""" + self.pgworker.subscribe(self.sub_notify, self, id_, name, params) + + model = get_model(name) + for obj in model.objects.all(): + _, payload = obj_change_as_msg(obj, 'added') + self.sub_notify(id_, name, params, payload) + + self.reply('ready', subs=[id_]) + + def sub_unnotify(self, id_): + """Send added/changed/removed msg due to receiving NOTIFY.""" + pass # TODO: find out if we're meant to send anything to the client + + def recv_unsub(self, id_): + """DDP unsub handler.""" + self.pgworker.unsubscribe(self.sub_unnotify, self, id_) + + def recv_method(self, method, params, id_, randomSeed=None): + """DDP method handler.""" + try: + func = self.methods[method] + except KeyError: + self.reply('result', id=id_, error=u'Unknown method: %s' % method) + else: + try: + self.reply('result', id=id_, result=func(**params)) + except Exception, err: + self.reply('result', id=id_, error='%s' % err) + finally: + pass + + +def ddpp_sockjs_xhr(environ, start_response): + """Dummy method that doesn't handle XHR requests.""" + start_response( + '404 Not found', + [ + ('Content-Type', 'text/plain; charset=UTF-8'), + ( + 'Access-Control-Allow-Origin', + '/'.join(environ['HTTP_REFERER'].split('/')[:3]), + ), + ('Access-Control-Allow-Credentials', 'true'), + # ('access-control-allow-credentials', 'true'), + ('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0'), + ('Connection', 'keep-alive'), + ('Vary', 'Origin'), + ], + ) + yield 'No.' + + +def ddpp_sockjs_info(environ, start_response): + """Inform client that WebSocket service is available.""" + start_response( + '200 OK', + [ + ('Content-Type', 'application/json; charset=UTF-8'), + ( + 'Access-Control-Allow-Origin', + '/'.join(environ['HTTP_REFERER'].split('/')[:3]), + ), + ('Access-Control-Allow-Credentials', 'true'), + # ('access-control-allow-credentials', 'true'), + ('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0'), + ('Connection', 'keep-alive'), + ('Vary', 'Origin'), + ], + ) + yield ejson.dumps(collections.OrderedDict([ + ('websocket', True), + ('origins', [ + '*:*', + ]), + ('cookie_needed', False), + ('entropy', random.getrandbits(32)), + ])) + + +from meerqat import wsgi +resource = geventwebsocket.Resource({ + r'/websocket': DDPApplication, + r'^/sockjs/\d+/\w+/websocket$': DDPApplication, + r'^/sockjs/\d+/\w+/xhr$': ddpp_sockjs_xhr, + r'^/sockjs/info$': ddpp_sockjs_info, + r'^/(?!(websocket|sockjs)/)': wsgi.application, +}) + + +class Command(BaseCommand): + + """Command to run DDP web service.""" + + args = 'HOST PORT' + help = 'Run DDP service' + requires_system_checks = False + + option_list = BaseCommand.option_list + ( + optparse.make_option( + '-H', '--host', dest="host", metavar='HOST', + help='TCP listening host (default: localhost)', default='localhost', + ), + optparse.make_option( + '-p', '--port', dest="port", metavar='PORT', + help='TCP listening port (default: 3000)', default='3000', + ), + ) + + def handle(self, *args, **options): + """Spawn greenlets for handling websockets and PostgreSQL calls.""" + # setup PostgresGreenlet to multiplex DB calls + postgres = PostgresGreenlet(connection) + DDPApplication.pgworker = postgres + + # setup WebSocketServer to dispatch web requests + BASE_HANDLER.load_middleware() + host = options['host'] + port = options['port'] + if port.isdigit(): + port = int(port) + else: + port = socket.getservbyname(port) + webserver = geventwebsocket.WebSocketServer( + (host, port), + resource, + debug=int(options['verbosity']) > 1, + ) + + def killall(*args, **kwargs): + """Kill all green threads.""" + postgres.stop() + webserver.stop() + + # die gracefully with SIGINT or SIGQUIT + gevent.signal(signal.SIGINT, killall) + gevent.signal(signal.SIGQUIT, killall) + + # start greenlets + print('Running as ws://%s:%d/websocket' % (host, port)) + postgres.start() + webserver.serve_forever() diff --git a/dddp/msg.py b/dddp/msg.py new file mode 100644 index 0000000..11e7b16 --- /dev/null +++ b/dddp/msg.py @@ -0,0 +1,25 @@ +"""Django DDP utils for DDP messaging.""" + +from django.core import serializers + +SERIALIZER = serializers.get_serializer('python')() + + +def obj_change_as_msg(obj, msg): + """Generate a DDP msg for obj with specified msg type.""" + data = SERIALIZER.serialize([obj])[0] + name = data['model'] + + # cast ID as string + if not isinstance(data['pk'], basestring): + data['pk'] = '%d' % data['pk'] + + payload = { + 'msg': msg, + 'collection': name, + 'id': data['pk'], + } + if msg != 'removed': + payload['fields'] = data['fields'] + + return (name, payload) diff --git a/dddp/server.py b/dddp/server.py deleted file mode 100644 index aabc0f4..0000000 --- a/dddp/server.py +++ /dev/null @@ -1,414 +0,0 @@ -"""Django DDP WebSocket service.""" - -from __future__ import print_function - -import collections -import gevent.monkey -import psycogreen.gevent -gevent.monkey.patch_all() -psycogreen.gevent.patch_psycopg() - -import inspect -import random -import signal -import socket # green -import sys -import traceback -import uuid - -import ejson -import gevent -import gevent.queue -import gevent.select -import geventwebsocket -import psycopg2 # green -import psycopg2.extras -from django.db import connection - - -sub_queue = gevent.queue.Queue() -unsub_queue = gevent.queue.Queue() -all_subs = collections.defaultdict(list) - - -def subscribe(func, name, params): - sub_queue.put((func, name, params)) - - -def unsubscribe(func, name, params): - unsub_queue.put((func, name, params)) - - -def pg_loop(conn): - if gevent.select.select([conn], [], [], 5) == ([], [], []): - print("Timeout") - else: - pg_wait(conn) - - -def pg_wait(conn): - while True: - state = conn.poll() - if state == psycopg2.extensions.POLL_OK: - while conn.notifies: - notify = conn.notifies.pop() - name = notify.channel - print("Got NOTIFY:", notify.pid, name, notify.payload) - subs = all_subs[name] - data = ejson.loads(notify.payload) - for (func, params) in subs: - try: - gevent.spawn(func, name, params, data) - #func(name, params, data) - except Exception, err: - print("Callback error: %s" % err) - traceback.print_exc() - break - elif state == psycopg2.extensions.POLL_WRITE: - print('POLL_WRITE') - gevent.select.select([], [conn.fileno()], []) - elif state == psycopg2.extensions.POLL_READ: - print('POLL_READ') - gevent.select.select([conn.fileno()], [], []) - else: - print('POLL_ERR: %s' % state) - raise psycopg2.OperationalError("poll() returned %s" % state) - - -def pg_listen(): - conn_params = connection.get_connection_params() - conn_params['async'] = True - #psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select) - #pg_wait = psycopg2.extras.wait_select - conn = connection.get_new_connection(conn_params) - #pg_wait(conn) - pg_wait(conn) - #psycopg2.extras.wait_select(conn) - cur = conn.cursor() - #cur.execute('LISTEN "survey.response";') - print('LISTEN') - while 1: - try: - func, name, params = sub_queue.get_nowait() - subs = all_subs[name] - if len(subs) == 0: - print('LISTEN "%s";' % name) - cur.execute('LISTEN "%s";' % name) - subs.append((func, params)) - except gevent.queue.Empty: - pass - try: - func, name, params = unsub_queue.get_nowait() - subs = all_subs[name] - subs.remove(func, params) - if len(subs) == 0: - print('UNLISTEN "%s";' % name) - cur.execute('UNLISTEN "%s";' % name) - except gevent.queue.Empty: - pass - - pg_loop(conn) - #psycopg2.extras.wait_select(conn) - - -class MeteorError(Exception): - - """MeteorError.""" - - pass - - -def validate_kwargs(func, kwargs, func_name=None): - """Validate arguments to be supplied to func.""" - if func_name is None: - func_name = func.__name__ - argspec = inspect.getargspec(func) - args = argspec.args[:] - - # ignore implicit 'self' argument - if inspect.ismethod(func): - args[:1] = [] - - # translate 'foo' args into 'foo_' to avoid crap argument names - for key in list(kwargs): - key_adj = '%s_' % key - if key_adj in args: - kwargs[key_adj] = kwargs.pop(key) - - required = args[:-len(argspec.defaults)] - supplied = sorted(kwargs) - missing = [ - arg for arg in required - if arg not in supplied - ] - if missing: - raise MeteorError( - 'Missing required arguments to %s: %s' % ( - func_name, - ' '.join(missing), - ), - ) - extra = [ - arg for arg in supplied - if arg not in args - ] - if extra: - raise MeteorError( - 'Unknown arguments to %s: %s' % ( - func_name, - ' '.join(extra), - ), - ) - - -class DDPApplication(geventwebsocket.WebSocketApplication): - - """Django DDP WebSocket application.""" - - methods = {} - versions = [ # first item is preferred version - '1', - 'pre1', - 'pre2', - ] - remote_addr = None - version = None - support = None - session = None - subs = None - - #def __init__(self, environ, start_response, *args, **kwargs): - # print('INIT: %r %r' % (args, kwargs)) - # super_init = super(DDPApplication, self).__init__ - # print(inspect.getargspec(super_init)) - # return super_init(environ, start_response, *args, **kwargs) - - def on_open(self): - """Handle new websocket connection.""" - self.remote_addr = '{0[REMOTE_ADDR]}:{0[REMOTE_PORT]}'.format( - self.ws.environ, - ) - self.subs = {} - print('+ %s OPEN' % self) - self.send('o') - self.send('a["{\\"server_id\\":\\"0\\"}"]') - - def __str__(self): - """Show remote address that connected to us.""" - return self.remote_addr - - def on_close(self, reason): - """Handle closing of websocket connection.""" - print('- %s %s' % (self, reason or 'CLOSE')) - - def on_message(self, raw): - """Process a message received from remote.""" - if self.ws.closed: - return None - try: - print('< %s %r' % (self, raw)) - data = ejson.loads(raw) - data = ejson.loads(data[0]) - try: - msg = data.pop('msg') - except (AttributeError, TypeError, IndexError): - raise MeteorError('Invalid EJSON message type.') - except KeyError: - raise MeteorError('Missing required field: msg') - else: - self.dispatch(msg, data) - except geventwebsocket.WebSocketError, err: - self.on_close(err) - except Exception, err: - self.error(err) - - def dispatch(self, msg, kwargs): - """Dispatch msg to appropriate recv_foo handler.""" - # enforce calling 'connect' first - if not self.session and msg != 'connect': - raise MeteorError("Session not establised - try 'connect'.") - - # lookup method handler - try: - handler = getattr(self, 'recv_%s' % msg) - except (AttributeError, UnicodeEncodeError): - raise MeteorError(404, 'Method not found') - - # validate handler arguments - validate_kwargs(handler, kwargs) - - # dispatch to handler - try: - handler(**kwargs) - except Exception, err: - raise MeteorError(500, 'Internal server error', err) - - def send(self, data): - print('> %s %r' % (self, data)) - self.ws.send(data) - - def reply(self, msg, **kwargs): - """Send EJSON reply to remote.""" - kwargs['msg'] = msg - data = ejson.dumps([ejson.dumps(kwargs)]) - self.send('a%s' % data) - - def error(self, err, reason=None, detail=None): - """Send EJSON error to remote.""" - if isinstance(err, MeteorError): - (err, reason, detail) = (err.args[:] + (None, None, None))[:3] - if not err: - err = '' - data = { - 'error': err, - } - if reason: - data['reason'] = reason - if detail: - data['detail'] = detail - print('! %s %r' % (self, data)) - self.reply('error', **data) - - def recv_connect(self, version, support, session=None): - """DDP connect handler.""" - if self.session: - self.error( - 'Session already established.', - reason='Current session in detail.', - detail=self.session, - ) - elif version not in self.versions: - self.reply('failed', version=self.versions[0]) - elif version not in support: - self.error('Client version/support mismatch.') - else: - if not session: - session = uuid.uuid4().hex - self.version = version - self.support = support - self.session = session - self.reply('connected', session=self.session) - - def recv_ping(self, id_=None): - """DDP ping handler.""" - if id_ is None: - self.reply('pong') - else: - self.reply('pong', id=id_) - - def sub_notify(self, name, params, data): - self.reply(**data) - - def recv_sub(self, id_, name, params=None): - """DDP sub handler.""" - print('recv_sub: %r %r %r' % (id_, name, params)) - subscribe(self.sub_notify, name, params) - self.reply('ready', subs=[id_]) - - def recv_unsub(self, id_): - """DDP unsub handler.""" - unsubscribe(self.sub_notify, name, params) - - def recv_method(self, method, params, id_, randomSeed=None): - """DDP method handler.""" - try: - func = self.methods[method] - except KeyError: - self.reply('result', id=id_, error=u'Unknown method: %s' % method) - else: - try: - self.reply('result', id=id_, result=func(**params)) - except Exception, err: - self.reply('result', id=id_, error='%s' % err) - finally: - pass - -def ddpp_sockjs_xhr(environ, start_response): - start_response( - '404 Not found', - [ - ('Content-Type', 'text/plain; charset=UTF-8'), - ( - 'Access-Control-Allow-Origin', - '%s://%s:3000' % ( - environ['wsgi.url_scheme'], - environ['HTTP_HOST'].split(':')[0], - ), - ), - ('Access-Control-Allow-Credentials', 'true'), - # ('access-control-allow-credentials', 'true'), - ('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0'), - ('Connection', 'keep-alive'), - ('Vary', 'Origin'), - ], - ) - yield 'No.' - -def ddpp_sockjs_info(environ, start_response): - print(environ) - host = environ['HTTP_HOST'].split(':')[0] - start_response( - '200 OK', - [ - ('Content-Type', 'application/json; charset=UTF-8'), - ( - 'Access-Control-Allow-Origin', - '%s://%s:3000' % ( - environ['wsgi.url_scheme'], - environ['HTTP_HOST'].split(':')[0], - ), - ), - ('Access-Control-Allow-Credentials', 'true'), - # ('access-control-allow-credentials', 'true'), - ('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0'), - ('Connection', 'keep-alive'), - ('Vary', 'Origin'), - ], - ) - yield ejson.dumps(collections.OrderedDict([ - ('websocket', True), - ('origins', [ - '*:*', - ]), - ('cookie_needed', False), - ('entropy', random.getrandbits(32)), - ])) - -class DDPServer(geventwebsocket.WebSocketServer): - def __init__(self, *args, **kwargs): - print('INIT: %r %r' % (args, kwargs)) - return super(DDPServer, self).__init__(*args, **kwargs) - -resource = geventwebsocket.Resource({ - r'/websocket': DDPApplication, - r'^/sockjs/\d+/\w+/websocket$': DDPApplication, - r'^/sockjs/\d+/\w+/xhr$': ddpp_sockjs_xhr, - r'^/sockjs/info$': ddpp_sockjs_info, -}) - -if __name__ == '__main__': - addr = (sys.argv[1:] + [':8001'])[0] - (host, port) = (addr.rsplit(':', 1)[:2] + [':8001'])[:2] - if port.isdigit(): - port = int(port) - else: - port = socket.getservbyname(port) - server = geventwebsocket.WebSocketServer( - (host, port), - resource, - debug='-v' in sys.argv, - ) - print('Running as ws://%s:%d/websocket' % (host, port)) - threads = [ - gevent.spawn(server.serve_forever), - gevent.spawn(pg_listen), - ] - - def killall(): - """Kill all green threads.""" - for thread in threads: - thread.kill() - - gevent.signal(signal.SIGINT, killall) - gevent.signal(signal.SIGQUIT, killall) - gevent.joinall(threads)