Refactored send_notify to add custom serialization support.

This commit is contained in:
Tyson Clugg 2015-05-06 19:40:54 +10:00
parent 55825b860a
commit c95e2faf2a
12 changed files with 302 additions and 382 deletions

View file

@ -53,11 +53,12 @@ class ObjectMapping(admin.ModelAdmin):
class SubscriptionCollectionInline(admin.TabularInline):
model = models.SubscriptionCollection
fields = [
'collection_class',
'model_name',
'collection_name',
]
readonly_fields = [
'name',
'collection_class',
'model_name',
'collection_name',
]
max_num = 0
@ -87,17 +88,18 @@ class Subscription(admin.ModelAdmin):
class SubscriptionCollection(admin.ModelAdmin):
search_fields = [
'name',
'collection_class',
'model_name',
'collection_name',
]
list_display = [
'__str__',
'subscription',
'name',
'collection_class',
'model_name',
'collection_name',
]
list_filter = [
'name',
'model_name',
'collection_name',
]

View file

@ -1,19 +1,33 @@
"""Django DDP API, Collections, Cursors and Publications."""
from __future__ import absolute_import
from __future__ import absolute_import, unicode_literals
# standard library
import collections
from copy import deepcopy
import heapq
import itertools
import sys
import traceback
from six.moves import range
# requirements
import dbarray
from django.conf import settings
from django.contrib.auth import get_user_model
from django.db import connection
from django.db import connection, connections
from django.db.models import aggregates, Q
from django.db.models.expressions import ExpressionNode
from django.db.models.sql import aggregates as sql_aggregates
from django.utils.encoding import force_text
from dddp import AlreadyRegistered, THREAD_LOCAL as this
from dddp.models import Subscription
from dddp.msg import obj_change_as_msg
from django.db import DatabaseError
from django.db.models import signals
import ejson
# django-ddp
from dddp import AlreadyRegistered, THREAD_LOCAL as this
from dddp.models import Connection, Subscription, get_meteor_id
XMIN = {'select': {'xmin': "'xmin'"}}
@ -136,8 +150,8 @@ class APIMixin(object):
return self.api_path_map()[api_path]
def collection_name(model):
"""Return collection name given model class."""
def model_name(model):
"""Return model name given model class."""
# Django supports model._meta -> pylint: disable=W0212
return force_text(model._meta)
@ -157,7 +171,7 @@ class CollectionMeta(APIMeta):
model = attrs.get('model', None)
if attrs.get('name', None) is None and model is not None:
attrs.update(
name=collection_name(model),
name=model_name(model),
)
return super(CollectionMeta, mcs).__new__(mcs, name, bases, attrs)
@ -279,7 +293,7 @@ class Collection(APIMixin):
schema['type'] = 'String'
schema['relation'] = {
'name': field.name,
'collection': collection_name(rel.to),
'collection': model_name(rel.to),
}
choices = getattr(field, 'choices', None)
@ -314,7 +328,7 @@ class Collection(APIMixin):
'type': '[String]',
'relation': {
'name': field.name,
'collection': collection_name(field.rel.to),
'collection': model_name(field.rel.to),
},
}
@ -327,6 +341,35 @@ class Collection(APIMixin):
in self.field_schema()
}
def serialize(self, obj):
"""Generate a DDP msg for obj with specified msg type."""
# check for F expressions
exps = [
name for name, val in vars(obj).items()
if isinstance(val, ExpressionNode)
]
if exps:
# clone/update obj with values but only for the expression fields
obj = deepcopy(obj)
for name, val in self.model.objects.values(*exps).get(pk=obj.pk):
setattr(obj, name, val)
# run serialization now all fields are "concrete" (not F expressions)
return this.serializer.serialize([obj])[0]
def obj_change_as_msg(self, obj, msg):
"""Return DDP change message of specified type (msg) for obj."""
if msg == 'removed':
data = {'pk': get_meteor_id(obj)} # `removed` only needs ID
elif msg in ('added', 'changed'):
data = self.serialize(obj)
data['id'] = str(data.pop('pk')) # force casting ID as string
else:
raise ValueError('Invalid message type: %r' % msg)
data.update(msg=msg, collection=self.name)
return data
class PublicationMeta(APIMeta):
@ -335,7 +378,7 @@ class PublicationMeta(APIMeta):
def __new__(mcs, name, bases, attrs):
"""Create a new Publication class."""
attrs.update(
api_path_prefix_format='publications/{name}/',
api_path_prefix_format='publication/{name}/',
)
return super(PublicationMeta, mcs).__new__(mcs, name, bases, attrs)
@ -357,14 +400,14 @@ class Publication(APIMixin):
self.name,
),
)
return self.queries[:] or []
return self.queries[:]
@api_endpoint
def collections(self, *params):
"""Return list of collections for this publication."""
return sorted(
set(
hasattr(qs, 'model') and collection_name(qs.model) or qs[1]
hasattr(qs, 'model') and model_name(qs.model) or qs[1]
for qs
in self.get_queries(*params)
)
@ -376,6 +419,8 @@ def pub_path(publication_name):
return Publication.api_path_prefix_format.format(name=publication_name)
class DDP(APIMixin):
"""Django DDP API."""
@ -383,17 +428,36 @@ class DDP(APIMixin):
__metaclass__ = APIMeta
pgworker = None
_in_migration = False
class Msg(object):
"""DDP message type enumeration."""
ADDED = 'added'
CHANGED = 'changed'
REMOVED = 'removed'
def __init__(self):
"""DDP API init."""
self._registry = {}
self._subs = {}
# self._tx_buffer collects outgoing messages which must be sent in order
self._tx_buffer = []
# track the head of the queue (buffer) and the next msg to be sent
self._tx_buffer_id_gen = itertools.repeat(range(sys.maxint))
self._tx_next_id_gen = itertools.repeat(range(sys.maxint))
# start by waiting for the very first message
self._tx_next_id = self._tx_next_id_gen.next()
def get_collection(self, model):
"""Return collection instance for given model."""
name = collection_name(model)
path = COLLECTION_PATH_FORMAT.format(name=name)
return self._registry[path]
name = model_name(model)
return self.get_col_by_name(name)
def get_col_by_name(self, name):
"""Return collection instance for given name."""
return self._registry[COLLECTION_PATH_FORMAT.format(name=name)]
@property
def api_providers(self):
@ -410,9 +474,7 @@ class DDP(APIMixin):
if hasattr(qs, 'model'):
return (qs, self.get_collection(qs.model))
elif isinstance(qs, (list, tuple)):
name = qs[1]
path = COLLECTION_PATH_FORMAT.format(name=name)
return (qs[0], self._registry[path])
return (qs[0], self.get_col_by_name(qs[1]))
else:
raise TypeError('Invalid query spec: %r' % qs)
@ -430,10 +492,6 @@ class DDP(APIMixin):
user_id=this.request.user.pk,
defaults={
'publication': pub.name,
'publication_class': '%s.%s' % (
pub.__class__.__module__,
pub.__class__.__name__,
),
'params_ejson': ejson.dumps(params),
},
)
@ -442,15 +500,15 @@ class DDP(APIMixin):
return
# re-read from DB so we can get transaction ID (xmin)
obj = Subscription.objects.extra(**XMIN).get(pk=obj.pk)
queries = {
collection.name: (collection, qs)
for (qs, collection)
queries = collections.OrderedDict(
(col.name, (col, qs))
for (qs, col)
in (
self.qs_and_collection(qs)
for qs
in pub.get_queries(*params)
)
}
)
self._subs[id_] = (this.ws, sorted(queries))
self.pgworker.subscribe(self.sub_notify, id_, sorted(queries))
# mergebox via MVCC! For details on how this is possible, read this:
@ -458,44 +516,42 @@ class DDP(APIMixin):
to_send = collections.OrderedDict(
(
name,
collection.objects_for_user(
col.objects_for_user(
user=this.request.user.pk,
qs=qs,
xmin__lte=obj.xmin,
),
)
for name, (collection, qs)
for name, (col, qs)
in queries.items()
)
for name, (collection, qs) in queries.items():
for name, (col, qs) in queries.items():
obj.collections.create(
name=collection_name(qs.model),
collection_class='%s.%s' % (
collection.__class__.__module__,
collection.__class__.__name__,
),
model_name=model_name(qs.model),
collection_name=name,
)
for other in Subscription.objects.filter(
connection=this.ws.connection,
collections__name__in=queries.keys(),
collections__collection_name__in=queries.keys(),
).exclude(
pk=obj.pk,
).order_by('pk').distinct():
other_pub = self._registry[pub_path(other.publication)]
for qs in other_pub.get_queries(*other.params):
qs, collection = self.qs_and_collection(qs)
if collection.name not in to_send:
qs, col = self.qs_and_collection(qs)
if col not in to_send:
continue
to_send[collection.name] = to_send[collection.name].exclude(
pk__in=collection.objects_for_user(
to_send[col] = to_send[col.name].exclude(
pk__in=col.objects_for_user(
user=this.request.user.pk,
qs=qs,
xmin__lte=obj.xmin,
).values('pk'),
)
for qs in to_send.values():
for collection_name, qs in to_send.items():
col = self.get_col_by_name(collection_name)
for obj in qs:
name, payload = obj_change_as_msg(obj, 'added')
payload = col.obj_change_as_msg(obj, self.Msg.ADDED)
this.send_msg(payload)
this.send_msg({'msg': 'ready', 'subs': [id_]})
@ -566,8 +622,132 @@ class DDP(APIMixin):
for api_provider in self.api_providers:
if isinstance(api_provider, Collection):
collection = api_provider
res[collection_name(collection.model)] = collection.schema()
res[model_name(collection.model)] = collection.schema()
return res
def ready(self):
"""Initialisation for django-ddp (setup lookups and signal handlers)."""
signals.post_save.connect(self.on_save)
signals.post_delete.connect(self.on_delete)
signals.m2m_changed.connect(self.on_m2m_changed)
signals.post_migrate.connect(self.on_post_migrate)
def on_save(self, sender, **kwargs):
"""Post-save signal handler."""
if self._in_migration:
return
self.send_notify(
model=sender,
obj=kwargs['instance'],
msg=kwargs['created'] and self.Msg.ADDED or self.Msg.CHANGED,
using=kwargs['using'],
)
def on_delete(self, sender, **kwargs):
"""Post-delete signal handler."""
if self._in_migration:
return
self.send_notify(
model=sender,
obj=kwargs['instance'],
msg='removed',
using=kwargs['using'],
)
def on_m2m_changed(self, sender, **kwargs):
"""M2M-changed signal handler."""
if self._in_migration:
return
# See https://docs.djangoproject.com/en/1.7/ref/signals/#m2m-changed
if kwargs['action'] in (
'post_add',
'post_remove',
'post_clear',
):
if kwargs['reverse'] is False:
objs = [kwargs['instance']]
model = objs[0].__class__
else:
model = kwargs['model']
objs = model.objects.filter(pk__in=kwargs['pk_set'])
for obj in objs:
self.send_notify(
model=model,
obj=obj,
msg='changed',
using=kwargs['using'],
)
def on_pre_migrate(self, sender, **kwargs):
"""Pre-migrate signal handler."""
self._in_migration = True
def on_post_migrate(self, sender, **kwargs):
"""Post-migrate signal handler."""
self._in_migration = False
try:
Connection.objects.all().delete()
except DatabaseError: # pylint: disable=E0712
pass
def send_notify(self, model, obj, msg, using):
"""Dispatch PostgreSQL async NOTIFY."""
col_user_ids = {}
mod_name = model_name(model)
if mod_name.split('.', 1)[0] in ('migrations', 'dddp'):
return # never send migration or DDP internal models
col_sub_ids = collections.defaultdict(set)
for sub in Subscription.objects.filter(
collections__model_name=mod_name,
).prefetch_related('collections'):
for qs, col in (
self.qs_and_collection(qs)
for qs
in self._registry[
'publication/%s/' % sub.publication
].get_queries(*sub.params)
):
# check if obj is an instance of the model for the queryset
if qs.model is not model:
continue # wrong model on queryset
# check if obj is included in this subscription
if not qs.filter(pk=obj.pk).exists():
continue # subscription doesn't include this obj
# filter qs using user_rel paths on collection
# retreieve list of allowed users via colleciton
try:
user_ids = col_user_ids[col.__class__]
except KeyError:
user_ids = col_user_ids[col.__class__] = \
col.user_ids_for_object(obj)
if user_ids is None:
pass # unrestricted collection, anyone permitted to see.
# check if user is in permitted list of users
if user_ids is not None:
pass # unrestricted collection, anyone permitted to see.
elif sub.user_id in user_ids:
continue # not for this user
col_sub_ids[col].add(sub.sub_id)
if not col_sub_ids:
get_meteor_id(obj) # force creation of meteor ID using randomSeed
return # no subscribers for this object, nothing more to do.
for col, sub_ids in col_sub_ids.items():
payload = col.obj_change_as_msg(obj, msg)
payload['_sub_ids'] = sorted(sub_ids)
cursor = connections[using].cursor()
cursor.execute(
'NOTIFY "%s", %%s' % col.name,
[
ejson.dumps(payload),
],
)
API = DDP()

View file

@ -6,87 +6,20 @@ from django.apps import AppConfig
from django.core import serializers
from django.conf import settings, ImproperlyConfigured
from django.db import DatabaseError
from django.db.models import signals, Field
from django.db.models import signals
from dddp.api import collection_name
from dddp.notify import send_notify
from dddp import autodiscover
from dddp.models import Connection
IN_MIGRATION = False
def on_save(sender, **kwargs):
"""Post-save signal handler."""
global IN_MIGRATION
if IN_MIGRATION: return
send_notify(
model=sender,
obj=kwargs['instance'],
msg=kwargs['created'] and 'added' or 'changed',
using=kwargs['using'],
)
def on_delete(sender, **kwargs):
"""Post-delete signal handler."""
global IN_MIGRATION
if IN_MIGRATION: return
send_notify(
model=sender,
obj=kwargs['instance'],
msg='removed',
using=kwargs['using'],
)
def on_m2m_changed(sender, **kwargs):
"""M2M-changed signal handler."""
global IN_MIGRATION
if IN_MIGRATION: return
# See https://docs.djangoproject.com/en/1.7/ref/signals/#m2m-changed
if kwargs['action'] in (
'post_add',
'post_remove',
'post_clear',
):
if kwargs['reverse'] == False:
objs = [kwargs['instance']]
model = objs[0].__class__
else:
model = kwargs['model']
objs = model.objects.filter(pk__in=kwargs['pk_set'])
for obj in objs:
send_notify(
model=model,
obj=obj,
msg='changed',
using=kwargs['using'],
)
def on_pre_migrate(sender, **kwargs):
global IN_MIGRATION
IN_MIGRATION = True
def on_post_migrate(sender, **kwargs):
"""Post-migrate signal handler."""
global IN_MIGRATION
IN_MIGRATION = False
try:
Connection.objects.all().delete()
except DatabaseError:
pass
class DjangoDDPConfig(AppConfig):
"""Django app config for django-ddp."""
api = None
name = 'dddp'
verbose_name = 'Django DDP'
_in_migration = False
def ready(self):
"""Initialisation for django-ddp (setup lookups and signal handlers)."""
@ -100,7 +33,5 @@ class DjangoDDPConfig(AppConfig):
alias, conf['backend'],
)
)
signals.post_save.connect(on_save)
signals.post_delete.connect(on_delete)
signals.m2m_changed.connect(on_m2m_changed)
signals.post_migrate.connect(on_post_migrate)
self.api = autodiscover()
self.api.ready()

View file

@ -62,9 +62,9 @@ def ddpp_sockjs_info(environ, start_response):
def serve(listen, debug=False):
"""Spawn greenlets for handling websockets and PostgreSQL calls."""
import signal
from django.apps import apps
from django.db import connection, close_old_connections
from django.utils.module_loading import import_string
from dddp import autodiscover
from dddp.postgres import PostgresGreenlet
from dddp.websocket import DDPWebSocketApplication
import gevent
@ -74,8 +74,8 @@ def serve(listen, debug=False):
close_old_connections()
# setup PostgresGreenlet to multiplex DB calls
postgres = PostgresGreenlet(connection, debug=debug)
DDPWebSocketApplication.pgworker = postgres
pgworker = PostgresGreenlet(connection, debug=debug)
DDPWebSocketApplication.pgworker = pgworker
# use settings.WSGI_APPLICATION or fallback to default Django WSGI app
from django.conf import settings
@ -108,7 +108,7 @@ def serve(listen, debug=False):
def killall(*args, **kwargs):
"""Kill all green threads."""
postgres.stop()
pgworker.stop()
for webserver in webservers:
webserver.stop()
@ -117,18 +117,19 @@ def serve(listen, debug=False):
gevent.signal(signal.SIGQUIT, killall)
print('=> Discovering DDP endpoints...')
ddp = autodiscover()
ddp.pgworker = postgres
api = apps.get_app_config('dddp').api
api.pgworker = pgworker
DDPWebSocketApplication.api = api
print(
'\n'.join(
' %s' % api_path
for api_path
in sorted(ddp.api_path_map())
in sorted(api.api_path_map())
),
)
# start greenlets
postgres.start()
pgworker.start()
print('=> Started PostgresGreenlet.')
web_threads = [
gevent.spawn(webserver.serve_forever)
@ -141,8 +142,8 @@ def serve(listen, debug=False):
for host, port in listen:
print('=> App running at: http://%s:%d/' % (host, port))
gevent.joinall(web_threads)
postgres.stop()
gevent.joinall([postgres])
pgworker.stop()
gevent.joinall([pgworker])
def addr(val, default_port=8000, defualt_host='localhost'):

View file

@ -1,161 +0,0 @@
"""Django DDP WebSocket service."""
from __future__ import print_function, absolute_import
import collections
import inspect
import optparse
import random
import signal
import socket
from django.core.management import get_commands, load_command_class
from django.core.management.base import BaseCommand
from django.db import connection, close_old_connections
from django.utils.module_loading import import_string
import ejson
import gevent
import gevent.monkey
import gevent.queue
import gevent.select
import geventwebsocket
import psycogreen.gevent
from dddp import autodiscover
from dddp.postgres import PostgresGreenlet
from dddp.websocket import DDPWebSocketApplication
def ddpp_sockjs_xhr(environ, start_response):
"""Dummy method that doesn't handle XHR requests."""
start_response(
'404 Not found',
[
('Content-Type', 'text/plain; charset=UTF-8'),
(
'Access-Control-Allow-Origin',
'/'.join(environ['HTTP_REFERER'].split('/')[:3]),
),
('Access-Control-Allow-Credentials', 'true'),
# ('access-control-allow-credentials', 'true'),
('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0'),
('Connection', 'keep-alive'),
('Vary', 'Origin'),
],
)
yield 'No.'
def ddpp_sockjs_info(environ, start_response):
"""Inform client that WebSocket service is available."""
start_response(
'200 OK',
[
('Content-Type', 'application/json; charset=UTF-8'),
(
'Access-Control-Allow-Origin',
'/'.join(environ['HTTP_REFERER'].split('/')[:3]),
),
('Access-Control-Allow-Credentials', 'true'),
# ('access-control-allow-credentials', 'true'),
('Cache-Control', 'no-store, no-cache, must-revalidate, max-age=0'),
('Connection', 'keep-alive'),
('Vary', 'Origin'),
],
)
yield ejson.dumps(collections.OrderedDict([
('websocket', True),
('origins', [
'*:*',
]),
('cookie_needed', False),
('entropy', random.getrandbits(32)),
]))
app_name = get_commands()['runserver']
if isinstance(app_name, BaseCommand):
command_class = app_name
else:
command_class = load_command_class(app_name, 'runserver')
command_class = command_class.__class__
class Command(command_class):
"""Command to run DDP web service."""
args = 'HOST PORT'
help = 'Run DDP service'
def run(self, *args, **options):
"""Spawn greenlets for handling websockets and PostgreSQL calls."""
# shutdown existing connections, mokey patch stdlib for gevent.
close_old_connections()
gevent.monkey.patch_all()
psycogreen.gevent.patch_psycopg()
debug = int(options['verbosity']) > 1
# setup PostgresGreenlet to multiplex DB calls
postgres = PostgresGreenlet(connection, debug=debug)
DDPWebSocketApplication.pgworker = postgres
# use settings.WSGI_APPLICATION or fallback to default Django WSGI app
from django.conf import settings
wsgi_app = self.get_handler(*args, **options)
if hasattr(settings, 'WSGI_APPLICATION'):
wsgi_name = settings.WSGI_APPLICATION
else:
wsgi_name = str(wsgi_app.__class__)
resource = geventwebsocket.Resource({
r'/websocket': DDPWebSocketApplication,
r'^/sockjs/\d+/\w+/websocket$': DDPWebSocketApplication,
r'^/sockjs/\d+/\w+/xhr$': ddpp_sockjs_xhr,
r'^/sockjs/info$': ddpp_sockjs_info,
r'^/(?!(websocket|sockjs)/)': wsgi_app,
})
# setup WebSocketServer to dispatch web requests
host = self.addr
port = self.port
if port.isdigit():
port = int(port)
else:
port = socket.getservbyname(port)
webserver = geventwebsocket.WebSocketServer(
(host, port),
resource,
debug=debug,
)
def killall(*args, **kwargs):
"""Kill all green threads."""
postgres.stop()
webserver.stop()
# die gracefully with SIGINT or SIGQUIT
gevent.signal(signal.SIGINT, killall)
gevent.signal(signal.SIGQUIT, killall)
print('=> Discovering DDP endpoints...')
ddp = autodiscover()
ddp.pgworker = postgres
print(
'\n'.join(
' %s' % api_path
for api_path
in sorted(ddp.api_path_map())
),
)
# start greenlets
postgres.start()
print('=> Started PostgresGreenlet.')
web = gevent.spawn(webserver.serve_forever)
print('=> Started DDPWebSocketApplication.')
print('=> Started your app (%s).' % wsgi_name)
print('')
print('=> App running at: http://%s:%d/' % (host, port))
gevent.joinall([postgres, web])

View file

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('dddp', '0006_auto_20150428_2245'),
]
operations = [
migrations.RenameField(
model_name='subscriptioncollection',
old_name='collection_class',
new_name='collection_name',
),
migrations.RenameField(
model_name='subscriptioncollection',
old_name='name',
new_name='model_name',
),
]

View file

@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
class Migration(migrations.Migration):
dependencies = [
('dddp', '0007_auto_20150505_1302'),
]
operations = [
migrations.RemoveField(
model_name='subscription',
name='publication_class',
),
]

View file

@ -128,7 +128,6 @@ class Subscription(models.Model, object):
sub_id = models.CharField(max_length=17)
user = models.ForeignKey(settings.AUTH_USER_MODEL, blank=True, null=True)
publication = models.CharField(max_length=255)
publication_class = models.CharField(max_length=255)
params_ejson = models.TextField(default='{}')
class Meta(object):
@ -166,11 +165,13 @@ class SubscriptionCollection(models.Model):
"""Collections for a subscription."""
subscription = models.ForeignKey(Subscription, related_name='collections')
name = models.CharField(max_length=255)
collection_class = models.CharField(max_length=255)
model_name = models.CharField(max_length=255)
collection_name = models.CharField(max_length=255)
def __str__(self):
"""Human readable representation of colleciton for a subscription."""
return '%s' % (
self.name,
return u'%s \u200b %s (%s)' % (
self.subscription,
self.collection_name,
self.model_name,
)

View file

@ -1,76 +0,0 @@
"""Django DDP notification support."""
from __future__ import absolute_import
import collections
import ejson
from django.db import connections
from django.utils.module_loading import import_string
from dddp.api import collection_name
from dddp.models import get_meteor_id, Subscription
from dddp.msg import obj_change_as_msg
class ImportCache(collections.defaultdict):
@staticmethod
def __missing__(key):
return import_string(key)
_CLS_CACHE = ImportCache()
def send_notify(model, obj, msg, using):
"""Dispatch PostgreSQL async NOTIFY."""
user_cache = {}
col_name = collection_name(model)
if col_name == 'migrations.migration':
return # never send migration models.
if col_name.startswith('dddp.'):
return # don't send DDP internal models.
sub_ids = set()
for sub in Subscription.objects.filter(
collections__name=col_name,
):
pub = _CLS_CACHE[sub.publication_class]()
pub_queries = {
collection_name(qs.model): qs
for qs
in pub.get_queries(*sub.params)
if qs.model is model
}
for sub_col in sub.collections.filter(
name=col_name,
):
qs = pub_queries[sub_col.name]
col = _CLS_CACHE[sub_col.collection_class]()
# filter qs using user_rel paths on collection
try:
user_ids = user_cache[sub_col.collection_class]
except KeyError:
user_ids = user_cache.setdefault(
sub_col.collection_class, col.user_ids_for_object(obj)
)
if user_ids is None:
pass # unrestricted collection, anyone can see.
elif sub.user_id not in user_ids:
continue # not for this user
qs = col.get_queryset(qs)
if qs.filter(pk=obj.pk).exists():
sub_ids.add(sub.sub_id)
if not sub_ids:
get_meteor_id(obj) # force creation of meteor ID using randomSeed
return # no subscribers for this object, nothing more to do.
name, payload = obj_change_as_msg(obj, msg)
payload['_sub_ids'] = sorted(sub_ids)
cursor = connections[using].cursor()
cursor.execute(
'NOTIFY "%s", %%s' % name,
[
ejson.dumps(payload),
],
)

View file

@ -15,7 +15,6 @@ from django.core.handlers.wsgi import WSGIRequest
from django.db import connection, transaction
from dddp import THREAD_LOCAL as this, alea
from dddp.api import API
class MeteorError(Exception):
@ -95,6 +94,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
'pre1',
'pre2',
]
api = None
logger = None
pgworker = None
remote_addr = None
@ -303,13 +303,13 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
def recv_sub(self, id_, name, params):
"""DDP sub handler."""
API.sub(id_, name, *params)
self.api.sub(id_, name, *params)
recv_sub.err = 'Malformed subscription'
def recv_unsub(self, id_=None):
"""DDP unsub handler."""
if id_:
API.unsub(id_)
self.api.unsub(id_)
else:
self.reply('nosub')
@ -318,6 +318,6 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
if randomSeed is not None:
this.random_streams.random_seed = randomSeed
this.alea_random = alea.Alea(randomSeed)
API.method(method, params, id_)
self.api.method(method, params, id_)
self.reply('updated', methods=[id_])
recv_method.err = 'Malformed method invocation'