diff --git a/dddp/__init__.py b/dddp/__init__.py index e027d7f..5e40797 100644 --- a/dddp/__init__.py +++ b/dddp/__init__.py @@ -42,6 +42,45 @@ def greenify(): patch_psycopg() +class MeteorError(Exception): + + """ + MeteorError. + + This exception can be thrown by DDP API endpoints (methods) and publication + methods. MeteorError is not expected to be logged or shown by the server, + leaving all handling of the error condition for the client. + + Args: + error (str): A string code uniquely identifying this kind of error. + This string should be used by clients to determine the appropriate + action to take, instead of attempting to parse the reason or detail + fields. + reason (Optional[str]): A short human-readable summary of the error. + detail (Optional[str]): Additional information about the error. + When returning errors to clients, Django DDP will default this to a + textual stack trace if `django.conf.settings.DEBUG` is `True`. + """ + + def __init__(self, error, reason=None, details=None, **kwargs): + """MeteorError constructor.""" + super(MeteorError, self).__init__(error, reason, details, kwargs) + + def as_dict(self, **kwargs): + """Return an error dict for self.args and kwargs.""" + error, reason, details, err_kwargs = self.args + result = { + key: val + for key, val in { + 'error': error, 'reason': reason, 'details': details, + }.items() + if val is not None + } + result.update(err_kwargs) + result.update(kwargs) + return result + + class AlreadyRegistered(Exception): """Raised when registering over the top of an existing registration.""" @@ -125,7 +164,7 @@ THREAD_LOCAL_FACTORIES = { 'user_ddp_id': lambda: None, 'user': lambda: None, } -THREAD_LOCAL = ThreadLocal() +THREAD_LOCAL = this = ThreadLocal() # pylint: disable=invalid-name METEOR_ID_CHARS = u'23456789ABCDEFGHJKLMNPQRSTWXYZabcdefghijkmnopqrstuvwxyz' diff --git a/dddp/accounts/ddp.py b/dddp/accounts/ddp.py index 8896f1c..a1d1d3d 100644 --- a/dddp/accounts/ddp.py +++ b/dddp/accounts/ddp.py @@ -21,12 +21,12 @@ from django.dispatch import Signal from django.utils import timezone from dddp import ( - THREAD_LOCAL_FACTORIES, THREAD_LOCAL as this, ADDED, REMOVED, + THREAD_LOCAL_FACTORIES, this, MeteorError, + ADDED, REMOVED, meteor_random_id, ) from dddp.models import get_meteor_id, get_object, Subscription from dddp.api import API, APIMixin, api_endpoint, Collection, Publication -from dddp.websocket import MeteorError # pylint dones't like lower case attribute names on modules, but it's the normal @@ -190,7 +190,7 @@ class Users(Collection): if key == prefixed('name'): result['full_name'] = val else: - raise ValueError('Bad profile key: %r' % key) + raise MeteorError(400, 'Bad profile key: %r' % key) return result @api_endpoint @@ -384,7 +384,7 @@ class Auth(APIMixin): # malicious MITM. Also as no salt is used with hashing, the # passwords are vulnerable to rainbow-table lookups anyway. # - # If you're doing security, do it right from the very outset. Fors + # If you're doing security, do it right from the very outset. For # web services that means using SSL and not relying on half-baked # security concepts put together by people with no security # background. @@ -393,9 +393,10 @@ class Auth(APIMixin): # until upstream developers see the light and drop the password # hashing mis-feature. raise MeteorError( - 400, - "Outmoded password hashing, run " - "`meteor add tysonclugg:accounts-secure` to fix.", + 426, + "Outmoded password hashing: " + "https://github.com/meteor/meteor/issues/4363", + upgrade='meteor add tysonclugg:accounts-secure', ) @api_endpoint('createUser') @@ -407,7 +408,9 @@ class Auth(APIMixin): params=params, ) if len(receivers) == 0: - raise MeteorError(501, 'Handler for `create_user` not registered.') + raise NotImplementedError( + 'Handler for `create_user` not registered.' + ) user = receivers[0][1] user = auth.authenticate( username=user.get_username(), password=params['password'], diff --git a/dddp/accounts/tests.py b/dddp/accounts/tests.py index ea9fb18..4922c54 100644 --- a/dddp/accounts/tests.py +++ b/dddp/accounts/tests.py @@ -26,7 +26,7 @@ class AccountsTestCase(tests.DDPServerTestCase): msgs = sockjs.recv() self.assertEqual( msgs, [ - {'msg': 'connected', 'session': msgs[0].get('session', None)}, + {'msg': 'connected', 'session': msgs[0]['session']}, ], ) @@ -37,12 +37,10 @@ class AccountsTestCase(tests.DDPServerTestCase): self.assertEqual( msgs, [ { - 'msg': 'result', + 'msg': 'result', 'id': id_, 'error': { - 'error': 500, - 'reason': "(403, 'Authentication failed.')", + 'error': 403, 'reason': 'Authentication failed.', }, - 'id': id_, }, ], ) diff --git a/dddp/api.py b/dddp/api.py index 0656c02..e3eac98 100644 --- a/dddp/api.py +++ b/dddp/api.py @@ -4,7 +4,7 @@ from __future__ import absolute_import, unicode_literals, print_function # standard library import collections from copy import deepcopy -import traceback +import inspect import uuid # requirements @@ -25,9 +25,7 @@ import ejson import six # django-ddp -from dddp import ( - AlreadyRegistered, THREAD_LOCAL as this, ADDED, CHANGED, REMOVED, -) +from dddp import AlreadyRegistered, this, ADDED, CHANGED, REMOVED, MeteorError from dddp.models import ( AleaIdField, Connection, Subscription, get_meteor_id, get_meteor_ids, ) @@ -124,7 +122,7 @@ class APIMeta(type): """DDP API metaclass.""" - def __new__(mcs, name, bases, attrs): + def __new__(cls, name, bases, attrs): """Create a new APIMixin class.""" attrs['name'] = attrs.pop('name', None) or name name_format = attrs.get('name_format', None) @@ -135,7 +133,7 @@ class APIMeta(type): pass elif api_path_prefix_format is not None: attrs['api_path_prefix'] = api_path_prefix_format.format(**attrs) - return super(APIMeta, mcs).__new__(mcs, name, bases, attrs) + return super(APIMeta, cls).__new__(cls, name, bases, attrs) class APIMixin(object): @@ -636,19 +634,7 @@ class DDP(APIMixin): @api_endpoint def sub(self, id_, name, *params): """Create subscription, send matched objects that haven't been sent.""" - try: - return self.do_sub(id_, name, False, *params) - except Exception as err: - this.send({ - 'msg': 'nosub', - 'id': id_, - 'error': { - 'error': 500, - 'errorType': 'Meteor.Error', - 'message': '%s' % err, - 'reason': 'Subscription failed', - }, - }) + return self.do_sub(id_, name, False, *params) @transaction.atomic def do_sub(self, id_, name, silent, *params): @@ -657,16 +643,7 @@ class DDP(APIMixin): pub = self.get_pub_by_name(name) except KeyError: if not silent: - this.send({ - 'msg': 'nosub', - 'id': id_, - 'error': { - 'error': 404, - 'errorType': 'Meteor.Error', - 'message': 'Subscription not found [404]', - 'reason': 'Subscription not found', - }, - }) + raise MeteorError(404, 'Subscription not found') return sub, created = Subscription.objects.get_or_create( connection_id=this.ws.connection.pk, @@ -747,41 +724,16 @@ class DDP(APIMixin): try: handler = self.api_path_map()[method] except KeyError: - print('Unknown method: %s %r' % (method, params)) - this.send({ - 'msg': 'result', - 'id': id_, - 'error': { - 'error': 404, - 'errorType': 'Meteor.Error', - 'message': 'Unknown method: %s %r' % (method, params), - 'reason': 'Method not found', - }, - }) - return - params_repr = repr(params) + raise MeteorError(404, 'Method not found', method) try: - result = handler(*params) - msg = {'msg': 'result', 'id': id_} - if result is not None: - msg['result'] = result - this.send(msg) - except Exception as err: # log err+stack trace -> pylint: disable=W0703 - details = traceback.format_exc() - print(id_, method, params_repr) - print(details) - this.ws.logger.error(err, exc_info=True) - msg = { - 'msg': 'result', - 'id': id_, - 'error': { - 'error': 500, - 'reason': str(err), - }, - } - if settings.DEBUG: - msg['error']['details'] = details - this.send(msg) + inspect.getcallargs(handler, *params) + except TypeError as err: + raise MeteorError(400, '%s' % err) + result = handler(*params) + msg = {'msg': 'result', 'id': id_} + if result is not None: + msg['result'] = result + this.send(msg) def register(self, api_or_iterable): """Register an API endpoint.""" diff --git a/dddp/tests.py b/dddp/tests.py index 22a0081..3286ae1 100644 --- a/dddp/tests.py +++ b/dddp/tests.py @@ -228,12 +228,12 @@ class DDPServerTestCase(django.test.TransactionTestCase): return self.server.url(path) +# gevent-websocket doesn't work with Python 3 yet +@expected_failure_if(sys.version_info.major == 3) class HttpTestCase(DDPServerTestCase): """Test that server launches and handles HTTP requests.""" - # gevent-websocket doesn't work with Python 3 yet - @expected_failure_if(sys.version_info.major == 3) def test_get(self): """Perform HTTP GET.""" import requests @@ -241,12 +241,12 @@ class HttpTestCase(DDPServerTestCase): self.assertEqual(resp.status_code, 200) +# gevent-websocket doesn't work with Python 3 yet +@expected_failure_if(sys.version_info.major == 3) class WebSocketTestCase(DDPServerTestCase): """Test that server launches and handles WebSocket connections.""" - # gevent-websocket doesn't work with Python 3 yet - @expected_failure_if(sys.version_info.major == 3) def test_sockjs_connect_ping(self): """SockJS connect.""" sockjs = self.server.sockjs('/sockjs/1/a/websocket') @@ -282,8 +282,6 @@ class WebSocketTestCase(DDPServerTestCase): sockjs.close() - # gevent-websocket doesn't work with Python 3 yet - @expected_failure_if(sys.version_info.major == 3) def test_sockjs_connect_sub_unsub(self): """SockJS connect.""" sockjs = self.server.sockjs('/sockjs/1/a/websocket') @@ -318,8 +316,6 @@ class WebSocketTestCase(DDPServerTestCase): sockjs.close() - # gevent-websocket doesn't work with Python 3 yet - @expected_failure_if(sys.version_info.major == 3) def test_call_missing_arguments(self): """Connect and login without any arguments.""" sockjs = self.server.sockjs('/sockjs/1/a/websocket') @@ -349,7 +345,7 @@ class WebSocketTestCase(DDPServerTestCase): { 'msg': 'result', 'error': { - 'error': 500, + 'error': 400, 'reason': 'login() takes exactly 2 arguments (1 given)', }, @@ -360,8 +356,6 @@ class WebSocketTestCase(DDPServerTestCase): sockjs.close() - # gevent-websocket doesn't work with Python 3 yet - @expected_failure_if(sys.version_info.major == 3) def test_call_extra_arguments(self): """Connect and login with extra arguments.""" with self.server.sockjs('/sockjs/1/a/websocket') as sockjs: @@ -394,7 +388,7 @@ class WebSocketTestCase(DDPServerTestCase): { 'msg': 'result', 'error': { - 'error': 500, + 'error': 400, 'reason': 'login() takes exactly 2 arguments (3 given)', }, @@ -404,6 +398,7 @@ class WebSocketTestCase(DDPServerTestCase): ) + def load_tests(loader, tests, pattern): """Specify which test cases to run.""" del pattern diff --git a/dddp/websocket.py b/dddp/websocket.py index c3835ca..940512f 100644 --- a/dddp/websocket.py +++ b/dddp/websocket.py @@ -14,19 +14,44 @@ from six.moves import range as irange import ejson import gevent import geventwebsocket +from django.conf import settings from django.core import signals from django.core.handlers.base import BaseHandler from django.core.handlers.wsgi import WSGIRequest from django.db import connection, transaction -from dddp import THREAD_LOCAL as this, alea, ADDED, CHANGED, REMOVED +from dddp import alea, this, ADDED, CHANGED, REMOVED, MeteorError -class MeteorError(Exception): +def safe_call(func, *args, **kwargs): + """ + Call `func(*args, **kwargs)` but NEVER raise an exception. - """MeteorError.""" + Useful in situations such as inside exception handlers where calls to + `logging.error` try to send email, but the SMTP server isn't always + availalbe and you don't want your exception handler blowing up. + """ + try: + return None, func(*args, **kwargs) + except Exception: # pylint: disable=broad-except + # something went wrong during the call, return a stack trace that can + # be dealt with by the caller + return traceback.format_exc(), None - pass + +def dprint(name, val): + """Debug print name and val.""" + from pprint import pformat + print( + '% 5s: %s' % ( + name, + '\n '.join( + pformat( + val, indent=4, width=75, + ).split('\n') + ), + ), + ) def validate_kwargs(func, kwargs): @@ -65,11 +90,12 @@ def validate_kwargs(func, kwargs): ] if missing: raise MeteorError( + 400, + func.err, 'Missing required arguments to %s: %s' % ( func_name, ' '.join(missing), ), - getattr(func, 'err', None), ) # figure out what is extra @@ -79,10 +105,9 @@ def validate_kwargs(func, kwargs): ] if extra: raise MeteorError( - 'Unknown arguments to %s: %s' % ( - func_name, - ' '.join(extra), - ), + 400, + func.err, + 'Unknown arguments to %s: %s' % (func_name, ' '.join(extra)), ) @@ -121,12 +146,11 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): this.ws = self this.send = self.send this.reply = self.reply - this.error = self.error self.logger = self.ws.logger self.remote_ids = collections.defaultdict(set) - # self._tx_buffer collects outgoing messages which must be sent in order + # `_tx_buffer` collects outgoing messages which must be sent in order self._tx_buffer = {} # track the head of the queue (buffer) and the next msg to be sent self._tx_buffer_id_gen = itertools.cycle(irange(sys.maxint)) @@ -139,7 +163,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): self.ws.environ, ) this.subs = {} - self.logger.info('+ %s OPEN', self) + safe_call(self.logger.info, '+ %s OPEN', self) self.send('o') self.send('a["{\\"server_id\\":\\"0\\"}"]') @@ -154,83 +178,144 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): self.connection.delete() self.connection = None signals.request_finished.send(sender=self.__class__) - self.logger.info('- %s %s', self, args or 'CLOSE') + safe_call(self.logger.info, '- %s %s', self, args or 'CLOSE') def on_message(self, message): """Process a message received from remote.""" if self.ws.closed: return None try: - self.logger.debug('< %s %r', self, message) - - # parse message set - try: - msgs = ejson.loads(message) - except ValueError as err: - self.error(400, 'Data is not valid EJSON') - return - if not isinstance(msgs, list): - self.error(400, 'Invalid EJSON messages') - return + safe_call(self.logger.debug, '< %s %r', self, message) # process individual messages - while msgs: - # parse message payload - raw = msgs.pop(0) - try: - data = ejson.loads(raw) - except (TypeError, ValueError) as err: - self.error(400, 'Data is not valid EJSON') - continue - if not isinstance(data, dict): - self.error(400, 'Invalid EJSON message payload', raw) - continue - try: - msg = data.pop('msg') - except KeyError: - self.error(400, 'Bad request', offendingMessage=data) - continue - # dispatch message - try: - self.dispatch(msg, data) - except MeteorError as err: - self.error(err) - except Exception as err: - traceback.print_exc() - self.error(err) + for data in self.ddp_frames_from_message(message): + self.process_ddp(data) # emit request_finished signal to close DB connections signals.request_finished.send(sender=self.__class__) - if msgs: - # yield to other greenlets before processing next msg - gevent.sleep() - except geventwebsocket.WebSocketError as err: + except geventwebsocket.WebSocketError: self.ws.close() + def ddp_frames_from_message(self, message): + """Yield DDP messages from a raw WebSocket message.""" + # parse message set + try: + msgs = ejson.loads(message) + except ValueError: + self.reply( + 'error', error=400, reason='Data is not valid EJSON', + ) + raise StopIteration + if not isinstance(msgs, list): + self.reply( + 'error', error=400, reason='Invalid EJSON messages', + ) + raise StopIteration + # process individual messages + while msgs: + # pop raw message from the list + raw = msgs.pop(0) + # parse message payload + try: + data = ejson.loads(raw) + except (TypeError, ValueError): + data = None + if not isinstance(data, dict): + self.reply( + 'error', error=400, + reason='Invalid SockJS DDP payload', + offendingMessage=raw, + ) + yield data + if msgs: + # yield to other greenlets before processing next msg + gevent.sleep() + + def process_ddp(self, data): + """Process a single DDP message.""" + msg_id = data.get('id', None) + try: + msg = data.pop('msg') + except KeyError: + self.reply( + 'error', reason='Bad request', + offendingMessage=data, + ) + return + try: + # dispatch message + self.dispatch(msg, data) + except Exception as err: # pylint: disable=broad-except + # This should be the only protocol exception handler + kwargs = { + 'msg': {'method': 'result'}.get(msg, 'error'), + } + if msg_id is not None: + kwargs['id'] = msg_id + if isinstance(err, MeteorError): + error = err.as_dict() + else: + error = { + 'error': 500, + 'reason': 'Internal server error', + } + if kwargs['msg'] == 'error': + kwargs.update(error) + else: + kwargs['error'] = error + if not isinstance(err, MeteorError): + # not a client error, should always be logged. + stack, _ = safe_call( + self.logger.error, '%r %r', msg, data, exc_info=1, + ) + if stack is not None: + # something went wrong while logging the error, revert to + # writing a stack trace to stderr. + traceback.print_exc(file=sys.stderr) + sys.stderr.write( + 'Additionally, while handling the above error the ' + 'following error was encountered:\n' + ) + sys.stderr.write(stack) + elif settings.DEBUG: + print('ERROR: %s' % err) + dprint('msg', msg) + dprint('data', data) + error.setdefault('details', traceback.format_exc()) + # print stack trace for client errors when DEBUG is True. + print(error['details']) + self.reply(**kwargs) + if msg_id and msg == 'method': + self.reply('updated', methods=[msg_id]) + @transaction.atomic def dispatch(self, msg, kwargs): """Dispatch msg to appropriate recv_foo handler.""" # enforce calling 'connect' first if self.connection is None and msg != 'connect': - self.error(400, 'Must connect first') + self.reply('error', reason='Must connect first') return + if msg == 'method': + if ( + 'method' not in kwargs + ) or ( + 'id' not in kwargs + ): + self.reply( + 'error', error=400, reason='Malformed method invocation', + ) + return # lookup method handler try: handler = getattr(self, 'recv_%s' % msg) except (AttributeError, UnicodeEncodeError): - print('Method not found: %s %r' % (msg, kwargs)) - self.error(404, 'Method not found', msg='result') - return + raise MeteorError(404, 'Method not found') # validate handler arguments validate_kwargs(handler, kwargs) # dispatch to handler - try: - handler(**kwargs) - except Exception as err: # re-raise err --> pylint: disable=W0703 - self.error(500, 'Internal server error', err) - raise + handler(**kwargs) def send(self, data, tx_id=None): """Send `data` (raw string or EJSON payload) to WebSocket client.""" @@ -244,7 +329,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): # pull next message from buffer data = self._tx_buffer.pop(self._tx_next_id) if self._tx_buffer: - self.logger.debug('TX found %d', self._tx_next_id) + safe_call(self.logger.debug, 'TX found %d', self._tx_next_id) # advance next message ID self._tx_next_id = next(self._tx_next_id_gen) if not isinstance(data, basestring): @@ -270,15 +355,17 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): continue # client doesn't have this, don't send. data = 'a%s' % ejson.dumps([ejson.dumps(data)]) # send message - self.logger.debug('> %s %r', self, data) + safe_call(self.logger.debug, '> %s %r', self, data) try: self.ws.send(data) except geventwebsocket.WebSocketError: self.ws.close() + self._tx_buffer.clear() break num_waiting = len(self._tx_buffer) if num_waiting > 10: - self.logger.warn( + safe_call( + self.logger.warn, 'TX received %d, waiting for %d, have %d waiting: %r.', tx_id, self._tx_next_id, num_waiting, self._tx_buffer, ) @@ -288,52 +375,18 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): kwargs['msg'] = msg self.send(kwargs) - def error( - self, err, reason=None, detail=None, msg='error', exc_info=1, - **kwargs - ): - """Send EJSON error to remote.""" - if isinstance(err, MeteorError): - ( - err, reason, detail, kwargs, - ) = ( - err.args[:] + (None, None, None, None) - )[:4] - elif isinstance(err, Exception): - reason = str(err) - data = { - 'error': '%s' % (err or ''), - } - if reason: - if reason is Exception: - reason = str(reason) - data['reason'] = reason - if detail: - if isinstance(detail, Exception): - detail = str(detail) - data['detail'] = detail - if kwargs: - data.update(kwargs) - record = { - 'extra': { - 'request': this.request, - }, - } - self.logger.error('! %s %r', self, data, exc_info=exc_info, **record) - self.reply(msg, **data) - def recv_connect(self, version=None, support=None, session=None): """DDP connect handler.""" del session # Meteor doesn't even use this! if self.connection is not None: - self.error( + raise MeteorError( 400, 'Session already established.', - detail=self.connection.connection_id, + self.connection.connection_id, ) elif None in (version, support) or version not in self.versions: self.reply('failed', version=self.versions[0]) elif version not in support: - self.error(400, 'Client version/support mismatch.') + raise MeteorError(400, 'Client version/support mismatch.') else: from dddp.models import Connection cur = connection.cursor() @@ -352,6 +405,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): self.pgworker.connections[self.connection.pk] = self atexit.register(self.on_close, 'Shutting down.') self.reply('connected', session=self.connection.connection_id) + recv_connect.err = 'Malformed connect' def recv_ping(self, id_=None): """DDP ping handler.""" @@ -359,6 +413,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): self.reply('pong') else: self.reply('pong', id=id_) + recv_ping.err = 'Malformed ping' def recv_sub(self, id_, name, params): """DDP sub handler.""" @@ -371,6 +426,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): self.api.unsub(id_) else: self.reply('nosub') + recv_unsub.err = 'Malformed unsubscription' def recv_method(self, method, params, id_, randomSeed=None): """DDP method handler."""