Updated Conduit code, slimmed down basic version

This commit is contained in:
ifandelse 2014-01-29 02:25:51 -05:00
parent 0cd75e6ddc
commit 7647d6b21e
18 changed files with 269 additions and 348 deletions

View file

@ -21,80 +21,17 @@
}(this, function (_, global, undefined) {
var _postal;
var prevPostal = global.postal;
var Strategy = function (options) {
var _target = options.owner[options.prop];
if (typeof _target !== "function") {
throw new Error("Strategies can only target methods.");
}
var _strategies = [];
var _context = options.context || options.owner;
var strategy = function () {
var idx = 0;
var next = function next() {
var args = Array.prototype.slice.call(arguments, 0);
var thisIdx = idx;
var strategy;
idx += 1;
if (thisIdx < _strategies.length) {
strategy = _strategies[thisIdx];
strategy.fn.apply(strategy.context || _context, [next].concat(args));
} else {
_target.apply(_context, args);
}
};
next.apply(this, arguments);
};
strategy.target = function () {
return _target;
};
strategy.context = function (ctx) {
if (arguments.length === 0) {
return _context;
} else {
_context = ctx;
}
};
strategy.strategies = function () {
return _strategies;
};
// TODO: add option to shift or push
strategy.useStrategy = function (strategy) {
var idx = 0,
exists = false;
while (idx < _strategies.length) {
if (_strategies[idx].name === strategy.name) {
_strategies[idx] = strategy;
exists = true;
break;
}
idx += 1;
}
if (!exists) {
_strategies.push(strategy);
}
};
strategy.reset = function () {
_strategies = [];
};
if (options.lazyInit) {
_target.useStrategy = function () {
options.owner[options.prop] = strategy;
strategy.useStrategy.apply(strategy, arguments);
};
_target.context = function () {
options.owner[options.prop] = strategy;
return strategy.context.apply(strategy, arguments);
};
return _target;
} else {
return strategy;
}
};
var ChannelDefinition = function (channelName) {
this.channel = channelName || _postal.configuration.DEFAULT_CHANNEL;
this.initialize();
};
ChannelDefinition.prototype.initialize = function () {};
ChannelDefinition.prototype.subscribe = function () {
return _postal.subscribe(arguments.length === 1 ? new SubscriptionDefinition(this.channel, arguments[0].topic, arguments[0].callback) : new SubscriptionDefinition(this.channel, arguments[0], arguments[1]));
return _postal.subscribe({
channel: this.channel,
topic: (arguments.length === 1 ? arguments[0].topic : arguments[0]),
callback: (arguments.length === 1 ? arguments[0].callback : arguments[1])
});
};
ChannelDefinition.prototype.publish = function () {
var envelope = arguments.length === 1 ? (Object.prototype.toString.call(arguments[0]) === "[object String]" ? {
@ -104,7 +41,7 @@
data: arguments[1]
};
envelope.channel = this.channel;
return _postal.publish(envelope);
_postal.publish(envelope);
};
var SubscriptionDefinition = function (channel, topic, callback) {
if (arguments.length !== 3) {
@ -124,20 +61,12 @@
_postal.unsubscribe(this);
}
},
// Move strat optimization here....
subscribe: function (callback) {
this.callback = callback;
this.callback = new Strategy({
owner: this,
prop: "callback",
context: this,
// TODO: is this the best option?
lazyInit: true
});
return this;
},
withContext: function (context) {
this.callback.context(context);
this.context = context;
return this;
}
};
@ -178,13 +107,7 @@
};
var fireSub = function (subDef, envelope) {
if (!subDef.inactive && _postal.configuration.resolver.compare(subDef.topic, envelope.topic)) {
if (_.all(subDef.constraints, function (constraint) {
return constraint.call(subDef.context, envelope.data, envelope);
})) {
if (typeof subDef.callback === "function") {
subDef.callback.call(subDef.context, envelope.data, envelope);
}
}
subDef.callback.call(subDef.context || this, envelope.data, envelope);
}
};
var pubInProgress = 0;
@ -252,7 +175,6 @@
if (--pubInProgress === 0) {
clearUnSubQueue();
}
return envelope;
},
unsubscribe: function (subDef) {
if (pubInProgress) {

File diff suppressed because one or more lines are too long

View file

@ -21,6 +21,107 @@
}(this, function (_, global, undefined) {
var _postal;
var prevPostal = global.postal;
var Conduit = function (options) {
if (typeof options.target !== "function") {
throw new Error("You can only make functions into Conduits.");
}
var _steps = {
pre: options.pre || [],
post: options.post || [],
all: []
};
var _defaultContext = options.context;
var _targetStep = {
isTarget: true,
fn: function (next) {
var args = Array.prototype.slice.call(arguments, 1);
options.target.apply(_defaultContext, args);
next.apply(this, args);
}
};
var _genPipeline = function () {
_steps.all = _steps.pre.concat([_targetStep].concat(_steps.post));
};
_genPipeline();
var conduit = function () {
var idx = 0;
var next = function next() {
var args = Array.prototype.slice.call(arguments, 0);
var thisIdx = idx;
var step;
idx += 1;
if (thisIdx < _steps.all.length) {
step = _steps.all[thisIdx];
step.fn.apply(step.context || _defaultContext, [next].concat(args));
}
};
next.apply(this, arguments);
};
conduit.steps = function () {
return _steps.all;
};
conduit.context = function (ctx) {
if (arguments.length === 0) {
return _defaultContext;
} else {
_defaultContext = ctx;
}
};
conduit.before = function (step, options) {
step = typeof step === "function" ? {
fn: step
} : step;
options = options || {};
if (options.prepend) {
_steps.pre.unshift(step);
} else {
_steps.pre.push(step);
}
_genPipeline();
};
conduit.after = function (step, options) {
step = typeof step === "function" ? {
fn: step
} : step;
options = options || {};
if (options.prepend) {
_steps.post.unshift(step);
} else {
_steps.post.push(step);
}
_genPipeline();
};
conduit.clear = function () {
_steps = {
pre: [],
post: [],
all: []
};
};
return conduit;
};
var ChannelDefinition = function (channelName) {
this.channel = channelName || _postal.configuration.DEFAULT_CHANNEL;
this.initialize();
};
ChannelDefinition.prototype.initialize = function () {};
ChannelDefinition.prototype.subscribe = function () {
return _postal.subscribe({
channel: this.channel,
topic: (arguments.length === 1 ? arguments[0].topic : arguments[0]),
callback: (arguments.length === 1 ? arguments[0].callback : arguments[1])
});
};
ChannelDefinition.prototype.publish = function () {
var envelope = arguments.length === 1 ? (Object.prototype.toString.call(arguments[0]) === "[object String]" ? {
topic: arguments[0]
} : arguments[0]) : {
topic: arguments[0],
data: arguments[1]
};
envelope.channel = this.channel;
_postal.publish(envelope);
};
var SubscriptionDefinition = function (channel, topic, callback) {
if (arguments.length !== 3) {
throw new Error("You must provide a channel, topic and callback when creating a SubscriptionDefinition instance.");
@ -39,92 +140,15 @@
_postal.unsubscribe(this);
}
},
// Move strat optimization here....
subscribe: function (callback) {
this.callback = callback;
this.callback = new Strategy({
owner: this,
prop: "callback",
context: this,
// TODO: is this the best option?
lazyInit: true
});
return this;
},
withContext: function (context) {
this.callback.context(context);
this.context = context;
return this;
}
};
var Strategy = function (options) {
var _target = options.owner[options.prop];
if (typeof _target !== "function") {
throw new Error("Strategies can only target methods.");
}
var _strategies = [];
var _context = options.context || options.owner;
var strategy = function () {
var idx = 0;
var next = function next() {
var args = Array.prototype.slice.call(arguments, 0);
var thisIdx = idx;
var strategy;
idx += 1;
if (thisIdx < _strategies.length) {
strategy = _strategies[thisIdx];
strategy.fn.apply(strategy.context || _context, [next].concat(args));
} else {
_target.apply(_context, args);
}
};
next.apply(this, arguments);
};
strategy.target = function () {
return _target;
};
strategy.context = function (ctx) {
if (arguments.length === 0) {
return _context;
} else {
_context = ctx;
}
};
strategy.strategies = function () {
return _strategies;
};
// TODO: add option to shift or push
strategy.useStrategy = function (strategy) {
var idx = 0,
exists = false;
while (idx < _strategies.length) {
if (_strategies[idx].name === strategy.name) {
_strategies[idx] = strategy;
exists = true;
break;
}
idx += 1;
}
if (!exists) {
_strategies.push(strategy);
}
};
strategy.reset = function () {
_strategies = [];
};
if (options.lazyInit) {
_target.useStrategy = function () {
options.owner[options.prop] = strategy;
strategy.useStrategy.apply(strategy, arguments);
};
_target.context = function () {
options.owner[options.prop] = strategy;
return strategy.context.apply(strategy, arguments);
};
return _target;
} else {
return strategy;
}
};
var ConsecutiveDistinctPredicate = function () {
var previous;
return function (data) {
@ -237,22 +261,22 @@
}
};
SubscriptionDefinition.prototype.defer = function () {
this.callback.useStrategy(strats.defer());
this.callback.before(strats.defer());
return this;
};
SubscriptionDefinition.prototype.disposeAfter = function (maxCalls) {
var self = this;
self.callback.useStrategy(strats.stopAfter(maxCalls, function () {
self.callback.before(strats.stopAfter(maxCalls, function () {
self.unsubscribe.call(self);
}));
return self;
};
SubscriptionDefinition.prototype.distinctUntilChanged = function () {
this.callback.useStrategy(strats.distinct());
this.callback.before(strats.distinct());
return this;
};
SubscriptionDefinition.prototype.distinct = function () {
this.callback.useStrategy(strats.distinct({
this.callback.before(strats.distinct({
all: true
}));
return this;
@ -262,36 +286,31 @@
return this;
};
SubscriptionDefinition.prototype.withConstraint = function (predicate) {
this.callback.useStrategy(strats.withConstraint(predicate));
this.callback.before(strats.withConstraint(predicate));
return this;
};
SubscriptionDefinition.prototype.withDebounce = function (milliseconds, immediate) {
this.callback.useStrategy(strats.withDebounce(milliseconds, immediate));
this.callback.before(strats.withDebounce(milliseconds, immediate));
return this;
};
SubscriptionDefinition.prototype.withDelay = function (milliseconds) {
this.callback.useStrategy(strats.withDelay(milliseconds));
this.callback.before(strats.withDelay(milliseconds));
return this;
};
SubscriptionDefinition.prototype.withThrottle = function (milliseconds) {
this.callback.useStrategy(strats.withThrottle(milliseconds));
this.callback.before(strats.withThrottle(milliseconds));
return this;
};
var ChannelDefinition = function (channelName) {
this.channel = channelName || _postal.configuration.DEFAULT_CHANNEL;
SubscriptionDefinition.prototype.subscribe = function (callback) {
this.callback = new Conduit({
target: callback,
context: this
});
return this;
};
ChannelDefinition.prototype.subscribe = function () {
return _postal.subscribe(arguments.length === 1 ? new SubscriptionDefinition(this.channel, arguments[0].topic, arguments[0].callback) : new SubscriptionDefinition(this.channel, arguments[0], arguments[1]));
};
ChannelDefinition.prototype.publish = function () {
var envelope = arguments.length === 1 ? (Object.prototype.toString.call(arguments[0]) === "[object String]" ? {
topic: arguments[0]
} : arguments[0]) : {
topic: arguments[0],
data: arguments[1]
};
envelope.channel = this.channel;
return _postal.publish(envelope);
SubscriptionDefinition.prototype.withContext = function (context) {
this.callback.context(context);
return this;
};
var bindingsResolver = {
cache: {},
@ -330,13 +349,7 @@
};
var fireSub = function (subDef, envelope) {
if (!subDef.inactive && _postal.configuration.resolver.compare(subDef.topic, envelope.topic)) {
if (_.all(subDef.constraints, function (constraint) {
return constraint.call(subDef.context, envelope.data, envelope);
})) {
if (typeof subDef.callback === "function") {
subDef.callback.call(subDef.context, envelope.data, envelope);
}
}
subDef.callback.call(subDef.context || this, envelope.data, envelope);
}
};
var pubInProgress = 0;
@ -404,7 +417,6 @@
if (--pubInProgress === 0) {
clearUnSubQueue();
}
return envelope;
},
unsubscribe: function (subDef) {
if (pubInProgress) {

2
lib/postal.min.js vendored

File diff suppressed because one or more lines are too long

View file

@ -134,22 +134,22 @@
}
};
SubscriptionDefinition.prototype.defer = function () {
this.callback.useStrategy(strats.defer());
this.callback.before(strats.defer());
return this;
};
SubscriptionDefinition.prototype.disposeAfter = function (maxCalls) {
var self = this;
self.callback.useStrategy(strats.stopAfter(maxCalls, function () {
self.callback.before(strats.stopAfter(maxCalls, function () {
self.unsubscribe.call(self);
}));
return self;
};
SubscriptionDefinition.prototype.distinctUntilChanged = function () {
this.callback.useStrategy(strats.distinct());
this.callback.before(strats.distinct());
return this;
};
SubscriptionDefinition.prototype.distinct = function () {
this.callback.useStrategy(strats.distinct({
this.callback.before(strats.distinct({
all: true
}));
return this;
@ -159,19 +159,30 @@
return this;
};
SubscriptionDefinition.prototype.withConstraint = function (predicate) {
this.callback.useStrategy(strats.withConstraint(predicate));
this.callback.before(strats.withConstraint(predicate));
return this;
};
SubscriptionDefinition.prototype.withDebounce = function (milliseconds, immediate) {
this.callback.useStrategy(strats.withDebounce(milliseconds, immediate));
this.callback.before(strats.withDebounce(milliseconds, immediate));
return this;
};
SubscriptionDefinition.prototype.withDelay = function (milliseconds) {
this.callback.useStrategy(strats.withDelay(milliseconds));
this.callback.before(strats.withDelay(milliseconds));
return this;
};
SubscriptionDefinition.prototype.withThrottle = function (milliseconds) {
this.callback.useStrategy(strats.withThrottle(milliseconds));
this.callback.before(strats.withThrottle(milliseconds));
return this;
};
SubscriptionDefinition.prototype.subscribe = function (callback) {
this.callback = new Conduit({
target: callback,
context: this
});
return this;
};
SubscriptionDefinition.prototype.withContext = function (context) {
this.callback.context(context);
return this;
};
}(postal.SubscriptionDefinition));

View file

@ -5,4 +5,4 @@
* Url: http://github.com/postaljs/postal.js
* License(s): MIT, GPL
*/
(function(t,n){"object"==typeof module&&module.exports?module.exports=function(t){return n(t,this)}:"function"==typeof define&&define.amd?define(["postal"],function(e){return n(e,t)}):t.postal=n(t.postal,t)})(this,function(t){return function(t){var n=function(){var t;return function(n){var e=!1;return _.isString(n)?(e=n===t,t=n):(e=_.isEqual(n,t),t=_.clone(n)),!e}},e=function(){var t=[];return function(n){var e=!_.any(t,function(t){return _.isObject(n)||_.isArray(n)?_.isEqual(n,t):n===t});return e&&t.push(n),e}},i={withDelay:function(t){if(_.isNaN(t))throw"Milliseconds must be a number";return{name:"withDelay",fn:function(n,e,i){setTimeout(function(){n(e,i)},t)}}},defer:function(){return this.withDelay(0)},stopAfter:function(t,n){if(_.isNaN(t)||0>=t)throw"The value provided to disposeAfter (maxCalls) must be a number greater than zero.";var e=_.after(t,n);return{name:"stopAfter",fn:function(t,n,i){e(),t(n,i)}}},withThrottle:function(t){if(_.isNaN(t))throw"Milliseconds must be a number";return{name:"withThrottle",fn:_.throttle(function(t,n,e){t(n,e)},t)}},withDebounce:function(t,n){if(_.isNaN(t))throw"Milliseconds must be a number";return{name:"debounce",fn:_.debounce(function(t,n,e){t(n,e)},t,!!n)}},withConstraint:function(t){if(!_.isFunction(t))throw"Predicate constraint must be a function";return{name:"withConstraint",fn:function(n,e,i){t.call(this,e,i)&&n.call(this,e,i)}}},distinct:function(t){t=t||{};var i=function(t){return t[0]},r=t.all?new e(i):new n(i);return{name:"distinct",fn:function(t,n,e){r(n)&&t(n,e)}}}};t.prototype.defer=function(){return this.callback.useStrategy(i.defer()),this},t.prototype.disposeAfter=function(t){var n=this;return n.callback.useStrategy(i.stopAfter(t,function(){n.unsubscribe.call(n)})),n},t.prototype.distinctUntilChanged=function(){return this.callback.useStrategy(i.distinct()),this},t.prototype.distinct=function(){return this.callback.useStrategy(i.distinct({all:!0})),this},t.prototype.once=function(){return this.disposeAfter(1),this},t.prototype.withConstraint=function(t){return this.callback.useStrategy(i.withConstraint(t)),this},t.prototype.withDebounce=function(t,n){return this.callback.useStrategy(i.withDebounce(t,n)),this},t.prototype.withDelay=function(t){return this.callback.useStrategy(i.withDelay(t)),this},t.prototype.withThrottle=function(t){return this.callback.useStrategy(i.withThrottle(t)),this}}(t.SubscriptionDefinition),t});
(function(t,n){"object"==typeof module&&module.exports?module.exports=function(t){return n(t,this)}:"function"==typeof define&&define.amd?define(["postal"],function(e){return n(e,t)}):t.postal=n(t.postal,t)})(this,function(t){return function(t){var n=function(){var t;return function(n){var e=!1;return _.isString(n)?(e=n===t,t=n):(e=_.isEqual(n,t),t=_.clone(n)),!e}},e=function(){var t=[];return function(n){var e=!_.any(t,function(t){return _.isObject(n)||_.isArray(n)?_.isEqual(n,t):n===t});return e&&t.push(n),e}},i={withDelay:function(t){if(_.isNaN(t))throw"Milliseconds must be a number";return{name:"withDelay",fn:function(n,e,i){setTimeout(function(){n(e,i)},t)}}},defer:function(){return this.withDelay(0)},stopAfter:function(t,n){if(_.isNaN(t)||0>=t)throw"The value provided to disposeAfter (maxCalls) must be a number greater than zero.";var e=_.after(t,n);return{name:"stopAfter",fn:function(t,n,i){e(),t(n,i)}}},withThrottle:function(t){if(_.isNaN(t))throw"Milliseconds must be a number";return{name:"withThrottle",fn:_.throttle(function(t,n,e){t(n,e)},t)}},withDebounce:function(t,n){if(_.isNaN(t))throw"Milliseconds must be a number";return{name:"debounce",fn:_.debounce(function(t,n,e){t(n,e)},t,!!n)}},withConstraint:function(t){if(!_.isFunction(t))throw"Predicate constraint must be a function";return{name:"withConstraint",fn:function(n,e,i){t.call(this,e,i)&&n.call(this,e,i)}}},distinct:function(t){t=t||{};var i=function(t){return t[0]},r=t.all?new e(i):new n(i);return{name:"distinct",fn:function(t,n,e){r(n)&&t(n,e)}}}};t.prototype.defer=function(){return this.callback.before(i.defer()),this},t.prototype.disposeAfter=function(t){var n=this;return n.callback.before(i.stopAfter(t,function(){n.unsubscribe.call(n)})),n},t.prototype.distinctUntilChanged=function(){return this.callback.before(i.distinct()),this},t.prototype.distinct=function(){return this.callback.before(i.distinct({all:!0})),this},t.prototype.once=function(){return this.disposeAfter(1),this},t.prototype.withConstraint=function(t){return this.callback.before(i.withConstraint(t)),this},t.prototype.withDebounce=function(t,n){return this.callback.before(i.withDebounce(t,n)),this},t.prototype.withDelay=function(t){return this.callback.before(i.withDelay(t)),this},t.prototype.withThrottle=function(t){return this.callback.before(i.withThrottle(t)),this},t.prototype.subscribe=function(t){return this.callback=new Conduit({target:t,context:this}),this},t.prototype.withContext=function(t){return this.callback.context(t),this}}(t.SubscriptionDefinition),t});

View file

@ -18,7 +18,6 @@
mocha.setup({ ui: 'bdd', timeout: 60000 });
</script>
<script type="text/javascript" src="../lib/postal.js"></script>
<script type="text/javascript" src="../lib/strategies-add-on/postal.strategies.js"></script>
<script type="text/javascript" src="postaljs.spec.js"></script>
<script type="text/javascript" src="utils.spec.js"></script>
<script type="text/javascript" src="linkedChannels.spec.js"></script>

View file

@ -63,9 +63,6 @@
it( "should have set subscription topic value", function () {
expect( sub.topic ).to.be( "MyTopic" );
} );
it( "should have defaulted the subscription context value", function () {
expect( sub.callback.context() ).to.be( sub );
} );
it( "should have captured subscription creation event", function () {
expect( caughtSubscribeEvent ).to.be.ok();
} );
@ -162,8 +159,7 @@
channel = postal.channel( "ContextChannel" );
subscription = channel.subscribe( "MyTopic", function ( data ) {
this.increment();
} )
.withContext( obj );
}).withContext( obj );
channel.publish( "MyTopic", "Testing123" );
} );
after( function () {
@ -201,9 +197,6 @@
it( "should have set subscription topic value", function () {
expect( sub.topic ).to.be( "MyTopic" );
} );
it( "should have defaulted the subscription context value", function () {
expect( sub.callback.context() ).to.be( sub );
} );
} );
});
@ -231,16 +224,6 @@
expect( msgData ).to.be( "Testing123" );
} );
} );
describe( "When publishing on a channel where no subscribers exist", function () {
it( "should return expected results for MyChannel/MyTopic", function () {
var env = postal.publish( {
channel : "NoOneIsUsingThisOne",
topic : "This.Is.A.Lonely.Topic",
data : "Y U NO SUBSCRIBE TO ME?"
} );
expect( !_.isEmpty( env ) ).to.be( true );
} );
} );
describe( "When using global publish api", function () {
var msgReceivedCnt = 0,
msgData;

View file

@ -9,7 +9,7 @@
var caughtSubscribeEvent = false;
var caughtUnsubscribeEvent = false;
describe("Subscription Creation - Strategies", function(){
describe("Subscription Creation - Pipeline Steps", function(){
describe( "When subscribing and ignoring duplicates", function () {
var subInvokedCnt = 0;
before( function () {
@ -186,10 +186,10 @@
recvd = true;
} )
.withConstraint(function () {
return false;
return true;
})
.withConstraint(function () {
return false;
return true;
})
.withConstraint(function () {
return true;
@ -200,8 +200,8 @@
postal.reset();
recvd = false;
} );
it( "should overwrite constraint with last one passed in", function () {
expect( subscription.callback.strategies().length ).to.be( 1 );
it( "should show all 4 constraints added", function () {
expect( subscription.callback.steps().length ).to.be( 4 );
} );
it( "should have invoked the callback", function () {
expect( recvd ).to.be.ok();
@ -224,7 +224,7 @@
recvd = false;
} );
it( "should have a constraint on the subscription", function () {
expect( subscription.callback.strategies()[0].name ).to.be( "withConstraint" );
expect( subscription.callback.steps()[0].name ).to.be( "withConstraint" );
} );
it( "should not have invoked the subscription callback", function () {
expect( recvd ).to.not.be.ok();
@ -247,7 +247,7 @@
recvd = false;
} );
it( "should have a constraint on the subscription", function () {
expect( subscription.callback.strategies()[0].name ).to.be( "withConstraint" );
expect( subscription.callback.steps()[0].name ).to.be( "withConstraint" );
} );
it( "should have invoked the subscription callback", function () {
expect( recvd ).to.be.ok();

View file

@ -19,9 +19,6 @@
it( "should set the topic to SubDefTestTopic", function () {
expect( sDef.topic ).to.be( "SubDefTestTopic" );
} );
it( "should set the callback", function () {
expect( sDef.callback ).to.be( NO_OP );
} );
it( "should default the context", function () {
expect( sDef.context ).to.be( undefined );
} );
@ -45,15 +42,5 @@
expect( name ).to.be( "Rose" );
} );
} );
describe( "When calling subscribe to set the callback", function () {
var sDefe = new SubscriptionDefinition( "TestChannel", "TestTopic", NO_OP ),
fn = function () {};
sDefe.subscribe( fn );
it( "Should set the callback", function () {
expect( sDefe.callback ).to.be( fn );
} );
} );
} );
}());

View file

@ -2,13 +2,7 @@
/*jshint -W020 */
var fireSub = function ( subDef, envelope ) {
if ( !subDef.inactive && _postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
if ( typeof subDef.callback === "function" ) {
subDef.callback.call( subDef.context, envelope.data, envelope );
}
}
subDef.callback.call( subDef.context || this, envelope.data, envelope );
}
};
var pubInProgress = 0;
@ -78,7 +72,6 @@ _postal = {
if ( --pubInProgress === 0 ) {
clearUnSubQueue();
}
return envelope;
},
unsubscribe: function( subDef ) {

View file

@ -1,12 +1,16 @@
/* global _postal, SubscriptionDefinition */
var ChannelDefinition = function ( channelName ) {
this.channel = channelName || _postal.configuration.DEFAULT_CHANNEL;
this.initialize();
};
ChannelDefinition.prototype.initialize = function() {};
ChannelDefinition.prototype.subscribe = function () {
return _postal.subscribe(arguments.length === 1 ?
new SubscriptionDefinition( this.channel, arguments[0].topic, arguments[0].callback ) :
new SubscriptionDefinition( this.channel, arguments[0], arguments[1] ));
return _postal.subscribe({
channel: this.channel,
topic: (arguments.length === 1 ? arguments[0].topic : arguments[0]),
callback: (arguments.length === 1 ? arguments[0].callback : arguments[1])
});
};
ChannelDefinition.prototype.publish = function () {
@ -16,5 +20,5 @@ ChannelDefinition.prototype.publish = function () {
arguments[0] ) :
{ topic : arguments[0], data : arguments[1] };
envelope.channel = this.channel;
return _postal.publish( envelope );
_postal.publish( envelope );
};

View file

@ -20,20 +20,13 @@ SubscriptionDefinition.prototype = {
}
},
// Move strat optimization here....
subscribe : function ( callback ) {
this.callback = callback;
this.callback = new Strategy({
owner : this,
prop : "callback",
context : this, // TODO: is this the best option?
lazyInit : true
});
return this;
},
withContext : function ( context ) {
this.callback.context(context);
this.context = context;
return this;
}
};

75
src/conduit.js Normal file
View file

@ -0,0 +1,75 @@
var Conduit = function( options ) {
if ( typeof options.target !== "function" ) {
throw new Error( "You can only make functions into Conduits." );
}
var _steps = {
pre : options.pre || [],
post: options.post || [],
all : []
};
var _defaultContext = options.context;
var _targetStep = {
isTarget: true,
fn: function(next) {
var args = Array.prototype.slice.call( arguments, 1 );
options.target.apply( _defaultContext, args);
next.apply(this, args);
}
};
var _genPipeline = function() {
_steps.all = _steps.pre.concat([_targetStep].concat(_steps.post));
};
_genPipeline();
var conduit = function() {
var idx = 0;
var next = function next() {
var args = Array.prototype.slice.call( arguments, 0 );
var thisIdx = idx;
var step;
idx += 1;
if (thisIdx < _steps.all.length ) {
step = _steps.all[thisIdx];
step.fn.apply( step.context || _defaultContext, [next].concat( args ) );
}
};
next.apply( this, arguments );
};
conduit.steps = function() {
return _steps.all;
};
conduit.context = function( ctx ) {
if ( arguments.length === 0 ) {
return _defaultContext;
} else {
_defaultContext = ctx;
}
};
conduit.before = function(step, options) {
step = typeof step === "function" ? { fn: step } : step;
options = options || {};
if(options.prepend) {
_steps.pre.unshift( step );
} else {
_steps.pre.push( step );
}
_genPipeline();
};
conduit.after = function(step, options) {
step = typeof step === "function" ? { fn: step } : step;
options = options || {};
if(options.prepend) {
_steps.post.unshift( step );
} else {
_steps.post.push( step );
}
_genPipeline();
};
conduit.clear = function() {
_steps = {
pre : [],
post: [],
all : []
};
};
return conduit;
};

View file

@ -17,7 +17,6 @@
var _postal;
var prevPostal = global.postal;
//import("strategy.js");
//import("ChannelDefinition.js");
//import("SubscriptionDefinition.js");
//import("AmqpBindingsResolver.js");

View file

@ -17,10 +17,10 @@
var _postal;
var prevPostal = global.postal;
//import("conduit.js");
//import("ChannelDefinition.js");
//import("SubscriptionDefinition.js");
//import("strategy.js");
//import("strategies.js");
//import("ChannelDefinition.js");
//import("AmqpBindingsResolver.js");
//import("Api.js");

View file

@ -1,4 +1,4 @@
/* global DistinctPredicate,ConsecutiveDistinctPredicate,SubscriptionDefinition */
/* global DistinctPredicate,ConsecutiveDistinctPredicate,SubscriptionDefinition,Conduit */
/*jshint -W098 */
var ConsecutiveDistinctPredicate = function () {
var previous;
@ -117,25 +117,25 @@ var strats = {
};
SubscriptionDefinition.prototype.defer = function () {
this.callback.useStrategy(strats.defer());
this.callback.before(strats.defer());
return this;
};
SubscriptionDefinition.prototype.disposeAfter = function ( maxCalls ) {
var self = this;
self.callback.useStrategy(strats.stopAfter(maxCalls, function() {
self.callback.before(strats.stopAfter(maxCalls, function() {
self.unsubscribe.call(self);
}));
return self;
};
SubscriptionDefinition.prototype.distinctUntilChanged = function () {
this.callback.useStrategy(strats.distinct());
this.callback.before(strats.distinct());
return this;
};
SubscriptionDefinition.prototype.distinct = function () {
this.callback.useStrategy(strats.distinct({ all : true }));
this.callback.before(strats.distinct({ all : true }));
return this;
};
@ -145,21 +145,34 @@ SubscriptionDefinition.prototype.once = function () {
};
SubscriptionDefinition.prototype.withConstraint = function ( predicate ) {
this.callback.useStrategy(strats.withConstraint(predicate));
this.callback.before(strats.withConstraint(predicate));
return this;
};
SubscriptionDefinition.prototype.withDebounce = function ( milliseconds, immediate ) {
this.callback.useStrategy(strats.withDebounce(milliseconds, immediate));
this.callback.before(strats.withDebounce(milliseconds, immediate));
return this;
};
SubscriptionDefinition.prototype.withDelay = function ( milliseconds ) {
this.callback.useStrategy(strats.withDelay(milliseconds));
this.callback.before(strats.withDelay(milliseconds));
return this;
};
SubscriptionDefinition.prototype.withThrottle = function ( milliseconds ) {
this.callback.useStrategy(strats.withThrottle(milliseconds));
this.callback.before(strats.withThrottle(milliseconds));
return this;
};
SubscriptionDefinition.prototype.subscribe = function ( callback ) {
this.callback = new Conduit({
target : callback,
context : this
});
return this;
};
SubscriptionDefinition.prototype.withContext = function ( context ) {
this.callback.context(context);
return this;
};

View file

@ -1,70 +0,0 @@
var Strategy = function( options ) {
var _target = options.owner[options.prop];
if ( typeof _target !== "function" ) {
throw new Error( "Strategies can only target methods." );
}
var _strategies = [];
var _context = options.context || options.owner;
var strategy = function() {
var idx = 0;
var next = function next() {
var args = Array.prototype.slice.call( arguments, 0 );
var thisIdx = idx;
var strategy;
idx += 1;
if ( thisIdx < _strategies.length ) {
strategy = _strategies[thisIdx];
strategy.fn.apply( strategy.context || _context, [next].concat( args ) );
} else {
_target.apply( _context, args );
}
};
next.apply( this, arguments );
};
strategy.target = function() {
return _target;
};
strategy.context = function( ctx ) {
if ( arguments.length === 0 ) {
return _context;
} else {
_context = ctx;
}
};
strategy.strategies = function() {
return _strategies;
};
// TODO: add option to shift or push
strategy.useStrategy = function( strategy ) {
var idx = 0,
exists = false;
while ( idx < _strategies.length ) {
if ( _strategies[idx].name === strategy.name ) {
_strategies[idx] = strategy;
exists = true;
break;
}
idx += 1;
}
if ( !exists ) {
_strategies.push( strategy );
}
};
strategy.reset = function() {
_strategies = [];
};
if ( options.lazyInit ) {
_target.useStrategy = function() {
options.owner[options.prop] = strategy;
strategy.useStrategy.apply( strategy, arguments );
};
_target.context = function() {
options.owner[options.prop] = strategy;
return strategy.context.apply( strategy, arguments );
};
return _target;
} else {
return strategy;
}
};