diff --git a/dddp/main.py b/dddp/main.py index e8ab34a..7fa9416 100644 --- a/dddp/main.py +++ b/dddp/main.py @@ -1,10 +1,18 @@ """Django DDP WebSocket service.""" -from __future__ import print_function, absolute_import +from __future__ import absolute_import, print_function +import argparse import collections import os -import sys +import signal + +import gevent +from gevent.backdoor import BackdoorServer +import gevent.event +import gevent.pywsgi +import geventwebsocket +import geventwebsocket.handler Addr = collections.namedtuple('Addr', ['host', 'port']) @@ -60,113 +68,164 @@ def ddpp_sockjs_info(environ, start_response): ])) -def serve(listen, debug=False, verbosity=1, debug_port=0, **ssl_args): - """Spawn greenlets for handling websockets and PostgreSQL calls.""" - import signal - from django.apps import apps - from django.db import connection, close_old_connections - from django.utils.module_loading import import_string - from dddp.postgres import PostgresGreenlet - from dddp.websocket import DDPWebSocketApplication - import gevent - import geventwebsocket +class DDPLauncher(object): - # shutdown existing connections - close_old_connections() + """Orchestrate the setup and launching of DDP greenlets in a sane manner.""" - # setup PostgresGreenlet to multiplex DB calls - pgworker = PostgresGreenlet(connection, debug=debug) - DDPWebSocketApplication.pgworker = pgworker + pgworker = None - # use settings.WSGI_APPLICATION or fallback to default Django WSGI app - from django.conf import settings - if hasattr(settings, 'WSGI_APPLICATION'): - wsgi_name = settings.WSGI_APPLICATION - wsgi_app = import_string(wsgi_name) - else: - from django.core.wsgi import get_wsgi_application - wsgi_app = get_wsgi_application() - wsgi_name = str(wsgi_app.__class__) + def __init__(self, debug=False, verbosity=1): + """ + Spawn greenlets for handling websockets and PostgreSQL calls. - resource = geventwebsocket.Resource( - collections.OrderedDict([ - (r'/websocket', DDPWebSocketApplication), - (r'^/sockjs/\d+/\w+/websocket$', DDPWebSocketApplication), - (r'^/sockjs/\d+/\w+/xhr$', ddpp_sockjs_xhr), - (r'^/sockjs/info$', ddpp_sockjs_info), - (r'^/(?!(websocket|sockjs)/)', wsgi_app), - ]), - ) + Only one pgworker should be started for any process, and ideally only + one instance per node as well (ie: use IPC to communicate events between + processes on the same host). This class handles the former (per + process), per host IPC is an excercise left to you, dear reader - pull + requests are of course welcome. ;) - # setup WebSocketServer to dispatch web requests - servers = [ - geventwebsocket.WebSocketServer( - (host, port), - resource, - debug=debug, - **{key:val for key, val in ssl_args.items() if val is not None} + For the record, there's no technical reason that you can't have multiple + pgworker instances running on one host (or even one process), it's just + inefficient to send exactly the same traffic multiple times between your + web servers and your database host. + """ + import logging + from django.apps import apps + from django.utils.module_loading import import_string + from dddp.websocket import DDPWebSocketApplication + + self.verbosity = verbosity + self._stop_event = gevent.event.Event() + self._stop_event.set() # start in stopped state. + self.logger = logging.getLogger('dddp.launcher') + + if DDPLauncher.pgworker is None: + from django.db import connection, close_old_connections + from dddp.postgres import PostgresGreenlet + # shutdown existing connections + close_old_connections() + + DDPLauncher.pgworker = PostgresGreenlet( + connection, debug=debug, + ) + + # use settings.WSGI_APPLICATION or fallback to default Django WSGI app + from django.conf import settings + if hasattr(settings, 'WSGI_APPLICATION'): + self.wsgi_name = settings.WSGI_APPLICATION + self.wsgi_app = import_string(self.wsgi_name) + else: + from django.core.wsgi import get_wsgi_application + self.wsgi_app = get_wsgi_application() + self.wsgi_name = str(self.wsgi_app.__class__) + + self.api = apps.get_app_config('dddp').api + self.api.pgworker = DDPLauncher.pgworker + DDPWebSocketApplication.api = self.api + + # setup PostgresGreenlet to multiplex DB calls + DDPWebSocketApplication.pgworker = self.pgworker + + self.resource = geventwebsocket.Resource( + collections.OrderedDict([ + (r'/websocket', DDPWebSocketApplication), + (r'^/sockjs/\d+/\w+/websocket$', DDPWebSocketApplication), + (r'^/sockjs/\d+/\w+/xhr$', ddpp_sockjs_xhr), + (r'^/sockjs/info$', ddpp_sockjs_info), + (r'^/(?!(websocket|sockjs)/)', self.wsgi_app), + ]), ) - for host, port - in listen - ] - def killall(*args, **kwargs): - """Kill all green threads.""" - pgworker.stop() - for server in servers: + self.servers = [] + self.threads = [] + + def print(self, msg, *args, **kwargs): + """Print formatted msg if verbosity set at 1 or above.""" + if self.verbosity >= 1: + print(msg, *args, **kwargs) + + def add_web_servers(self, listen_addrs, debug=False, **ssl_args): + """Add WebSocketServer for each (host, port) in listen_addrs.""" + self.servers.extend( + self.get_web_server(listen_addr, debug=debug, **ssl_args) + for listen_addr in listen_addrs + ) + + def get_web_server(self, listen_addr, debug=False, **ssl_args): + """Setup WebSocketServer on listen_addr (host, port).""" + return geventwebsocket.WebSocketServer( + listen_addr, + self.resource, + debug=debug, + **{key: val for key, val in ssl_args.items() if val is not None} + ) + + def get_backdoor_server(self, listen_addr, **context): + """Add a backdoor (debug) server.""" + from django.conf import settings + local_vars = { + 'launcher': self, + 'servers': self.servers, + 'pgworker': self.pgworker, + 'stop': self.stop, + 'api': self.api, + 'resource': self.resource, + 'settings': settings, + 'wsgi_app': self.wsgi_app, + 'wsgi_name': self.wsgi_name, + } + local_vars.update(context) + return BackdoorServer( + listen_addr, banner='Django DDP', locals=local_vars, + ) + + def stop(self): + """Stop all green threads.""" + self.logger.debug('PostgresGreenlet stop') + self._stop_event.set() + # ask all threads to stop. + for server in self.servers + [DDPLauncher.pgworker]: + self.logger.debug('Stopping %s', server) server.stop() - # die gracefully with SIGINT or SIGQUIT - gevent.signal(signal.SIGINT, killall) - gevent.signal(signal.SIGQUIT, killall) + def start(self): + """Run PostgresGreenlet and web/debug servers.""" + self.logger.debug('PostgresGreenlet start') + self._stop_event.clear() + self.print('=> Discovering DDP endpoints...') + for api_path in sorted(self.api.api_path_map()): + self.logger.debug(' %s', api_path) - print('=> Discovering DDP endpoints...') - api = apps.get_app_config('dddp').api - api.pgworker = pgworker - DDPWebSocketApplication.api = api - print( - '\n'.join( - ' %s' % api_path - for api_path - in sorted(api.api_path_map()) - ), - ) + # start greenlets + self.pgworker.start() + self.print('=> Started PostgresGreenlet.') + for server in self.servers: + thread = gevent.spawn(server.serve_forever) + self.threads.append(thread) + if isinstance(server, geventwebsocket.WebSocketServer): + self.print( + '=> App running at: %s://%s:%d/' % ( + 'https' if server.ssl_enabled else 'http', + server.server_host, + server.server_port, + ), + ) + elif isinstance(server, gevent.backdoor.BackdoorServer): + self.print( + '=> Debug service running at: telnet://%s:%d/' % ( + server.server_host, + server.server_port, + ), + ) + self.print('=> Started your app (%s).' % self.wsgi_name) - # start greenlets - if debug_port: - from gevent.backdoor import BackdoorServer - servers.append( - BackdoorServer( - ('127.0.0.1', debug_port), - banner='Django DDP', - locals={ - 'servers': servers, - 'pgworker': pgworker, - 'killall': killall, - 'api': api, - 'resource': resource, - 'settings': settings, - 'wsgi_app': wsgi_app, - 'wsgi_name': wsgi_name, - }, - ) - ) - - pgworker.start() - print('=> Started PostgresGreenlet.') - threads = [ - gevent.spawn(server.serve_forever) - for server - in servers - ] - print('=> Started DDPWebSocketApplication.') - print('=> Started your app (%s).' % wsgi_name) - print('') - for host, port in listen: - print('=> App running at: http://%s:%d/' % (host, port)) - gevent.joinall(threads) - pgworker.stop() - gevent.joinall([pgworker]) + def run(self): + """Run DDP greenlets.""" + self.logger.debug('PostgresGreenlet run') + self.start() + self._stop_event.wait() + # wait for all threads to stop. + gevent.joinall(self.threads + [DDPLauncher.pgworker]) def addr(val, default_port=8000, defualt_host='localhost'): @@ -205,8 +264,37 @@ def addr(val, default_port=8000, defualt_host='localhost'): return Addr(host, port) +def serve(listen, verbosity=1, debug_port=0, **ssl_args): + """Spawn greenlets for handling websockets and PostgreSQL calls.""" + launcher = DDPLauncher(debug=verbosity == 3, verbosity=verbosity) + if debug_port: + launcher.servers.append( + launcher.get_backdoor_server('localhost:%d' % debug_port) + ) + launcher.add_web_servers(listen, **ssl_args) + # die gracefully with SIGINT or SIGQUIT + sigmap = { + val: name + for name, val + in vars(signal).items() + if name.startswith('SIG') + } + + def sighandler(signum=None, frame=None): + """Signal handler""" + launcher.logger.info( + 'Received signal %s in frame %r', + sigmap.get(signum, signum), + frame, + ) + launcher.stop() + for signum in [signal.SIGINT, signal.SIGQUIT]: + gevent.signal(signum, sighandler) + launcher.run() + + def main(): - import argparse + """Main entry point for `dddp` command.""" parser = argparse.ArgumentParser(description=__doc__) django = parser.add_argument_group('Django Options') django.add_argument(