From 73465d6ed40da86a93d350ec8dcd996f43dc83b8 Mon Sep 17 00:00:00 2001 From: Tyson Clugg Date: Thu, 8 Oct 2015 11:37:03 +1100 Subject: [PATCH 1/8] Fixed #7 -- Warn if using DB engines other than psycopg2. --- dddp/apps.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/dddp/apps.py b/dddp/apps.py index 6692e6b..fe0090b 100644 --- a/dddp/apps.py +++ b/dddp/apps.py @@ -1,6 +1,5 @@ """Django DDP app config.""" - -from __future__ import print_function +import warnings from django.apps import AppConfig from django.conf import settings, ImproperlyConfigured @@ -15,18 +14,19 @@ class DjangoDDPConfig(AppConfig): api = None name = 'dddp' verbose_name = 'Django DDP' - _in_migration = False def ready(self): """Initialisation for django-ddp (setup lookups and signal handlers).""" if not settings.DATABASES: raise ImproperlyConfigured('No databases configured.') for (alias, conf) in settings.DATABASES.items(): - if conf['ENGINE'] != 'django.db.backends.postgresql_psycopg2': - raise ImproperlyConfigured( - '%r uses %r: django-ddp only works with PostgreSQL.' % ( - alias, conf['backend'], - ) + engine = conf['ENGINE'] + if engine != 'django.db.backends.postgresql_psycopg2': + warnings.warn( + 'Database %r uses unsupported %r engine.' % ( + alias, engine, + ), + UserWarning, ) self.api = autodiscover() self.api.ready() From 397e044dddc2e02c2e9cc50fdf4244719263dbc7 Mon Sep 17 00:00:00 2001 From: Tyson Clugg Date: Thu, 8 Oct 2015 11:58:42 +1100 Subject: [PATCH 2/8] Work towards #16 -- Use `psycopg2cffi` compatibility if `psycopg2` not installed. --- dddp/__init__.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/dddp/__init__.py b/dddp/__init__.py index 321555c..f60ad98 100644 --- a/dddp/__init__.py +++ b/dddp/__init__.py @@ -6,7 +6,6 @@ from pkg_resources import get_distribution, DistributionNotFound from gevent.local import local from dddp import alea - try: _dist = get_distribution('django-ddp') if not __file__.startswith(os.path.join(_dist.location, 'django-ddp', '')): @@ -43,6 +42,14 @@ def greenify(): # ensure we don't greenify again _GREEN[True] = True + try: + # Use psycopg2 by default + import psycopg2 + except ImportError: + # Fallback to psycopg2cffi if required (eg: pypy) + from psycopg2cffi import compat + compat.register() + class AlreadyRegistered(Exception): From b14b2a427e3b9abffad5ec9bc6d053f7799353bf Mon Sep 17 00:00:00 2001 From: Tyson Clugg Date: Tue, 13 Oct 2015 11:05:02 +1100 Subject: [PATCH 3/8] New setting `DDP_API_ENDPOINT_DECORATORS`. Takes a list of dotted import paths to decorators which are applied to API endpoints. For example, enable New Relic instrumentation with the following: ``` DDP_API_ENDPOINT_DECORATORS = ['newrelic.agent.background_task'] ``` --- dddp/api.py | 57 +++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 51 insertions(+), 6 deletions(-) diff --git a/dddp/api.py b/dddp/api.py index 189aac4..e23ccc4 100644 --- a/dddp/api.py +++ b/dddp/api.py @@ -21,6 +21,7 @@ except ImportError: from django.db.models import Expression as ExpressionNode from django.db.models.sql import aggregates as sql_aggregates from django.utils.encoding import force_text +from django.utils.module_loading import import_string from django.db import DatabaseError from django.db.models import signals import ejson @@ -34,6 +35,12 @@ from dddp.models import ( ) +API_ENDPOINT_DECORATORS = [ + import_string(dotted_path) + for dotted_path + in getattr(settings, 'DDP_API_ENDPOINT_DECORATORS', []) +] + XMIN = {'select': {'xmin': "'xmin'"}} @@ -88,16 +95,54 @@ class Array(aggregates.Aggregate): return value -def api_endpoint(path_or_func): - """Decorator to mark a method as an API endpoint for later registration.""" +def api_endpoint(path_or_func=None, decorate=True): + """ + Decorator to mark a method as an API endpoint for later registration. + + Args: + path_or_func: either the function to be decorated or its API path. + decorate (bool): Apply API_ENDPOINT_DECORATORS if True (default). + + Returns: + Callable: Decorated function (with optionally applied decorators). + + Examples: + + >>> class Counter(APIMixin): + ... value = 0 + ... + ... # default API path matches function name 'increment'. + ... @api_endpoint + ... def increment(self, amount): + ... '''Increment counter value by `amount`.''' + ... self.value += amount + ... return self.value + ... + ... # excplicitly set API path to 'Decrement'. + ... @api_endpoint('Decrement') + ... def decrement(self, amount): + ... '''Decrement counter value by `amount`.''' + ... self.value -= amount + ... return self.value + + """ + def maybe_decorated(func): + """Apply API_ENDPOINT_DECORATORS to func.""" + if decorate: + for decorator in API_ENDPOINT_DECORATORS: + func = decorator()(func) + return func if callable(path_or_func): path_or_func.api_path = path_or_func.__name__ - return path_or_func + return maybe_decorated(path_or_func) else: def _api_endpoint(func): """Decorator inner.""" - func.api_path = path_or_func - return func + if path_or_func is None: + func.api_path = func.__name__ + else: + func.api_path = path_or_func + return maybe_decorated(func) return _api_endpoint @@ -675,7 +720,7 @@ class DDP(APIMixin): if not silent: this.send({'msg': 'nosub', 'id': id_}) - @api_endpoint + @api_endpoint(decorate=False) def method(self, method, params, id_): """Invoke a method.""" try: From 0bbf5ac10c14aa8ae307961a4bb57e5073738814 Mon Sep 17 00:00:00 2001 From: Tyson Clugg Date: Tue, 13 Oct 2015 11:13:59 +1100 Subject: [PATCH 4/8] Add missing import for Python 2 print(). --- dddp/websocket.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dddp/websocket.py b/dddp/websocket.py index 508ec15..89e8a69 100644 --- a/dddp/websocket.py +++ b/dddp/websocket.py @@ -1,6 +1,6 @@ """Django DDP WebSocket service.""" -from __future__ import absolute_import +from __future__ import absolute_import, print_function import atexit import collections From 91a1b4a3db02c9c74ec0831c8e0d2d390b7279fc Mon Sep 17 00:00:00 2001 From: Tyson Clugg Date: Tue, 13 Oct 2015 11:18:24 +1100 Subject: [PATCH 5/8] Bugfix thread locals setup when opening WebSocket. --- dddp/websocket.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/dddp/websocket.py b/dddp/websocket.py index 89e8a69..ae8e019 100644 --- a/dddp/websocket.py +++ b/dddp/websocket.py @@ -115,6 +115,12 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): def on_open(self): """Handle new websocket connection.""" + this.request = WSGIRequest(self.ws.environ) + this.ws = self + this.send = self.send + this.reply = self.reply + this.error = self.error + self.logger = self.ws.logger self.remote_ids = collections.defaultdict(set) @@ -325,12 +331,6 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): elif version not in support: self.error('Client version/support mismatch.') else: - this.request = WSGIRequest(self.ws.environ) - this.ws = self - this.send = self.send - this.reply = self.reply - this.error = self.error - from dddp.models import Connection cur = connection.cursor() cur.execute('SELECT pg_backend_pid()') From 2aa21c7a274a9ae5942281662c8748f9f4c383ba Mon Sep 17 00:00:00 2001 From: Tyson Clugg Date: Tue, 13 Oct 2015 11:36:05 +1100 Subject: [PATCH 6/8] Warn if many TX chunks are waiting (stalled?) --- dddp/websocket.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/dddp/websocket.py b/dddp/websocket.py index ae8e019..c1db11a 100644 --- a/dddp/websocket.py +++ b/dddp/websocket.py @@ -233,11 +233,6 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): # buffer data until we get pre-requisite data if tx_id is None: tx_id = self.get_tx_id() - if self._tx_buffer: - self.logger.debug( - 'TX received %d, waiting for %d, have %r.', - tx_id, self._tx_next_id, self._tx_buffer, - ) self._tx_buffer[tx_id] = data # de-queue messages from buffer @@ -277,6 +272,12 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): except geventwebsocket.WebSocketError: self.ws.close() break + num_waiting = len(self._tx_buffer) + if num_waiting > 10: + self.logger.warn( + 'TX received %d, waiting for %d, have %d waiting: %r.', + tx_id, self._tx_next_id, num_waiting, self._tx_buffer, + ) def reply(self, msg, **kwargs): """Send EJSON reply to remote.""" From b80e50a6ac1cfa6d50f55c21ce550a93275c283b Mon Sep 17 00:00:00 2001 From: Tyson Clugg Date: Tue, 13 Oct 2015 11:39:20 +1100 Subject: [PATCH 7/8] Improvements to error/exception handling. --- dddp/websocket.py | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/dddp/websocket.py b/dddp/websocket.py index c1db11a..3da9efd 100644 --- a/dddp/websocket.py +++ b/dddp/websocket.py @@ -145,13 +145,13 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): """Show remote address that connected to us.""" return self.remote_addr - def on_close(self, reason): + def on_close(self, *args, **kwargs): """Handle closing of websocket connection.""" if self.connection is not None: del self.pgworker.connections[self.connection.pk] self.connection.delete() self.connection = None - self.logger.info('- %s %s', self, reason or 'CLOSE') + self.logger.info('- %s %s', self, args or 'CLOSE') def on_message(self, message): """Process a message received from remote.""" @@ -176,7 +176,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): raw = msgs.pop(0) try: data = ejson.loads(raw) - except ValueError, err: + except (TypeError, ValueError), err: self.error(400, 'Data is not valid EJSON') continue if not isinstance(data, dict): @@ -185,9 +185,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): try: msg = data.pop('msg') except KeyError: - self.error( - 400, 'Bad request', offendingMessage=data, - ) + self.error(400, 'Bad request', offendingMessage=data) continue # dispatch message try: @@ -199,8 +197,6 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): self.error(err) except geventwebsocket.WebSocketError, err: self.ws.close() - except MeteorError, err: - self.error(err) @transaction.atomic def dispatch(self, msg, kwargs): @@ -284,7 +280,10 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): kwargs['msg'] = msg self.send(kwargs) - def error(self, err, reason=None, detail=None, msg='error', **kwargs): + def error( + self, err, reason=None, detail=None, msg='error', exc_info=1, + **kwargs + ): """Send EJSON error to remote.""" if isinstance(err, MeteorError): ( @@ -308,29 +307,25 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): if kwargs: data.update(kwargs) record = { - 'exc_info': sys.exc_info(), 'extra': { 'request': this.request, }, } - if record['exc_info'] == (None, None, None): - del record['exc_info'] - self.logger.error('! %s %r', self, data, **record) + self.logger.error('! %s %r', self, data, exc_info=exc_info, **record) self.reply(msg, **data) def recv_connect(self, version=None, support=None, session=None): """DDP connect handler.""" + del session # Meteor doesn't even use this! if self.connection is not None: self.error( - 400, - 'Session already established.', - reason='Current session in detail.', + 400, 'Session already established.', detail=self.connection.connection_id, ) elif None in (version, support) or version not in self.versions: self.reply('failed', version=self.versions[0]) elif version not in support: - self.error('Client version/support mismatch.') + self.error(400, 'Client version/support mismatch.') else: from dddp.models import Connection cur = connection.cursor() From 99cff8241e394cc09e62a1770785ebbcb4e5a7db Mon Sep 17 00:00:00 2001 From: Tyson Clugg Date: Tue, 13 Oct 2015 12:19:52 +1100 Subject: [PATCH 8/8] Update CHANGES.rst, bump version number. --- CHANGES.rst | 17 +++++++++++++++++ setup.py | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/CHANGES.rst b/CHANGES.rst index 7bb9aef..def0434 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,23 @@ Change Log ========== +0.16.0 +------ +* New setting: `DDP_API_ENDPOINT_DECORATORS`. + This setting takes a list of dotted import paths to decorators which are applied to API endpoints. For example, enable New Relic instrumentation by adding the line below to your Django `settings.py`: + + .. code:: python + + DDP_API_ENDPOINT_DECORATORS = ['newrelic.agent.background_task'] + +* Fixed #7 -- Warn if using DB engines other than psycopg2 - thanks @Matvey-Kuk. +* Improvements to error/exception handling. +* Warn if many TX chunks are queued in case WebSocket has stalled. +* Bugfix thread locals setup when opening WebSocket. +* Add missing import for print function (Python 2). +* Work towards #16 -- Use `psycopg2cffi` compatibility if `psycopg2` not + installed. + 0.15.0 ------ * Renamed `Logs` collection and publication to `dddp.logs` to be consistent with naming conventions used elsewhere. diff --git a/setup.py b/setup.py index 8c19cbf..bb32f9b 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,7 @@ CLASSIFIERS = [ setup( name='django-ddp', - version='0.15.0', + version='0.16.0', description=__doc__, long_description=open('README.rst').read(), author='Tyson Clugg',