Merge branch 'develop' into feature/pub_query_refactor

Conflicts:
	dddp/__init__.py
	dddp/accounts/ddp.py
This commit is contained in:
Tyson Clugg 2015-08-28 14:12:25 +10:00
commit e213f55437
17 changed files with 870 additions and 330 deletions

View file

@ -1,6 +1,60 @@
Change Log
==========
0.12.2
------
* Set blank=True on AleaIdField, allowing adding items without inventing
IDs yourself.
0.12.1
------
* Add `AleaIdMixin` which provides `aid = AleaIdField(unique=True)` to
models.
* Use `AleaIdField(unique=True)` wherever possible when translating
between Meteor style identifiers and Django primary keys, reducing
round trips to the database and hence drastically improving
performance when such fields are available.
0.12.0
------
* Get path to `star.json` from view config (defined in your urls.py)
instead of from settings.
* Dropped `dddp.server.views`, use `dddp.views` instead.
0.11.0
------
* Support more than 8KB of change data by splitting large payloads into
multiple chunks.
0.10.2
------
* Add `Logs` publication that can be configured to emit logs via DDP
through the use of the `dddp.logging.DDPHandler` log handler.
* Add option to dddp daemon to provide a BackdoorServer (telnet) for
interactive debugging (REPL) at runtime.
0.10.1
------
* Bugfix dddp.accounts forgot_password feature.
0.10.0
------
* Stop processing request middleware upon connection - see
https://github.com/commoncode/django-ddp/commit/e7b38b89db5c4e252ac37566f626b5e9e1651a29
for rationale. Access to `this.request.user` is gone.
* Add `this.user` handling to dddp.accounts.
0.9.14
------
* Fix ordering of user added vs login ready in dddp.accounts
authentication methods.
0.9.13
------
* Add dddp.models.get_object_ids helper function.
* Add ObjectMappingMixini abstract model mixin providing
GenericRelation back to ObjectMapping model.
0.9.12
------
* Bugfix /app.model/schema helper method on collections to work with

View file

@ -103,6 +103,19 @@ Add ddp.py to your Django application:
[Book, Author, AllBooks, BooksByAuthorEmail]
)
Start the Django DDP service:
.. code:: sh
DJANGO_SETTINGS_MODULE=myproject.settings dddp
Using django-ddp as a secondary DDP connection (RAPID DEVELOPMENT)
------------------------------------------------------------------
Running in this manner allows rapid development through use of the hot
code push features provided by Meteor.
Connect your Meteor application to the Django DDP service:
.. code:: javascript
@ -118,19 +131,53 @@ Connect your Meteor application to the Django DDP service:
Django.subscribe('BooksByAuthorEmail', 'janet@evanovich.com');
}
Start the Django DDP service:
.. code:: sh
DJANGO_SETTINGS_MODULE=myproject.settings dddp
In a separate terminal, start Meteor (from within your meteor
application directory):
Start Meteor (from within your meteor application directory):
.. code:: sh
meteor
Using django-ddp as the primary DDP connection (RECOMMENDED)
------------------------------------------------------------
If you'd prefer to not have two DDP connections (one to Meteor and one
to django-ddp) you can set the `DDP_DEFAULT_CONNECTION_URL` environment
variable to use the specified URL as the primary DDP connection in
Meteor. When doing this, you won't need to use `DDP.connect(...)` or
specify `{connection: Django}` on your collections. Running with
django-ddp as the primary connection is recommended, and indeed required
if you wish to use `dddp.accounts` to provide authentication using
`django.contrib.auth` to your meteor app.
.. code:: sh
DDP_DEFAULT_CONNECTION_URL=http://localhost:8000/ meteor
Serving your Meteor applications from django-ddp
------------------------------------------------
First, you will need to build your meteor app into a directory (examples
below assume target directory named `myapp`):
.. code:: sh
meteor build ../myapp
Then, add a MeteorView to your urls.py:
.. code:: python
from dddp.views import MeteorView
urlpatterns = patterns(
url('^(?P<path>/.*)$', MeteorView.as_view(
json_path=os.path.join(
settings.PROJ_ROOT, 'myapp', 'bundle', 'star.json',
),
),
)
Adding API endpoints (server method definitions)
------------------------------------------------

View file

@ -48,28 +48,27 @@ class ThreadLocal(local):
_init_done = False
def __init__(self, **default_factories):
def __init__(self):
"""Create new thread storage instance."""
if self._init_done:
raise SystemError('__init__ called too many times')
self._init_done = True
self._default_factories = default_factories
def __getattr__(self, name):
"""Create missing attributes using default factories."""
try:
factory = self._default_factories[name]
factory = THREAD_LOCAL_FACTORIES[name]
except KeyError:
raise AttributeError(name)
obj = factory()
setattr(self, name, obj)
return obj
return self.get(name, factory)
def get(self, name, factory, *factory_args, **factory_kwargs):
"""Get attribute, creating if required using specified factory."""
if not hasattr(self, name):
update_thread_local = getattr(factory, 'update_thread_local', True)
if (not update_thread_local) or (not hasattr(self, name)):
obj = factory(*factory_args, **factory_kwargs)
setattr(self, name, obj)
if update_thread_local:
setattr(self, name, obj)
return obj
return getattr(self, name)
@ -139,33 +138,21 @@ def tagged(tag_class, obj):
)(*[arg for arg in [obj] if arg is not None])
class UserIdBase(object):
"""Tag class for user_id."""
pass
def user_id():
"""Return tagged reference to this.request.user.id"""
return tagged(UserIdBase, THREAD_LOCAL.request.user.pk)
THREAD_LOCAL = ThreadLocal(
alea_random=alea.Alea,
random_streams=RandomStreams,
serializer=serializer_factory,
user_id=user_id,
)
THREAD_LOCAL_FACTORIES = {
'alea_random': alea.Alea,
'random_streams': RandomStreams,
'serializer': serializer_factory,
}
THREAD_LOCAL = ThreadLocal()
METEOR_ID_CHARS = u'23456789ABCDEFGHJKLMNPQRSTWXYZabcdefghijkmnopqrstuvwxyz'
def meteor_random_id(name=None):
def meteor_random_id(name=None, length=17):
if name is None:
stream = THREAD_LOCAL.alea_random
else:
stream = THREAD_LOCAL.random_streams[name]
return stream.random_string(17, METEOR_ID_CHARS)
return stream.random_string(length, METEOR_ID_CHARS)
def autodiscover():

View file

@ -7,23 +7,30 @@ See http://docs.meteor.com/#/full/accounts_api for details of each method.
"""
from binascii import Error
import collections
import datetime
import hashlib
from ejson import loads, dumps
from django.conf import settings
from django.contrib import auth
from django.contrib.sessions.backends.db import SessionStore
from django.contrib.auth.signals import user_login_failed
from django.contrib.auth.signals import (
user_login_failed, user_logged_in, user_logged_out,
)
from django.dispatch import Signal
from django.utils import timezone
from dddp import THREAD_LOCAL as this, ADDED, REMOVED
from dddp import (
THREAD_LOCAL_FACTORIES, THREAD_LOCAL as this, ADDED, REMOVED,
meteor_random_id,
)
from dddp.models import get_meteor_id, get_object, Subscription
from dddp.api import API, APIMixin, api_endpoint, Collection, Publication
from dddp.websocket import MeteorError
# pylint dones't like lower case attribute names on modules, but it's the normal
# thing to do for Django signal names. --> pylint: disable=C0103
create_user = Signal(providing_args=['request', 'params'])
password_changed = Signal(providing_args=['request', 'user'])
forgot_password = Signal(providing_args=['request', 'user', 'token', 'expiry'])
@ -36,8 +43,71 @@ class HashPurpose(object):
PASSWORD_RESET = 'password_reset'
RESUME_LOGIN = 'resume_login'
CHANGE_EMAIL = 'change_email'
CREATE_USER = 'create_user'
HASH_DAYS_VALID = {
HashPurpose.PASSWORD_RESET: int(
getattr(
# keep possible attack window short to reduce chance of account
# takeover through later discovery of password reset email message.
settings, 'DDP_PASSWORD_RESET_DAYS_VALID', '1',
)
),
HashPurpose.RESUME_LOGIN: int(
getattr(
# balance security and useability by allowing users to resume their
# logins within a reasonable time, but not forever.
settings, 'DDP_LOGIN_RESUME_DAYS_VALID', '10',
)
),
}
def iter_auth_hashes(user, purpose, days_valid):
"""
Generate auth tokens tied to user and specified purpose.
The hash expires at midnight on the day of today + days_valid, such that
when days_valid=1 you get *at least* 24 hours to use the token.
"""
today = timezone.now().date()
for day in range(days_valid + 1):
yield hashlib.sha1(
'%s:%s:%s:%s:%s' % (
today - datetime.timedelta(days=day),
user.password,
purpose,
user.pk,
settings.SECRET_KEY,
),
).hexdigest()
def get_auth_hash(user, purpose):
"""Generate a user hash for a particular purpose."""
return iter_auth_hashes(user, purpose, days_valid=1).next()
def calc_expiry_time(days_valid):
"""Return specific time an auth_hash will expire."""
return (
timezone.now() + datetime.timedelta(days=days_valid + 1)
).replace(hour=0, minute=0, second=0, microsecond=0)
def get_user_token(user, purpose, days_valid):
"""Return login token info for given user."""
token = ''.join(
dumps([
user.get_username(),
get_auth_hash(user, purpose),
]).encode('base64').split('\n')
)
return {
'id': get_meteor_id(user),
'token': token,
'tokenExpires': calc_expiry_time(days_valid),
}
class Users(Collection):
@ -99,7 +169,7 @@ class Users(Collection):
return data
@staticmethod
def deserialize_profile(user, profile, key_prefix='', pop=False):
def deserialize_profile(profile, key_prefix='', pop=False):
"""De-serialize user profile fields into concrete model fields."""
result = {}
if pop:
@ -122,12 +192,14 @@ class Users(Collection):
@api_endpoint
def update(self, selector, update, options=None):
"""Update user data."""
# we're ignoring the `options` argument at this time
del options
user = get_object(
self.model, selector['_id'],
pk=this.request.user.pk,
pk=this.user_id,
)
profile_update = self.deserialize_profile(
user, update['$set'], key_prefix='profile.', pop=True,
update['$set'], key_prefix='profile.', pop=True,
)
if len(update['$set']) != 0:
raise MeteorError(400, 'Invalid update fields: %r')
@ -137,15 +209,22 @@ class Users(Collection):
user.save()
class LoginPublication(Publication):
class LoginServiceConfiguration(Publication):
"""Meteor Accounts emulation."""
"""Published list of authenitcation providers and their configuration."""
name = 'meteor.loginServiceConfiguration'
def get_queries(self):
"""Return queries for each set of data the user may see."""
yield Users.query(pk=this.request.user.pk)
queries = []
class LoggedInUser(Publication):
"""Meteor auto publication for showing logged in user."""
queries = [
(Users.model.objects.all(), 'users'),
]
class Auth(APIMixin):
@ -154,8 +233,22 @@ class Auth(APIMixin):
api_path_prefix = '' # auth endpoints don't have a common prefix
user_model = auth.get_user_model()
user_id = None
user_ddp_id = None
def update_subs(self, new_user_id):
def user_factory(self):
"""Retrieve the current user (or None) from the database."""
if this.user_id is None:
return None
return self.user_model.objects.get(pk=this.user_id)
user_factory.update_thread_local = False
def ready(self):
"""Called after AppConfig.ready()."""
THREAD_LOCAL_FACTORIES['user'] = self.user_factory
@staticmethod
def update_subs(new_user_id):
"""Update subs to send added/removed for collections with user_rel."""
for sub in Subscription.objects.filter(connection=this.ws.connection):
params = loads(sub.params_ejson)
@ -163,7 +256,7 @@ class Auth(APIMixin):
# calculate the querysets prior to update
pre = collections.OrderedDict([
(col, qs) for col, qs
(col, query) for col, query
in API.sub_unique_objects(sub, params, pub)
])
@ -173,30 +266,32 @@ class Auth(APIMixin):
# calculate the querysets after the update
post = collections.OrderedDict([
(col, qs) for col, qs
(col, query) for col, query
in API.sub_unique_objects(sub, params, pub)
])
# first pass, send `added` for objs unique to `post`
for col_post, qs in post.items():
for col_post, query in post.items():
try:
qs_pre = pre[col_post]
qs = qs.exclude(pk__in=qs_pre.order_by().values('pk'))
query = query.exclude(pk__in=qs_pre.order_by().values('pk'))
except KeyError:
# collection not included pre-auth, everything is added.
pass
for obj in qs:
for obj in query:
this.ws.send(col_post.obj_change_as_msg(obj, ADDED))
# second pass, send `removed` for objs unique to `pre`
for col_pre, qs in pre.items():
for col_pre, query in pre.items():
try:
qs_post = post[col_pre]
qs = qs.exclude(pk__in=qs_post.order_by().values('pk'))
query = query.exclude(
pk__in=qs_post.order_by().values('pk'),
)
except KeyError:
# collection not included post-auth, everything is removed.
pass
for obj in qs:
for obj in query:
this.ws.send(col_pre.obj_change_as_msg(obj, REMOVED))
@staticmethod
@ -209,55 +304,24 @@ class Auth(APIMixin):
)
raise MeteorError(403, 'Authentication failed.')
@staticmethod
def get_auth_hash(user, purpose):
"""Generate a user hash for a particular purpose."""
return hashlib.sha1(
':'.join([
settings.SECRET_KEY,
user.get_session_auth_hash(),
purpose,
])
).hexdigest()
@classmethod
def validated_user_and_session(cls, token, purpose):
"""Resolve and validate auth token, returns user and session objects."""
def validated_user(cls, token, purpose, days_valid):
"""Resolve and validate auth token, returns user object."""
try:
username, session_key, auth_hash = loads(token.decode('base64'))
username, auth_hash = loads(token.decode('base64'))
except (ValueError, Error):
cls.auth_failed(token=token)
try:
user = cls.user_model.objects.get(**{
cls.user_model.USERNAME_FIELD: username,
'is_active': True,
})
user.backend = 'django.contrib.auth.backends.ModelBackend'
except cls.user_model.DoesNotExist:
cls.auth_failed(username=username, token=token)
if cls.get_auth_hash(user, purpose) != auth_hash:
if auth_hash not in iter_auth_hashes(user, purpose, days_valid):
cls.auth_failed(username=username, token=token)
session = SessionStore(
session_key=session_key,
)
if session.get_expiry_date() <= timezone.now():
cls.auth_failed(username=username, token=token)
return (user, session)
@classmethod
def get_user_token(cls, user, session_key, expiry_date, purpose):
"""Return login token info for given user."""
token = ''.join(
dumps([
user.get_username(),
session_key,
cls.get_auth_hash(user, purpose),
]).encode('base64').split('\n')
)
return {
'id': get_meteor_id(user),
'token': token,
'tokenExpires': expiry_date,
}
return user
@staticmethod
def check_secure():
@ -344,20 +408,40 @@ class Auth(APIMixin):
user = auth.authenticate(
username=user.get_username(), password=params['password'],
)
auth.login(this.request, user)
self.update_subs(user.pk)
return self.get_user_token(
user=user,
session_key=this.request.session.session_key,
expiry_date=this.request.session.get_expiry_date(),
purpose=HashPurpose.CREATE_USER,
self.do_login(user)
return get_user_token(
user=user, purpose=HashPurpose.RESUME_LOGIN,
days_valid=HASH_DAYS_VALID[HashPurpose.RESUME_LOGIN],
)
def do_login(self, user):
"""Login a user."""
this.user_id = user.pk
this.user_ddp_id = get_meteor_id(user)
# silent subscription (sans sub/nosub msg) to LoggedInUser pub
this.user_sub_id = meteor_random_id()
API.do_sub(this.user_sub_id, 'LoggedInUser', silent=True)
self.update_subs(user.pk)
user_logged_in.send(
sender=user.__class__, request=this.request, user=user,
)
def do_logout(self):
"""Logout a user."""
# silent unsubscription (sans sub/nosub msg) from LoggedInUser pub
API.do_unsub(this.user_sub_id, silent=True)
del this.user_sub_id
self.update_subs(None)
user_logged_out.send(
sender=self.user_model, request=this.request, user=this.user,
)
this.user_id = None
this.user_ddp_id = None
@api_endpoint
def logout(self):
"""Logout current user."""
auth.logout(this.request)
self.update_subs(None)
self.do_logout()
@api_endpoint
def login(self, params):
@ -381,14 +465,10 @@ class Auth(APIMixin):
if user is not None:
# the password verified for the user
if user.is_active:
auth.login(this.request, user)
self.update_subs(user.pk)
this.request.session.save()
return self.get_user_token(
user=user,
session_key=this.request.session.session_key,
expiry_date=this.request.session.get_expiry_date(),
purpose=HashPurpose.RESUME_LOGIN,
self.do_login(user)
return get_user_token(
user=user, purpose=HashPurpose.RESUME_LOGIN,
days_valid=HASH_DAYS_VALID[HashPurpose.RESUME_LOGIN],
)
# Call to `authenticate` was unable to verify the username and password.
@ -408,26 +488,27 @@ class Auth(APIMixin):
# never allow insecure login
self.check_secure()
# pull the username, session_key and auth_hash from the token
user, session = self.validated_user_and_session(
# pull the username and auth_hash from the token
user = self.validated_user(
params['resume'], purpose=HashPurpose.RESUME_LOGIN,
days_valid=HASH_DAYS_VALID[HashPurpose.RESUME_LOGIN],
)
auth.login(this.request, user)
self.update_subs(user.pk)
this.request.session.save()
return self.get_user_token(
user=user,
session_key=session.session_key,
expiry_date=session.get_expiry_date(),
purpose=HashPurpose.RESUME_LOGIN,
self.do_login(user)
return get_user_token(
user=user, purpose=HashPurpose.RESUME_LOGIN,
days_valid=HASH_DAYS_VALID[HashPurpose.RESUME_LOGIN],
)
@api_endpoint('changePassword')
def change_password(self, old_password, new_password):
"""Change password."""
try:
user = this.user
except self.user_model.DoesNotExist:
self.auth_failed()
user = auth.authenticate(
username=this.request.user.get_username(),
username=user.get_username(),
password=self.get_password(old_password),
)
if user is None:
@ -453,11 +534,10 @@ class Auth(APIMixin):
except self.user_model.DoesNotExist:
self.auth_failed()
expiry_date = this.request.session.get_expiry_date()
token = self.get_user_token(
user=user, session_key=this.request.session.session_key,
expiry_date=expiry_date,
purpose=HashPurpose.PASSWORD_RESET,
days_valid = HASH_DAYS_VALID[HashPurpose.PASSWORD_RESET]
token = get_user_token(
user=user, purpose=HashPurpose.PASSWORD_RESET,
days_valid=days_valid,
)
forgot_password.send(
@ -465,20 +545,20 @@ class Auth(APIMixin):
user=user,
token=token,
request=this.request,
expiry_date=expiry_date,
expiry_date=calc_expiry_time(days_valid),
)
@api_endpoint('resetPassword')
def reset_password(self, token, new_password):
"""Reset password using a token received in email then logs user in."""
user, _ = self.validated_user_and_session(
user = self.validated_user(
token, purpose=HashPurpose.PASSWORD_RESET,
days_valid=HASH_DAYS_VALID[HashPurpose.PASSWORD_RESET],
)
user.set_password(new_password)
user.save()
auth.login(this.request, user)
self.update_subs(user.pk)
return {"userId": get_meteor_id(this.request.user)};
self.do_login(user)
return {"userId": this.user_ddp_id}
API.register([Users, LoginPublication, Auth])
API.register([Users, LoginServiceConfiguration, LoggedInUser, Auth])

View file

@ -5,6 +5,7 @@ from __future__ import absolute_import, unicode_literals, print_function
import collections
from copy import deepcopy
import traceback
import uuid
# requirements
import dbarray
@ -29,7 +30,9 @@ from dddp import (
AlreadyRegistered, THREAD_LOCAL as this, ADDED, CHANGED, REMOVED,
UserIdBase,
)
from dddp.models import Connection, Subscription, get_meteor_id, get_meteor_ids
from dddp.models import (
AleaIdField, Connection, Subscription, get_meteor_id, get_meteor_ids,
)
XMIN = {'select': {'xmin': "'xmin'"}}
@ -163,6 +166,10 @@ class APIMixin(object):
"""Return API endpoint for given api_path."""
return self.api_path_map()[api_path]
def ready(self):
"""Initialisation (setup lookups and signal handlers)."""
pass
def model_name(model):
"""Return model name given model class."""
@ -484,6 +491,15 @@ class Collection(APIMixin):
)
elif isinstance(field, django.contrib.postgres.fields.ArrayField):
fields[field.name] = field.to_python(fields.pop(field.name))
elif (
isinstance(field, AleaIdField)
) and (
not field.null
) and (
field.name == 'aid'
):
# This will be sent as the `id`, don't send it in `fields`.
fields.pop(field.name)
for field in meta.local_many_to_many:
fields['%s_ids' % field.name] = get_meteor_ids(
field.rel.to, fields.pop(field.name),
@ -648,30 +664,37 @@ class DDP(APIMixin):
@api_endpoint
def sub(self, id_, name, *params):
"""Create subscription, send matched objects that haven't been sent."""
return self.do_sub(id_, name, False, *params)
def do_sub(self, id_, name, silent, *params):
"""Subscribe the current thread to the specified publication."""
try:
pub = self.get_pub_by_name(name)
except KeyError:
this.send({
'msg': 'nosub',
'error': {
'error': 404,
'errorType': 'Meteor.Error',
'message': 'Subscription not found [404]',
'reason': 'Subscription not found',
},
})
if not silent:
this.send({
'msg': 'nosub',
'error': {
'error': 404,
'errorType': 'Meteor.Error',
'message': 'Subscription not found [404]',
'reason': 'Subscription not found',
},
})
return
sub, created = Subscription.objects.get_or_create(
connection_id=this.ws.connection.pk,
sub_id=id_,
user_id=this.request.user.pk,
user_id=getattr(this, 'user_id', None),
defaults={
'publication': pub.name,
'params_ejson': ejson.dumps(params),
},
)
this.subs.setdefault(sub.publication, set()).add(sub.pk)
if not created:
this.send({'msg': 'ready', 'subs': [id_]})
if not silent:
this.send({'msg': 'ready', 'subs': [id_]})
return
# re-read from DB so we can get transaction ID (xmin)
sub = Subscription.objects.extra(**XMIN).get(pk=sub.pk)
@ -682,29 +705,55 @@ class DDP(APIMixin):
model_name=model_name(qs.model),
collection_name=col.name,
)
meteor_ids = get_meteor_ids(
qs.model, qs.values_list('pk', flat=True),
)
if isinstance(col.model._meta.pk, AleaIdField):
meteor_ids = None
elif len([
field
for field
in col.model._meta.local_fields
if (
isinstance(field, AleaIdField)
) and (
field.unique
) and (
not field.null
)
]) == 1:
meteor_ids = None
else:
meteor_ids = get_meteor_ids(
qs.model, qs.values_list('pk', flat=True),
)
for obj in qs:
payload = col.obj_change_as_msg(obj, ADDED, meteor_ids)
this.send(payload)
this.send({'msg': 'ready', 'subs': [id_]})
if not silent:
this.send({'msg': 'ready', 'subs': [id_]})
@api_endpoint
def unsub(self, id_):
"""Remove a subscription."""
self.do_unsub(id_, False)
def do_unsub(self, id_, silent):
"""Unsubscribe the current thread from the specified subscription id."""
sub = Subscription.objects.get(
connection=this.ws.connection, sub_id=id_,
)
for col, qs in self.sub_unique_objects(sub):
meteor_ids = get_meteor_ids(
qs.model, qs.values_list('pk', flat=True),
)
if isinstance(col.model._meta.pk, AleaIdField):
meteor_ids = None
else:
meteor_ids = get_meteor_ids(
qs.model, qs.values_list('pk', flat=True),
)
for obj in qs:
payload = col.obj_change_as_msg(obj, REMOVED, meteor_ids)
this.send(payload)
this.subs[sub.publication].remove(sub.pk)
sub.delete()
this.send({'msg': 'nosub', 'id': id_})
if not silent:
this.send({'msg': 'nosub', 'id': id_})
@api_endpoint
def method(self, method, params, id_):
@ -712,7 +761,17 @@ class DDP(APIMixin):
try:
handler = self.api_path_map()[method]
except KeyError:
this.error('Unknown method: %s' % method)
print('Unknown method: %s %r' % (method, params))
this.send({
'msg': 'result',
'id': id_,
'error': {
'error': 404,
'errorType': 'Meteor.Error',
'message': 'Unknown method: %s %r' % (method, params),
'reason': 'Method not found',
},
})
return
params_repr = repr(params)
try:
@ -776,6 +835,9 @@ class DDP(APIMixin):
signals.post_save.connect(self.on_post_save)
signals.post_delete.connect(self.on_post_delete)
signals.m2m_changed.connect(self.on_m2m_changed)
# call ready on each registered API endpoint
for api_provider in self.api_providers:
api_provider.ready()
def on_pre_migrate(self, sender, **kwargs):
"""Pre-migrate signal handler."""
@ -939,13 +1001,32 @@ class DDP(APIMixin):
if my_connection_id in connection_ids:
# msg must go to connection that initiated the change
payload['_tx_id'] = this.ws.get_tx_id()
# header is sent in every payload
header = {
'uuid': uuid.uuid1().int, # UUID1 should be unique
'seq': 1, # increments for each 8KB chunk
'fin': 0, # zero if more chunks expected, 1 if last chunk.
}
data = ejson.dumps(payload)
cursor = connections[using].cursor()
cursor.execute(
'NOTIFY "ddp", %s',
[
ejson.dumps(payload),
],
)
while data:
hdr = ejson.dumps(header)
# use all available payload space for chunk
max_len = 8000 - len(hdr) - 100
# take a chunk from data
chunk, data = data[:max_len], data[max_len:]
if not data:
# last chunk, set fin=1.
header['fin'] = 1
hdr = ejson.dumps(header)
# print('NOTIFY: %s' % hdr)
cursor.execute(
'NOTIFY "ddp", %s',
[
'%s|%s' % (hdr, chunk), # pipe separates hdr|chunk.
],
)
header['seq'] += 1 # increment sequence.
API = DDP()

22
dddp/ddp.py Normal file
View file

@ -0,0 +1,22 @@
from dddp import THREAD_LOCAL as this
from dddp.api import API, Publication
from django.contrib import auth
class Logs(Publication):
users = auth.get_user_model()
def get_queries(self):
user_pk = getattr(this, 'user_id', False)
if user_pk:
if self.users.objects.filter(
pk=user_pk,
is_active=True,
is_superuser=True,
).exists():
return []
raise ValueError('User not permitted.')
API.register([Logs])

38
dddp/logging.py Normal file
View file

@ -0,0 +1,38 @@
"""Django DDP logging helpers."""
from __future__ import absolute_import, print_function
import logging
from dddp import THREAD_LOCAL as this, meteor_random_id, ADDED
class DDPHandler(logging.Handler):
"""Logging handler that streams log events via DDP to the current client."""
def __init__(self, *args, **kwargs):
self.logger = logging.getLogger('django.db.backends')
self.logger.info('Test')
super(DDPHandler, self).__init__(*args, **kwargs)
def emit(self, record):
"""Emit a formatted log record via DDP."""
if getattr(this, 'subs', {}).get('Logs', False):
this.send({
'msg': ADDED,
'collection': 'logs',
'id': meteor_random_id('/collection/logs'),
'fields': {
# 'name': record.name,
# 'levelno': record.levelno,
'levelname': record.levelname,
# 'pathname': record.pathname,
# 'lineno': record.lineno,
'msg': record.msg,
'args': record.args,
# 'exc_info': record.exc_info,
# 'funcName': record.funcName,
},
})

View file

@ -60,7 +60,7 @@ def ddpp_sockjs_info(environ, start_response):
]))
def serve(listen, debug=False, **ssl_args):
def serve(listen, debug=False, verbosity=1, debug_port=0, **ssl_args):
"""Spawn greenlets for handling websockets and PostgreSQL calls."""
import signal
from django.apps import apps
@ -99,7 +99,7 @@ def serve(listen, debug=False, **ssl_args):
)
# setup WebSocketServer to dispatch web requests
webservers = [
servers = [
geventwebsocket.WebSocketServer(
(host, port),
resource,
@ -113,8 +113,8 @@ def serve(listen, debug=False, **ssl_args):
def killall(*args, **kwargs):
"""Kill all green threads."""
pgworker.stop()
for webserver in webservers:
webserver.stop()
for server in servers:
server.stop()
# die gracefully with SIGINT or SIGQUIT
gevent.signal(signal.SIGINT, killall)
@ -133,19 +133,38 @@ def serve(listen, debug=False, **ssl_args):
)
# start greenlets
if debug_port:
from gevent.backdoor import BackdoorServer
servers.append(
BackdoorServer(
('127.0.0.1', debug_port),
banner='Django DDP',
locals={
'servers': servers,
'pgworker': pgworker,
'killall': killall,
'api': api,
'resource': resource,
'settings': settings,
'wsgi_app': wsgi_app,
'wsgi_name': wsgi_name,
},
)
)
pgworker.start()
print('=> Started PostgresGreenlet.')
web_threads = [
gevent.spawn(webserver.serve_forever)
for webserver
in webservers
threads = [
gevent.spawn(server.serve_forever)
for server
in servers
]
print('=> Started DDPWebSocketApplication.')
print('=> Started your app (%s).' % wsgi_name)
print('')
for host, port in listen:
print('=> App running at: http://%s:%d/' % (host, port))
gevent.joinall(web_threads)
gevent.joinall(threads)
pgworker.stop()
gevent.joinall([pgworker])
@ -190,6 +209,14 @@ def main():
import argparse
parser = argparse.ArgumentParser(description=__doc__)
django = parser.add_argument_group('Django Options')
django.add_argument(
'--verbosity', '-v', metavar='VERBOSITY', dest='verbosity', type=int,
default=1,
)
django.add_argument(
'--debug-port', metavar='DEBUG_PORT', dest='debug_port', type=int,
default=0,
)
django.add_argument(
'--settings', metavar='SETTINGS', dest='settings',
help="The Python path to a settings module, e.g. "
@ -218,8 +245,10 @@ def main():
os.environ['DJANGO_SETTINGS_MODULE'] = namespace.settings
serve(
namespace.listen or [Addr('localhost', 8000)],
debug_port=namespace.debug_port,
keyfile=namespace.keyfile,
certfile=namespace.certfile,
verbosity=namespace.verbosity,
)

View file

@ -0,0 +1,20 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import models, migrations
import dddp.models
class Migration(migrations.Migration):
dependencies = [
('dddp', '0008_remove_subscription_publication_class'),
]
operations = [
migrations.AlterField(
model_name='connection',
name='connection_id',
field=dddp.models.AleaIdField(max_length=17, verbose_name=b'Alea ID'),
),
]

View file

@ -1,4 +1,7 @@
import functools
from django.db import migrations
from django.db.migrations.operations.base import Operation
from dddp.models import AleaIdField, get_meteor_id
class TruncateOperation(Operation):
@ -35,3 +38,48 @@ class TruncateOperation(Operation):
def describe(self):
"""Describe what the operation does in console output."""
return "Truncate tables"
def set_default_forwards(app_name, operation, apps, schema_editor):
"""Set default value for AleaIdField."""
model = apps.get_model(app_name, operation.model_name)
for obj_pk in model.objects.values_list('pk', flat=True):
model.objects.filter(pk=obj_pk).update(**{
operation.name: get_meteor_id(model, obj_pk),
})
def set_default_reverse(app_name, operation, apps, schema_editor):
"""Unset default value for AleaIdField."""
model = apps.get_model(app_name, operation.model_name)
for obj_pk in model.objects.values_list('pk', flat=True):
get_meteor_id(model, obj_pk)
class DefaultAleaIdOperations(object):
def __init__(self, app_name):
self.app_name = app_name
def __add__(self, operations):
default_operations = []
for operation in operations:
if not isinstance(operation, migrations.AlterField):
continue
if not isinstance(operation.field, AleaIdField):
continue
if operation.name != 'aid':
continue
if operation.field.null:
continue
default_operations.append(
migrations.RunPython(
code=functools.partial(
set_default_forwards, self.app_name, operation,
),
reverse_code=functools.partial(
set_default_reverse, self.app_name, operation,
),
)
)
return default_operations + operations

View file

@ -2,8 +2,10 @@
from __future__ import absolute_import
import collections
import os
from django.db import models, transaction
from django.db.models.fields import NOT_PROVIDED
from django.conf import settings
from django.contrib.contenttypes.fields import (
GenericRelation, GenericForeignKey,
@ -15,7 +17,6 @@ import ejson
from dddp import meteor_random_id
@transaction.atomic
def get_meteor_id(obj_or_model, obj_pk=None):
"""Return an Alea ID for the given object."""
if obj_or_model is None:
@ -26,10 +27,39 @@ def get_meteor_id(obj_or_model, obj_pk=None):
if model is ObjectMapping:
# this doesn't make sense - raise TypeError
raise TypeError("Can't map ObjectMapping instances through self.")
if obj_or_model is not model and obj_pk is None:
obj_pk = str(obj_or_model.pk)
# try getting value of AleaIdField straight from instance if possible
if isinstance(obj_or_model, model):
# obj_or_model is an instance, not a model.
if isinstance(meta.pk, AleaIdField):
return obj_or_model.pk
if obj_pk is None:
# fall back to primary key, but coerce as string type for lookup.
obj_pk = str(obj_or_model.pk)
alea_unique_fields = [
field
for field in meta.local_fields
if isinstance(field, AleaIdField) and field.unique
]
if len(alea_unique_fields) == 1:
# found an AleaIdField with unique=True, assume it's got the value.
aid = alea_unique_fields[0].attname
if isinstance(obj_or_model, model):
val = getattr(obj_or_model, aid)
elif obj_pk is None:
val = None
else:
val = model.objects.values_list(aid, flat=True).get(
pk=obj_pk,
)
if val:
return val
if obj_pk is None:
# bail out if args are (model, pk) but pk is None.
return None
# fallback to using AleaIdField from ObjectMapping model.
content_type = ContentType.objects.get_for_model(model)
try:
return ObjectMapping.objects.values_list(
@ -47,37 +77,74 @@ def get_meteor_id(obj_or_model, obj_pk=None):
get_meteor_id.short_description = 'DDP ID' # nice title for admin list_display
@transaction.atomic
def get_meteor_ids(model, object_ids):
"""Return Alea ID mapping for all given ids of specified model."""
content_type = ContentType.objects.get_for_model(model)
# Django model._meta is now public API -> pylint: disable=W0212
meta = model._meta
result = collections.OrderedDict(
(str(obj_pk), None)
for obj_pk
in object_ids
)
for obj_pk, meteor_id in ObjectMapping.objects.filter(
if isinstance(meta.pk, AleaIdField):
# primary_key is an AleaIdField, use it.
return collections.OrderedDict(
(obj_pk, obj_pk) for obj_pk in object_ids
)
alea_unique_fields = [
field
for field in meta.local_fields
if isinstance(field, AleaIdField) and field.unique and not field.null
]
if len(alea_unique_fields) == 1:
aid = alea_unique_fields[0].name
query = model.objects.filter(
pk__in=object_ids,
).values_list('pk', aid)
else:
content_type = ContentType.objects.get_for_model(model)
query = ObjectMapping.objects.filter(
content_type=content_type,
object_id__in=list(result)
).values_list('object_id', 'meteor_id'):
).values_list('object_id', 'meteor_id')
for obj_pk, meteor_id in query:
result[obj_pk] = meteor_id
for obj_pk, meteor_id in result.items():
if meteor_id is None:
# Django model._meta is now public API -> pylint: disable=W0212
result[obj_pk] = ObjectMapping.objects.create(
content_type=content_type,
object_id=obj_pk,
meteor_id=meteor_random_id('/collection/%s' % model._meta),
).meteor_id
result[obj_pk] = get_meteor_id(model, obj_pk)
return result
@transaction.atomic
def get_object_id(model, meteor_id):
"""Return an object ID for the given meteor_id."""
if meteor_id is None:
return None
# Django model._meta is now public API -> pylint: disable=W0212
meta = model._meta
if model is ObjectMapping:
# this doesn't make sense - raise TypeError
raise TypeError("Can't map ObjectMapping instances through self.")
if isinstance(meta.pk, AleaIdField):
# meteor_id is the primary key
return meteor_id
alea_unique_fields = [
field
for field in meta.local_fields
if isinstance(field, AleaIdField) and field.unique
]
if len(alea_unique_fields) == 1:
# found an AleaIdField with unique=True, assume it's got the value.
val = model.objects.values_list(
'pk', flat=True,
).get(**{
alea_unique_fields[0].attname: meteor_id,
})
if val:
return val
content_type = ContentType.objects.get_for_model(model)
return ObjectMapping.objects.filter(
content_type=content_type,
@ -85,29 +152,57 @@ def get_object_id(model, meteor_id):
).values_list('object_id', flat=True).get()
@transaction.atomic
def get_object_ids(model, meteor_ids):
"""Return all object IDs for the given meteor_ids."""
if model is ObjectMapping:
# this doesn't make sense - raise TypeError
raise TypeError("Can't map ObjectMapping instances through self.")
content_type = ContentType.objects.get_for_model(model)
# Django model._meta is now public API -> pylint: disable=W0212
meta = model._meta
alea_unique_fields = [
field
for field in meta.local_fields
if isinstance(field, AleaIdField) and field.unique and not field.null
]
result = collections.OrderedDict(
(str(meteor_id), None)
for meteor_id
in meteor_ids
)
for meteor_id, object_id in ObjectMapping.objects.filter(
content_type=content_type,
meteor_id__in=meteor_ids,
).values_list('meteor_id', 'object_id'):
if len(alea_unique_fields) == 1:
aid = alea_unique_fields[0].name
query = model.objects.filter(**{
'%s__in' % aid: meteor_ids,
}).values_list(aid, 'pk')
else:
content_type = ContentType.objects.get_for_model(model)
query = ObjectMapping.objects.filter(
content_type=content_type,
meteor_id__in=meteor_ids,
).values_list('meteor_id', 'object_id')
for meteor_id, object_id in query:
result[meteor_id] = object_id
return result
@transaction.atomic
def get_object(model, meteor_id, *args, **kwargs):
"""Return an object for the given meteor_id."""
# Django model._meta is now public API -> pylint: disable=W0212
meta = model._meta
if isinstance(meta.pk, AleaIdField):
# meteor_id is the primary key
return model.objects.filter(*args, **kwargs).get(pk=meteor_id)
alea_unique_fields = [
field
for field in meta.local_fields
if isinstance(field, AleaIdField) and field.unique and not field.null
]
if len(alea_unique_fields) == 1:
return model.objects.filter(*args, **kwargs).get(**{
alea_unique_fields[0].name: meteor_id,
})
return model.objects.filter(*args, **kwargs).get(
pk=get_object_id(model, meteor_id),
)
@ -119,12 +214,59 @@ class AleaIdField(models.CharField):
def __init__(self, *args, **kwargs):
"""Assume max_length of 17 to match Meteor implementation."""
kwargs.update(
default=meteor_random_id,
max_length=17,
)
kwargs['blank'] = True
kwargs.setdefault('verbose_name', 'Alea ID')
kwargs.setdefault('max_length', 17)
super(AleaIdField, self).__init__(*args, **kwargs)
def deconstruct(self):
"""Return arguments to pass to __init__() to re-create this field."""
name, path, args, kwargs = super(AleaIdField, self).deconstruct()
del kwargs['blank']
return name, path, args, kwargs
def get_seeded_value(self, instance):
"""Generate a syncronised value."""
# Django model._meta is public API -> pylint: disable=W0212
return meteor_random_id(
'/collection/%s' % instance._meta, self.max_length,
)
def get_pk_value_on_save(self, instance):
"""Generate ID if required."""
value = super(AleaIdField, self).get_pk_value_on_save(instance)
if not value:
value = self.get_seeded_value(instance)
return value
def pre_save(self, model_instance, add):
"""Generate ID if required."""
value = super(AleaIdField, self).pre_save(model_instance, add)
if (not value) and self.default in (meteor_random_id, NOT_PROVIDED):
value = self.get_seeded_value(model_instance)
setattr(model_instance, self.attname, value)
return value
# Please don't hate me...
AID_KWARGS = {}
if os.environ.get('AID_MIGRATE_STEP', '') == '1':
AID_KWARGS['null'] = True # ...please?
class AleaIdMixin(models.Model):
"""Django model mixin that provides AleaIdField field (as aid)."""
aid = AleaIdField(unique=True, editable=True, **AID_KWARGS)
class Meta(object):
"""Model meta options for AleaIdMixin."""
abstract = True
@python_2_unicode_compatible
class ObjectMapping(models.Model):

View file

@ -22,6 +22,7 @@ class PostgresGreenlet(gevent.Greenlet):
# queues for processing incoming sub/unsub requests and processing
self.connections = {}
self.chunks = {}
self._stop_event = gevent.event.Event()
# connect to DB in async mode
@ -62,7 +63,29 @@ class PostgresGreenlet(gevent.Greenlet):
"Got NOTIFY (pid=%d, payload=%r)",
notify.pid, notify.payload,
)
data = ejson.loads(notify.payload)
# read the header and check seq/fin.
hdr, chunk = notify.payload.split('|', 1)
# print('RECEIVE: %s' % hdr)
header = ejson.loads(hdr)
uuid = header['uuid']
size, chunks = self.chunks.setdefault(uuid, [0, {}])
if header['fin']:
size = self.chunks[uuid][0] = header['seq']
# stash the chunk
chunks[header['seq']] = chunk
if len(chunks) != size:
# haven't got all the chunks yet
continue # process next NOTIFY in loop
# got the last chunk -> process it.
data = ''.join(
chunk for _, chunk in sorted(chunks.items())
)
del self.chunks[uuid] # don't forget to cleanup!
data = ejson.loads(data)
sender = data.pop('_sender', None)
tx_id = data.pop('_tx_id', None)
for connection_id in data.pop('_connection_ids'):

View file

@ -1 +0,0 @@
default_app_config = 'dddp.server.apps.ServerConfig'

View file

@ -1,58 +0,0 @@
"""Django DDP Server views."""
from __future__ import print_function, absolute_import, unicode_literals
from ejson import dumps
from django.apps import apps
from django.conf import settings
from django.http import HttpResponse
from django.views.generic import View
STAR_JSON_SETTING_NAME = 'METEOR_STAR_JSON'
class MeteorView(View):
"""Django DDP Meteor server view."""
http_method_names = ['get', 'head']
app = None
runtime_config = None
def __init__(self, **kwargs):
"""Initialisation for Django DDP server view."""
super(MeteorView, self).__init__(**kwargs)
self.app = apps.get_app_config('server')
def get(self, request, path):
"""Return HTML (or other related content) for Meteor."""
if path == '/meteor_runtime_config.js':
config = {
'DDP_DEFAULT_CONNECTION_URL': request.build_absolute_uri('/'),
'ROOT_URL': request.build_absolute_uri(
'%s/' % self.runtime_config.get('ROOT_URL_PATH_PREFIX', ''),
),
'ROOT_URL_PATH_PREFIX': '',
}
# Use HTTPS instead of HTTP if SECURE_SSL_REDIRECT is set
if config['DDP_DEFAULT_CONNECTION_URL'].startswith('http:') \
and settings.SECURE_SSL_REDIRECT:
config['DDP_DEFAULT_CONNECTION_URL'] = 'https:%s' % (
config['DDP_DEFAULT_CONNECTION_URL'].split(':', 1)[1],
)
config.update(self.runtime_config)
return HttpResponse(
'__meteor_runtime_config__ = %s;' % dumps(config),
content_type='text/javascript',
)
try:
file_path, content_type = self.app.url_map[path]
with open(file_path, 'r') as content:
return HttpResponse(
content.read(),
content_type=content_type,
)
except KeyError:
print(path)
return HttpResponse(self.app.html)
# raise Http404

View file

@ -1,20 +1,18 @@
"""Django DDP Server app config."""
"""Django DDP Server views."""
from __future__ import print_function, absolute_import, unicode_literals
import io
import mimetypes
import os.path
from django.apps import AppConfig
from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from ejson import dumps, loads
from django.conf import settings
from django.http import HttpResponse
from django.views.generic import View
import pybars
STAR_JSON_SETTING_NAME = 'METEOR_STAR_JSON'
def read(path, default=None, encoding='utf8'):
"""Read encoded contents from specified path or return default."""
if not path:
@ -34,12 +32,14 @@ def read_json(path):
return loads(json_file.read())
class ServerConfig(AppConfig):
class MeteorView(View):
"""Django config for dddp.server app."""
"""Django DDP Meteor server view."""
name = 'dddp.server'
verbose_name = 'Django DDP Meteor Web Server'
http_method_names = ['get', 'head']
json_path = None
runtime_config = None
manifest = None
program_json = None
@ -47,8 +47,6 @@ class ServerConfig(AppConfig):
runtime_config = None
star_json = None # top level layout
server_json = None # web server layout
web_browser_json = None # web.browser (client) layout
url_map = None
internal_map = None
@ -57,21 +55,21 @@ class ServerConfig(AppConfig):
client_map = None # web.browser (client) URL to path map
html = '<!DOCTYPE html>\n<html><head><title>DDP App</title></head></html>'
def ready(self):
"""Configure Django DDP server app."""
mimetypes.init() # read and process /etc/mime.types
root_url_path_prefix = ''
bundled_js_css_prefix = '/'
def __init__(self, **kwargs):
"""Initialisation for Django DDP server view."""
# super(...).__init__ assigns kwargs to instance.
super(MeteorView, self).__init__(**kwargs)
# read and process /etc/mime.types
mimetypes.init()
self.url_map = {}
try:
json_path = getattr(settings, STAR_JSON_SETTING_NAME)
except AttributeError:
raise ImproperlyConfigured(
'%s setting required by dddp.server.view.' % (
STAR_JSON_SETTING_NAME,
),
)
self.star_json = read_json(json_path)
# process `star_json`
self.star_json = read_json(self.json_path)
star_format = self.star_json['format']
if star_format != 'site-archive-pre1':
raise ValueError(
@ -82,19 +80,20 @@ class ServerConfig(AppConfig):
for program in self.star_json['programs']
}
# process `bundle/programs/server/program.json` from build dir
server_json_path = os.path.join(
os.path.dirname(json_path),
os.path.dirname(self.json_path),
os.path.dirname(programs['server']['path']),
'program.json',
)
self.server_json = read_json(server_json_path)
server_format = self.server_json['format']
server_json = read_json(server_json_path)
server_format = server_json['format']
if server_format != 'javascript-image-pre1':
raise ValueError(
'Unknown Meteor server format: %r' % server_format,
)
self.server_load_map = {}
for item in self.server_json['load']:
for item in server_json['load']:
item['path_full'] = os.path.join(
os.path.dirname(server_json_path),
item['path'],
@ -124,12 +123,13 @@ class ServerConfig(AppConfig):
],
)
# process `bundle/programs/web.browser/program.json` from build dir
web_browser_json_path = os.path.join(
os.path.dirname(json_path),
os.path.dirname(self.json_path),
programs['web.browser']['path'],
)
self.web_browser_json = read_json(web_browser_json_path)
web_browser_format = self.web_browser_json['format']
web_browser_json = read_json(web_browser_json_path)
web_browser_format = web_browser_json['format']
if web_browser_format != 'web-program-pre1':
raise ValueError(
'Unknown Meteor web.browser format: %r' % (
@ -138,7 +138,7 @@ class ServerConfig(AppConfig):
)
self.client_map = {}
self.internal_map = {}
for item in self.web_browser_json['manifest']:
for item in web_browser_json['manifest']:
item['path_full'] = os.path.join(
os.path.dirname(web_browser_json_path),
item['path'],
@ -159,19 +159,19 @@ class ServerConfig(AppConfig):
config = {
'css': [
{'url': item['path']}
for item in self.web_browser_json['manifest']
for item in web_browser_json['manifest']
if item['type'] == 'css' and item['where'] == 'client'
],
'js': [
{'url': item['path']}
for item in self.web_browser_json['manifest']
for item in web_browser_json['manifest']
if item['type'] == 'js' and item['where'] == 'client'
],
'meteorRuntimeConfig': '"%s"' % (
dumps(self.runtime_config)
),
'rootUrlPathPrefix': '/app',
'bundledJsCssPrefix': '/app/',
'rootUrlPathPrefix': self.root_url_path_prefix,
'bundledJsCssPrefix': self.bundled_js_css_prefix,
'inlineScriptsAllowed': False,
'inline': None,
'head': read(
@ -187,3 +187,36 @@ class ServerConfig(AppConfig):
compiler = pybars.Compiler()
tmpl = compiler.compile(tmpl_raw)
self.html = '<!DOCTYPE html>\n%s' % tmpl(config)
def get(self, request, path):
"""Return HTML (or other related content) for Meteor."""
if path == '/meteor_runtime_config.js':
config = {
'DDP_DEFAULT_CONNECTION_URL': request.build_absolute_uri('/'),
'ROOT_URL': request.build_absolute_uri(
'%s/' % self.runtime_config.get('ROOT_URL_PATH_PREFIX', ''),
),
'ROOT_URL_PATH_PREFIX': '',
}
# Use HTTPS instead of HTTP if SECURE_SSL_REDIRECT is set
if config['DDP_DEFAULT_CONNECTION_URL'].startswith('http:') \
and settings.SECURE_SSL_REDIRECT:
config['DDP_DEFAULT_CONNECTION_URL'] = 'https:%s' % (
config['DDP_DEFAULT_CONNECTION_URL'].split(':', 1)[1],
)
config.update(self.runtime_config)
return HttpResponse(
'__meteor_runtime_config__ = %s;' % dumps(config),
content_type='text/javascript',
)
try:
file_path, content_type = self.url_map[path]
with open(file_path, 'r') as content:
return HttpResponse(
content.read(),
content_type=content_type,
)
except KeyError:
print(path)
return HttpResponse(self.html)
# raise Http404

View file

@ -106,8 +106,6 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
version = None
support = None
connection = None
subs = None
request = None
remote_ids = None
base_handler = BaseHandler()
@ -132,7 +130,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
'{0[REMOTE_ADDR]}:{0[REMOTE_PORT]}'.format(
self.ws.environ,
)
self.subs = {}
this.subs = {}
self.logger.info('+ %s OPEN', self)
self.send('o')
self.send('a["{\\"server_id\\":\\"0\\"}"]')
@ -160,32 +158,35 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
try:
msgs = ejson.loads(message)
except ValueError, err:
raise MeteorError(400, 'Data is not valid EJSON')
self.error(400, 'Data is not valid EJSON')
return
if not isinstance(msgs, list):
raise MeteorError(400, 'Invalid EJSON messages')
self.error(400, 'Invalid EJSON messages')
return
# process individual messages
while msgs:
# parse message payload
raw = msgs.pop(0)
try:
try:
data = ejson.loads(raw)
except ValueError, err:
raise MeteorError(400, 'Data is not valid EJSON')
if not isinstance(data, dict):
self.error(400, 'Invalid EJSON message payload', raw)
continue
try:
msg = data.pop('msg')
except KeyError:
raise MeteorError(
400, 'Bad request', None, {'offendingMessage': data}
)
# dispatch message
data = ejson.loads(raw)
except ValueError, err:
self.error(400, 'Data is not valid EJSON')
continue
if not isinstance(data, dict):
self.error(400, 'Invalid EJSON message payload', raw)
continue
try:
msg = data.pop('msg')
except KeyError:
self.error(
400, 'Bad request', offendingMessage=data,
)
continue
# dispatch message
try:
self.dispatch(msg, data)
except MeteorError, err:
traceback.print_exc()
self.error(err)
except Exception, err:
traceback.print_exc()
@ -200,13 +201,16 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
"""Dispatch msg to appropriate recv_foo handler."""
# enforce calling 'connect' first
if self.connection is None and msg != 'connect':
raise MeteorError(400, 'Must connect first')
self.error(400, 'Must connect first')
return
# lookup method handler
try:
handler = getattr(self, 'recv_%s' % msg)
except (AttributeError, UnicodeEncodeError):
raise MeteorError(404, 'Method not found')
print('Method not found: %s %r' % (msg, kwargs))
self.error(404, 'Method not found', msg='result')
return
# validate handler arguments
validate_kwargs(handler, kwargs)
@ -216,7 +220,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
handler(**kwargs)
except Exception, err: # print stack trace --> pylint: disable=W0703
traceback.print_exc()
self.error(MeteorError(500, 'Internal server error', err))
self.error(500, 'Internal server error', err)
def send(self, data, tx_id=None):
"""Send `data` (raw string or EJSON payload) to WebSocket client."""
@ -273,7 +277,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
kwargs['msg'] = msg
self.send(kwargs)
def error(self, err, reason=None, detail=None, **kwargs):
def error(self, err, reason=None, detail=None, msg='error', **kwargs):
"""Send EJSON error to remote."""
if isinstance(err, MeteorError):
(
@ -297,12 +301,13 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
if kwargs:
data.update(kwargs)
self.logger.error('! %s %r', self, data)
self.reply('error', **data)
self.reply(msg, **data)
def recv_connect(self, version=None, support=None, session=None):
"""DDP connect handler."""
if self.connection is not None:
self.error(
400,
'Session already established.',
reason='Current session in detail.',
detail=self.connection.connection_id,
@ -312,21 +317,11 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication):
elif version not in support:
self.error('Client version/support mismatch.')
else:
self.request = WSGIRequest(self.ws.environ)
# Apply request middleware (so we get request.user and other attrs)
# pylint: disable=protected-access
if self.base_handler._request_middleware is None:
self.base_handler.load_middleware()
for middleware_method in self.base_handler._request_middleware:
response = middleware_method(self.request)
if response:
raise ValueError(response)
this.request = WSGIRequest(self.ws.environ)
this.ws = self
this.request = self.request
this.send = self.send
this.reply = self.reply
this.error = self.error
this.request.session.save()
from dddp.models import Connection
cur = connection.cursor()

View file

@ -5,7 +5,7 @@ from setuptools import setup, find_packages
setup(
name='django-ddp',
version='0.9.12',
version='0.12.2',
description=__doc__,
long_description=open('README.rst').read(),
author='Tyson Clugg',