Add gevent thread local support, Alea PRNG generator, refactor EJSON message generation, add ObjectMapping model.

This commit is contained in:
Tyson Clugg 2015-03-04 18:29:07 +11:00
parent 29b9acf8e4
commit ab0d3ba161
5 changed files with 284 additions and 36 deletions

View file

@ -1,7 +1,9 @@
"""Django/PostgreSQL implementation of the Meteor DDP service."""
import os.path
from pkg_resources import get_distribution, DistributionNotFound
from django.utils.module_loading import autodiscover_modules
from gevent.local import local
from dddp import alea
try:
_dist = get_distribution('django-ddp')
@ -14,3 +16,28 @@ else:
__version__ = _dist.version
default_app_config = 'dddp.apps.DjangoDDPConfig'
class ThreadLocal(local):
_init_done = False
def __init__(self, **default_factories):
if self._init_done:
raise SystemError('__init__ called too many times')
self._init_done = True
self._default_factories = default_factories
def __getattr__(self, name):
try:
factory = self._default_factories[name]
except KeyError:
raise AttributeError
obj = factory()
setattr(self, name, obj)
return obj
def get(self, name, factory, *factory_args, **factory_kwargs):
if not hasattr(self, name):
return setattr(self, name, factory(*factory_args, **factory_kwargs))
return getattr(self, name)
THREAD_LOCAL = ThreadLocal(alea_random=alea.Alea)

160
dddp/alea.py Executable file
View file

@ -0,0 +1,160 @@
#!/usr/bin/env python
"""
Alea PRNG.
This implementation of Alea defaults to a more secure initial internal state.
>>> r1, r2 = Alea(), Alea()
>>> assert r1.state != r2.state, 'r1: %r, r2: %r' % (r1.state, r2.state)
>>> random = Alea("my", 3, "seeds")
>>> (random.s0, random.s1, random.s2)
(0.23922116006724536, 0.6147655111271888, 0.3493568613193929)
>>> random()
0.30802189325913787
>>> random()
0.5190450621303171
>>> random()
0.43635262292809784
>>> random = Alea("my", 3, "seeds")
>>> random()
0.30802189325913787
>>> random = Alea("my", 3, "seeds")
>>> random.random_string(17, UNMISTAKABLE)
'JYRduBwQtjpeCkqP7'
>>> random.random_string(17, UNMISTAKABLE)
'HLxYtpZBtSain84zj'
>>> random.random_string(17, UNMISTAKABLE)
's9XrbWaDC4yCL5NCW'
>>> random.random_string(17, UNMISTAKABLE)
'SCiymgNnZpwda9vSH'
>>> random.random_string(17, UNMISTAKABLE)
'hui3ThSoZrFrdFDTT'
>>> random = Alea("my", 3, "seeds")
>>> random.random_string(43, BASE64)
'tHBM5k8z4TZOmU0zgsv9H4ZIl4CJSXic_T3iF2KFJnm'
"""
from math import floor
import os
import random
import time
UNMISTAKABLE = '23456789ABCDEFGHJKLMNPQRSTWXYZabcdefghijkmnopqrstuvwxyz'
BASE64 = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-_'
class Mash(object):
"""
`Mash` hasing algorithm.
>>> mash = Mash()
>>> mash(' ')
0.8633289230056107
>>> mash(' ')
0.15019597788341343
>>> mash(' ')
0.9176952994894236
"""
def __init__(self):
"""Initialise state."""
self.n = 0xefc8249d
def __call__(self, data):
"""Return mash, updating internal state."""
data = bytes(data)
for byte in bytes(data):
self.n += ord(byte)
h = 0.02519603282416938 * self.n
self.n = floor(h)
h -= self.n
h *= self.n
self.n = floor(h)
h -= self.n
self.n += h * 0x100000000
res = self.n * 2.3283064365386963e-10 # 2^-32
return res
class Alea(object):
"""Alea stateful PRNG."""
c = None
s0 = None
s1 = None
s2 = None
def __init__(self, *args):
"""Initialise Alea state from seeds (args)."""
self.seed(args)
def seed(self, values):
"""Seed internal state from supplied values."""
if not values:
# Meteor uses epoch seconds as the seed if no args supplied, we use
# a much more secure seed by default to avoid hash collisions.
seed_ids = [int, str, random, self, values, self.__class__]
random.shuffle(seed_ids)
values = map(id, seed_ids) + [time.time(), os.urandom(512)]
mash = Mash()
self.c = 1
self.s0 = mash(' ')
self.s1 = mash(' ')
self.s2 = mash(' ')
for val in values:
self.s0 -= mash(val)
if self.s0 < 0:
self.s0 += 1
self.s1 -= mash(val)
if self.s1 < 0:
self.s1 += 1
self.s2 -= mash(val)
if self.s2 < 0:
self.s2 += 1
@property
def state(self):
"""Return internal state, useful for testing."""
return {'c': self.c, 's0': self.s0, 's1': self.s1, 's2': self.s2}
def __call__(self):
"""Get the next psuedo random number, updating state."""
t = 2091639 * self.s0 + self.c * 2.3283064365386963e-10 # 2^-32
self.c = floor(t)
self.s0 = self.s1
self.s1 = self.s2
self.s2 = t - self.c
return self.s2
def choice(self, seq):
"""Choose an element from the sequence `seq`."""
return seq[int(self() * len(seq))]
def random_string(self, length, alphabet):
"""Return string of `length` elements chosen from `alphabet`."""
return ''.join(
self.choice(alphabet) for n in range(length)
)
if __name__ == '__main__':
import doctest
doctest.testmod()

View file

@ -2,50 +2,42 @@
from __future__ import print_function
import uuid
import ejson
from django.apps import AppConfig
from django.core import serializers
from django.db import connections
from django.db.models import signals
from dddp.msg import obj_change_as_msg
from dddp.notify import send_notify
def on_save(sender, **kwargs):
"""Post-save signal handler."""
send_notify(
model=sender,
obj=kwargs['instance'],
msg=kwargs['created'] and 'added' or 'changed',
using=kwargs['using'],
)
def on_delete(sender, **kwargs):
"""Post-delete signal handler."""
send_notify(
model=sender,
obj=kwargs['instance'],
msg='removed',
using=kwargs['using'],
)
class DjangoDDPConfig(AppConfig):
"""Django app config for django-ddp."""
name = 'dddp'
verbose_name = 'Django DDP'
serializer = serializers.get_serializer('python')()
def ready(self):
for (signal, handler) in [
(signals.post_save, self.on_save),
(signals.post_delete, self.on_delete),
]:
signal.connect(handler)
def on_save(self, sender, **kwargs):
self.send_notify(
model=sender,
obj=kwargs['instance'],
msg=kwargs['created'] and 'added' or 'changed',
using=kwargs['using'],
)
def on_delete(self, sender, **kwargs):
self.send_notify(
model=sender,
obj=kwargs['instance'],
msg='removed',
using=kwargs['using'],
)
def send_notify(self, model, obj, msg, using):
name, payload = obj_change_as_msg(obj, msg)
cursor = connections[using].cursor()
cursor.execute(
'NOTIFY "%s", %%s' % name,
[
ejson.dumps(payload),
],
)
"""Initialisation for django-ddp (setup signal handlers)."""
signals.post_save.connect(on_save)
signals.post_delete.connect(on_delete)

51
dddp/models.py Normal file
View file

@ -0,0 +1,51 @@
"""Django DDP models."""
from django.db import models
from django.contrib.contenttypes.fields import GenericForeignKey
from django.contrib.contenttypes.models import ContentType
from dddp import THREAD_LOCAL
METEOR_ID_CHARS = '23456789ABCDEFGHJKLMNPQRSTWXYZabcdefghijkmnopqrstuvwxyz'
class AleaIdField(models.CharField):
"""CharField that generates its own values using Alea PRNG before INSERT."""
def __init__(self, *args, **kwargs):
"""Assume max_length of 17 to match Meteor implementation."""
kwargs.setdefault('max_length', 17)
super(AleaIdField, self).__init__(*args, **kwargs)
def pre_save(self, model_instance, add):
"""Generate value if not set during INSERT."""
if add and not getattr(model_instance, self.attname):
value = THREAD_LOCAL.alea_random.random_string(
self.max_length, METEOR_ID_CHARS,
)
setattr(model_instance, self.attname, value)
return value
else:
return super(AleaIdField, self).pre_save(self, model_instance, add)
class ObjectMapping(models.Model):
"""Mapping from regular Django model primary keys to Meteor object IDs."""
meteor_id = AleaIdField()
content_type = models.ForeignKey(ContentType, db_index=True)
object_id = models.PositiveIntegerField()
content_object = GenericForeignKey('content_type', 'object_id')
class Meta(object):
"""Meta info for ObjectMapping model."""
unique_together = [
['content_type', 'meteor_id'],
]
index_together = [
['content_type', 'object_id'],
['content_type', 'meteor_id'],
]

18
dddp/notify.py Normal file
View file

@ -0,0 +1,18 @@
"""Django DDP notification support."""
import ejson
from django.db import connections
from dddp.msg import obj_change_as_msg
def send_notify(model, obj, msg, using):
"""Dispatch PostgreSQL async NOTIFY."""
name, payload = obj_change_as_msg(obj, msg)
cursor = connections[using].cursor()
cursor.execute(
'NOTIFY "%s", %%s' % name,
[
ejson.dumps(payload),
],
)