diff --git a/dddp/api.py b/dddp/api.py index dad34e3..26840eb 100644 --- a/dddp/api.py +++ b/dddp/api.py @@ -749,8 +749,10 @@ class DDP(APIMixin): payload['_sub_ids'] = sorted(sub_ids) try: ws = this.ws - payload['_tx_id'] = ws.get_tx_id() payload['_sender'] = ws.connection.pk + if set(sub_ids).intersection(self._subs): + # message must go to connection that initiated the change + payload['_tx_id'] = ws.get_tx_id() except AttributeError: pass cursor = connections[using].cursor() diff --git a/dddp/websocket.py b/dddp/websocket.py index e48422d..497fffa 100644 --- a/dddp/websocket.py +++ b/dddp/websocket.py @@ -128,29 +128,12 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): # start by waiting for the very first message self._tx_next_id = next(self._tx_next_id_gen) - self.request = WSGIRequest(self.ws.environ) - # Apply request middleware (so we get request.user and other attrs) - # pylint: disable=protected-access - if self.base_handler._request_middleware is None: - self.base_handler.load_middleware() - for middleware_method in self.base_handler._request_middleware: - response = middleware_method(self.request) - if response: - raise ValueError(response) - this.ws = self - this.request = self.request - this.send = self.send - this.send_msg = self.send_msg - this.reply = self.reply - this.error = self.error - this.request.session.save() - this.remote_addr = self.remote_addr = \ '{0[REMOTE_ADDR]}:{0[REMOTE_PORT]}'.format( self.ws.environ, ) self.subs = {} - self.logger.info('+ %s OPEN %s', self, this.request.user) + self.logger.info('+ %s OPEN', self) self.send('o') self.send('a["{\\"server_id\\":\\"0\\"}"]') @@ -239,12 +222,19 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): # buffer data until we get pre-requisite data if tx_id is None: tx_id = self.get_tx_id() + if self._tx_buffer: + self.logger.debug( + 'TX received %d, waiting for %d, have %r.', + tx_id, self._tx_next_id, sorted(self._tx_buffer), + ) self._tx_buffer[tx_id] = data # de-queue messages from buffer while self._tx_next_id in self._tx_buffer: # pull next message from buffer data = self._tx_buffer.pop(self._tx_next_id) + if self._tx_buffer: + self.logger.debug('TX found %d', self._tx_next_id) # advance next message ID self._tx_next_id = next(self._tx_next_id_gen) # send message @@ -320,6 +310,23 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): elif version not in support: self.error('Client version/support mismatch.') else: + self.request = WSGIRequest(self.ws.environ) + # Apply request middleware (so we get request.user and other attrs) + # pylint: disable=protected-access + if self.base_handler._request_middleware is None: + self.base_handler.load_middleware() + for middleware_method in self.base_handler._request_middleware: + response = middleware_method(self.request) + if response: + raise ValueError(response) + this.ws = self + this.request = self.request + this.send = self.send + this.send_msg = self.send_msg + this.reply = self.reply + this.error = self.error + this.request.session.save() + from dddp.models import Connection cur = connection.cursor() cur.execute('SELECT pg_backend_pid()')