fix for #36: account for nested .publish() invocations and dequeue all queued .unsubscribe() calls which occurred during that time.

This commit is contained in:
Ger Hobbelt 2013-06-14 23:16:37 +02:00
parent e909bbad08
commit 38cf3c7c5f
4 changed files with 94 additions and 86 deletions

View file

@ -42,7 +42,7 @@
};
var DistinctPredicate = function () {
var previous = [];
return function ( data ) {
var isDistinct = !_.any( previous, function ( p ) {
if ( _.isObject( data ) || _.isArray( data ) ) {
@ -59,13 +59,13 @@
var ChannelDefinition = function ( channelName ) {
this.channel = channelName || DEFAULT_CHANNEL;
};
ChannelDefinition.prototype.subscribe = function () {
return arguments.length === 1 ?
new SubscriptionDefinition( this.channel, arguments[0].topic, arguments[0].callback ) :
new SubscriptionDefinition( this.channel, arguments[0], arguments[1] );
};
ChannelDefinition.prototype.publish = function () {
var envelope = arguments.length === 1 ?
(Object.prototype.toString.call(arguments[0]) === '[object String]' ?
@ -90,7 +90,7 @@
} );
postal.configuration.bus.subscribe( this );
};
SubscriptionDefinition.prototype = {
unsubscribe : function () {
this.inactive = true;
@ -105,7 +105,7 @@
}
} );
},
defer : function () {
var fn = this.callback;
this.callback = function ( data ) {
@ -115,7 +115,7 @@
};
return this;
},
disposeAfter : function ( maxCalls ) {
if ( _.isNaN( maxCalls ) || maxCalls <= 0 ) {
throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero.";
@ -124,29 +124,29 @@
var dispose = _.after( maxCalls, _.bind( function () {
this.unsubscribe();
}, this ) );
this.callback = function () {
fn.apply( this.context, arguments );
dispose();
};
return this;
},
distinctUntilChanged : function () {
this.withConstraint( new ConsecutiveDistinctPredicate() );
return this;
},
distinct : function () {
this.withConstraint( new DistinctPredicate() );
return this;
},
once : function () {
this.disposeAfter( 1 );
return this;
},
withConstraint : function ( predicate ) {
if ( !_.isFunction( predicate ) ) {
throw "Predicate constraint must be a function";
@ -154,7 +154,7 @@
this.constraints.push( predicate );
return this;
},
withConstraints : function ( predicates ) {
var self = this;
if ( _.isArray( predicates ) ) {
@ -164,12 +164,12 @@
}
return self;
},
withContext : function ( context ) {
this.context = context;
return this;
},
withDebounce : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -178,7 +178,7 @@
this.callback = _.debounce( fn, milliseconds );
return this;
},
withDelay : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -191,7 +191,7 @@
};
return this;
},
withThrottle : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -200,7 +200,7 @@
this.callback = _.throttle( fn, milliseconds );
return this;
},
subscribe : function ( callback ) {
this.callback = callback;
return this;
@ -209,7 +209,7 @@
var bindingsResolver = {
cache : {},
regex : {},
compare : function ( binding, topic ) {
var pattern, rgx, prevSegment, result = (this.cache[topic] && this.cache[topic][binding]);
if(typeof result !== "undefined") {
@ -237,7 +237,7 @@
this.cache[topic][binding] = result = rgx.test( topic );
return result;
},
reset : function () {
this.cache = {};
this.regex = {};
@ -254,15 +254,15 @@
}
}
};
var pubInProgress = false;
var pubInProgress = 0;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
unSubQueue.shift().unsubscribe();
}
};
var localBus = {
addWireTap : function ( callback ) {
var self = this;
@ -274,9 +274,9 @@
}
};
},
publish : function ( envelope ) {
pubInProgress = true;
++pubInProgress;
envelope.timeStamp = new Date();
_.each( this.wireTaps, function ( tap ) {
tap( envelope.data, envelope );
@ -291,10 +291,12 @@
}
} );
}
pubInProgress = false;
if (--pubInProgress == 0) {
clearUnSubQueue();
}
return envelope;
},
reset : function () {
if ( this.subscriptions ) {
_.each( this.subscriptions, function ( channel ) {
@ -307,7 +309,7 @@
this.subscriptions = {};
}
},
subscribe : function ( subDef ) {
var idx, found, fn, channel = this.subscriptions[subDef.channel], subs;
if ( !channel ) {
@ -320,13 +322,13 @@
subs.push( subDef );
return subDef;
},
subscriptions : {},
wireTaps : [],
unsubscribe : function ( config ) {
if(pubInProgress) {
if (pubInProgress) {
unSubQueue.push(config);
return;
}
@ -351,27 +353,27 @@
DEFAULT_CHANNEL : DEFAULT_CHANNEL,
SYSTEM_CHANNEL : SYSTEM_CHANNEL
},
ChannelDefinition : ChannelDefinition,
SubscriptionDefinition : SubscriptionDefinition,
channel : function ( channelName ) {
return new ChannelDefinition( channelName );
},
subscribe : function ( options ) {
return new SubscriptionDefinition( options.channel || DEFAULT_CHANNEL, options.topic, options.callback );
},
publish : function ( envelope ) {
envelope.channel = envelope.channel || DEFAULT_CHANNEL;
return postal.configuration.bus.publish( envelope );
},
addWireTap : function ( callback ) {
return this.configuration.bus.addWireTap( callback );
},
linkChannels : function ( sources, destinations ) {
var result = [];
sources = !_.isArray( sources ) ? [sources] : sources;
@ -397,7 +399,7 @@
} );
return result;
},
utils : {
getSubscribersFor : function () {
var channel = arguments[ 0 ],
@ -412,7 +414,7 @@
}
return [];
},
reset : function () {
postal.configuration.bus.reset();
postal.configuration.resolver.reset();

View file

@ -255,7 +255,7 @@
}
};
var pubInProgress = false;
var pubInProgress = 0;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
@ -276,7 +276,7 @@
},
publish : function ( envelope ) {
pubInProgress = true;
++pubInProgress;
envelope.timeStamp = new Date();
_.each( this.wireTaps, function ( tap ) {
tap( envelope.data, envelope );
@ -291,7 +291,9 @@
}
} );
}
pubInProgress = false;
if (--pubInProgress == 0) {
clearUnSubQueue();
}
return envelope;
},
@ -326,7 +328,7 @@
wireTaps : [],
unsubscribe : function ( config ) {
if(pubInProgress) {
if (pubInProgress) {
unSubQueue.push(config);
return;
}

View file

@ -42,7 +42,7 @@
};
var DistinctPredicate = function () {
var previous = [];
return function ( data ) {
var isDistinct = !_.any( previous, function ( p ) {
if ( _.isObject( data ) || _.isArray( data ) ) {
@ -59,13 +59,13 @@
var ChannelDefinition = function ( channelName ) {
this.channel = channelName || DEFAULT_CHANNEL;
};
ChannelDefinition.prototype.subscribe = function () {
return arguments.length === 1 ?
new SubscriptionDefinition( this.channel, arguments[0].topic, arguments[0].callback ) :
new SubscriptionDefinition( this.channel, arguments[0], arguments[1] );
};
ChannelDefinition.prototype.publish = function () {
var envelope = arguments.length === 1 ?
(Object.prototype.toString.call(arguments[0]) === '[object String]' ?
@ -90,7 +90,7 @@
} );
postal.configuration.bus.subscribe( this );
};
SubscriptionDefinition.prototype = {
unsubscribe : function () {
this.inactive = true;
@ -105,7 +105,7 @@
}
} );
},
defer : function () {
var fn = this.callback;
this.callback = function ( data ) {
@ -115,7 +115,7 @@
};
return this;
},
disposeAfter : function ( maxCalls ) {
if ( _.isNaN( maxCalls ) || maxCalls <= 0 ) {
throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero.";
@ -124,29 +124,29 @@
var dispose = _.after( maxCalls, _.bind( function () {
this.unsubscribe();
}, this ) );
this.callback = function () {
fn.apply( this.context, arguments );
dispose();
};
return this;
},
distinctUntilChanged : function () {
this.withConstraint( new ConsecutiveDistinctPredicate() );
return this;
},
distinct : function () {
this.withConstraint( new DistinctPredicate() );
return this;
},
once : function () {
this.disposeAfter( 1 );
return this;
},
withConstraint : function ( predicate ) {
if ( !_.isFunction( predicate ) ) {
throw "Predicate constraint must be a function";
@ -154,7 +154,7 @@
this.constraints.push( predicate );
return this;
},
withConstraints : function ( predicates ) {
var self = this;
if ( _.isArray( predicates ) ) {
@ -164,12 +164,12 @@
}
return self;
},
withContext : function ( context ) {
this.context = context;
return this;
},
withDebounce : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -178,7 +178,7 @@
this.callback = _.debounce( fn, milliseconds );
return this;
},
withDelay : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -191,7 +191,7 @@
};
return this;
},
withThrottle : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -200,7 +200,7 @@
this.callback = _.throttle( fn, milliseconds );
return this;
},
subscribe : function ( callback ) {
this.callback = callback;
return this;
@ -209,7 +209,7 @@
var bindingsResolver = {
cache : {},
regex : {},
compare : function ( binding, topic ) {
var pattern, rgx, prevSegment, result = (this.cache[topic] && this.cache[topic][binding]);
if(typeof result !== "undefined") {
@ -237,7 +237,7 @@
this.cache[topic][binding] = result = rgx.test( topic );
return result;
},
reset : function () {
this.cache = {};
this.regex = {};
@ -254,15 +254,15 @@
}
}
};
var pubInProgress = false;
var pubInProgress = 0;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
unSubQueue.shift().unsubscribe();
}
};
var localBus = {
addWireTap : function ( callback ) {
var self = this;
@ -274,9 +274,9 @@
}
};
},
publish : function ( envelope ) {
pubInProgress = true;
++pubInProgress;
envelope.timeStamp = new Date();
_.each( this.wireTaps, function ( tap ) {
tap( envelope.data, envelope );
@ -291,10 +291,12 @@
}
} );
}
pubInProgress = false;
if (--pubInProgress == 0) {
clearUnSubQueue();
}
return envelope;
},
reset : function () {
if ( this.subscriptions ) {
_.each( this.subscriptions, function ( channel ) {
@ -307,7 +309,7 @@
this.subscriptions = {};
}
},
subscribe : function ( subDef ) {
var idx, found, fn, channel = this.subscriptions[subDef.channel], subs;
if ( !channel ) {
@ -320,13 +322,13 @@
subs.push( subDef );
return subDef;
},
subscriptions : {},
wireTaps : [],
unsubscribe : function ( config ) {
if(pubInProgress) {
if (pubInProgress) {
unSubQueue.push(config);
return;
}
@ -351,27 +353,27 @@
DEFAULT_CHANNEL : DEFAULT_CHANNEL,
SYSTEM_CHANNEL : SYSTEM_CHANNEL
},
ChannelDefinition : ChannelDefinition,
SubscriptionDefinition : SubscriptionDefinition,
channel : function ( channelName ) {
return new ChannelDefinition( channelName );
},
subscribe : function ( options ) {
return new SubscriptionDefinition( options.channel || DEFAULT_CHANNEL, options.topic, options.callback );
},
publish : function ( envelope ) {
envelope.channel = envelope.channel || DEFAULT_CHANNEL;
return postal.configuration.bus.publish( envelope );
},
addWireTap : function ( callback ) {
return this.configuration.bus.addWireTap( callback );
},
linkChannels : function ( sources, destinations ) {
var result = [];
sources = !_.isArray( sources ) ? [sources] : sources;
@ -397,7 +399,7 @@
} );
return result;
},
utils : {
getSubscribersFor : function () {
var channel = arguments[ 0 ],
@ -412,7 +414,7 @@
}
return [];
},
reset : function () {
postal.configuration.bus.reset();
postal.configuration.resolver.reset();

View file

@ -10,7 +10,7 @@ var fireSub = function(subDef, envelope) {
}
};
var pubInProgress = false;
var pubInProgress = 0;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
@ -31,7 +31,7 @@ var localBus = {
},
publish : function ( envelope ) {
pubInProgress = true;
++pubInProgress;
envelope.timeStamp = new Date();
_.each( this.wireTaps, function ( tap ) {
tap( envelope.data, envelope );
@ -46,7 +46,9 @@ var localBus = {
}
} );
}
pubInProgress = false;
if (--pubInProgress == 0) {
clearUnSubQueue();
}
return envelope;
},
@ -81,7 +83,7 @@ var localBus = {
wireTaps : [],
unsubscribe : function ( config ) {
if(pubInProgress) {
if (pubInProgress) {
unSubQueue.push(config);
return;
}