diff --git a/CHANGES.rst b/CHANGES.rst index 7bb9aef..def0434 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,23 @@ Change Log ========== +0.16.0 +------ +* New setting: `DDP_API_ENDPOINT_DECORATORS`. + This setting takes a list of dotted import paths to decorators which are applied to API endpoints. For example, enable New Relic instrumentation by adding the line below to your Django `settings.py`: + + .. code:: python + + DDP_API_ENDPOINT_DECORATORS = ['newrelic.agent.background_task'] + +* Fixed #7 -- Warn if using DB engines other than psycopg2 - thanks @Matvey-Kuk. +* Improvements to error/exception handling. +* Warn if many TX chunks are queued in case WebSocket has stalled. +* Bugfix thread locals setup when opening WebSocket. +* Add missing import for print function (Python 2). +* Work towards #16 -- Use `psycopg2cffi` compatibility if `psycopg2` not + installed. + 0.15.0 ------ * Renamed `Logs` collection and publication to `dddp.logs` to be consistent with naming conventions used elsewhere. diff --git a/dddp/__init__.py b/dddp/__init__.py index 321555c..f60ad98 100644 --- a/dddp/__init__.py +++ b/dddp/__init__.py @@ -6,7 +6,6 @@ from pkg_resources import get_distribution, DistributionNotFound from gevent.local import local from dddp import alea - try: _dist = get_distribution('django-ddp') if not __file__.startswith(os.path.join(_dist.location, 'django-ddp', '')): @@ -43,6 +42,14 @@ def greenify(): # ensure we don't greenify again _GREEN[True] = True + try: + # Use psycopg2 by default + import psycopg2 + except ImportError: + # Fallback to psycopg2cffi if required (eg: pypy) + from psycopg2cffi import compat + compat.register() + class AlreadyRegistered(Exception): diff --git a/dddp/api.py b/dddp/api.py index 189aac4..e23ccc4 100644 --- a/dddp/api.py +++ b/dddp/api.py @@ -21,6 +21,7 @@ except ImportError: from django.db.models import Expression as ExpressionNode from django.db.models.sql import aggregates as sql_aggregates from django.utils.encoding import force_text +from django.utils.module_loading import import_string from django.db import DatabaseError from django.db.models import signals import ejson @@ -34,6 +35,12 @@ from dddp.models import ( ) +API_ENDPOINT_DECORATORS = [ + import_string(dotted_path) + for dotted_path + in getattr(settings, 'DDP_API_ENDPOINT_DECORATORS', []) +] + XMIN = {'select': {'xmin': "'xmin'"}} @@ -88,16 +95,54 @@ class Array(aggregates.Aggregate): return value -def api_endpoint(path_or_func): - """Decorator to mark a method as an API endpoint for later registration.""" +def api_endpoint(path_or_func=None, decorate=True): + """ + Decorator to mark a method as an API endpoint for later registration. + + Args: + path_or_func: either the function to be decorated or its API path. + decorate (bool): Apply API_ENDPOINT_DECORATORS if True (default). + + Returns: + Callable: Decorated function (with optionally applied decorators). + + Examples: + + >>> class Counter(APIMixin): + ... value = 0 + ... + ... # default API path matches function name 'increment'. + ... @api_endpoint + ... def increment(self, amount): + ... '''Increment counter value by `amount`.''' + ... self.value += amount + ... return self.value + ... + ... # excplicitly set API path to 'Decrement'. + ... @api_endpoint('Decrement') + ... def decrement(self, amount): + ... '''Decrement counter value by `amount`.''' + ... self.value -= amount + ... return self.value + + """ + def maybe_decorated(func): + """Apply API_ENDPOINT_DECORATORS to func.""" + if decorate: + for decorator in API_ENDPOINT_DECORATORS: + func = decorator()(func) + return func if callable(path_or_func): path_or_func.api_path = path_or_func.__name__ - return path_or_func + return maybe_decorated(path_or_func) else: def _api_endpoint(func): """Decorator inner.""" - func.api_path = path_or_func - return func + if path_or_func is None: + func.api_path = func.__name__ + else: + func.api_path = path_or_func + return maybe_decorated(func) return _api_endpoint @@ -675,7 +720,7 @@ class DDP(APIMixin): if not silent: this.send({'msg': 'nosub', 'id': id_}) - @api_endpoint + @api_endpoint(decorate=False) def method(self, method, params, id_): """Invoke a method.""" try: diff --git a/dddp/apps.py b/dddp/apps.py index 6692e6b..fe0090b 100644 --- a/dddp/apps.py +++ b/dddp/apps.py @@ -1,6 +1,5 @@ """Django DDP app config.""" - -from __future__ import print_function +import warnings from django.apps import AppConfig from django.conf import settings, ImproperlyConfigured @@ -15,18 +14,19 @@ class DjangoDDPConfig(AppConfig): api = None name = 'dddp' verbose_name = 'Django DDP' - _in_migration = False def ready(self): """Initialisation for django-ddp (setup lookups and signal handlers).""" if not settings.DATABASES: raise ImproperlyConfigured('No databases configured.') for (alias, conf) in settings.DATABASES.items(): - if conf['ENGINE'] != 'django.db.backends.postgresql_psycopg2': - raise ImproperlyConfigured( - '%r uses %r: django-ddp only works with PostgreSQL.' % ( - alias, conf['backend'], - ) + engine = conf['ENGINE'] + if engine != 'django.db.backends.postgresql_psycopg2': + warnings.warn( + 'Database %r uses unsupported %r engine.' % ( + alias, engine, + ), + UserWarning, ) self.api = autodiscover() self.api.ready() diff --git a/dddp/websocket.py b/dddp/websocket.py index 508ec15..3da9efd 100644 --- a/dddp/websocket.py +++ b/dddp/websocket.py @@ -1,6 +1,6 @@ """Django DDP WebSocket service.""" -from __future__ import absolute_import +from __future__ import absolute_import, print_function import atexit import collections @@ -115,6 +115,12 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): def on_open(self): """Handle new websocket connection.""" + this.request = WSGIRequest(self.ws.environ) + this.ws = self + this.send = self.send + this.reply = self.reply + this.error = self.error + self.logger = self.ws.logger self.remote_ids = collections.defaultdict(set) @@ -139,13 +145,13 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): """Show remote address that connected to us.""" return self.remote_addr - def on_close(self, reason): + def on_close(self, *args, **kwargs): """Handle closing of websocket connection.""" if self.connection is not None: del self.pgworker.connections[self.connection.pk] self.connection.delete() self.connection = None - self.logger.info('- %s %s', self, reason or 'CLOSE') + self.logger.info('- %s %s', self, args or 'CLOSE') def on_message(self, message): """Process a message received from remote.""" @@ -170,7 +176,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): raw = msgs.pop(0) try: data = ejson.loads(raw) - except ValueError, err: + except (TypeError, ValueError), err: self.error(400, 'Data is not valid EJSON') continue if not isinstance(data, dict): @@ -179,9 +185,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): try: msg = data.pop('msg') except KeyError: - self.error( - 400, 'Bad request', offendingMessage=data, - ) + self.error(400, 'Bad request', offendingMessage=data) continue # dispatch message try: @@ -193,8 +197,6 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): self.error(err) except geventwebsocket.WebSocketError, err: self.ws.close() - except MeteorError, err: - self.error(err) @transaction.atomic def dispatch(self, msg, kwargs): @@ -227,11 +229,6 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): # buffer data until we get pre-requisite data if tx_id is None: tx_id = self.get_tx_id() - if self._tx_buffer: - self.logger.debug( - 'TX received %d, waiting for %d, have %r.', - tx_id, self._tx_next_id, self._tx_buffer, - ) self._tx_buffer[tx_id] = data # de-queue messages from buffer @@ -271,13 +268,22 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): except geventwebsocket.WebSocketError: self.ws.close() break + num_waiting = len(self._tx_buffer) + if num_waiting > 10: + self.logger.warn( + 'TX received %d, waiting for %d, have %d waiting: %r.', + tx_id, self._tx_next_id, num_waiting, self._tx_buffer, + ) def reply(self, msg, **kwargs): """Send EJSON reply to remote.""" kwargs['msg'] = msg self.send(kwargs) - def error(self, err, reason=None, detail=None, msg='error', **kwargs): + def error( + self, err, reason=None, detail=None, msg='error', exc_info=1, + **kwargs + ): """Send EJSON error to remote.""" if isinstance(err, MeteorError): ( @@ -301,36 +307,26 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): if kwargs: data.update(kwargs) record = { - 'exc_info': sys.exc_info(), 'extra': { 'request': this.request, }, } - if record['exc_info'] == (None, None, None): - del record['exc_info'] - self.logger.error('! %s %r', self, data, **record) + self.logger.error('! %s %r', self, data, exc_info=exc_info, **record) self.reply(msg, **data) def recv_connect(self, version=None, support=None, session=None): """DDP connect handler.""" + del session # Meteor doesn't even use this! if self.connection is not None: self.error( - 400, - 'Session already established.', - reason='Current session in detail.', + 400, 'Session already established.', detail=self.connection.connection_id, ) elif None in (version, support) or version not in self.versions: self.reply('failed', version=self.versions[0]) elif version not in support: - self.error('Client version/support mismatch.') + self.error(400, 'Client version/support mismatch.') else: - this.request = WSGIRequest(self.ws.environ) - this.ws = self - this.send = self.send - this.reply = self.reply - this.error = self.error - from dddp.models import Connection cur = connection.cursor() cur.execute('SELECT pg_backend_pid()') diff --git a/setup.py b/setup.py index 8c19cbf..bb32f9b 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,7 @@ CLASSIFIERS = [ setup( name='django-ddp', - version='0.15.0', + version='0.16.0', description=__doc__, long_description=open('README.rst').read(), author='Tyson Clugg',