Abstract DDPLauncher out from dddp.main.serve to permit use from other contexts.

This commit is contained in:
Tyson Clugg 2015-09-18 17:45:38 +10:00
parent d03b259226
commit 2264ddbdc2

View file

@ -1,10 +1,18 @@
"""Django DDP WebSocket service."""
from __future__ import print_function, absolute_import
from __future__ import absolute_import, print_function
import argparse
import collections
import os
import sys
import signal
import gevent
from gevent.backdoor import BackdoorServer
import gevent.event
import gevent.pywsgi
import geventwebsocket
import geventwebsocket.handler
Addr = collections.namedtuple('Addr', ['host', 'port'])
@ -60,113 +68,164 @@ def ddpp_sockjs_info(environ, start_response):
]))
def serve(listen, debug=False, verbosity=1, debug_port=0, **ssl_args):
"""Spawn greenlets for handling websockets and PostgreSQL calls."""
import signal
from django.apps import apps
from django.db import connection, close_old_connections
from django.utils.module_loading import import_string
from dddp.postgres import PostgresGreenlet
from dddp.websocket import DDPWebSocketApplication
import gevent
import geventwebsocket
class DDPLauncher(object):
# shutdown existing connections
close_old_connections()
"""Orchestrate the setup and launching of DDP greenlets in a sane manner."""
# setup PostgresGreenlet to multiplex DB calls
pgworker = PostgresGreenlet(connection, debug=debug)
DDPWebSocketApplication.pgworker = pgworker
pgworker = None
# use settings.WSGI_APPLICATION or fallback to default Django WSGI app
from django.conf import settings
if hasattr(settings, 'WSGI_APPLICATION'):
wsgi_name = settings.WSGI_APPLICATION
wsgi_app = import_string(wsgi_name)
else:
from django.core.wsgi import get_wsgi_application
wsgi_app = get_wsgi_application()
wsgi_name = str(wsgi_app.__class__)
def __init__(self, debug=False, verbosity=1):
"""
Spawn greenlets for handling websockets and PostgreSQL calls.
resource = geventwebsocket.Resource(
collections.OrderedDict([
(r'/websocket', DDPWebSocketApplication),
(r'^/sockjs/\d+/\w+/websocket$', DDPWebSocketApplication),
(r'^/sockjs/\d+/\w+/xhr$', ddpp_sockjs_xhr),
(r'^/sockjs/info$', ddpp_sockjs_info),
(r'^/(?!(websocket|sockjs)/)', wsgi_app),
]),
)
Only one pgworker should be started for any process, and ideally only
one instance per node as well (ie: use IPC to communicate events between
processes on the same host). This class handles the former (per
process), per host IPC is an excercise left to you, dear reader - pull
requests are of course welcome. ;)
# setup WebSocketServer to dispatch web requests
servers = [
geventwebsocket.WebSocketServer(
(host, port),
resource,
debug=debug,
**{key:val for key, val in ssl_args.items() if val is not None}
For the record, there's no technical reason that you can't have multiple
pgworker instances running on one host (or even one process), it's just
inefficient to send exactly the same traffic multiple times between your
web servers and your database host.
"""
import logging
from django.apps import apps
from django.utils.module_loading import import_string
from dddp.websocket import DDPWebSocketApplication
self.verbosity = verbosity
self._stop_event = gevent.event.Event()
self._stop_event.set() # start in stopped state.
self.logger = logging.getLogger('dddp.launcher')
if DDPLauncher.pgworker is None:
from django.db import connection, close_old_connections
from dddp.postgres import PostgresGreenlet
# shutdown existing connections
close_old_connections()
DDPLauncher.pgworker = PostgresGreenlet(
connection, debug=debug,
)
# use settings.WSGI_APPLICATION or fallback to default Django WSGI app
from django.conf import settings
if hasattr(settings, 'WSGI_APPLICATION'):
self.wsgi_name = settings.WSGI_APPLICATION
self.wsgi_app = import_string(self.wsgi_name)
else:
from django.core.wsgi import get_wsgi_application
self.wsgi_app = get_wsgi_application()
self.wsgi_name = str(self.wsgi_app.__class__)
self.api = apps.get_app_config('dddp').api
self.api.pgworker = DDPLauncher.pgworker
DDPWebSocketApplication.api = self.api
# setup PostgresGreenlet to multiplex DB calls
DDPWebSocketApplication.pgworker = self.pgworker
self.resource = geventwebsocket.Resource(
collections.OrderedDict([
(r'/websocket', DDPWebSocketApplication),
(r'^/sockjs/\d+/\w+/websocket$', DDPWebSocketApplication),
(r'^/sockjs/\d+/\w+/xhr$', ddpp_sockjs_xhr),
(r'^/sockjs/info$', ddpp_sockjs_info),
(r'^/(?!(websocket|sockjs)/)', self.wsgi_app),
]),
)
for host, port
in listen
]
def killall(*args, **kwargs):
"""Kill all green threads."""
pgworker.stop()
for server in servers:
self.servers = []
self.threads = []
def print(self, msg, *args, **kwargs):
"""Print formatted msg if verbosity set at 1 or above."""
if self.verbosity >= 1:
print(msg, *args, **kwargs)
def add_web_servers(self, listen_addrs, debug=False, **ssl_args):
"""Add WebSocketServer for each (host, port) in listen_addrs."""
self.servers.extend(
self.get_web_server(listen_addr, debug=debug, **ssl_args)
for listen_addr in listen_addrs
)
def get_web_server(self, listen_addr, debug=False, **ssl_args):
"""Setup WebSocketServer on listen_addr (host, port)."""
return geventwebsocket.WebSocketServer(
listen_addr,
self.resource,
debug=debug,
**{key: val for key, val in ssl_args.items() if val is not None}
)
def get_backdoor_server(self, listen_addr, **context):
"""Add a backdoor (debug) server."""
from django.conf import settings
local_vars = {
'launcher': self,
'servers': self.servers,
'pgworker': self.pgworker,
'stop': self.stop,
'api': self.api,
'resource': self.resource,
'settings': settings,
'wsgi_app': self.wsgi_app,
'wsgi_name': self.wsgi_name,
}
local_vars.update(context)
return BackdoorServer(
listen_addr, banner='Django DDP', locals=local_vars,
)
def stop(self):
"""Stop all green threads."""
self.logger.debug('PostgresGreenlet stop')
self._stop_event.set()
# ask all threads to stop.
for server in self.servers + [DDPLauncher.pgworker]:
self.logger.debug('Stopping %s', server)
server.stop()
# die gracefully with SIGINT or SIGQUIT
gevent.signal(signal.SIGINT, killall)
gevent.signal(signal.SIGQUIT, killall)
def start(self):
"""Run PostgresGreenlet and web/debug servers."""
self.logger.debug('PostgresGreenlet start')
self._stop_event.clear()
self.print('=> Discovering DDP endpoints...')
for api_path in sorted(self.api.api_path_map()):
self.logger.debug(' %s', api_path)
print('=> Discovering DDP endpoints...')
api = apps.get_app_config('dddp').api
api.pgworker = pgworker
DDPWebSocketApplication.api = api
print(
'\n'.join(
' %s' % api_path
for api_path
in sorted(api.api_path_map())
),
)
# start greenlets
self.pgworker.start()
self.print('=> Started PostgresGreenlet.')
for server in self.servers:
thread = gevent.spawn(server.serve_forever)
self.threads.append(thread)
if isinstance(server, geventwebsocket.WebSocketServer):
self.print(
'=> App running at: %s://%s:%d/' % (
'https' if server.ssl_enabled else 'http',
server.server_host,
server.server_port,
),
)
elif isinstance(server, gevent.backdoor.BackdoorServer):
self.print(
'=> Debug service running at: telnet://%s:%d/' % (
server.server_host,
server.server_port,
),
)
self.print('=> Started your app (%s).' % self.wsgi_name)
# start greenlets
if debug_port:
from gevent.backdoor import BackdoorServer
servers.append(
BackdoorServer(
('127.0.0.1', debug_port),
banner='Django DDP',
locals={
'servers': servers,
'pgworker': pgworker,
'killall': killall,
'api': api,
'resource': resource,
'settings': settings,
'wsgi_app': wsgi_app,
'wsgi_name': wsgi_name,
},
)
)
pgworker.start()
print('=> Started PostgresGreenlet.')
threads = [
gevent.spawn(server.serve_forever)
for server
in servers
]
print('=> Started DDPWebSocketApplication.')
print('=> Started your app (%s).' % wsgi_name)
print('')
for host, port in listen:
print('=> App running at: http://%s:%d/' % (host, port))
gevent.joinall(threads)
pgworker.stop()
gevent.joinall([pgworker])
def run(self):
"""Run DDP greenlets."""
self.logger.debug('PostgresGreenlet run')
self.start()
self._stop_event.wait()
# wait for all threads to stop.
gevent.joinall(self.threads + [DDPLauncher.pgworker])
def addr(val, default_port=8000, defualt_host='localhost'):
@ -205,8 +264,37 @@ def addr(val, default_port=8000, defualt_host='localhost'):
return Addr(host, port)
def serve(listen, verbosity=1, debug_port=0, **ssl_args):
"""Spawn greenlets for handling websockets and PostgreSQL calls."""
launcher = DDPLauncher(debug=verbosity == 3, verbosity=verbosity)
if debug_port:
launcher.servers.append(
launcher.get_backdoor_server('localhost:%d' % debug_port)
)
launcher.add_web_servers(listen, **ssl_args)
# die gracefully with SIGINT or SIGQUIT
sigmap = {
val: name
for name, val
in vars(signal).items()
if name.startswith('SIG')
}
def sighandler(signum=None, frame=None):
"""Signal handler"""
launcher.logger.info(
'Received signal %s in frame %r',
sigmap.get(signum, signum),
frame,
)
launcher.stop()
for signum in [signal.SIGINT, signal.SIGQUIT]:
gevent.signal(signum, sighandler)
launcher.run()
def main():
import argparse
"""Main entry point for `dddp` command."""
parser = argparse.ArgumentParser(description=__doc__)
django = parser.add_argument_group('Django Options')
django.add_argument(