mirror of
https://github.com/jazzband/django-ddp.git
synced 2026-03-16 22:40:24 +00:00
356 lines
12 KiB
Python
356 lines
12 KiB
Python
"""Django DDP WebSocket service."""
|
|
|
|
from __future__ import absolute_import, print_function
|
|
|
|
import argparse
|
|
import collections
|
|
import os
|
|
import signal
|
|
|
|
import gevent
|
|
from gevent.backdoor import BackdoorServer
|
|
import gevent.event
|
|
import gevent.pywsgi
|
|
import geventwebsocket
|
|
import geventwebsocket.handler
|
|
|
|
|
|
Addr = collections.namedtuple('Addr', ['host', 'port'])
|
|
|
|
|
|
def common_headers(environ, **kwargs):
|
|
"""Return list of common headers for SockJS HTTP responses."""
|
|
return [
|
|
# DDP doesn't use cookies or HTTP level auth, so CSRF attacks are
|
|
# ineffective. We can safely allow cross-domain DDP connections and
|
|
# developers may choose to allow anonymous access to publications and
|
|
# RPC methods as they see fit. More to the point, developers should
|
|
# restrict access to publications and RPC endpoints as appropriate.
|
|
('Access-Control-Allow-Origin', '*'),
|
|
('Access-Control-Allow-Credentials', 'false'),
|
|
('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0'),
|
|
('Connection', 'keep-alive'),
|
|
('Vary', 'Origin'),
|
|
]
|
|
|
|
|
|
def ddpp_sockjs_xhr(environ, start_response):
|
|
"""Dummy method that doesn't handle XHR requests."""
|
|
start_response(
|
|
'404 Not found',
|
|
[
|
|
('Content-Type', 'text/plain; charset=UTF-8'),
|
|
] + common_headers(environ),
|
|
)
|
|
yield 'No.'
|
|
|
|
|
|
def ddpp_sockjs_info(environ, start_response):
|
|
"""Inform client that WebSocket service is available."""
|
|
import random
|
|
import ejson
|
|
|
|
start_response(
|
|
'200 OK',
|
|
[
|
|
('Content-Type', 'application/json; charset=UTF-8'),
|
|
] + common_headers(environ),
|
|
)
|
|
yield ejson.dumps(collections.OrderedDict([
|
|
('websocket', True),
|
|
('origins', [
|
|
'*:*',
|
|
]),
|
|
('cookie_needed', False),
|
|
('entropy', random.getrandbits(32)),
|
|
]))
|
|
|
|
|
|
class DDPLauncher(object):
|
|
|
|
"""Orchestrate the setup and launching of DDP greenlets in a sane manner."""
|
|
|
|
pgworker = None
|
|
|
|
def __init__(self, debug=False, verbosity=1):
|
|
"""
|
|
Spawn greenlets for handling websockets and PostgreSQL calls.
|
|
|
|
Only one pgworker should be started for any process, and ideally only
|
|
one instance per node as well (ie: use IPC to communicate events between
|
|
processes on the same host). This class handles the former (per
|
|
process), per host IPC is an excercise left to you, dear reader - pull
|
|
requests are of course welcome. ;)
|
|
|
|
For the record, there's no technical reason that you can't have multiple
|
|
pgworker instances running on one host (or even one process), it's just
|
|
inefficient to send exactly the same traffic multiple times between your
|
|
web servers and your database host.
|
|
"""
|
|
import logging
|
|
from django.apps import apps
|
|
from django.utils.module_loading import import_string
|
|
from dddp.websocket import DDPWebSocketApplication
|
|
|
|
self.verbosity = verbosity
|
|
self._stop_event = gevent.event.Event()
|
|
self._stop_event.set() # start in stopped state.
|
|
self.logger = logging.getLogger('dddp.launcher')
|
|
|
|
if DDPLauncher.pgworker is None:
|
|
from django.db import connection, close_old_connections
|
|
from dddp.postgres import PostgresGreenlet
|
|
# shutdown existing connections
|
|
close_old_connections()
|
|
|
|
DDPLauncher.pgworker = PostgresGreenlet(connection)
|
|
|
|
# use settings.WSGI_APPLICATION or fallback to default Django WSGI app
|
|
from django.conf import settings
|
|
self.wsgi_app = None
|
|
if hasattr(settings, 'WSGI_APPLICATION'):
|
|
self.wsgi_name = settings.WSGI_APPLICATION
|
|
try:
|
|
self.wsgi_app = import_string(self.wsgi_name)
|
|
except ImportError:
|
|
pass
|
|
if self.wsgi_app is None:
|
|
from django.core.wsgi import get_wsgi_application
|
|
self.wsgi_app = get_wsgi_application()
|
|
self.wsgi_name = str(self.wsgi_app.__class__)
|
|
|
|
self.api = apps.get_app_config('dddp').api
|
|
self.api.pgworker = DDPLauncher.pgworker
|
|
DDPWebSocketApplication.api = self.api
|
|
|
|
# setup PostgresGreenlet to multiplex DB calls
|
|
DDPWebSocketApplication.pgworker = self.pgworker
|
|
|
|
self.resource = geventwebsocket.Resource(
|
|
collections.OrderedDict([
|
|
(r'/websocket', DDPWebSocketApplication),
|
|
(r'^/sockjs/\d+/\w+/websocket$', DDPWebSocketApplication),
|
|
(r'^/sockjs/\d+/\w+/xhr$', ddpp_sockjs_xhr),
|
|
(r'^/sockjs/info$', ddpp_sockjs_info),
|
|
(r'^/(?!(websocket|sockjs)/)', self.wsgi_app),
|
|
]),
|
|
)
|
|
|
|
self.servers = []
|
|
self.threads = []
|
|
|
|
def print(self, msg, *args, **kwargs):
|
|
"""Print formatted msg if verbosity set at 1 or above."""
|
|
if self.verbosity >= 1:
|
|
print(msg, *args, **kwargs)
|
|
|
|
def add_web_servers(self, listen_addrs, debug=False, **ssl_args):
|
|
"""Add WebSocketServer for each (host, port) in listen_addrs."""
|
|
self.servers.extend(
|
|
self.get_web_server(listen_addr, debug=debug, **ssl_args)
|
|
for listen_addr in listen_addrs
|
|
)
|
|
|
|
def get_web_server(self, listen_addr, debug=False, **ssl_args):
|
|
"""Setup WebSocketServer on listen_addr (host, port)."""
|
|
return geventwebsocket.WebSocketServer(
|
|
listen_addr,
|
|
self.resource,
|
|
debug=debug,
|
|
**{key: val for key, val in ssl_args.items() if val is not None}
|
|
)
|
|
|
|
def get_backdoor_server(self, listen_addr, **context):
|
|
"""Add a backdoor (debug) server."""
|
|
from django.conf import settings
|
|
local_vars = {
|
|
'launcher': self,
|
|
'servers': self.servers,
|
|
'pgworker': self.pgworker,
|
|
'stop': self.stop,
|
|
'api': self.api,
|
|
'resource': self.resource,
|
|
'settings': settings,
|
|
'wsgi_app': self.wsgi_app,
|
|
'wsgi_name': self.wsgi_name,
|
|
}
|
|
local_vars.update(context)
|
|
return BackdoorServer(
|
|
listen_addr, banner='Django DDP', locals=local_vars,
|
|
)
|
|
|
|
def stop(self):
|
|
"""Stop all green threads."""
|
|
self.logger.debug('PostgresGreenlet stop')
|
|
self._stop_event.set()
|
|
# ask all threads to stop.
|
|
for server in self.servers + [DDPLauncher.pgworker]:
|
|
self.logger.debug('Stopping %s', server)
|
|
server.stop()
|
|
# wait for all threads to stop.
|
|
gevent.joinall(self.threads + [DDPLauncher.pgworker])
|
|
self.threads = []
|
|
|
|
def start(self):
|
|
"""Run PostgresGreenlet and web/debug servers."""
|
|
self.logger.debug('PostgresGreenlet start')
|
|
self._stop_event.clear()
|
|
self.print('=> Discovering DDP endpoints...')
|
|
if self.verbosity > 1:
|
|
for api_path in sorted(self.api.api_path_map()):
|
|
print(' %s' % api_path)
|
|
|
|
# start greenlets
|
|
self.pgworker.start()
|
|
self.print('=> Started PostgresGreenlet.')
|
|
for server in self.servers:
|
|
thread = gevent.spawn(server.serve_forever)
|
|
gevent.sleep() # yield to thread in case it can't start
|
|
self.threads.append(thread)
|
|
if thread.dead:
|
|
# thread died, stop everything and re-raise the exception.
|
|
self.stop()
|
|
thread.get()
|
|
if isinstance(server, geventwebsocket.WebSocketServer):
|
|
self.print(
|
|
'=> App running at: %s://%s:%d/' % (
|
|
'https' if server.ssl_enabled else 'http',
|
|
server.server_host,
|
|
server.server_port,
|
|
),
|
|
)
|
|
elif isinstance(server, gevent.backdoor.BackdoorServer):
|
|
self.print(
|
|
'=> Debug service running at: telnet://%s:%d/' % (
|
|
server.server_host,
|
|
server.server_port,
|
|
),
|
|
)
|
|
self.print('=> Started your app (%s).' % self.wsgi_name)
|
|
|
|
def run(self):
|
|
"""Run DDP greenlets."""
|
|
self.logger.debug('PostgresGreenlet run')
|
|
self.start()
|
|
self._stop_event.wait()
|
|
# wait for all threads to stop.
|
|
gevent.joinall(self.threads + [DDPLauncher.pgworker])
|
|
self.threads = []
|
|
|
|
|
|
def addr(val, default_port=8000, defualt_host='localhost'):
|
|
"""
|
|
Convert a string of format host[:port] into Addr(host, port).
|
|
|
|
>>> addr('0:80')
|
|
Addr(host='0', port=80)
|
|
|
|
>>> addr('127.0.0.1:80')
|
|
Addr(host='127.0.0.1', port=80)
|
|
|
|
>>> addr('0.0.0.0', default_port=8000)
|
|
Addr(host='0.0.0.0', port=8000)
|
|
"""
|
|
import re
|
|
import socket
|
|
|
|
match = re.match(r'\A(?P<host>.*?)(:(?P<port>(\d+|\w+)))?\Z', val)
|
|
if match is None:
|
|
raise argparse.ArgumentTypeError(
|
|
'%r is not a valid host[:port] address.' % val
|
|
)
|
|
host, port = match.group('host', 'port')
|
|
|
|
if not host:
|
|
host = defualt_host
|
|
|
|
if not port:
|
|
port = default_port
|
|
elif port.isdigit():
|
|
port = int(port)
|
|
else:
|
|
port = socket.getservbyname(port)
|
|
|
|
return Addr(host, port)
|
|
|
|
|
|
def serve(listen, verbosity=1, debug_port=0, **ssl_args):
|
|
"""Spawn greenlets for handling websockets and PostgreSQL calls."""
|
|
launcher = DDPLauncher(debug=verbosity == 3, verbosity=verbosity)
|
|
if debug_port:
|
|
launcher.servers.append(
|
|
launcher.get_backdoor_server('localhost:%d' % debug_port)
|
|
)
|
|
launcher.add_web_servers(listen, **ssl_args)
|
|
# die gracefully with SIGINT or SIGQUIT
|
|
sigmap = {
|
|
val: name
|
|
for name, val
|
|
in vars(signal).items()
|
|
if name.startswith('SIG')
|
|
}
|
|
|
|
def sighandler(signum=None, frame=None):
|
|
"""Signal handler"""
|
|
launcher.logger.info(
|
|
'Received signal %s in frame %r',
|
|
sigmap.get(signum, signum),
|
|
frame,
|
|
)
|
|
launcher.stop()
|
|
for signum in [signal.SIGINT, signal.SIGQUIT]:
|
|
gevent.signal(signum, sighandler)
|
|
launcher.run()
|
|
|
|
|
|
def main():
|
|
"""Main entry point for `dddp` command."""
|
|
parser = argparse.ArgumentParser(description=__doc__)
|
|
django = parser.add_argument_group('Django Options')
|
|
django.add_argument(
|
|
'--verbosity', '-v', metavar='VERBOSITY', dest='verbosity', type=int,
|
|
default=1,
|
|
)
|
|
django.add_argument(
|
|
'--debug-port', metavar='DEBUG_PORT', dest='debug_port', type=int,
|
|
default=0,
|
|
)
|
|
django.add_argument(
|
|
'--settings', metavar='SETTINGS', dest='settings',
|
|
help="The Python path to a settings module, e.g. "
|
|
"\"myproject.settings.main\". If this isn't provided, the "
|
|
"DJANGO_SETTINGS_MODULE environment variable will be used.",
|
|
)
|
|
http = parser.add_argument_group('HTTP Options')
|
|
http.add_argument(
|
|
'listen', metavar='address[:port]', nargs='*', type=addr,
|
|
help='Listening address for HTTP(s) server.',
|
|
)
|
|
ssl = parser.add_argument_group('SSL Options')
|
|
ssl.add_argument('--ssl-version', metavar='SSL_VERSION', dest='ssl_version',
|
|
help="SSL version to use (see stdlib ssl module's) [3]",
|
|
choices=['1', '2', '3'], default='3')
|
|
ssl.add_argument('--certfile', metavar='FILE', dest='certfile',
|
|
help="SSL certificate file [None]")
|
|
ssl.add_argument('--ciphers', metavar='CIPHERS', dest='ciphers',
|
|
help="Ciphers to use (see stdlib ssl module's) [TLSv1]")
|
|
ssl.add_argument('--ca-certs', metavar='FILE', dest='ca_certs',
|
|
help="CA certificates file [None]")
|
|
ssl.add_argument('--keyfile', metavar='FILE', dest='keyfile',
|
|
help="SSL key file [None]")
|
|
namespace = parser.parse_args()
|
|
if namespace.settings:
|
|
os.environ['DJANGO_SETTINGS_MODULE'] = namespace.settings
|
|
serve(
|
|
namespace.listen or [Addr('localhost', 8000)],
|
|
debug_port=namespace.debug_port,
|
|
keyfile=namespace.keyfile,
|
|
certfile=namespace.certfile,
|
|
verbosity=namespace.verbosity,
|
|
)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
from dddp import greenify
|
|
greenify()
|
|
main()
|