Added strategy/pipeline behavior

This commit is contained in:
ifandelse 2014-01-25 01:27:33 -05:00
parent f8cdebd019
commit 72388c6ae8
10 changed files with 408 additions and 238 deletions

View file

@ -9,10 +9,7 @@
(function (root, factory) {
if (typeof module === "object" && module.exports) {
// Node, or CommonJS-Like environments
module.exports = function (_) {
_ = _ || require("underscore");
return factory(_, this);
};
module.exports = factory(require("underscore"), this);
} else if (typeof define === "function" && define.amd) {
// AMD. Register as an anonymous module.
define(["underscore"], function (_) {
@ -27,6 +24,138 @@
var postal;
var prevPostal = global.postal;
var Strategy = function (options) {
var _target = options.owner[options.prop];
if (typeof _target !== "function") {
throw new Error("Strategies can only target methods.");
}
var _strategies = [];
var _context = options.context || options.owner;
var strategy = function () {
var idx = 0;
var next = function next() {
var args = Array.prototype.slice.call(arguments, 0);
var thisIdx = idx;
var strategy;
idx += 1;
if (thisIdx < _strategies.length) {
strategy = _strategies[thisIdx];
strategy.fn.apply(strategy.context || _context, [next].concat(args));
} else {
_target.apply(_context, args);
}
};
next.apply(this, arguments);
};
strategy.target = function () {
return _target;
};
strategy.context = function (ctx) {
if (arguments.length === 0) {
return _context;
} else {
_context = ctx;
}
};
strategy.strategies = function () {
return _strategies;
};
strategy.useStrategy = function (strategy) {
var idx = 0,
exists = false;
while (idx < _strategies.length) {
if (_strategies[idx].name === strategy.name) {
_strategies[idx] = strategy;
exists = true;
break;
}
idx += 1;
}
if (!exists) {
_strategies.push(strategy);
}
};
strategy.reset = function () {
_strategies = [];
};
if (options.lazyInit) {
_target.useStrategy = function () {
options.owner[options.prop] = strategy;
strategy.useStrategy.apply(strategy, arguments);
};
_target.context = function () {
options.owner[options.prop] = strategy;
return strategy.context.apply(strategy, arguments);
};
return _target;
} else {
return strategy;
}
};
var strats = {
setTimeout: function (ms) {
return {
name: "setTimeout",
fn: function (next, data, envelope) {
setTimeout(function () {
next(data, envelope);
}, ms);
}
};
},
after: function (maxCalls, callback) {
var dispose = _.after(maxCalls, callback);
return {
name: "after",
fn: function (next, data, envelope) {
dispose();
next(data, envelope);
}
};
},
throttle: function (ms) {
return {
name: "throttle",
fn: _.throttle(function (next, data, envelope) {
next(data, envelope);
}, ms)
};
},
debounce: function (ms, immediate) {
return {
name: "debounce",
fn: _.debounce(function (next, data, envelope) {
next(data, envelope);
}, ms, !! immediate)
};
},
predicate: function (pred) {
return {
name: "predicate",
fn: function (next, data, envelope) {
if (pred.call(this, data, envelope)) {
next.call(this, data, envelope);
}
}
};
},
distinct: function (options) {
options = options || {};
var accessor = function (args) {
return args[0];
};
var check = options.all ? new DistinctPredicate(accessor) : new ConsecutiveDistinctPredicate(accessor);
return {
name: "distinct",
fn: function (next, data, envelope) {
if (check(data)) {
next(data, envelope);
}
}
};
}
};
var ConsecutiveDistinctPredicate = function () {
var previous;
@ -90,9 +219,7 @@
}
this.channel = channel;
this.topic = topic;
this.callback = callback;
this.constraints = [];
this.context = null;
this.subscribe(callback);
};
SubscriptionDefinition.prototype = {
@ -104,13 +231,7 @@
},
defer: function () {
var self = this;
var fn = this.callback;
this.callback = function (data, env) {
setTimeout(function () {
fn.call(self.context, data, env);
}, 0);
};
this.callback.useStrategy(postal.configuration.strategies.setTimeout(0));
return this;
},
@ -119,25 +240,21 @@
throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero.";
}
var self = this;
var fn = this.callback;
var dispose = _.after(maxCalls, _.bind(function () {
this.unsubscribe();
}, this));
this.callback = function () {
fn.apply(self.context, arguments);
dispose();
};
return this;
self.callback.useStrategy(postal.configuration.strategies.after(maxCalls, function () {
self.unsubscribe.call(self);
}));
return self;
},
distinctUntilChanged: function () {
this.withConstraint(new ConsecutiveDistinctPredicate());
this.callback.useStrategy(postal.configuration.strategies.distinct());
return this;
},
distinct: function () {
this.withConstraint(new DistinctPredicate());
this.callback.useStrategy(postal.configuration.strategies.distinct({
all: true
}));
return this;
},
@ -150,22 +267,12 @@
if (!_.isFunction(predicate)) {
throw "Predicate constraint must be a function";
}
this.constraints.push(predicate);
this.callback.useStrategy(postal.configuration.strategies.predicate(predicate));
return this;
},
withConstraints: function (predicates) {
var self = this;
if (_.isArray(predicates)) {
_.each(predicates, function (predicate) {
self.withConstraint(predicate);
});
}
return self;
},
withContext: function (context) {
this.context = context;
this.callback.context(context);
return this;
},
@ -182,28 +289,27 @@
if (_.isNaN(milliseconds)) {
throw "Milliseconds must be a number";
}
var self = this;
var fn = this.callback;
this.callback = function (data, env) {
setTimeout(function () {
fn.call(self.context, data, env);
}, milliseconds);
};
this.callback.useStrategy(postal.configuration.strategies.setTimeout(milliseconds));
return this;
},
withThrottle: function (milliseconds, options) {
options = options || {};
withThrottle: function (milliseconds) {
if (_.isNaN(milliseconds)) {
throw "Milliseconds must be a number";
}
var fn = this.callback;
this.callback = _.throttle(fn, milliseconds, options);
this.callback.useStrategy(postal.configuration.strategies.throttle(milliseconds));
return this;
},
subscribe: function (callback) {
this.callback = callback;
this.callback = new Strategy({
owner: this,
prop: "callback",
context: this,
// TODO: is this the best option?
lazyInit: true
});
return this;
}
};
@ -358,7 +464,8 @@
bus: localBus,
resolver: bindingsResolver,
DEFAULT_CHANNEL: "/",
SYSTEM_CHANNEL: "postal"
SYSTEM_CHANNEL: "postal",
strategies: strats
},
ChannelDefinition: ChannelDefinition,

2
lib/postal.min.js vendored

File diff suppressed because one or more lines are too long

View file

@ -63,11 +63,8 @@
it( "should have set subscription topic value", function () {
expect( sub.topic ).to.be( "MyTopic" );
} );
it( "should have defaulted the subscription constraints array", function () {
expect( sub.constraints.length ).to.be( 0 );
} );
it( "should have defaulted the subscription context value", function () {
expect( sub.context ).to.be( null );
expect( sub.callback.context() ).to.be( sub );
} );
it( "should have captured subscription creation event", function () {
expect( caughtSubscribeEvent ).to.be.ok();
@ -92,8 +89,8 @@
postal.utils.reset();
subInvokedCnt = 0;
} );
it( "should have a constraint on the subscription", function () {
expect( postal.configuration.bus.subscriptions.MyChannel.MyTopic[0].constraints.length ).to.be( 1 );
it( "callback should be a strategy", function () {
expect( typeof postal.configuration.bus.subscriptions.MyChannel.MyTopic[0].callback.context ).to.be( "function" );
} );
it( "subscription callback should be invoked once", function () {
expect( subInvokedCnt ).to.be( 1 );
@ -329,57 +326,28 @@
subscription = channel.subscribe( "MyTopic", function ( data ) {
recvd = true;
} )
.withConstraints( [function () {
.withConstraint(function () {
return false;
})
.withConstraint(function () {
return false;
})
.withConstraint(function () {
return true;
},
function () {
return true;
},
function () {
return true;
}] );
});
channel.publish( "MyTopic", "Testing123" );
} );
after( function () {
postal.utils.reset();
recvd = false;
} );
it( "should have a constraint on the subscription", function () {
expect( postal.configuration.bus.subscriptions.MyChannel.MyTopic[0].constraints.length ).to.be( 3 );
it( "should overwrite constraint with last one passed in", function () {
expect( subscription.callback.strategies().length ).to.be( 1 );
} );
it( "should have invoked the callback", function () {
expect( recvd ).to.be.ok();
} );
} );
describe( "When subscribing with multiple constraints and one returning false", function () {
var recvd = false;
before( function () {
channel = postal.channel( "MyChannel" );
subscription = channel.subscribe( "MyTopic", function ( data ) {
recvd = true;
} )
.withConstraints( [function () {
return true;
},
function () {
return false;
},
function () {
return true;
}] );
channel.publish( "MyTopic", "Testing123" );
} );
after( function () {
postal.utils.reset();
recvd = false;
} );
it( "should have a constraint on the subscription", function () {
expect( postal.configuration.bus.subscriptions.MyChannel.MyTopic[0].constraints.length ).to.be( 3 );
} );
it( "should not have invoked the callback", function () {
expect( recvd ).to.not.be.ok()
} );
} );
describe( "When subscribing with one constraint returning false", function () {
var recvd = false;
before( function () {
@ -397,7 +365,7 @@
recvd = false;
} );
it( "should have a constraint on the subscription", function () {
expect( postal.configuration.bus.subscriptions.MyChannel.MyTopic[0].constraints.length ).to.be( 1 );
expect( subscription.callback.strategies()[0].name ).to.be( "predicate" );
} );
it( "should not have invoked the subscription callback", function () {
expect( recvd ).to.not.be.ok();
@ -420,7 +388,7 @@
recvd = false;
} );
it( "should have a constraint on the subscription", function () {
expect( postal.configuration.bus.subscriptions.MyChannel.MyTopic[0].constraints.length ).to.be( 1 );
expect( subscription.callback.strategies()[0].name ).to.be( "predicate" );
} );
it( "should have invoked the subscription callback", function () {
expect( recvd ).to.be.ok();
@ -506,11 +474,8 @@
it( "should have set subscription topic value", function () {
expect( sub.topic ).to.be( "MyTopic" );
} );
it( "should have defaulted the subscription constraints array", function () {
expect( sub.constraints.length ).to.be( 0 );
} );
it( "should have defaulted the subscription context value", function () {
expect( sub.context ).to.be( null );
expect( sub.callback.context() ).to.be( sub );
} );
} );
});
@ -655,16 +620,16 @@
results.push( "unsubscribed" );
}
} );
subscription1 = channel.subscribe( 'nest.test',function () {
results.push( '1 received message' );
subscription1 = channel.subscribe( "nest.test",function () {
results.push( "1 received message" );
channel.publish( "nest.test2", "Hai" );
} ).once();
subscription2 = channel.subscribe( 'nest.test2', function () {
results.push( '2 received message' );
subscription2 = channel.subscribe( "nest.test2", function () {
results.push( "2 received message" );
} );
channel.publish( 'nest.test' );
channel.publish( 'nest.test' );
channel.publish( "nest.test" );
channel.publish( "nest.test" );
} );
after( function () {
//subscription2.unsubscribe();
@ -673,9 +638,9 @@
} );
it( "should produce expected messages", function () {
expect( results.length ).to.be( 3 );
expect( results[0] ).to.be( "1 received message" );
expect( results[1] ).to.be( "2 received message" );
expect( results[2] ).to.be( "unsubscribed" );
expect( results[0] ).to.be( "unsubscribed" );
expect( results[1] ).to.be( "1 received message" );
expect( results[2] ).to.be( "2 received message" );
} );
} );
});

View file

@ -22,42 +22,26 @@
it( "should set the callback", function () {
expect( sDef.callback ).to.be( NO_OP );
} );
it( "should default the constraints", function () {
expect( sDef.constraints.length ).to.be( 0 );
} );
it( "should default the context", function () {
expect( sDef.context ).to.be( null );
expect( sDef.context ).to.be( undefined );
} );
} );
describe( "When setting distinctUntilChanged", function () {
var sDefa = new SubscriptionDefinition( "TestChannel", "TestTopic", NO_OP ).distinctUntilChanged();
it( "Should add a DistinctPredicate constraint to the configuration constraints", function () {
expect( sDefa.constraints.length ).to.be( 1 );
} );
it( "callback should be a strategy", function () {
expect( typeof sDefa.callback.context ).to.be( "function" );
});
} );
describe( "When adding a constraint", function () {
var sDefb = new SubscriptionDefinition( "TestChannel", "TestTopic", NO_OP ).withConstraint( function () {
} );
});
it( "Should add a constraint", function () {
expect( sDefb.constraints.length ).to.be( 1 );
} );
} );
describe( "When adding multiple constraints", function () {
var sDefc = new SubscriptionDefinition( "TestChannel", "TestTopic", NO_OP )
.withConstraints( [
function () {},
function () {},
function () {}
]);
it( "Should add a constraint", function () {
expect( sDefc.constraints.length ).to.be( 3 );
} );
it( "callback should be a strategy", function () {
expect( typeof sDefb.callback.context ).to.be( "function" );
});
} );
describe( "When setting the context", function () {
@ -69,9 +53,13 @@
name = this.name;
return true;
} );
sDefd.callback({ channel : "TestChannel", topic : "TestTopic", data : "Oh, hai"}, "Oh, hai");
it( "Should set context", function () {
expect( sDefd.context ).to.be( obj );
expect( sDefd.callback.context() ).to.be( obj );
} );
it( "Should apply context to predicate/constraint", function () {
expect( name ).to.be( "Rose" );
} );
} );

View file

@ -48,8 +48,7 @@
it( "should have created a subscription definition", function () {
expect( sub.channel ).to.be( "MyChannel" );
expect( sub.topic ).to.be( "MyTopic" );
expect( sub.constraints.length ).to.be( 0 );
expect( sub.context ).to.be( null );
expect( sub.context ).to.be( undefined );
} );
it( "should have created a resolver cache entry", function () {
expect( _.isEmpty( resolver ) ).to.not.be.ok()

View file

@ -5,7 +5,8 @@ postal = {
bus : localBus,
resolver : bindingsResolver,
DEFAULT_CHANNEL : "/",
SYSTEM_CHANNEL : "postal"
SYSTEM_CHANNEL : "postal",
strategies : strats
},
ChannelDefinition : ChannelDefinition,

View file

@ -7,11 +7,9 @@ var SubscriptionDefinition = function ( channel, topic, callback ) {
if(topic.length === 0) {
throw new Error("Topics cannot be empty");
}
this.channel = channel;
this.topic = topic;
this.callback = callback;
this.constraints = [];
this.context = null;
this.channel = channel;
this.topic = topic;
this.subscribe(callback);
};
SubscriptionDefinition.prototype = {
@ -22,107 +20,83 @@ SubscriptionDefinition.prototype = {
}
},
defer : function () {
var self = this;
var fn = this.callback;
this.callback = function ( data, env ) {
setTimeout( function () {
fn.call( self.context, data, env );
}, 0 );
};
return this;
},
defer : function () {
this.callback.useStrategy(postal.configuration.strategies.setTimeout(0));
return this;
},
disposeAfter : function ( maxCalls ) {
if ( _.isNaN( maxCalls ) || maxCalls <= 0 ) {
throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero.";
}
var self = this;
var fn = this.callback;
var dispose = _.after( maxCalls, _.bind( function () {
this.unsubscribe();
}, this ) );
disposeAfter : function ( maxCalls ) {
if ( _.isNaN( maxCalls ) || maxCalls <= 0 ) {
throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero.";
}
var self = this;
self.callback.useStrategy(postal.configuration.strategies.after(maxCalls, function() {
self.unsubscribe.call(self);
}));
return self;
},
this.callback = function () {
fn.apply( self.context, arguments );
dispose();
};
return this;
},
distinctUntilChanged : function () {
this.callback.useStrategy(postal.configuration.strategies.distinct());
return this;
},
distinctUntilChanged : function () {
this.withConstraint( new ConsecutiveDistinctPredicate() );
return this;
},
distinct : function () {
this.callback.useStrategy(postal.configuration.strategies.distinct({ all: true }));
return this;
},
distinct : function () {
this.withConstraint( new DistinctPredicate() );
return this;
},
once : function () {
this.disposeAfter( 1 );
return this;
},
once : function () {
this.disposeAfter( 1 );
return this;
},
withConstraint : function ( predicate ) {
if ( !_.isFunction( predicate ) ) {
throw "Predicate constraint must be a function";
}
this.callback.useStrategy(postal.configuration.strategies.predicate(predicate));
return this;
},
withConstraint : function ( predicate ) {
if ( !_.isFunction( predicate ) ) {
throw "Predicate constraint must be a function";
}
this.constraints.push( predicate );
return this;
},
withContext : function ( context ) {
this.callback.context(context);
return this;
},
withConstraints : function ( predicates ) {
var self = this;
if ( _.isArray( predicates ) ) {
_.each( predicates, function ( predicate ) {
self.withConstraint( predicate );
} );
}
return self;
},
withDebounce : function ( milliseconds, immediate ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
}
var fn = this.callback;
this.callback = _.debounce( fn, milliseconds, !!immediate );
return this;
},
withContext : function ( context ) {
this.context = context;
return this;
},
withDelay : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
}
this.callback.useStrategy(postal.configuration.strategies.setTimeout(milliseconds));
return this;
},
withDebounce : function ( milliseconds, immediate ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
}
var fn = this.callback;
this.callback = _.debounce( fn, milliseconds, !!immediate );
return this;
},
withThrottle : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
}
this.callback.useStrategy(postal.configuration.strategies.throttle(milliseconds));
return this;
},
withDelay : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
}
var self = this;
var fn = this.callback;
this.callback = function ( data, env ) {
setTimeout( function () {
fn.call( self.context, data, env );
}, milliseconds );
};
return this;
},
withThrottle : function ( milliseconds, options ) {
options = options || { };
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
}
var fn = this.callback;
this.callback = _.throttle( fn, milliseconds, options );
return this;
},
subscribe : function ( callback ) {
this.callback = callback;
return this;
}
subscribe : function ( callback ) {
this.callback = callback;
this.callback = new Strategy({
owner : this,
prop : "callback",
context : this, // TODO: is this the best option?
lazyInit : true
});
return this;
}
};

View file

@ -17,7 +17,9 @@
var postal;
var prevPostal = global.postal;
//import("ConsecutiveDistinctPredicate.js");
//import("strategy.js");
//import("strategies.js");
//import("ConsecutiveDistinctPredicate.js");
//import("DistinctPredicate.js");
//import("ChannelDefinition.js");
//import("SubscriptionDefinition.js");

66
src/strategies.js Normal file
View file

@ -0,0 +1,66 @@
/* global DistinctPredicate,ConsecutiveDistinctPredicate */
var strats = {
setTimeout: function(ms) {
return {
name: "setTimeout",
fn: function (next, data, envelope) {
setTimeout(function () {
next(data, envelope);
}, ms);
}
};
},
after: function(maxCalls, callback) {
var dispose = _.after(maxCalls, callback);
return {
name: "after",
fn: function (next, data, envelope) {
dispose();
next(data, envelope);
}
};
},
throttle : function(ms) {
return {
name: "throttle",
fn: _.throttle(function(next, data, envelope) {
next(data, envelope);
}, ms)
};
},
debounce: function(ms, immediate) {
return {
name: "debounce",
fn: _.debounce(function(next, data, envelope) {
next(data, envelope);
}, ms, !!immediate)
};
},
predicate: function(pred) {
return {
name: "predicate",
fn: function(next, data, envelope) {
if(pred.call(this, data, envelope)) {
next.call(this, data, envelope);
}
}
};
},
distinct : function(options) {
options = options || {};
var accessor = function(args) {
return args[0];
};
var check = options.all ?
new DistinctPredicate(accessor) :
new ConsecutiveDistinctPredicate(accessor);
return {
name : "distinct",
fn : function(next, data, envelope) {
if(check(data)) {
next(data, envelope);
}
}
};
}
};

68
src/strategy.js Normal file
View file

@ -0,0 +1,68 @@
var Strategy = function( options ) {
var _target = options.owner[options.prop];
if ( typeof _target !== "function" ) {
throw new Error( "Strategies can only target methods." );
}
var _strategies = [];
var _context = options.context || options.owner;
var strategy = function() {
var idx = 0;
var next = function next() {
var args = Array.prototype.slice.call( arguments, 0 );
var thisIdx = idx;
var strategy;
idx += 1;
if ( thisIdx < _strategies.length ) {
strategy = _strategies[thisIdx];
strategy.fn.apply( strategy.context || _context, [next].concat( args ) );
} else {
_target.apply( _context, args );
}
};
next.apply( this, arguments );
};
strategy.target = function() {
return _target;
};
strategy.context = function( ctx ) {
if ( arguments.length === 0 ) {
return _context;
} else {
_context = ctx;
}
};
strategy.strategies = function() {
return _strategies;
};
strategy.useStrategy = function( strategy ) {
var idx = 0,
exists = false;
while ( idx < _strategies.length ) {
if ( _strategies[idx].name === strategy.name ) {
_strategies[idx] = strategy;
exists = true;
break;
}
idx += 1;
}
if ( !exists ) {
_strategies.push( strategy );
}
};
strategy.reset = function() {
_strategies = [];
};
if ( options.lazyInit ) {
_target.useStrategy = function() {
options.owner[options.prop] = strategy;
strategy.useStrategy.apply( strategy, arguments );
};
_target.context = function() {
options.owner[options.prop] = strategy;
return strategy.context.apply( strategy, arguments );
};
return _target;
} else {
return strategy;
}
};