diff --git a/CHANGES.rst b/CHANGES.rst index 39d433a..2b98e96 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,60 @@ Change Log ========== +0.12.2 +------ +* Set blank=True on AleaIdField, allowing adding items without inventing + IDs yourself. + +0.12.1 +------ +* Add `AleaIdMixin` which provides `aid = AleaIdField(unique=True)` to + models. +* Use `AleaIdField(unique=True)` wherever possible when translating + between Meteor style identifiers and Django primary keys, reducing + round trips to the database and hence drastically improving + performance when such fields are available. + +0.12.0 +------ +* Get path to `star.json` from view config (defined in your urls.py) + instead of from settings. +* Dropped `dddp.server.views`, use `dddp.views` instead. + +0.11.0 +------ +* Support more than 8KB of change data by splitting large payloads into + multiple chunks. + +0.10.2 +------ +* Add `Logs` publication that can be configured to emit logs via DDP + through the use of the `dddp.logging.DDPHandler` log handler. +* Add option to dddp daemon to provide a BackdoorServer (telnet) for + interactive debugging (REPL) at runtime. + +0.10.1 +------ +* Bugfix dddp.accounts forgot_password feature. + +0.10.0 +------ +* Stop processing request middleware upon connection - see + https://github.com/commoncode/django-ddp/commit/e7b38b89db5c4e252ac37566f626b5e9e1651a29 + for rationale. Access to `this.request.user` is gone. +* Add `this.user` handling to dddp.accounts. + +0.9.14 +------ +* Fix ordering of user added vs login ready in dddp.accounts + authentication methods. + +0.9.13 +------ +* Add dddp.models.get_object_ids helper function. +* Add ObjectMappingMixini abstract model mixin providing + GenericRelation back to ObjectMapping model. + 0.9.12 ------ * Bugfix /app.model/schema helper method on collections to work with diff --git a/README.rst b/README.rst index afd6ece..367cc6e 100644 --- a/README.rst +++ b/README.rst @@ -103,6 +103,19 @@ Add ddp.py to your Django application: [Book, Author, AllBooks, BooksByAuthorEmail] ) +Start the Django DDP service: + +.. code:: sh + + DJANGO_SETTINGS_MODULE=myproject.settings dddp + + +Using django-ddp as a secondary DDP connection (RAPID DEVELOPMENT) +------------------------------------------------------------------ + +Running in this manner allows rapid development through use of the hot +code push features provided by Meteor. + Connect your Meteor application to the Django DDP service: .. code:: javascript @@ -118,19 +131,53 @@ Connect your Meteor application to the Django DDP service: Django.subscribe('BooksByAuthorEmail', 'janet@evanovich.com'); } -Start the Django DDP service: - -.. code:: sh - - DJANGO_SETTINGS_MODULE=myproject.settings dddp - -In a separate terminal, start Meteor (from within your meteor -application directory): +Start Meteor (from within your meteor application directory): .. code:: sh meteor +Using django-ddp as the primary DDP connection (RECOMMENDED) +------------------------------------------------------------ + +If you'd prefer to not have two DDP connections (one to Meteor and one +to django-ddp) you can set the `DDP_DEFAULT_CONNECTION_URL` environment +variable to use the specified URL as the primary DDP connection in +Meteor. When doing this, you won't need to use `DDP.connect(...)` or +specify `{connection: Django}` on your collections. Running with +django-ddp as the primary connection is recommended, and indeed required +if you wish to use `dddp.accounts` to provide authentication using +`django.contrib.auth` to your meteor app. + +.. code:: sh + + DDP_DEFAULT_CONNECTION_URL=http://localhost:8000/ meteor + + +Serving your Meteor applications from django-ddp +------------------------------------------------ + +First, you will need to build your meteor app into a directory (examples +below assume target directory named `myapp`): + +.. code:: sh + + meteor build ../myapp + +Then, add a MeteorView to your urls.py: + +.. code:: python + + from dddp.views import MeteorView + + urlpatterns = patterns( + url('^(?P/.*)$', MeteorView.as_view( + json_path=os.path.join( + settings.PROJ_ROOT, 'myapp', 'bundle', 'star.json', + ), + ), + ) + Adding API endpoints (server method definitions) ------------------------------------------------ diff --git a/dddp/__init__.py b/dddp/__init__.py index e4a856a..86fef52 100644 --- a/dddp/__init__.py +++ b/dddp/__init__.py @@ -48,28 +48,27 @@ class ThreadLocal(local): _init_done = False - def __init__(self, **default_factories): + def __init__(self): """Create new thread storage instance.""" if self._init_done: raise SystemError('__init__ called too many times') self._init_done = True - self._default_factories = default_factories def __getattr__(self, name): """Create missing attributes using default factories.""" try: - factory = self._default_factories[name] + factory = THREAD_LOCAL_FACTORIES[name] except KeyError: raise AttributeError(name) - obj = factory() - setattr(self, name, obj) - return obj + return self.get(name, factory) def get(self, name, factory, *factory_args, **factory_kwargs): """Get attribute, creating if required using specified factory.""" - if not hasattr(self, name): + update_thread_local = getattr(factory, 'update_thread_local', True) + if (not update_thread_local) or (not hasattr(self, name)): obj = factory(*factory_args, **factory_kwargs) - setattr(self, name, obj) + if update_thread_local: + setattr(self, name, obj) return obj return getattr(self, name) @@ -139,33 +138,21 @@ def tagged(tag_class, obj): )(*[arg for arg in [obj] if arg is not None]) -class UserIdBase(object): - - """Tag class for user_id.""" - - pass - - -def user_id(): - """Return tagged reference to this.request.user.id""" - return tagged(UserIdBase, THREAD_LOCAL.request.user.pk) - - -THREAD_LOCAL = ThreadLocal( - alea_random=alea.Alea, - random_streams=RandomStreams, - serializer=serializer_factory, - user_id=user_id, -) +THREAD_LOCAL_FACTORIES = { + 'alea_random': alea.Alea, + 'random_streams': RandomStreams, + 'serializer': serializer_factory, +} +THREAD_LOCAL = ThreadLocal() METEOR_ID_CHARS = u'23456789ABCDEFGHJKLMNPQRSTWXYZabcdefghijkmnopqrstuvwxyz' -def meteor_random_id(name=None): +def meteor_random_id(name=None, length=17): if name is None: stream = THREAD_LOCAL.alea_random else: stream = THREAD_LOCAL.random_streams[name] - return stream.random_string(17, METEOR_ID_CHARS) + return stream.random_string(length, METEOR_ID_CHARS) def autodiscover(): diff --git a/dddp/accounts/ddp.py b/dddp/accounts/ddp.py index 3266ed7..8ebb25a 100644 --- a/dddp/accounts/ddp.py +++ b/dddp/accounts/ddp.py @@ -7,23 +7,30 @@ See http://docs.meteor.com/#/full/accounts_api for details of each method. """ from binascii import Error import collections +import datetime import hashlib from ejson import loads, dumps from django.conf import settings from django.contrib import auth -from django.contrib.sessions.backends.db import SessionStore -from django.contrib.auth.signals import user_login_failed +from django.contrib.auth.signals import ( + user_login_failed, user_logged_in, user_logged_out, +) from django.dispatch import Signal from django.utils import timezone -from dddp import THREAD_LOCAL as this, ADDED, REMOVED +from dddp import ( + THREAD_LOCAL_FACTORIES, THREAD_LOCAL as this, ADDED, REMOVED, + meteor_random_id, +) from dddp.models import get_meteor_id, get_object, Subscription from dddp.api import API, APIMixin, api_endpoint, Collection, Publication from dddp.websocket import MeteorError +# pylint dones't like lower case attribute names on modules, but it's the normal +# thing to do for Django signal names. --> pylint: disable=C0103 create_user = Signal(providing_args=['request', 'params']) password_changed = Signal(providing_args=['request', 'user']) forgot_password = Signal(providing_args=['request', 'user', 'token', 'expiry']) @@ -36,8 +43,71 @@ class HashPurpose(object): PASSWORD_RESET = 'password_reset' RESUME_LOGIN = 'resume_login' - CHANGE_EMAIL = 'change_email' - CREATE_USER = 'create_user' + + +HASH_DAYS_VALID = { + HashPurpose.PASSWORD_RESET: int( + getattr( + # keep possible attack window short to reduce chance of account + # takeover through later discovery of password reset email message. + settings, 'DDP_PASSWORD_RESET_DAYS_VALID', '1', + ) + ), + HashPurpose.RESUME_LOGIN: int( + getattr( + # balance security and useability by allowing users to resume their + # logins within a reasonable time, but not forever. + settings, 'DDP_LOGIN_RESUME_DAYS_VALID', '10', + ) + ), +} + + +def iter_auth_hashes(user, purpose, days_valid): + """ + Generate auth tokens tied to user and specified purpose. + + The hash expires at midnight on the day of today + days_valid, such that + when days_valid=1 you get *at least* 24 hours to use the token. + """ + today = timezone.now().date() + for day in range(days_valid + 1): + yield hashlib.sha1( + '%s:%s:%s:%s:%s' % ( + today - datetime.timedelta(days=day), + user.password, + purpose, + user.pk, + settings.SECRET_KEY, + ), + ).hexdigest() + + +def get_auth_hash(user, purpose): + """Generate a user hash for a particular purpose.""" + return iter_auth_hashes(user, purpose, days_valid=1).next() + + +def calc_expiry_time(days_valid): + """Return specific time an auth_hash will expire.""" + return ( + timezone.now() + datetime.timedelta(days=days_valid + 1) + ).replace(hour=0, minute=0, second=0, microsecond=0) + + +def get_user_token(user, purpose, days_valid): + """Return login token info for given user.""" + token = ''.join( + dumps([ + user.get_username(), + get_auth_hash(user, purpose), + ]).encode('base64').split('\n') + ) + return { + 'id': get_meteor_id(user), + 'token': token, + 'tokenExpires': calc_expiry_time(days_valid), + } class Users(Collection): @@ -99,7 +169,7 @@ class Users(Collection): return data @staticmethod - def deserialize_profile(user, profile, key_prefix='', pop=False): + def deserialize_profile(profile, key_prefix='', pop=False): """De-serialize user profile fields into concrete model fields.""" result = {} if pop: @@ -122,12 +192,14 @@ class Users(Collection): @api_endpoint def update(self, selector, update, options=None): """Update user data.""" + # we're ignoring the `options` argument at this time + del options user = get_object( self.model, selector['_id'], - pk=this.request.user.pk, + pk=this.user_id, ) profile_update = self.deserialize_profile( - user, update['$set'], key_prefix='profile.', pop=True, + update['$set'], key_prefix='profile.', pop=True, ) if len(update['$set']) != 0: raise MeteorError(400, 'Invalid update fields: %r') @@ -137,15 +209,22 @@ class Users(Collection): user.save() -class LoginPublication(Publication): +class LoginServiceConfiguration(Publication): - """Meteor Accounts emulation.""" + """Published list of authenitcation providers and their configuration.""" name = 'meteor.loginServiceConfiguration' - def get_queries(self): - """Return queries for each set of data the user may see.""" - yield Users.query(pk=this.request.user.pk) + queries = [] + + +class LoggedInUser(Publication): + + """Meteor auto publication for showing logged in user.""" + + queries = [ + (Users.model.objects.all(), 'users'), + ] class Auth(APIMixin): @@ -154,8 +233,22 @@ class Auth(APIMixin): api_path_prefix = '' # auth endpoints don't have a common prefix user_model = auth.get_user_model() + user_id = None + user_ddp_id = None - def update_subs(self, new_user_id): + def user_factory(self): + """Retrieve the current user (or None) from the database.""" + if this.user_id is None: + return None + return self.user_model.objects.get(pk=this.user_id) + user_factory.update_thread_local = False + + def ready(self): + """Called after AppConfig.ready().""" + THREAD_LOCAL_FACTORIES['user'] = self.user_factory + + @staticmethod + def update_subs(new_user_id): """Update subs to send added/removed for collections with user_rel.""" for sub in Subscription.objects.filter(connection=this.ws.connection): params = loads(sub.params_ejson) @@ -163,7 +256,7 @@ class Auth(APIMixin): # calculate the querysets prior to update pre = collections.OrderedDict([ - (col, qs) for col, qs + (col, query) for col, query in API.sub_unique_objects(sub, params, pub) ]) @@ -173,30 +266,32 @@ class Auth(APIMixin): # calculate the querysets after the update post = collections.OrderedDict([ - (col, qs) for col, qs + (col, query) for col, query in API.sub_unique_objects(sub, params, pub) ]) # first pass, send `added` for objs unique to `post` - for col_post, qs in post.items(): + for col_post, query in post.items(): try: qs_pre = pre[col_post] - qs = qs.exclude(pk__in=qs_pre.order_by().values('pk')) + query = query.exclude(pk__in=qs_pre.order_by().values('pk')) except KeyError: # collection not included pre-auth, everything is added. pass - for obj in qs: + for obj in query: this.ws.send(col_post.obj_change_as_msg(obj, ADDED)) # second pass, send `removed` for objs unique to `pre` - for col_pre, qs in pre.items(): + for col_pre, query in pre.items(): try: qs_post = post[col_pre] - qs = qs.exclude(pk__in=qs_post.order_by().values('pk')) + query = query.exclude( + pk__in=qs_post.order_by().values('pk'), + ) except KeyError: # collection not included post-auth, everything is removed. pass - for obj in qs: + for obj in query: this.ws.send(col_pre.obj_change_as_msg(obj, REMOVED)) @staticmethod @@ -209,55 +304,24 @@ class Auth(APIMixin): ) raise MeteorError(403, 'Authentication failed.') - @staticmethod - def get_auth_hash(user, purpose): - """Generate a user hash for a particular purpose.""" - return hashlib.sha1( - ':'.join([ - settings.SECRET_KEY, - user.get_session_auth_hash(), - purpose, - ]) - ).hexdigest() - @classmethod - def validated_user_and_session(cls, token, purpose): - """Resolve and validate auth token, returns user and session objects.""" + def validated_user(cls, token, purpose, days_valid): + """Resolve and validate auth token, returns user object.""" try: - username, session_key, auth_hash = loads(token.decode('base64')) + username, auth_hash = loads(token.decode('base64')) except (ValueError, Error): cls.auth_failed(token=token) try: user = cls.user_model.objects.get(**{ cls.user_model.USERNAME_FIELD: username, + 'is_active': True, }) user.backend = 'django.contrib.auth.backends.ModelBackend' except cls.user_model.DoesNotExist: cls.auth_failed(username=username, token=token) - if cls.get_auth_hash(user, purpose) != auth_hash: + if auth_hash not in iter_auth_hashes(user, purpose, days_valid): cls.auth_failed(username=username, token=token) - session = SessionStore( - session_key=session_key, - ) - if session.get_expiry_date() <= timezone.now(): - cls.auth_failed(username=username, token=token) - return (user, session) - - @classmethod - def get_user_token(cls, user, session_key, expiry_date, purpose): - """Return login token info for given user.""" - token = ''.join( - dumps([ - user.get_username(), - session_key, - cls.get_auth_hash(user, purpose), - ]).encode('base64').split('\n') - ) - return { - 'id': get_meteor_id(user), - 'token': token, - 'tokenExpires': expiry_date, - } + return user @staticmethod def check_secure(): @@ -344,20 +408,40 @@ class Auth(APIMixin): user = auth.authenticate( username=user.get_username(), password=params['password'], ) - auth.login(this.request, user) - self.update_subs(user.pk) - return self.get_user_token( - user=user, - session_key=this.request.session.session_key, - expiry_date=this.request.session.get_expiry_date(), - purpose=HashPurpose.CREATE_USER, + self.do_login(user) + return get_user_token( + user=user, purpose=HashPurpose.RESUME_LOGIN, + days_valid=HASH_DAYS_VALID[HashPurpose.RESUME_LOGIN], ) + def do_login(self, user): + """Login a user.""" + this.user_id = user.pk + this.user_ddp_id = get_meteor_id(user) + # silent subscription (sans sub/nosub msg) to LoggedInUser pub + this.user_sub_id = meteor_random_id() + API.do_sub(this.user_sub_id, 'LoggedInUser', silent=True) + self.update_subs(user.pk) + user_logged_in.send( + sender=user.__class__, request=this.request, user=user, + ) + + def do_logout(self): + """Logout a user.""" + # silent unsubscription (sans sub/nosub msg) from LoggedInUser pub + API.do_unsub(this.user_sub_id, silent=True) + del this.user_sub_id + self.update_subs(None) + user_logged_out.send( + sender=self.user_model, request=this.request, user=this.user, + ) + this.user_id = None + this.user_ddp_id = None + @api_endpoint def logout(self): """Logout current user.""" - auth.logout(this.request) - self.update_subs(None) + self.do_logout() @api_endpoint def login(self, params): @@ -381,14 +465,10 @@ class Auth(APIMixin): if user is not None: # the password verified for the user if user.is_active: - auth.login(this.request, user) - self.update_subs(user.pk) - this.request.session.save() - return self.get_user_token( - user=user, - session_key=this.request.session.session_key, - expiry_date=this.request.session.get_expiry_date(), - purpose=HashPurpose.RESUME_LOGIN, + self.do_login(user) + return get_user_token( + user=user, purpose=HashPurpose.RESUME_LOGIN, + days_valid=HASH_DAYS_VALID[HashPurpose.RESUME_LOGIN], ) # Call to `authenticate` was unable to verify the username and password. @@ -408,26 +488,27 @@ class Auth(APIMixin): # never allow insecure login self.check_secure() - # pull the username, session_key and auth_hash from the token - user, session = self.validated_user_and_session( + # pull the username and auth_hash from the token + user = self.validated_user( params['resume'], purpose=HashPurpose.RESUME_LOGIN, + days_valid=HASH_DAYS_VALID[HashPurpose.RESUME_LOGIN], ) - auth.login(this.request, user) - self.update_subs(user.pk) - this.request.session.save() - return self.get_user_token( - user=user, - session_key=session.session_key, - expiry_date=session.get_expiry_date(), - purpose=HashPurpose.RESUME_LOGIN, + self.do_login(user) + return get_user_token( + user=user, purpose=HashPurpose.RESUME_LOGIN, + days_valid=HASH_DAYS_VALID[HashPurpose.RESUME_LOGIN], ) @api_endpoint('changePassword') def change_password(self, old_password, new_password): """Change password.""" + try: + user = this.user + except self.user_model.DoesNotExist: + self.auth_failed() user = auth.authenticate( - username=this.request.user.get_username(), + username=user.get_username(), password=self.get_password(old_password), ) if user is None: @@ -453,11 +534,10 @@ class Auth(APIMixin): except self.user_model.DoesNotExist: self.auth_failed() - expiry_date = this.request.session.get_expiry_date() - token = self.get_user_token( - user=user, session_key=this.request.session.session_key, - expiry_date=expiry_date, - purpose=HashPurpose.PASSWORD_RESET, + days_valid = HASH_DAYS_VALID[HashPurpose.PASSWORD_RESET] + token = get_user_token( + user=user, purpose=HashPurpose.PASSWORD_RESET, + days_valid=days_valid, ) forgot_password.send( @@ -465,20 +545,20 @@ class Auth(APIMixin): user=user, token=token, request=this.request, - expiry_date=expiry_date, + expiry_date=calc_expiry_time(days_valid), ) @api_endpoint('resetPassword') def reset_password(self, token, new_password): """Reset password using a token received in email then logs user in.""" - user, _ = self.validated_user_and_session( + user = self.validated_user( token, purpose=HashPurpose.PASSWORD_RESET, + days_valid=HASH_DAYS_VALID[HashPurpose.PASSWORD_RESET], ) user.set_password(new_password) user.save() - auth.login(this.request, user) - self.update_subs(user.pk) - return {"userId": get_meteor_id(this.request.user)}; + self.do_login(user) + return {"userId": this.user_ddp_id} -API.register([Users, LoginPublication, Auth]) +API.register([Users, LoginServiceConfiguration, LoggedInUser, Auth]) diff --git a/dddp/api.py b/dddp/api.py index 3815fb0..2d02c3a 100644 --- a/dddp/api.py +++ b/dddp/api.py @@ -5,6 +5,7 @@ from __future__ import absolute_import, unicode_literals, print_function import collections from copy import deepcopy import traceback +import uuid # requirements import dbarray @@ -29,7 +30,9 @@ from dddp import ( AlreadyRegistered, THREAD_LOCAL as this, ADDED, CHANGED, REMOVED, UserIdBase, ) -from dddp.models import Connection, Subscription, get_meteor_id, get_meteor_ids +from dddp.models import ( + AleaIdField, Connection, Subscription, get_meteor_id, get_meteor_ids, +) XMIN = {'select': {'xmin': "'xmin'"}} @@ -163,6 +166,10 @@ class APIMixin(object): """Return API endpoint for given api_path.""" return self.api_path_map()[api_path] + def ready(self): + """Initialisation (setup lookups and signal handlers).""" + pass + def model_name(model): """Return model name given model class.""" @@ -484,6 +491,15 @@ class Collection(APIMixin): ) elif isinstance(field, django.contrib.postgres.fields.ArrayField): fields[field.name] = field.to_python(fields.pop(field.name)) + elif ( + isinstance(field, AleaIdField) + ) and ( + not field.null + ) and ( + field.name == 'aid' + ): + # This will be sent as the `id`, don't send it in `fields`. + fields.pop(field.name) for field in meta.local_many_to_many: fields['%s_ids' % field.name] = get_meteor_ids( field.rel.to, fields.pop(field.name), @@ -648,30 +664,37 @@ class DDP(APIMixin): @api_endpoint def sub(self, id_, name, *params): """Create subscription, send matched objects that haven't been sent.""" + return self.do_sub(id_, name, False, *params) + + def do_sub(self, id_, name, silent, *params): + """Subscribe the current thread to the specified publication.""" try: pub = self.get_pub_by_name(name) except KeyError: - this.send({ - 'msg': 'nosub', - 'error': { - 'error': 404, - 'errorType': 'Meteor.Error', - 'message': 'Subscription not found [404]', - 'reason': 'Subscription not found', - }, - }) + if not silent: + this.send({ + 'msg': 'nosub', + 'error': { + 'error': 404, + 'errorType': 'Meteor.Error', + 'message': 'Subscription not found [404]', + 'reason': 'Subscription not found', + }, + }) return sub, created = Subscription.objects.get_or_create( connection_id=this.ws.connection.pk, sub_id=id_, - user_id=this.request.user.pk, + user_id=getattr(this, 'user_id', None), defaults={ 'publication': pub.name, 'params_ejson': ejson.dumps(params), }, ) + this.subs.setdefault(sub.publication, set()).add(sub.pk) if not created: - this.send({'msg': 'ready', 'subs': [id_]}) + if not silent: + this.send({'msg': 'ready', 'subs': [id_]}) return # re-read from DB so we can get transaction ID (xmin) sub = Subscription.objects.extra(**XMIN).get(pk=sub.pk) @@ -682,29 +705,55 @@ class DDP(APIMixin): model_name=model_name(qs.model), collection_name=col.name, ) - meteor_ids = get_meteor_ids( - qs.model, qs.values_list('pk', flat=True), - ) + if isinstance(col.model._meta.pk, AleaIdField): + meteor_ids = None + elif len([ + field + for field + in col.model._meta.local_fields + if ( + isinstance(field, AleaIdField) + ) and ( + field.unique + ) and ( + not field.null + ) + ]) == 1: + meteor_ids = None + else: + meteor_ids = get_meteor_ids( + qs.model, qs.values_list('pk', flat=True), + ) for obj in qs: payload = col.obj_change_as_msg(obj, ADDED, meteor_ids) this.send(payload) - this.send({'msg': 'ready', 'subs': [id_]}) + if not silent: + this.send({'msg': 'ready', 'subs': [id_]}) @api_endpoint def unsub(self, id_): """Remove a subscription.""" + self.do_unsub(id_, False) + + def do_unsub(self, id_, silent): + """Unsubscribe the current thread from the specified subscription id.""" sub = Subscription.objects.get( connection=this.ws.connection, sub_id=id_, ) for col, qs in self.sub_unique_objects(sub): - meteor_ids = get_meteor_ids( - qs.model, qs.values_list('pk', flat=True), - ) + if isinstance(col.model._meta.pk, AleaIdField): + meteor_ids = None + else: + meteor_ids = get_meteor_ids( + qs.model, qs.values_list('pk', flat=True), + ) for obj in qs: payload = col.obj_change_as_msg(obj, REMOVED, meteor_ids) this.send(payload) + this.subs[sub.publication].remove(sub.pk) sub.delete() - this.send({'msg': 'nosub', 'id': id_}) + if not silent: + this.send({'msg': 'nosub', 'id': id_}) @api_endpoint def method(self, method, params, id_): @@ -712,7 +761,17 @@ class DDP(APIMixin): try: handler = self.api_path_map()[method] except KeyError: - this.error('Unknown method: %s' % method) + print('Unknown method: %s %r' % (method, params)) + this.send({ + 'msg': 'result', + 'id': id_, + 'error': { + 'error': 404, + 'errorType': 'Meteor.Error', + 'message': 'Unknown method: %s %r' % (method, params), + 'reason': 'Method not found', + }, + }) return params_repr = repr(params) try: @@ -776,6 +835,9 @@ class DDP(APIMixin): signals.post_save.connect(self.on_post_save) signals.post_delete.connect(self.on_post_delete) signals.m2m_changed.connect(self.on_m2m_changed) + # call ready on each registered API endpoint + for api_provider in self.api_providers: + api_provider.ready() def on_pre_migrate(self, sender, **kwargs): """Pre-migrate signal handler.""" @@ -939,13 +1001,32 @@ class DDP(APIMixin): if my_connection_id in connection_ids: # msg must go to connection that initiated the change payload['_tx_id'] = this.ws.get_tx_id() + # header is sent in every payload + header = { + 'uuid': uuid.uuid1().int, # UUID1 should be unique + 'seq': 1, # increments for each 8KB chunk + 'fin': 0, # zero if more chunks expected, 1 if last chunk. + } + data = ejson.dumps(payload) cursor = connections[using].cursor() - cursor.execute( - 'NOTIFY "ddp", %s', - [ - ejson.dumps(payload), - ], - ) + while data: + hdr = ejson.dumps(header) + # use all available payload space for chunk + max_len = 8000 - len(hdr) - 100 + # take a chunk from data + chunk, data = data[:max_len], data[max_len:] + if not data: + # last chunk, set fin=1. + header['fin'] = 1 + hdr = ejson.dumps(header) + # print('NOTIFY: %s' % hdr) + cursor.execute( + 'NOTIFY "ddp", %s', + [ + '%s|%s' % (hdr, chunk), # pipe separates hdr|chunk. + ], + ) + header['seq'] += 1 # increment sequence. API = DDP() diff --git a/dddp/ddp.py b/dddp/ddp.py new file mode 100644 index 0000000..0af0f11 --- /dev/null +++ b/dddp/ddp.py @@ -0,0 +1,22 @@ +from dddp import THREAD_LOCAL as this +from dddp.api import API, Publication +from django.contrib import auth + + +class Logs(Publication): + + users = auth.get_user_model() + + def get_queries(self): + user_pk = getattr(this, 'user_id', False) + if user_pk: + if self.users.objects.filter( + pk=user_pk, + is_active=True, + is_superuser=True, + ).exists(): + return [] + raise ValueError('User not permitted.') + + +API.register([Logs]) diff --git a/dddp/logging.py b/dddp/logging.py new file mode 100644 index 0000000..121e542 --- /dev/null +++ b/dddp/logging.py @@ -0,0 +1,38 @@ +"""Django DDP logging helpers.""" +from __future__ import absolute_import, print_function + +import logging + +from dddp import THREAD_LOCAL as this, meteor_random_id, ADDED + + + + +class DDPHandler(logging.Handler): + + """Logging handler that streams log events via DDP to the current client.""" + + def __init__(self, *args, **kwargs): + self.logger = logging.getLogger('django.db.backends') + self.logger.info('Test') + super(DDPHandler, self).__init__(*args, **kwargs) + + def emit(self, record): + """Emit a formatted log record via DDP.""" + if getattr(this, 'subs', {}).get('Logs', False): + this.send({ + 'msg': ADDED, + 'collection': 'logs', + 'id': meteor_random_id('/collection/logs'), + 'fields': { + # 'name': record.name, + # 'levelno': record.levelno, + 'levelname': record.levelname, + # 'pathname': record.pathname, + # 'lineno': record.lineno, + 'msg': record.msg, + 'args': record.args, + # 'exc_info': record.exc_info, + # 'funcName': record.funcName, + }, + }) diff --git a/dddp/main.py b/dddp/main.py index 2b0f863..e8ab34a 100644 --- a/dddp/main.py +++ b/dddp/main.py @@ -60,7 +60,7 @@ def ddpp_sockjs_info(environ, start_response): ])) -def serve(listen, debug=False, **ssl_args): +def serve(listen, debug=False, verbosity=1, debug_port=0, **ssl_args): """Spawn greenlets for handling websockets and PostgreSQL calls.""" import signal from django.apps import apps @@ -99,7 +99,7 @@ def serve(listen, debug=False, **ssl_args): ) # setup WebSocketServer to dispatch web requests - webservers = [ + servers = [ geventwebsocket.WebSocketServer( (host, port), resource, @@ -113,8 +113,8 @@ def serve(listen, debug=False, **ssl_args): def killall(*args, **kwargs): """Kill all green threads.""" pgworker.stop() - for webserver in webservers: - webserver.stop() + for server in servers: + server.stop() # die gracefully with SIGINT or SIGQUIT gevent.signal(signal.SIGINT, killall) @@ -133,19 +133,38 @@ def serve(listen, debug=False, **ssl_args): ) # start greenlets + if debug_port: + from gevent.backdoor import BackdoorServer + servers.append( + BackdoorServer( + ('127.0.0.1', debug_port), + banner='Django DDP', + locals={ + 'servers': servers, + 'pgworker': pgworker, + 'killall': killall, + 'api': api, + 'resource': resource, + 'settings': settings, + 'wsgi_app': wsgi_app, + 'wsgi_name': wsgi_name, + }, + ) + ) + pgworker.start() print('=> Started PostgresGreenlet.') - web_threads = [ - gevent.spawn(webserver.serve_forever) - for webserver - in webservers + threads = [ + gevent.spawn(server.serve_forever) + for server + in servers ] print('=> Started DDPWebSocketApplication.') print('=> Started your app (%s).' % wsgi_name) print('') for host, port in listen: print('=> App running at: http://%s:%d/' % (host, port)) - gevent.joinall(web_threads) + gevent.joinall(threads) pgworker.stop() gevent.joinall([pgworker]) @@ -190,6 +209,14 @@ def main(): import argparse parser = argparse.ArgumentParser(description=__doc__) django = parser.add_argument_group('Django Options') + django.add_argument( + '--verbosity', '-v', metavar='VERBOSITY', dest='verbosity', type=int, + default=1, + ) + django.add_argument( + '--debug-port', metavar='DEBUG_PORT', dest='debug_port', type=int, + default=0, + ) django.add_argument( '--settings', metavar='SETTINGS', dest='settings', help="The Python path to a settings module, e.g. " @@ -218,8 +245,10 @@ def main(): os.environ['DJANGO_SETTINGS_MODULE'] = namespace.settings serve( namespace.listen or [Addr('localhost', 8000)], + debug_port=namespace.debug_port, keyfile=namespace.keyfile, certfile=namespace.certfile, + verbosity=namespace.verbosity, ) diff --git a/dddp/migrations/0009_auto_20150812_0856.py b/dddp/migrations/0009_auto_20150812_0856.py new file mode 100644 index 0000000..3247443 --- /dev/null +++ b/dddp/migrations/0009_auto_20150812_0856.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import models, migrations +import dddp.models + + +class Migration(migrations.Migration): + + dependencies = [ + ('dddp', '0008_remove_subscription_publication_class'), + ] + + operations = [ + migrations.AlterField( + model_name='connection', + name='connection_id', + field=dddp.models.AleaIdField(max_length=17, verbose_name=b'Alea ID'), + ), + ] diff --git a/dddp/migrations/__init__.py b/dddp/migrations/__init__.py index 29844c7..51b84bc 100644 --- a/dddp/migrations/__init__.py +++ b/dddp/migrations/__init__.py @@ -1,4 +1,7 @@ +import functools +from django.db import migrations from django.db.migrations.operations.base import Operation +from dddp.models import AleaIdField, get_meteor_id class TruncateOperation(Operation): @@ -35,3 +38,48 @@ class TruncateOperation(Operation): def describe(self): """Describe what the operation does in console output.""" return "Truncate tables" + + +def set_default_forwards(app_name, operation, apps, schema_editor): + """Set default value for AleaIdField.""" + model = apps.get_model(app_name, operation.model_name) + for obj_pk in model.objects.values_list('pk', flat=True): + model.objects.filter(pk=obj_pk).update(**{ + operation.name: get_meteor_id(model, obj_pk), + }) + + +def set_default_reverse(app_name, operation, apps, schema_editor): + """Unset default value for AleaIdField.""" + model = apps.get_model(app_name, operation.model_name) + for obj_pk in model.objects.values_list('pk', flat=True): + get_meteor_id(model, obj_pk) + + +class DefaultAleaIdOperations(object): + + def __init__(self, app_name): + self.app_name = app_name + + def __add__(self, operations): + default_operations = [] + for operation in operations: + if not isinstance(operation, migrations.AlterField): + continue + if not isinstance(operation.field, AleaIdField): + continue + if operation.name != 'aid': + continue + if operation.field.null: + continue + default_operations.append( + migrations.RunPython( + code=functools.partial( + set_default_forwards, self.app_name, operation, + ), + reverse_code=functools.partial( + set_default_reverse, self.app_name, operation, + ), + ) + ) + return default_operations + operations diff --git a/dddp/models.py b/dddp/models.py index b4515f0..89d36b6 100644 --- a/dddp/models.py +++ b/dddp/models.py @@ -2,8 +2,10 @@ from __future__ import absolute_import import collections +import os from django.db import models, transaction +from django.db.models.fields import NOT_PROVIDED from django.conf import settings from django.contrib.contenttypes.fields import ( GenericRelation, GenericForeignKey, @@ -15,7 +17,6 @@ import ejson from dddp import meteor_random_id -@transaction.atomic def get_meteor_id(obj_or_model, obj_pk=None): """Return an Alea ID for the given object.""" if obj_or_model is None: @@ -26,10 +27,39 @@ def get_meteor_id(obj_or_model, obj_pk=None): if model is ObjectMapping: # this doesn't make sense - raise TypeError raise TypeError("Can't map ObjectMapping instances through self.") - if obj_or_model is not model and obj_pk is None: - obj_pk = str(obj_or_model.pk) + + # try getting value of AleaIdField straight from instance if possible + if isinstance(obj_or_model, model): + # obj_or_model is an instance, not a model. + if isinstance(meta.pk, AleaIdField): + return obj_or_model.pk + if obj_pk is None: + # fall back to primary key, but coerce as string type for lookup. + obj_pk = str(obj_or_model.pk) + alea_unique_fields = [ + field + for field in meta.local_fields + if isinstance(field, AleaIdField) and field.unique + ] + if len(alea_unique_fields) == 1: + # found an AleaIdField with unique=True, assume it's got the value. + aid = alea_unique_fields[0].attname + if isinstance(obj_or_model, model): + val = getattr(obj_or_model, aid) + elif obj_pk is None: + val = None + else: + val = model.objects.values_list(aid, flat=True).get( + pk=obj_pk, + ) + if val: + return val + if obj_pk is None: + # bail out if args are (model, pk) but pk is None. return None + + # fallback to using AleaIdField from ObjectMapping model. content_type = ContentType.objects.get_for_model(model) try: return ObjectMapping.objects.values_list( @@ -47,37 +77,74 @@ def get_meteor_id(obj_or_model, obj_pk=None): get_meteor_id.short_description = 'DDP ID' # nice title for admin list_display -@transaction.atomic def get_meteor_ids(model, object_ids): """Return Alea ID mapping for all given ids of specified model.""" - content_type = ContentType.objects.get_for_model(model) + # Django model._meta is now public API -> pylint: disable=W0212 + meta = model._meta result = collections.OrderedDict( (str(obj_pk), None) for obj_pk in object_ids ) - for obj_pk, meteor_id in ObjectMapping.objects.filter( + if isinstance(meta.pk, AleaIdField): + # primary_key is an AleaIdField, use it. + return collections.OrderedDict( + (obj_pk, obj_pk) for obj_pk in object_ids + ) + alea_unique_fields = [ + field + for field in meta.local_fields + if isinstance(field, AleaIdField) and field.unique and not field.null + ] + if len(alea_unique_fields) == 1: + aid = alea_unique_fields[0].name + query = model.objects.filter( + pk__in=object_ids, + ).values_list('pk', aid) + else: + content_type = ContentType.objects.get_for_model(model) + query = ObjectMapping.objects.filter( content_type=content_type, object_id__in=list(result) - ).values_list('object_id', 'meteor_id'): + ).values_list('object_id', 'meteor_id') + for obj_pk, meteor_id in query: result[obj_pk] = meteor_id for obj_pk, meteor_id in result.items(): if meteor_id is None: - # Django model._meta is now public API -> pylint: disable=W0212 - result[obj_pk] = ObjectMapping.objects.create( - content_type=content_type, - object_id=obj_pk, - meteor_id=meteor_random_id('/collection/%s' % model._meta), - ).meteor_id + result[obj_pk] = get_meteor_id(model, obj_pk) return result -@transaction.atomic def get_object_id(model, meteor_id): """Return an object ID for the given meteor_id.""" + if meteor_id is None: + return None + # Django model._meta is now public API -> pylint: disable=W0212 + meta = model._meta + if model is ObjectMapping: # this doesn't make sense - raise TypeError raise TypeError("Can't map ObjectMapping instances through self.") + + if isinstance(meta.pk, AleaIdField): + # meteor_id is the primary key + return meteor_id + + alea_unique_fields = [ + field + for field in meta.local_fields + if isinstance(field, AleaIdField) and field.unique + ] + if len(alea_unique_fields) == 1: + # found an AleaIdField with unique=True, assume it's got the value. + val = model.objects.values_list( + 'pk', flat=True, + ).get(**{ + alea_unique_fields[0].attname: meteor_id, + }) + if val: + return val + content_type = ContentType.objects.get_for_model(model) return ObjectMapping.objects.filter( content_type=content_type, @@ -85,29 +152,57 @@ def get_object_id(model, meteor_id): ).values_list('object_id', flat=True).get() -@transaction.atomic def get_object_ids(model, meteor_ids): """Return all object IDs for the given meteor_ids.""" if model is ObjectMapping: # this doesn't make sense - raise TypeError raise TypeError("Can't map ObjectMapping instances through self.") - content_type = ContentType.objects.get_for_model(model) + # Django model._meta is now public API -> pylint: disable=W0212 + meta = model._meta + alea_unique_fields = [ + field + for field in meta.local_fields + if isinstance(field, AleaIdField) and field.unique and not field.null + ] result = collections.OrderedDict( (str(meteor_id), None) for meteor_id in meteor_ids ) - for meteor_id, object_id in ObjectMapping.objects.filter( - content_type=content_type, - meteor_id__in=meteor_ids, - ).values_list('meteor_id', 'object_id'): + if len(alea_unique_fields) == 1: + aid = alea_unique_fields[0].name + query = model.objects.filter(**{ + '%s__in' % aid: meteor_ids, + }).values_list(aid, 'pk') + else: + content_type = ContentType.objects.get_for_model(model) + query = ObjectMapping.objects.filter( + content_type=content_type, + meteor_id__in=meteor_ids, + ).values_list('meteor_id', 'object_id') + for meteor_id, object_id in query: result[meteor_id] = object_id return result -@transaction.atomic def get_object(model, meteor_id, *args, **kwargs): """Return an object for the given meteor_id.""" + # Django model._meta is now public API -> pylint: disable=W0212 + meta = model._meta + if isinstance(meta.pk, AleaIdField): + # meteor_id is the primary key + return model.objects.filter(*args, **kwargs).get(pk=meteor_id) + + alea_unique_fields = [ + field + for field in meta.local_fields + if isinstance(field, AleaIdField) and field.unique and not field.null + ] + if len(alea_unique_fields) == 1: + return model.objects.filter(*args, **kwargs).get(**{ + alea_unique_fields[0].name: meteor_id, + }) + return model.objects.filter(*args, **kwargs).get( pk=get_object_id(model, meteor_id), ) @@ -119,12 +214,59 @@ class AleaIdField(models.CharField): def __init__(self, *args, **kwargs): """Assume max_length of 17 to match Meteor implementation.""" - kwargs.update( - default=meteor_random_id, - max_length=17, - ) + kwargs['blank'] = True + kwargs.setdefault('verbose_name', 'Alea ID') + kwargs.setdefault('max_length', 17) super(AleaIdField, self).__init__(*args, **kwargs) + def deconstruct(self): + """Return arguments to pass to __init__() to re-create this field.""" + name, path, args, kwargs = super(AleaIdField, self).deconstruct() + del kwargs['blank'] + return name, path, args, kwargs + + def get_seeded_value(self, instance): + """Generate a syncronised value.""" + # Django model._meta is public API -> pylint: disable=W0212 + return meteor_random_id( + '/collection/%s' % instance._meta, self.max_length, + ) + + def get_pk_value_on_save(self, instance): + """Generate ID if required.""" + value = super(AleaIdField, self).get_pk_value_on_save(instance) + if not value: + value = self.get_seeded_value(instance) + return value + + def pre_save(self, model_instance, add): + """Generate ID if required.""" + value = super(AleaIdField, self).pre_save(model_instance, add) + if (not value) and self.default in (meteor_random_id, NOT_PROVIDED): + value = self.get_seeded_value(model_instance) + setattr(model_instance, self.attname, value) + return value + + +# Please don't hate me... +AID_KWARGS = {} + +if os.environ.get('AID_MIGRATE_STEP', '') == '1': + AID_KWARGS['null'] = True # ...please? + + +class AleaIdMixin(models.Model): + + """Django model mixin that provides AleaIdField field (as aid).""" + + aid = AleaIdField(unique=True, editable=True, **AID_KWARGS) + + class Meta(object): + + """Model meta options for AleaIdMixin.""" + + abstract = True + @python_2_unicode_compatible class ObjectMapping(models.Model): diff --git a/dddp/postgres.py b/dddp/postgres.py index 2c2404c..51844d0 100644 --- a/dddp/postgres.py +++ b/dddp/postgres.py @@ -22,6 +22,7 @@ class PostgresGreenlet(gevent.Greenlet): # queues for processing incoming sub/unsub requests and processing self.connections = {} + self.chunks = {} self._stop_event = gevent.event.Event() # connect to DB in async mode @@ -62,7 +63,29 @@ class PostgresGreenlet(gevent.Greenlet): "Got NOTIFY (pid=%d, payload=%r)", notify.pid, notify.payload, ) - data = ejson.loads(notify.payload) + + # read the header and check seq/fin. + hdr, chunk = notify.payload.split('|', 1) + # print('RECEIVE: %s' % hdr) + header = ejson.loads(hdr) + uuid = header['uuid'] + size, chunks = self.chunks.setdefault(uuid, [0, {}]) + if header['fin']: + size = self.chunks[uuid][0] = header['seq'] + + # stash the chunk + chunks[header['seq']] = chunk + + if len(chunks) != size: + # haven't got all the chunks yet + continue # process next NOTIFY in loop + + # got the last chunk -> process it. + data = ''.join( + chunk for _, chunk in sorted(chunks.items()) + ) + del self.chunks[uuid] # don't forget to cleanup! + data = ejson.loads(data) sender = data.pop('_sender', None) tx_id = data.pop('_tx_id', None) for connection_id in data.pop('_connection_ids'): diff --git a/dddp/server/__init__.py b/dddp/server/__init__.py deleted file mode 100644 index cea26b4..0000000 --- a/dddp/server/__init__.py +++ /dev/null @@ -1 +0,0 @@ -default_app_config = 'dddp.server.apps.ServerConfig' diff --git a/dddp/server/views.py b/dddp/server/views.py deleted file mode 100644 index 59e4575..0000000 --- a/dddp/server/views.py +++ /dev/null @@ -1,58 +0,0 @@ -"""Django DDP Server views.""" -from __future__ import print_function, absolute_import, unicode_literals -from ejson import dumps -from django.apps import apps -from django.conf import settings -from django.http import HttpResponse -from django.views.generic import View - - -STAR_JSON_SETTING_NAME = 'METEOR_STAR_JSON' - - -class MeteorView(View): - - """Django DDP Meteor server view.""" - - http_method_names = ['get', 'head'] - - app = None - runtime_config = None - - def __init__(self, **kwargs): - """Initialisation for Django DDP server view.""" - super(MeteorView, self).__init__(**kwargs) - self.app = apps.get_app_config('server') - - def get(self, request, path): - """Return HTML (or other related content) for Meteor.""" - if path == '/meteor_runtime_config.js': - config = { - 'DDP_DEFAULT_CONNECTION_URL': request.build_absolute_uri('/'), - 'ROOT_URL': request.build_absolute_uri( - '%s/' % self.runtime_config.get('ROOT_URL_PATH_PREFIX', ''), - ), - 'ROOT_URL_PATH_PREFIX': '', - } - # Use HTTPS instead of HTTP if SECURE_SSL_REDIRECT is set - if config['DDP_DEFAULT_CONNECTION_URL'].startswith('http:') \ - and settings.SECURE_SSL_REDIRECT: - config['DDP_DEFAULT_CONNECTION_URL'] = 'https:%s' % ( - config['DDP_DEFAULT_CONNECTION_URL'].split(':', 1)[1], - ) - config.update(self.runtime_config) - return HttpResponse( - '__meteor_runtime_config__ = %s;' % dumps(config), - content_type='text/javascript', - ) - try: - file_path, content_type = self.app.url_map[path] - with open(file_path, 'r') as content: - return HttpResponse( - content.read(), - content_type=content_type, - ) - except KeyError: - print(path) - return HttpResponse(self.app.html) - # raise Http404 diff --git a/dddp/server/apps.py b/dddp/views.py similarity index 63% rename from dddp/server/apps.py rename to dddp/views.py index 3065be9..76ebfac 100644 --- a/dddp/server/apps.py +++ b/dddp/views.py @@ -1,20 +1,18 @@ -"""Django DDP Server app config.""" +"""Django DDP Server views.""" from __future__ import print_function, absolute_import, unicode_literals import io import mimetypes import os.path -from django.apps import AppConfig -from django.conf import settings -from django.core.exceptions import ImproperlyConfigured from ejson import dumps, loads +from django.conf import settings +from django.http import HttpResponse +from django.views.generic import View + import pybars -STAR_JSON_SETTING_NAME = 'METEOR_STAR_JSON' - - def read(path, default=None, encoding='utf8'): """Read encoded contents from specified path or return default.""" if not path: @@ -34,12 +32,14 @@ def read_json(path): return loads(json_file.read()) -class ServerConfig(AppConfig): +class MeteorView(View): - """Django config for dddp.server app.""" + """Django DDP Meteor server view.""" - name = 'dddp.server' - verbose_name = 'Django DDP Meteor Web Server' + http_method_names = ['get', 'head'] + + json_path = None + runtime_config = None manifest = None program_json = None @@ -47,8 +47,6 @@ class ServerConfig(AppConfig): runtime_config = None star_json = None # top level layout - server_json = None # web server layout - web_browser_json = None # web.browser (client) layout url_map = None internal_map = None @@ -57,21 +55,21 @@ class ServerConfig(AppConfig): client_map = None # web.browser (client) URL to path map html = '\nDDP App' - def ready(self): - """Configure Django DDP server app.""" - mimetypes.init() # read and process /etc/mime.types + root_url_path_prefix = '' + bundled_js_css_prefix = '/' + + def __init__(self, **kwargs): + """Initialisation for Django DDP server view.""" + # super(...).__init__ assigns kwargs to instance. + super(MeteorView, self).__init__(**kwargs) + + # read and process /etc/mime.types + mimetypes.init() + self.url_map = {} - try: - json_path = getattr(settings, STAR_JSON_SETTING_NAME) - except AttributeError: - raise ImproperlyConfigured( - '%s setting required by dddp.server.view.' % ( - STAR_JSON_SETTING_NAME, - ), - ) - - self.star_json = read_json(json_path) + # process `star_json` + self.star_json = read_json(self.json_path) star_format = self.star_json['format'] if star_format != 'site-archive-pre1': raise ValueError( @@ -82,19 +80,20 @@ class ServerConfig(AppConfig): for program in self.star_json['programs'] } + # process `bundle/programs/server/program.json` from build dir server_json_path = os.path.join( - os.path.dirname(json_path), + os.path.dirname(self.json_path), os.path.dirname(programs['server']['path']), 'program.json', ) - self.server_json = read_json(server_json_path) - server_format = self.server_json['format'] + server_json = read_json(server_json_path) + server_format = server_json['format'] if server_format != 'javascript-image-pre1': raise ValueError( 'Unknown Meteor server format: %r' % server_format, ) self.server_load_map = {} - for item in self.server_json['load']: + for item in server_json['load']: item['path_full'] = os.path.join( os.path.dirname(server_json_path), item['path'], @@ -124,12 +123,13 @@ class ServerConfig(AppConfig): ], ) + # process `bundle/programs/web.browser/program.json` from build dir web_browser_json_path = os.path.join( - os.path.dirname(json_path), + os.path.dirname(self.json_path), programs['web.browser']['path'], ) - self.web_browser_json = read_json(web_browser_json_path) - web_browser_format = self.web_browser_json['format'] + web_browser_json = read_json(web_browser_json_path) + web_browser_format = web_browser_json['format'] if web_browser_format != 'web-program-pre1': raise ValueError( 'Unknown Meteor web.browser format: %r' % ( @@ -138,7 +138,7 @@ class ServerConfig(AppConfig): ) self.client_map = {} self.internal_map = {} - for item in self.web_browser_json['manifest']: + for item in web_browser_json['manifest']: item['path_full'] = os.path.join( os.path.dirname(web_browser_json_path), item['path'], @@ -159,19 +159,19 @@ class ServerConfig(AppConfig): config = { 'css': [ {'url': item['path']} - for item in self.web_browser_json['manifest'] + for item in web_browser_json['manifest'] if item['type'] == 'css' and item['where'] == 'client' ], 'js': [ {'url': item['path']} - for item in self.web_browser_json['manifest'] + for item in web_browser_json['manifest'] if item['type'] == 'js' and item['where'] == 'client' ], 'meteorRuntimeConfig': '"%s"' % ( dumps(self.runtime_config) ), - 'rootUrlPathPrefix': '/app', - 'bundledJsCssPrefix': '/app/', + 'rootUrlPathPrefix': self.root_url_path_prefix, + 'bundledJsCssPrefix': self.bundled_js_css_prefix, 'inlineScriptsAllowed': False, 'inline': None, 'head': read( @@ -187,3 +187,36 @@ class ServerConfig(AppConfig): compiler = pybars.Compiler() tmpl = compiler.compile(tmpl_raw) self.html = '\n%s' % tmpl(config) + + def get(self, request, path): + """Return HTML (or other related content) for Meteor.""" + if path == '/meteor_runtime_config.js': + config = { + 'DDP_DEFAULT_CONNECTION_URL': request.build_absolute_uri('/'), + 'ROOT_URL': request.build_absolute_uri( + '%s/' % self.runtime_config.get('ROOT_URL_PATH_PREFIX', ''), + ), + 'ROOT_URL_PATH_PREFIX': '', + } + # Use HTTPS instead of HTTP if SECURE_SSL_REDIRECT is set + if config['DDP_DEFAULT_CONNECTION_URL'].startswith('http:') \ + and settings.SECURE_SSL_REDIRECT: + config['DDP_DEFAULT_CONNECTION_URL'] = 'https:%s' % ( + config['DDP_DEFAULT_CONNECTION_URL'].split(':', 1)[1], + ) + config.update(self.runtime_config) + return HttpResponse( + '__meteor_runtime_config__ = %s;' % dumps(config), + content_type='text/javascript', + ) + try: + file_path, content_type = self.url_map[path] + with open(file_path, 'r') as content: + return HttpResponse( + content.read(), + content_type=content_type, + ) + except KeyError: + print(path) + return HttpResponse(self.html) + # raise Http404 diff --git a/dddp/websocket.py b/dddp/websocket.py index be4f507..0676363 100644 --- a/dddp/websocket.py +++ b/dddp/websocket.py @@ -106,8 +106,6 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): version = None support = None connection = None - subs = None - request = None remote_ids = None base_handler = BaseHandler() @@ -132,7 +130,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): '{0[REMOTE_ADDR]}:{0[REMOTE_PORT]}'.format( self.ws.environ, ) - self.subs = {} + this.subs = {} self.logger.info('+ %s OPEN', self) self.send('o') self.send('a["{\\"server_id\\":\\"0\\"}"]') @@ -160,32 +158,35 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): try: msgs = ejson.loads(message) except ValueError, err: - raise MeteorError(400, 'Data is not valid EJSON') + self.error(400, 'Data is not valid EJSON') + return if not isinstance(msgs, list): - raise MeteorError(400, 'Invalid EJSON messages') + self.error(400, 'Invalid EJSON messages') + return # process individual messages while msgs: # parse message payload raw = msgs.pop(0) try: - try: - data = ejson.loads(raw) - except ValueError, err: - raise MeteorError(400, 'Data is not valid EJSON') - if not isinstance(data, dict): - self.error(400, 'Invalid EJSON message payload', raw) - continue - try: - msg = data.pop('msg') - except KeyError: - raise MeteorError( - 400, 'Bad request', None, {'offendingMessage': data} - ) - # dispatch message + data = ejson.loads(raw) + except ValueError, err: + self.error(400, 'Data is not valid EJSON') + continue + if not isinstance(data, dict): + self.error(400, 'Invalid EJSON message payload', raw) + continue + try: + msg = data.pop('msg') + except KeyError: + self.error( + 400, 'Bad request', offendingMessage=data, + ) + continue + # dispatch message + try: self.dispatch(msg, data) except MeteorError, err: - traceback.print_exc() self.error(err) except Exception, err: traceback.print_exc() @@ -200,13 +201,16 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): """Dispatch msg to appropriate recv_foo handler.""" # enforce calling 'connect' first if self.connection is None and msg != 'connect': - raise MeteorError(400, 'Must connect first') + self.error(400, 'Must connect first') + return # lookup method handler try: handler = getattr(self, 'recv_%s' % msg) except (AttributeError, UnicodeEncodeError): - raise MeteorError(404, 'Method not found') + print('Method not found: %s %r' % (msg, kwargs)) + self.error(404, 'Method not found', msg='result') + return # validate handler arguments validate_kwargs(handler, kwargs) @@ -216,7 +220,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): handler(**kwargs) except Exception, err: # print stack trace --> pylint: disable=W0703 traceback.print_exc() - self.error(MeteorError(500, 'Internal server error', err)) + self.error(500, 'Internal server error', err) def send(self, data, tx_id=None): """Send `data` (raw string or EJSON payload) to WebSocket client.""" @@ -273,7 +277,7 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): kwargs['msg'] = msg self.send(kwargs) - def error(self, err, reason=None, detail=None, **kwargs): + def error(self, err, reason=None, detail=None, msg='error', **kwargs): """Send EJSON error to remote.""" if isinstance(err, MeteorError): ( @@ -297,12 +301,13 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): if kwargs: data.update(kwargs) self.logger.error('! %s %r', self, data) - self.reply('error', **data) + self.reply(msg, **data) def recv_connect(self, version=None, support=None, session=None): """DDP connect handler.""" if self.connection is not None: self.error( + 400, 'Session already established.', reason='Current session in detail.', detail=self.connection.connection_id, @@ -312,21 +317,11 @@ class DDPWebSocketApplication(geventwebsocket.WebSocketApplication): elif version not in support: self.error('Client version/support mismatch.') else: - self.request = WSGIRequest(self.ws.environ) - # Apply request middleware (so we get request.user and other attrs) - # pylint: disable=protected-access - if self.base_handler._request_middleware is None: - self.base_handler.load_middleware() - for middleware_method in self.base_handler._request_middleware: - response = middleware_method(self.request) - if response: - raise ValueError(response) + this.request = WSGIRequest(self.ws.environ) this.ws = self - this.request = self.request this.send = self.send this.reply = self.reply this.error = self.error - this.request.session.save() from dddp.models import Connection cur = connection.cursor() diff --git a/setup.py b/setup.py index 2494769..a269565 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ from setuptools import setup, find_packages setup( name='django-ddp', - version='0.9.12', + version='0.12.2', description=__doc__, long_description=open('README.rst').read(), author='Tyson Clugg',