diff --git a/example/amd/js/libs/postal/postal.js b/example/amd/js/libs/postal/postal.js index e96e762..cf48133 100755 --- a/example/amd/js/libs/postal/postal.js +++ b/example/amd/js/libs/postal/postal.js @@ -42,7 +42,7 @@ }; var DistinctPredicate = function () { var previous = []; - + return function ( data ) { var isDistinct = !_.any( previous, function ( p ) { if ( _.isObject( data ) || _.isArray( data ) ) { @@ -59,13 +59,13 @@ var ChannelDefinition = function ( channelName ) { this.channel = channelName || DEFAULT_CHANNEL; }; - + ChannelDefinition.prototype.subscribe = function () { return arguments.length === 1 ? new SubscriptionDefinition( this.channel, arguments[0].topic, arguments[0].callback ) : new SubscriptionDefinition( this.channel, arguments[0], arguments[1] ); }; - + ChannelDefinition.prototype.publish = function () { var envelope = arguments.length === 1 ? (Object.prototype.toString.call(arguments[0]) === '[object String]' ? @@ -90,7 +90,7 @@ } ); postal.configuration.bus.subscribe( this ); }; - + SubscriptionDefinition.prototype = { unsubscribe : function () { this.inactive = true; @@ -105,7 +105,7 @@ } } ); }, - + defer : function () { var fn = this.callback; this.callback = function ( data ) { @@ -115,7 +115,7 @@ }; return this; }, - + disposeAfter : function ( maxCalls ) { if ( _.isNaN( maxCalls ) || maxCalls <= 0 ) { throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero."; @@ -124,29 +124,29 @@ var dispose = _.after( maxCalls, _.bind( function () { this.unsubscribe(); }, this ) ); - + this.callback = function () { fn.apply( this.context, arguments ); dispose(); }; return this; }, - + distinctUntilChanged : function () { this.withConstraint( new ConsecutiveDistinctPredicate() ); return this; }, - + distinct : function () { this.withConstraint( new DistinctPredicate() ); return this; }, - + once : function () { this.disposeAfter( 1 ); return this; }, - + withConstraint : function ( predicate ) { if ( !_.isFunction( predicate ) ) { throw "Predicate constraint must be a function"; @@ -154,7 +154,7 @@ this.constraints.push( predicate ); return this; }, - + withConstraints : function ( predicates ) { var self = this; if ( _.isArray( predicates ) ) { @@ -164,12 +164,12 @@ } return self; }, - + withContext : function ( context ) { this.context = context; return this; }, - + withDebounce : function ( milliseconds ) { if ( _.isNaN( milliseconds ) ) { throw "Milliseconds must be a number"; @@ -178,7 +178,7 @@ this.callback = _.debounce( fn, milliseconds ); return this; }, - + withDelay : function ( milliseconds ) { if ( _.isNaN( milliseconds ) ) { throw "Milliseconds must be a number"; @@ -191,7 +191,7 @@ }; return this; }, - + withThrottle : function ( milliseconds ) { if ( _.isNaN( milliseconds ) ) { throw "Milliseconds must be a number"; @@ -200,7 +200,7 @@ this.callback = _.throttle( fn, milliseconds ); return this; }, - + subscribe : function ( callback ) { this.callback = callback; return this; @@ -209,7 +209,7 @@ var bindingsResolver = { cache : {}, regex : {}, - + compare : function ( binding, topic ) { var pattern, rgx, prevSegment, result = (this.cache[topic] && this.cache[topic][binding]); if(typeof result !== "undefined") { @@ -237,7 +237,7 @@ this.cache[topic][binding] = result = rgx.test( topic ); return result; }, - + reset : function () { this.cache = {}; this.regex = {}; @@ -254,15 +254,15 @@ } } }; - - var pubInProgress = false; + + var pubInProgress = 0; var unSubQueue = []; var clearUnSubQueue = function() { while(unSubQueue.length) { unSubQueue.shift().unsubscribe(); } }; - + var localBus = { addWireTap : function ( callback ) { var self = this; @@ -274,9 +274,9 @@ } }; }, - + publish : function ( envelope ) { - pubInProgress = true; + ++pubInProgress; envelope.timeStamp = new Date(); _.each( this.wireTaps, function ( tap ) { tap( envelope.data, envelope ); @@ -291,10 +291,12 @@ } } ); } - pubInProgress = false; + if (--pubInProgress == 0) { + clearUnSubQueue(); + } return envelope; }, - + reset : function () { if ( this.subscriptions ) { _.each( this.subscriptions, function ( channel ) { @@ -307,7 +309,7 @@ this.subscriptions = {}; } }, - + subscribe : function ( subDef ) { var idx, found, fn, channel = this.subscriptions[subDef.channel], subs; if ( !channel ) { @@ -320,13 +322,13 @@ subs.push( subDef ); return subDef; }, - + subscriptions : {}, - + wireTaps : [], - + unsubscribe : function ( config ) { - if(pubInProgress) { + if (pubInProgress) { unSubQueue.push(config); return; } @@ -351,27 +353,27 @@ DEFAULT_CHANNEL : DEFAULT_CHANNEL, SYSTEM_CHANNEL : SYSTEM_CHANNEL }, - + ChannelDefinition : ChannelDefinition, SubscriptionDefinition : SubscriptionDefinition, - + channel : function ( channelName ) { return new ChannelDefinition( channelName ); }, - + subscribe : function ( options ) { return new SubscriptionDefinition( options.channel || DEFAULT_CHANNEL, options.topic, options.callback ); }, - + publish : function ( envelope ) { envelope.channel = envelope.channel || DEFAULT_CHANNEL; return postal.configuration.bus.publish( envelope ); }, - + addWireTap : function ( callback ) { return this.configuration.bus.addWireTap( callback ); }, - + linkChannels : function ( sources, destinations ) { var result = []; sources = !_.isArray( sources ) ? [sources] : sources; @@ -397,7 +399,7 @@ } ); return result; }, - + utils : { getSubscribersFor : function () { var channel = arguments[ 0 ], @@ -412,7 +414,7 @@ } return []; }, - + reset : function () { postal.configuration.bus.reset(); postal.configuration.resolver.reset(); diff --git a/example/standard/js/postal.js b/example/standard/js/postal.js index e96e762..e4a4051 100755 --- a/example/standard/js/postal.js +++ b/example/standard/js/postal.js @@ -255,7 +255,7 @@ } }; - var pubInProgress = false; + var pubInProgress = 0; var unSubQueue = []; var clearUnSubQueue = function() { while(unSubQueue.length) { @@ -276,7 +276,7 @@ }, publish : function ( envelope ) { - pubInProgress = true; + ++pubInProgress; envelope.timeStamp = new Date(); _.each( this.wireTaps, function ( tap ) { tap( envelope.data, envelope ); @@ -291,7 +291,9 @@ } } ); } - pubInProgress = false; + if (--pubInProgress == 0) { + clearUnSubQueue(); + } return envelope; }, @@ -326,7 +328,7 @@ wireTaps : [], unsubscribe : function ( config ) { - if(pubInProgress) { + if (pubInProgress) { unSubQueue.push(config); return; } diff --git a/lib/postal.js b/lib/postal.js index e96e762..cf48133 100755 --- a/lib/postal.js +++ b/lib/postal.js @@ -42,7 +42,7 @@ }; var DistinctPredicate = function () { var previous = []; - + return function ( data ) { var isDistinct = !_.any( previous, function ( p ) { if ( _.isObject( data ) || _.isArray( data ) ) { @@ -59,13 +59,13 @@ var ChannelDefinition = function ( channelName ) { this.channel = channelName || DEFAULT_CHANNEL; }; - + ChannelDefinition.prototype.subscribe = function () { return arguments.length === 1 ? new SubscriptionDefinition( this.channel, arguments[0].topic, arguments[0].callback ) : new SubscriptionDefinition( this.channel, arguments[0], arguments[1] ); }; - + ChannelDefinition.prototype.publish = function () { var envelope = arguments.length === 1 ? (Object.prototype.toString.call(arguments[0]) === '[object String]' ? @@ -90,7 +90,7 @@ } ); postal.configuration.bus.subscribe( this ); }; - + SubscriptionDefinition.prototype = { unsubscribe : function () { this.inactive = true; @@ -105,7 +105,7 @@ } } ); }, - + defer : function () { var fn = this.callback; this.callback = function ( data ) { @@ -115,7 +115,7 @@ }; return this; }, - + disposeAfter : function ( maxCalls ) { if ( _.isNaN( maxCalls ) || maxCalls <= 0 ) { throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero."; @@ -124,29 +124,29 @@ var dispose = _.after( maxCalls, _.bind( function () { this.unsubscribe(); }, this ) ); - + this.callback = function () { fn.apply( this.context, arguments ); dispose(); }; return this; }, - + distinctUntilChanged : function () { this.withConstraint( new ConsecutiveDistinctPredicate() ); return this; }, - + distinct : function () { this.withConstraint( new DistinctPredicate() ); return this; }, - + once : function () { this.disposeAfter( 1 ); return this; }, - + withConstraint : function ( predicate ) { if ( !_.isFunction( predicate ) ) { throw "Predicate constraint must be a function"; @@ -154,7 +154,7 @@ this.constraints.push( predicate ); return this; }, - + withConstraints : function ( predicates ) { var self = this; if ( _.isArray( predicates ) ) { @@ -164,12 +164,12 @@ } return self; }, - + withContext : function ( context ) { this.context = context; return this; }, - + withDebounce : function ( milliseconds ) { if ( _.isNaN( milliseconds ) ) { throw "Milliseconds must be a number"; @@ -178,7 +178,7 @@ this.callback = _.debounce( fn, milliseconds ); return this; }, - + withDelay : function ( milliseconds ) { if ( _.isNaN( milliseconds ) ) { throw "Milliseconds must be a number"; @@ -191,7 +191,7 @@ }; return this; }, - + withThrottle : function ( milliseconds ) { if ( _.isNaN( milliseconds ) ) { throw "Milliseconds must be a number"; @@ -200,7 +200,7 @@ this.callback = _.throttle( fn, milliseconds ); return this; }, - + subscribe : function ( callback ) { this.callback = callback; return this; @@ -209,7 +209,7 @@ var bindingsResolver = { cache : {}, regex : {}, - + compare : function ( binding, topic ) { var pattern, rgx, prevSegment, result = (this.cache[topic] && this.cache[topic][binding]); if(typeof result !== "undefined") { @@ -237,7 +237,7 @@ this.cache[topic][binding] = result = rgx.test( topic ); return result; }, - + reset : function () { this.cache = {}; this.regex = {}; @@ -254,15 +254,15 @@ } } }; - - var pubInProgress = false; + + var pubInProgress = 0; var unSubQueue = []; var clearUnSubQueue = function() { while(unSubQueue.length) { unSubQueue.shift().unsubscribe(); } }; - + var localBus = { addWireTap : function ( callback ) { var self = this; @@ -274,9 +274,9 @@ } }; }, - + publish : function ( envelope ) { - pubInProgress = true; + ++pubInProgress; envelope.timeStamp = new Date(); _.each( this.wireTaps, function ( tap ) { tap( envelope.data, envelope ); @@ -291,10 +291,12 @@ } } ); } - pubInProgress = false; + if (--pubInProgress == 0) { + clearUnSubQueue(); + } return envelope; }, - + reset : function () { if ( this.subscriptions ) { _.each( this.subscriptions, function ( channel ) { @@ -307,7 +309,7 @@ this.subscriptions = {}; } }, - + subscribe : function ( subDef ) { var idx, found, fn, channel = this.subscriptions[subDef.channel], subs; if ( !channel ) { @@ -320,13 +322,13 @@ subs.push( subDef ); return subDef; }, - + subscriptions : {}, - + wireTaps : [], - + unsubscribe : function ( config ) { - if(pubInProgress) { + if (pubInProgress) { unSubQueue.push(config); return; } @@ -351,27 +353,27 @@ DEFAULT_CHANNEL : DEFAULT_CHANNEL, SYSTEM_CHANNEL : SYSTEM_CHANNEL }, - + ChannelDefinition : ChannelDefinition, SubscriptionDefinition : SubscriptionDefinition, - + channel : function ( channelName ) { return new ChannelDefinition( channelName ); }, - + subscribe : function ( options ) { return new SubscriptionDefinition( options.channel || DEFAULT_CHANNEL, options.topic, options.callback ); }, - + publish : function ( envelope ) { envelope.channel = envelope.channel || DEFAULT_CHANNEL; return postal.configuration.bus.publish( envelope ); }, - + addWireTap : function ( callback ) { return this.configuration.bus.addWireTap( callback ); }, - + linkChannels : function ( sources, destinations ) { var result = []; sources = !_.isArray( sources ) ? [sources] : sources; @@ -397,7 +399,7 @@ } ); return result; }, - + utils : { getSubscribersFor : function () { var channel = arguments[ 0 ], @@ -412,7 +414,7 @@ } return []; }, - + reset : function () { postal.configuration.bus.reset(); postal.configuration.resolver.reset(); diff --git a/src/LocalBus.js b/src/LocalBus.js index ccaceae..fccdfc6 100644 --- a/src/LocalBus.js +++ b/src/LocalBus.js @@ -10,7 +10,7 @@ var fireSub = function(subDef, envelope) { } }; -var pubInProgress = false; +var pubInProgress = 0; var unSubQueue = []; var clearUnSubQueue = function() { while(unSubQueue.length) { @@ -31,7 +31,7 @@ var localBus = { }, publish : function ( envelope ) { - pubInProgress = true; + ++pubInProgress; envelope.timeStamp = new Date(); _.each( this.wireTaps, function ( tap ) { tap( envelope.data, envelope ); @@ -46,7 +46,9 @@ var localBus = { } } ); } - pubInProgress = false; + if (--pubInProgress == 0) { + clearUnSubQueue(); + } return envelope; }, @@ -81,7 +83,7 @@ var localBus = { wireTaps : [], unsubscribe : function ( config ) { - if(pubInProgress) { + if (pubInProgress) { unSubQueue.push(config); return; }