diff --git a/dddp/api.py b/dddp/api.py index d61d9aa..1780d27 100644 --- a/dddp/api.py +++ b/dddp/api.py @@ -597,6 +597,7 @@ class DDP(APIMixin): 'params_ejson': ejson.dumps(params), }, ) + this.subs.setdefault(sub.publication, set()).add(sub.pk) if not created: if not silent: this.send({'msg': 'ready', 'subs': [id_]}) @@ -636,6 +637,7 @@ class DDP(APIMixin): for obj in qs: payload = col.obj_change_as_msg(obj, REMOVED, meteor_ids) this.send(payload) + this.subs[sub.publication].remove(sub.pk) sub.delete() if not silent: this.send({'msg': 'nosub', 'id': id_}) diff --git a/dddp/ddp.py b/dddp/ddp.py new file mode 100644 index 0000000..0af0f11 --- /dev/null +++ b/dddp/ddp.py @@ -0,0 +1,22 @@ +from dddp import THREAD_LOCAL as this +from dddp.api import API, Publication +from django.contrib import auth + + +class Logs(Publication): + + users = auth.get_user_model() + + def get_queries(self): + user_pk = getattr(this, 'user_id', False) + if user_pk: + if self.users.objects.filter( + pk=user_pk, + is_active=True, + is_superuser=True, + ).exists(): + return [] + raise ValueError('User not permitted.') + + +API.register([Logs]) diff --git a/dddp/logging.py b/dddp/logging.py new file mode 100644 index 0000000..147873f --- /dev/null +++ b/dddp/logging.py @@ -0,0 +1,39 @@ +"""Django DDP logging helpers.""" +from __future__ import absolute_import, print_function + +import logging + +from dddp import THREAD_LOCAL as this, meteor_random_id, ADDED + + + + +class DDPHandler(logging.Handler): + + """Logging handler that streams log events via DDP to the current client.""" + + def __init__(self, *args, **kwargs): + print(self.__class__, args, kwargs) + self.logger = logging.getLogger('django.db.backends') + self.logger.info('Test') + super(DDPHandler, self).__init__(*args, **kwargs) + + def emit(self, record): + """Emit a formatted log record via DDP.""" + if getattr(this, 'subs', {}).get('Logs', False): + this.send({ + 'msg': ADDED, + 'collection': 'logs', + 'id': meteor_random_id('/collection/logs'), + 'fields': { + # 'name': record.name, + # 'levelno': record.levelno, + 'levelname': record.levelname, + # 'pathname': record.pathname, + # 'lineno': record.lineno, + 'msg': record.msg, + 'args': record.args, + # 'exc_info': record.exc_info, + # 'funcName': record.funcName, + }, + }) diff --git a/dddp/main.py b/dddp/main.py index 2b0f863..e8ab34a 100644 --- a/dddp/main.py +++ b/dddp/main.py @@ -60,7 +60,7 @@ def ddpp_sockjs_info(environ, start_response): ])) -def serve(listen, debug=False, **ssl_args): +def serve(listen, debug=False, verbosity=1, debug_port=0, **ssl_args): """Spawn greenlets for handling websockets and PostgreSQL calls.""" import signal from django.apps import apps @@ -99,7 +99,7 @@ def serve(listen, debug=False, **ssl_args): ) # setup WebSocketServer to dispatch web requests - webservers = [ + servers = [ geventwebsocket.WebSocketServer( (host, port), resource, @@ -113,8 +113,8 @@ def serve(listen, debug=False, **ssl_args): def killall(*args, **kwargs): """Kill all green threads.""" pgworker.stop() - for webserver in webservers: - webserver.stop() + for server in servers: + server.stop() # die gracefully with SIGINT or SIGQUIT gevent.signal(signal.SIGINT, killall) @@ -133,19 +133,38 @@ def serve(listen, debug=False, **ssl_args): ) # start greenlets + if debug_port: + from gevent.backdoor import BackdoorServer + servers.append( + BackdoorServer( + ('127.0.0.1', debug_port), + banner='Django DDP', + locals={ + 'servers': servers, + 'pgworker': pgworker, + 'killall': killall, + 'api': api, + 'resource': resource, + 'settings': settings, + 'wsgi_app': wsgi_app, + 'wsgi_name': wsgi_name, + }, + ) + ) + pgworker.start() print('=> Started PostgresGreenlet.') - web_threads = [ - gevent.spawn(webserver.serve_forever) - for webserver - in webservers + threads = [ + gevent.spawn(server.serve_forever) + for server + in servers ] print('=> Started DDPWebSocketApplication.') print('=> Started your app (%s).' % wsgi_name) print('') for host, port in listen: print('=> App running at: http://%s:%d/' % (host, port)) - gevent.joinall(web_threads) + gevent.joinall(threads) pgworker.stop() gevent.joinall([pgworker]) @@ -190,6 +209,14 @@ def main(): import argparse parser = argparse.ArgumentParser(description=__doc__) django = parser.add_argument_group('Django Options') + django.add_argument( + '--verbosity', '-v', metavar='VERBOSITY', dest='verbosity', type=int, + default=1, + ) + django.add_argument( + '--debug-port', metavar='DEBUG_PORT', dest='debug_port', type=int, + default=0, + ) django.add_argument( '--settings', metavar='SETTINGS', dest='settings', help="The Python path to a settings module, e.g. " @@ -218,8 +245,10 @@ def main(): os.environ['DJANGO_SETTINGS_MODULE'] = namespace.settings serve( namespace.listen or [Addr('localhost', 8000)], + debug_port=namespace.debug_port, keyfile=namespace.keyfile, certfile=namespace.certfile, + verbosity=namespace.verbosity, ) diff --git a/dddp/websocket.py b/dddp/websocket.py index b599e29..0676363 100644 --- a/dddp/websocket.py +++ b/dddp/websocket.py @@ -106,7 +106,6 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): version = None support = None connection = None - subs = None remote_ids = None base_handler = BaseHandler() @@ -131,7 +130,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): '{0[REMOTE_ADDR]}:{0[REMOTE_PORT]}'.format( self.ws.environ, ) - self.subs = {} + this.subs = {} self.logger.info('+ %s OPEN', self) self.send('o') self.send('a["{\\"server_id\\":\\"0\\"}"]')