mirror of
https://github.com/jazzband/django-ddp.git
synced 2026-03-16 22:40:24 +00:00
Merge pull request #40 from MEERQAT/feature/client_errors
Consistent handling of client side vs server side errors.
This commit is contained in:
commit
b2eb2d4436
6 changed files with 233 additions and 190 deletions
|
|
@ -42,6 +42,45 @@ def greenify():
|
|||
patch_psycopg()
|
||||
|
||||
|
||||
class MeteorError(Exception):
|
||||
|
||||
"""
|
||||
MeteorError.
|
||||
|
||||
This exception can be thrown by DDP API endpoints (methods) and publication
|
||||
methods. MeteorError is not expected to be logged or shown by the server,
|
||||
leaving all handling of the error condition for the client.
|
||||
|
||||
Args:
|
||||
error (str): A string code uniquely identifying this kind of error.
|
||||
This string should be used by clients to determine the appropriate
|
||||
action to take, instead of attempting to parse the reason or detail
|
||||
fields.
|
||||
reason (Optional[str]): A short human-readable summary of the error.
|
||||
detail (Optional[str]): Additional information about the error.
|
||||
When returning errors to clients, Django DDP will default this to a
|
||||
textual stack trace if `django.conf.settings.DEBUG` is `True`.
|
||||
"""
|
||||
|
||||
def __init__(self, error, reason=None, details=None, **kwargs):
|
||||
"""MeteorError constructor."""
|
||||
super(MeteorError, self).__init__(error, reason, details, kwargs)
|
||||
|
||||
def as_dict(self, **kwargs):
|
||||
"""Return an error dict for self.args and kwargs."""
|
||||
error, reason, details, err_kwargs = self.args
|
||||
result = {
|
||||
key: val
|
||||
for key, val in {
|
||||
'error': error, 'reason': reason, 'details': details,
|
||||
}.items()
|
||||
if val is not None
|
||||
}
|
||||
result.update(err_kwargs)
|
||||
result.update(kwargs)
|
||||
return result
|
||||
|
||||
|
||||
class AlreadyRegistered(Exception):
|
||||
|
||||
"""Raised when registering over the top of an existing registration."""
|
||||
|
|
@ -125,7 +164,7 @@ THREAD_LOCAL_FACTORIES = {
|
|||
'user_ddp_id': lambda: None,
|
||||
'user': lambda: None,
|
||||
}
|
||||
THREAD_LOCAL = ThreadLocal()
|
||||
THREAD_LOCAL = this = ThreadLocal() # pylint: disable=invalid-name
|
||||
METEOR_ID_CHARS = u'23456789ABCDEFGHJKLMNPQRSTWXYZabcdefghijkmnopqrstuvwxyz'
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -21,12 +21,12 @@ from django.dispatch import Signal
|
|||
from django.utils import timezone
|
||||
|
||||
from dddp import (
|
||||
THREAD_LOCAL_FACTORIES, THREAD_LOCAL as this, ADDED, REMOVED,
|
||||
THREAD_LOCAL_FACTORIES, this, MeteorError,
|
||||
ADDED, REMOVED,
|
||||
meteor_random_id,
|
||||
)
|
||||
from dddp.models import get_meteor_id, get_object, Subscription
|
||||
from dddp.api import API, APIMixin, api_endpoint, Collection, Publication
|
||||
from dddp.websocket import MeteorError
|
||||
|
||||
|
||||
# pylint dones't like lower case attribute names on modules, but it's the normal
|
||||
|
|
@ -190,7 +190,7 @@ class Users(Collection):
|
|||
if key == prefixed('name'):
|
||||
result['full_name'] = val
|
||||
else:
|
||||
raise ValueError('Bad profile key: %r' % key)
|
||||
raise MeteorError(400, 'Bad profile key: %r' % key)
|
||||
return result
|
||||
|
||||
@api_endpoint
|
||||
|
|
@ -384,7 +384,7 @@ class Auth(APIMixin):
|
|||
# malicious MITM. Also as no salt is used with hashing, the
|
||||
# passwords are vulnerable to rainbow-table lookups anyway.
|
||||
#
|
||||
# If you're doing security, do it right from the very outset. Fors
|
||||
# If you're doing security, do it right from the very outset. For
|
||||
# web services that means using SSL and not relying on half-baked
|
||||
# security concepts put together by people with no security
|
||||
# background.
|
||||
|
|
@ -393,9 +393,10 @@ class Auth(APIMixin):
|
|||
# until upstream developers see the light and drop the password
|
||||
# hashing mis-feature.
|
||||
raise MeteorError(
|
||||
400,
|
||||
"Outmoded password hashing, run "
|
||||
"`meteor add tysonclugg:accounts-secure` to fix.",
|
||||
426,
|
||||
"Outmoded password hashing: "
|
||||
"https://github.com/meteor/meteor/issues/4363",
|
||||
upgrade='meteor add tysonclugg:accounts-secure',
|
||||
)
|
||||
|
||||
@api_endpoint('createUser')
|
||||
|
|
@ -407,7 +408,9 @@ class Auth(APIMixin):
|
|||
params=params,
|
||||
)
|
||||
if len(receivers) == 0:
|
||||
raise MeteorError(501, 'Handler for `create_user` not registered.')
|
||||
raise NotImplementedError(
|
||||
'Handler for `create_user` not registered.'
|
||||
)
|
||||
user = receivers[0][1]
|
||||
user = auth.authenticate(
|
||||
username=user.get_username(), password=params['password'],
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ class AccountsTestCase(tests.DDPServerTestCase):
|
|||
msgs = sockjs.recv()
|
||||
self.assertEqual(
|
||||
msgs, [
|
||||
{'msg': 'connected', 'session': msgs[0].get('session', None)},
|
||||
{'msg': 'connected', 'session': msgs[0]['session']},
|
||||
],
|
||||
)
|
||||
|
||||
|
|
@ -37,12 +37,10 @@ class AccountsTestCase(tests.DDPServerTestCase):
|
|||
self.assertEqual(
|
||||
msgs, [
|
||||
{
|
||||
'msg': 'result',
|
||||
'msg': 'result', 'id': id_,
|
||||
'error': {
|
||||
'error': 500,
|
||||
'reason': "(403, 'Authentication failed.')",
|
||||
'error': 403, 'reason': 'Authentication failed.',
|
||||
},
|
||||
'id': id_,
|
||||
},
|
||||
],
|
||||
)
|
||||
|
|
|
|||
78
dddp/api.py
78
dddp/api.py
|
|
@ -4,7 +4,7 @@ from __future__ import absolute_import, unicode_literals, print_function
|
|||
# standard library
|
||||
import collections
|
||||
from copy import deepcopy
|
||||
import traceback
|
||||
import inspect
|
||||
import uuid
|
||||
|
||||
# requirements
|
||||
|
|
@ -25,9 +25,7 @@ import ejson
|
|||
import six
|
||||
|
||||
# django-ddp
|
||||
from dddp import (
|
||||
AlreadyRegistered, THREAD_LOCAL as this, ADDED, CHANGED, REMOVED,
|
||||
)
|
||||
from dddp import AlreadyRegistered, this, ADDED, CHANGED, REMOVED, MeteorError
|
||||
from dddp.models import (
|
||||
AleaIdField, Connection, Subscription, get_meteor_id, get_meteor_ids,
|
||||
)
|
||||
|
|
@ -124,7 +122,7 @@ class APIMeta(type):
|
|||
|
||||
"""DDP API metaclass."""
|
||||
|
||||
def __new__(mcs, name, bases, attrs):
|
||||
def __new__(cls, name, bases, attrs):
|
||||
"""Create a new APIMixin class."""
|
||||
attrs['name'] = attrs.pop('name', None) or name
|
||||
name_format = attrs.get('name_format', None)
|
||||
|
|
@ -135,7 +133,7 @@ class APIMeta(type):
|
|||
pass
|
||||
elif api_path_prefix_format is not None:
|
||||
attrs['api_path_prefix'] = api_path_prefix_format.format(**attrs)
|
||||
return super(APIMeta, mcs).__new__(mcs, name, bases, attrs)
|
||||
return super(APIMeta, cls).__new__(cls, name, bases, attrs)
|
||||
|
||||
|
||||
class APIMixin(object):
|
||||
|
|
@ -636,19 +634,7 @@ class DDP(APIMixin):
|
|||
@api_endpoint
|
||||
def sub(self, id_, name, *params):
|
||||
"""Create subscription, send matched objects that haven't been sent."""
|
||||
try:
|
||||
return self.do_sub(id_, name, False, *params)
|
||||
except Exception as err:
|
||||
this.send({
|
||||
'msg': 'nosub',
|
||||
'id': id_,
|
||||
'error': {
|
||||
'error': 500,
|
||||
'errorType': 'Meteor.Error',
|
||||
'message': '%s' % err,
|
||||
'reason': 'Subscription failed',
|
||||
},
|
||||
})
|
||||
return self.do_sub(id_, name, False, *params)
|
||||
|
||||
@transaction.atomic
|
||||
def do_sub(self, id_, name, silent, *params):
|
||||
|
|
@ -657,16 +643,7 @@ class DDP(APIMixin):
|
|||
pub = self.get_pub_by_name(name)
|
||||
except KeyError:
|
||||
if not silent:
|
||||
this.send({
|
||||
'msg': 'nosub',
|
||||
'id': id_,
|
||||
'error': {
|
||||
'error': 404,
|
||||
'errorType': 'Meteor.Error',
|
||||
'message': 'Subscription not found [404]',
|
||||
'reason': 'Subscription not found',
|
||||
},
|
||||
})
|
||||
raise MeteorError(404, 'Subscription not found')
|
||||
return
|
||||
sub, created = Subscription.objects.get_or_create(
|
||||
connection_id=this.ws.connection.pk,
|
||||
|
|
@ -747,41 +724,16 @@ class DDP(APIMixin):
|
|||
try:
|
||||
handler = self.api_path_map()[method]
|
||||
except KeyError:
|
||||
print('Unknown method: %s %r' % (method, params))
|
||||
this.send({
|
||||
'msg': 'result',
|
||||
'id': id_,
|
||||
'error': {
|
||||
'error': 404,
|
||||
'errorType': 'Meteor.Error',
|
||||
'message': 'Unknown method: %s %r' % (method, params),
|
||||
'reason': 'Method not found',
|
||||
},
|
||||
})
|
||||
return
|
||||
params_repr = repr(params)
|
||||
raise MeteorError(404, 'Method not found', method)
|
||||
try:
|
||||
result = handler(*params)
|
||||
msg = {'msg': 'result', 'id': id_}
|
||||
if result is not None:
|
||||
msg['result'] = result
|
||||
this.send(msg)
|
||||
except Exception as err: # log err+stack trace -> pylint: disable=W0703
|
||||
details = traceback.format_exc()
|
||||
print(id_, method, params_repr)
|
||||
print(details)
|
||||
this.ws.logger.error(err, exc_info=True)
|
||||
msg = {
|
||||
'msg': 'result',
|
||||
'id': id_,
|
||||
'error': {
|
||||
'error': 500,
|
||||
'reason': str(err),
|
||||
},
|
||||
}
|
||||
if settings.DEBUG:
|
||||
msg['error']['details'] = details
|
||||
this.send(msg)
|
||||
inspect.getcallargs(handler, *params)
|
||||
except TypeError as err:
|
||||
raise MeteorError(400, '%s' % err)
|
||||
result = handler(*params)
|
||||
msg = {'msg': 'result', 'id': id_}
|
||||
if result is not None:
|
||||
msg['result'] = result
|
||||
this.send(msg)
|
||||
|
||||
def register(self, api_or_iterable):
|
||||
"""Register an API endpoint."""
|
||||
|
|
|
|||
|
|
@ -228,12 +228,12 @@ class DDPServerTestCase(django.test.TransactionTestCase):
|
|||
return self.server.url(path)
|
||||
|
||||
|
||||
# gevent-websocket doesn't work with Python 3 yet
|
||||
@expected_failure_if(sys.version_info.major == 3)
|
||||
class HttpTestCase(DDPServerTestCase):
|
||||
|
||||
"""Test that server launches and handles HTTP requests."""
|
||||
|
||||
# gevent-websocket doesn't work with Python 3 yet
|
||||
@expected_failure_if(sys.version_info.major == 3)
|
||||
def test_get(self):
|
||||
"""Perform HTTP GET."""
|
||||
import requests
|
||||
|
|
@ -241,12 +241,12 @@ class HttpTestCase(DDPServerTestCase):
|
|||
self.assertEqual(resp.status_code, 200)
|
||||
|
||||
|
||||
# gevent-websocket doesn't work with Python 3 yet
|
||||
@expected_failure_if(sys.version_info.major == 3)
|
||||
class WebSocketTestCase(DDPServerTestCase):
|
||||
|
||||
"""Test that server launches and handles WebSocket connections."""
|
||||
|
||||
# gevent-websocket doesn't work with Python 3 yet
|
||||
@expected_failure_if(sys.version_info.major == 3)
|
||||
def test_sockjs_connect_ping(self):
|
||||
"""SockJS connect."""
|
||||
sockjs = self.server.sockjs('/sockjs/1/a/websocket')
|
||||
|
|
@ -282,8 +282,6 @@ class WebSocketTestCase(DDPServerTestCase):
|
|||
|
||||
sockjs.close()
|
||||
|
||||
# gevent-websocket doesn't work with Python 3 yet
|
||||
@expected_failure_if(sys.version_info.major == 3)
|
||||
def test_sockjs_connect_sub_unsub(self):
|
||||
"""SockJS connect."""
|
||||
sockjs = self.server.sockjs('/sockjs/1/a/websocket')
|
||||
|
|
@ -318,8 +316,6 @@ class WebSocketTestCase(DDPServerTestCase):
|
|||
|
||||
sockjs.close()
|
||||
|
||||
# gevent-websocket doesn't work with Python 3 yet
|
||||
@expected_failure_if(sys.version_info.major == 3)
|
||||
def test_call_missing_arguments(self):
|
||||
"""Connect and login without any arguments."""
|
||||
sockjs = self.server.sockjs('/sockjs/1/a/websocket')
|
||||
|
|
@ -349,7 +345,7 @@ class WebSocketTestCase(DDPServerTestCase):
|
|||
{
|
||||
'msg': 'result',
|
||||
'error': {
|
||||
'error': 500,
|
||||
'error': 400,
|
||||
'reason':
|
||||
'login() takes exactly 2 arguments (1 given)',
|
||||
},
|
||||
|
|
@ -360,8 +356,6 @@ class WebSocketTestCase(DDPServerTestCase):
|
|||
|
||||
sockjs.close()
|
||||
|
||||
# gevent-websocket doesn't work with Python 3 yet
|
||||
@expected_failure_if(sys.version_info.major == 3)
|
||||
def test_call_extra_arguments(self):
|
||||
"""Connect and login with extra arguments."""
|
||||
with self.server.sockjs('/sockjs/1/a/websocket') as sockjs:
|
||||
|
|
@ -394,7 +388,7 @@ class WebSocketTestCase(DDPServerTestCase):
|
|||
{
|
||||
'msg': 'result',
|
||||
'error': {
|
||||
'error': 500,
|
||||
'error': 400,
|
||||
'reason':
|
||||
'login() takes exactly 2 arguments (3 given)',
|
||||
},
|
||||
|
|
@ -404,6 +398,7 @@ class WebSocketTestCase(DDPServerTestCase):
|
|||
)
|
||||
|
||||
|
||||
|
||||
def load_tests(loader, tests, pattern):
|
||||
"""Specify which test cases to run."""
|
||||
del pattern
|
||||
|
|
|
|||
|
|
@ -14,19 +14,44 @@ from six.moves import range as irange
|
|||
import ejson
|
||||
import gevent
|
||||
import geventwebsocket
|
||||
from django.conf import settings
|
||||
from django.core import signals
|
||||
from django.core.handlers.base import BaseHandler
|
||||
from django.core.handlers.wsgi import WSGIRequest
|
||||
from django.db import connection, transaction
|
||||
|
||||
from dddp import THREAD_LOCAL as this, alea, ADDED, CHANGED, REMOVED
|
||||
from dddp import alea, this, ADDED, CHANGED, REMOVED, MeteorError
|
||||
|
||||
|
||||
class MeteorError(Exception):
|
||||
def safe_call(func, *args, **kwargs):
|
||||
"""
|
||||
Call `func(*args, **kwargs)` but NEVER raise an exception.
|
||||
|
||||
"""MeteorError."""
|
||||
Useful in situations such as inside exception handlers where calls to
|
||||
`logging.error` try to send email, but the SMTP server isn't always
|
||||
availalbe and you don't want your exception handler blowing up.
|
||||
"""
|
||||
try:
|
||||
return None, func(*args, **kwargs)
|
||||
except Exception: # pylint: disable=broad-except
|
||||
# something went wrong during the call, return a stack trace that can
|
||||
# be dealt with by the caller
|
||||
return traceback.format_exc(), None
|
||||
|
||||
pass
|
||||
|
||||
def dprint(name, val):
|
||||
"""Debug print name and val."""
|
||||
from pprint import pformat
|
||||
print(
|
||||
'% 5s: %s' % (
|
||||
name,
|
||||
'\n '.join(
|
||||
pformat(
|
||||
val, indent=4, width=75,
|
||||
).split('\n')
|
||||
),
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def validate_kwargs(func, kwargs):
|
||||
|
|
@ -65,11 +90,12 @@ def validate_kwargs(func, kwargs):
|
|||
]
|
||||
if missing:
|
||||
raise MeteorError(
|
||||
400,
|
||||
func.err,
|
||||
'Missing required arguments to %s: %s' % (
|
||||
func_name,
|
||||
' '.join(missing),
|
||||
),
|
||||
getattr(func, 'err', None),
|
||||
)
|
||||
|
||||
# figure out what is extra
|
||||
|
|
@ -79,10 +105,9 @@ def validate_kwargs(func, kwargs):
|
|||
]
|
||||
if extra:
|
||||
raise MeteorError(
|
||||
'Unknown arguments to %s: %s' % (
|
||||
func_name,
|
||||
' '.join(extra),
|
||||
),
|
||||
400,
|
||||
func.err,
|
||||
'Unknown arguments to %s: %s' % (func_name, ' '.join(extra)),
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -121,12 +146,11 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
|
|||
this.ws = self
|
||||
this.send = self.send
|
||||
this.reply = self.reply
|
||||
this.error = self.error
|
||||
|
||||
self.logger = self.ws.logger
|
||||
self.remote_ids = collections.defaultdict(set)
|
||||
|
||||
# self._tx_buffer collects outgoing messages which must be sent in order
|
||||
# `_tx_buffer` collects outgoing messages which must be sent in order
|
||||
self._tx_buffer = {}
|
||||
# track the head of the queue (buffer) and the next msg to be sent
|
||||
self._tx_buffer_id_gen = itertools.cycle(irange(sys.maxint))
|
||||
|
|
@ -139,7 +163,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
|
|||
self.ws.environ,
|
||||
)
|
||||
this.subs = {}
|
||||
self.logger.info('+ %s OPEN', self)
|
||||
safe_call(self.logger.info, '+ %s OPEN', self)
|
||||
self.send('o')
|
||||
self.send('a["{\\"server_id\\":\\"0\\"}"]')
|
||||
|
||||
|
|
@ -154,83 +178,144 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
|
|||
self.connection.delete()
|
||||
self.connection = None
|
||||
signals.request_finished.send(sender=self.__class__)
|
||||
self.logger.info('- %s %s', self, args or 'CLOSE')
|
||||
safe_call(self.logger.info, '- %s %s', self, args or 'CLOSE')
|
||||
|
||||
def on_message(self, message):
|
||||
"""Process a message received from remote."""
|
||||
if self.ws.closed:
|
||||
return None
|
||||
try:
|
||||
self.logger.debug('< %s %r', self, message)
|
||||
|
||||
# parse message set
|
||||
try:
|
||||
msgs = ejson.loads(message)
|
||||
except ValueError as err:
|
||||
self.error(400, 'Data is not valid EJSON')
|
||||
return
|
||||
if not isinstance(msgs, list):
|
||||
self.error(400, 'Invalid EJSON messages')
|
||||
return
|
||||
safe_call(self.logger.debug, '< %s %r', self, message)
|
||||
|
||||
# process individual messages
|
||||
while msgs:
|
||||
# parse message payload
|
||||
raw = msgs.pop(0)
|
||||
try:
|
||||
data = ejson.loads(raw)
|
||||
except (TypeError, ValueError) as err:
|
||||
self.error(400, 'Data is not valid EJSON')
|
||||
continue
|
||||
if not isinstance(data, dict):
|
||||
self.error(400, 'Invalid EJSON message payload', raw)
|
||||
continue
|
||||
try:
|
||||
msg = data.pop('msg')
|
||||
except KeyError:
|
||||
self.error(400, 'Bad request', offendingMessage=data)
|
||||
continue
|
||||
# dispatch message
|
||||
try:
|
||||
self.dispatch(msg, data)
|
||||
except MeteorError as err:
|
||||
self.error(err)
|
||||
except Exception as err:
|
||||
traceback.print_exc()
|
||||
self.error(err)
|
||||
for data in self.ddp_frames_from_message(message):
|
||||
self.process_ddp(data)
|
||||
# emit request_finished signal to close DB connections
|
||||
signals.request_finished.send(sender=self.__class__)
|
||||
if msgs:
|
||||
# yield to other greenlets before processing next msg
|
||||
gevent.sleep()
|
||||
except geventwebsocket.WebSocketError as err:
|
||||
except geventwebsocket.WebSocketError:
|
||||
self.ws.close()
|
||||
|
||||
def ddp_frames_from_message(self, message):
|
||||
"""Yield DDP messages from a raw WebSocket message."""
|
||||
# parse message set
|
||||
try:
|
||||
msgs = ejson.loads(message)
|
||||
except ValueError:
|
||||
self.reply(
|
||||
'error', error=400, reason='Data is not valid EJSON',
|
||||
)
|
||||
raise StopIteration
|
||||
if not isinstance(msgs, list):
|
||||
self.reply(
|
||||
'error', error=400, reason='Invalid EJSON messages',
|
||||
)
|
||||
raise StopIteration
|
||||
# process individual messages
|
||||
while msgs:
|
||||
# pop raw message from the list
|
||||
raw = msgs.pop(0)
|
||||
# parse message payload
|
||||
try:
|
||||
data = ejson.loads(raw)
|
||||
except (TypeError, ValueError):
|
||||
data = None
|
||||
if not isinstance(data, dict):
|
||||
self.reply(
|
||||
'error', error=400,
|
||||
reason='Invalid SockJS DDP payload',
|
||||
offendingMessage=raw,
|
||||
)
|
||||
yield data
|
||||
if msgs:
|
||||
# yield to other greenlets before processing next msg
|
||||
gevent.sleep()
|
||||
|
||||
def process_ddp(self, data):
|
||||
"""Process a single DDP message."""
|
||||
msg_id = data.get('id', None)
|
||||
try:
|
||||
msg = data.pop('msg')
|
||||
except KeyError:
|
||||
self.reply(
|
||||
'error', reason='Bad request',
|
||||
offendingMessage=data,
|
||||
)
|
||||
return
|
||||
try:
|
||||
# dispatch message
|
||||
self.dispatch(msg, data)
|
||||
except Exception as err: # pylint: disable=broad-except
|
||||
# This should be the only protocol exception handler
|
||||
kwargs = {
|
||||
'msg': {'method': 'result'}.get(msg, 'error'),
|
||||
}
|
||||
if msg_id is not None:
|
||||
kwargs['id'] = msg_id
|
||||
if isinstance(err, MeteorError):
|
||||
error = err.as_dict()
|
||||
else:
|
||||
error = {
|
||||
'error': 500,
|
||||
'reason': 'Internal server error',
|
||||
}
|
||||
if kwargs['msg'] == 'error':
|
||||
kwargs.update(error)
|
||||
else:
|
||||
kwargs['error'] = error
|
||||
if not isinstance(err, MeteorError):
|
||||
# not a client error, should always be logged.
|
||||
stack, _ = safe_call(
|
||||
self.logger.error, '%r %r', msg, data, exc_info=1,
|
||||
)
|
||||
if stack is not None:
|
||||
# something went wrong while logging the error, revert to
|
||||
# writing a stack trace to stderr.
|
||||
traceback.print_exc(file=sys.stderr)
|
||||
sys.stderr.write(
|
||||
'Additionally, while handling the above error the '
|
||||
'following error was encountered:\n'
|
||||
)
|
||||
sys.stderr.write(stack)
|
||||
elif settings.DEBUG:
|
||||
print('ERROR: %s' % err)
|
||||
dprint('msg', msg)
|
||||
dprint('data', data)
|
||||
error.setdefault('details', traceback.format_exc())
|
||||
# print stack trace for client errors when DEBUG is True.
|
||||
print(error['details'])
|
||||
self.reply(**kwargs)
|
||||
if msg_id and msg == 'method':
|
||||
self.reply('updated', methods=[msg_id])
|
||||
|
||||
@transaction.atomic
|
||||
def dispatch(self, msg, kwargs):
|
||||
"""Dispatch msg to appropriate recv_foo handler."""
|
||||
# enforce calling 'connect' first
|
||||
if self.connection is None and msg != 'connect':
|
||||
self.error(400, 'Must connect first')
|
||||
self.reply('error', reason='Must connect first')
|
||||
return
|
||||
if msg == 'method':
|
||||
if (
|
||||
'method' not in kwargs
|
||||
) or (
|
||||
'id' not in kwargs
|
||||
):
|
||||
self.reply(
|
||||
'error', error=400, reason='Malformed method invocation',
|
||||
)
|
||||
return
|
||||
|
||||
# lookup method handler
|
||||
try:
|
||||
handler = getattr(self, 'recv_%s' % msg)
|
||||
except (AttributeError, UnicodeEncodeError):
|
||||
print('Method not found: %s %r' % (msg, kwargs))
|
||||
self.error(404, 'Method not found', msg='result')
|
||||
return
|
||||
raise MeteorError(404, 'Method not found')
|
||||
|
||||
# validate handler arguments
|
||||
validate_kwargs(handler, kwargs)
|
||||
|
||||
# dispatch to handler
|
||||
try:
|
||||
handler(**kwargs)
|
||||
except Exception as err: # re-raise err --> pylint: disable=W0703
|
||||
self.error(500, 'Internal server error', err)
|
||||
raise
|
||||
handler(**kwargs)
|
||||
|
||||
def send(self, data, tx_id=None):
|
||||
"""Send `data` (raw string or EJSON payload) to WebSocket client."""
|
||||
|
|
@ -244,7 +329,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
|
|||
# pull next message from buffer
|
||||
data = self._tx_buffer.pop(self._tx_next_id)
|
||||
if self._tx_buffer:
|
||||
self.logger.debug('TX found %d', self._tx_next_id)
|
||||
safe_call(self.logger.debug, 'TX found %d', self._tx_next_id)
|
||||
# advance next message ID
|
||||
self._tx_next_id = next(self._tx_next_id_gen)
|
||||
if not isinstance(data, basestring):
|
||||
|
|
@ -270,15 +355,17 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
|
|||
continue # client doesn't have this, don't send.
|
||||
data = 'a%s' % ejson.dumps([ejson.dumps(data)])
|
||||
# send message
|
||||
self.logger.debug('> %s %r', self, data)
|
||||
safe_call(self.logger.debug, '> %s %r', self, data)
|
||||
try:
|
||||
self.ws.send(data)
|
||||
except geventwebsocket.WebSocketError:
|
||||
self.ws.close()
|
||||
self._tx_buffer.clear()
|
||||
break
|
||||
num_waiting = len(self._tx_buffer)
|
||||
if num_waiting > 10:
|
||||
self.logger.warn(
|
||||
safe_call(
|
||||
self.logger.warn,
|
||||
'TX received %d, waiting for %d, have %d waiting: %r.',
|
||||
tx_id, self._tx_next_id, num_waiting, self._tx_buffer,
|
||||
)
|
||||
|
|
@ -288,52 +375,18 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
|
|||
kwargs['msg'] = msg
|
||||
self.send(kwargs)
|
||||
|
||||
def error(
|
||||
self, err, reason=None, detail=None, msg='error', exc_info=1,
|
||||
**kwargs
|
||||
):
|
||||
"""Send EJSON error to remote."""
|
||||
if isinstance(err, MeteorError):
|
||||
(
|
||||
err, reason, detail, kwargs,
|
||||
) = (
|
||||
err.args[:] + (None, None, None, None)
|
||||
)[:4]
|
||||
elif isinstance(err, Exception):
|
||||
reason = str(err)
|
||||
data = {
|
||||
'error': '%s' % (err or '<UNKNOWN_ERROR>'),
|
||||
}
|
||||
if reason:
|
||||
if reason is Exception:
|
||||
reason = str(reason)
|
||||
data['reason'] = reason
|
||||
if detail:
|
||||
if isinstance(detail, Exception):
|
||||
detail = str(detail)
|
||||
data['detail'] = detail
|
||||
if kwargs:
|
||||
data.update(kwargs)
|
||||
record = {
|
||||
'extra': {
|
||||
'request': this.request,
|
||||
},
|
||||
}
|
||||
self.logger.error('! %s %r', self, data, exc_info=exc_info, **record)
|
||||
self.reply(msg, **data)
|
||||
|
||||
def recv_connect(self, version=None, support=None, session=None):
|
||||
"""DDP connect handler."""
|
||||
del session # Meteor doesn't even use this!
|
||||
if self.connection is not None:
|
||||
self.error(
|
||||
raise MeteorError(
|
||||
400, 'Session already established.',
|
||||
detail=self.connection.connection_id,
|
||||
self.connection.connection_id,
|
||||
)
|
||||
elif None in (version, support) or version not in self.versions:
|
||||
self.reply('failed', version=self.versions[0])
|
||||
elif version not in support:
|
||||
self.error(400, 'Client version/support mismatch.')
|
||||
raise MeteorError(400, 'Client version/support mismatch.')
|
||||
else:
|
||||
from dddp.models import Connection
|
||||
cur = connection.cursor()
|
||||
|
|
@ -352,6 +405,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
|
|||
self.pgworker.connections[self.connection.pk] = self
|
||||
atexit.register(self.on_close, 'Shutting down.')
|
||||
self.reply('connected', session=self.connection.connection_id)
|
||||
recv_connect.err = 'Malformed connect'
|
||||
|
||||
def recv_ping(self, id_=None):
|
||||
"""DDP ping handler."""
|
||||
|
|
@ -359,6 +413,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
|
|||
self.reply('pong')
|
||||
else:
|
||||
self.reply('pong', id=id_)
|
||||
recv_ping.err = 'Malformed ping'
|
||||
|
||||
def recv_sub(self, id_, name, params):
|
||||
"""DDP sub handler."""
|
||||
|
|
@ -371,6 +426,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
|
|||
self.api.unsub(id_)
|
||||
else:
|
||||
self.reply('nosub')
|
||||
recv_unsub.err = 'Malformed unsubscription'
|
||||
|
||||
def recv_method(self, method, params, id_, randomSeed=None):
|
||||
"""DDP method handler."""
|
||||
|
|
|
|||
Loading…
Reference in a new issue