Merged and tweaked PR from @GerHobbelt, which fixed issue with nested publishes breaking the unsubscribe queue behavior

This commit is contained in:
ifandelse 2013-08-10 12:19:34 -04:00
parent d163f597dc
commit 25425c0cda
10 changed files with 240 additions and 189 deletions

View file

@ -42,7 +42,7 @@
};
var DistinctPredicate = function () {
var previous = [];
return function ( data ) {
var isDistinct = !_.any( previous, function ( p ) {
if ( _.isObject( data ) || _.isArray( data ) ) {
@ -59,17 +59,19 @@
var ChannelDefinition = function ( channelName ) {
this.channel = channelName || DEFAULT_CHANNEL;
};
ChannelDefinition.prototype.subscribe = function () {
return arguments.length === 1 ?
new SubscriptionDefinition( this.channel, arguments[0].topic, arguments[0].callback ) :
new SubscriptionDefinition( this.channel, arguments[0], arguments[1] );
};
ChannelDefinition.prototype.publish = function () {
var envelope = arguments.length === 1 ?
(Object.prototype.toString.call(arguments[0]) === '[object String]' ?
{ topic: arguments[0] } : arguments[0]) : { topic : arguments[0], data : arguments[1] };
(Object.prototype.toString.call(arguments[0]) === '[object String]' ?
{ topic: arguments[0] } :
arguments[0]) :
{ topic : arguments[0], data : arguments[1] };
envelope.channel = this.channel;
return postal.configuration.bus.publish( envelope );
};
@ -90,22 +92,24 @@
} );
postal.configuration.bus.subscribe( this );
};
SubscriptionDefinition.prototype = {
unsubscribe : function () {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
topic : "subscription.removed",
data : {
event : "subscription.removed",
channel : this.channel,
topic : this.topic
}
} );
if(!this.inactive) {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
topic : "subscription.removed",
data : {
event : "subscription.removed",
channel : this.channel,
topic : this.topic
}
} );
}
},
defer : function () {
var fn = this.callback;
this.callback = function ( data ) {
@ -115,7 +119,7 @@
};
return this;
},
disposeAfter : function ( maxCalls ) {
if ( _.isNaN( maxCalls ) || maxCalls <= 0 ) {
throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero.";
@ -124,29 +128,29 @@
var dispose = _.after( maxCalls, _.bind( function () {
this.unsubscribe();
}, this ) );
this.callback = function () {
fn.apply( this.context, arguments );
dispose();
};
return this;
},
distinctUntilChanged : function () {
this.withConstraint( new ConsecutiveDistinctPredicate() );
return this;
},
distinct : function () {
this.withConstraint( new DistinctPredicate() );
return this;
},
once : function () {
this.disposeAfter( 1 );
return this;
},
withConstraint : function ( predicate ) {
if ( !_.isFunction( predicate ) ) {
throw "Predicate constraint must be a function";
@ -154,7 +158,7 @@
this.constraints.push( predicate );
return this;
},
withConstraints : function ( predicates ) {
var self = this;
if ( _.isArray( predicates ) ) {
@ -164,12 +168,12 @@
}
return self;
},
withContext : function ( context ) {
this.context = context;
return this;
},
withDebounce : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -178,7 +182,7 @@
this.callback = _.debounce( fn, milliseconds );
return this;
},
withDelay : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -191,7 +195,7 @@
};
return this;
},
withThrottle : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -200,7 +204,7 @@
this.callback = _.throttle( fn, milliseconds );
return this;
},
subscribe : function ( callback ) {
this.callback = callback;
return this;
@ -209,7 +213,7 @@
var bindingsResolver = {
cache : {},
regex : {},
compare : function ( binding, topic ) {
var pattern, rgx, prevSegment, result = (this.cache[topic] && this.cache[topic][binding]);
if(typeof result !== "undefined") {
@ -237,32 +241,32 @@
this.cache[topic][binding] = result = rgx.test( topic );
return result;
},
reset : function () {
this.cache = {};
this.regex = {};
}
};
var fireSub = function(subDef, envelope) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
if ( typeof subDef.callback === 'function' ) {
subDef.callback.call( subDef.context, envelope.data, envelope );
}
}
}
var fireSub = function ( subDef, envelope ) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
if ( typeof subDef.callback === 'function' ) {
subDef.callback.call( subDef.context, envelope.data, envelope );
}
}
}
};
var pubInProgress = 0;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
var clearUnSubQueue = function () {
while ( unSubQueue.length ) {
unSubQueue.shift().unsubscribe();
}
};
var localBus = {
addWireTap : function ( callback ) {
var self = this;
@ -274,7 +278,7 @@
}
};
},
publish : function ( envelope ) {
++pubInProgress;
envelope.timeStamp = new Date();
@ -291,12 +295,12 @@
}
} );
}
if (--pubInProgress == 0) {
if ( --pubInProgress === 0 ) {
clearUnSubQueue();
}
return envelope;
},
reset : function () {
if ( this.subscriptions ) {
_.each( this.subscriptions, function ( channel ) {
@ -309,7 +313,7 @@
this.subscriptions = {};
}
},
subscribe : function ( subDef ) {
var idx, found, fn, channel = this.subscriptions[subDef.channel], subs;
if ( !channel ) {
@ -322,20 +326,20 @@
subs.push( subDef );
return subDef;
},
subscriptions : {},
wireTaps : [],
unsubscribe : function ( config ) {
if (pubInProgress) {
unSubQueue.push(config);
if ( pubInProgress ) {
unSubQueue.push( config );
return;
}
if ( this.subscriptions[config.channel][config.topic] ) {
var len = this.subscriptions[config.channel][config.topic].length,
idx = 0;
while(idx < len) {
while ( idx < len ) {
if ( this.subscriptions[config.channel][config.topic][idx] === config ) {
this.subscriptions[config.channel][config.topic].splice( idx, 1 );
break;
@ -353,27 +357,27 @@
DEFAULT_CHANNEL : DEFAULT_CHANNEL,
SYSTEM_CHANNEL : SYSTEM_CHANNEL
},
ChannelDefinition : ChannelDefinition,
SubscriptionDefinition : SubscriptionDefinition,
channel : function ( channelName ) {
return new ChannelDefinition( channelName );
},
subscribe : function ( options ) {
return new SubscriptionDefinition( options.channel || DEFAULT_CHANNEL, options.topic, options.callback );
},
publish : function ( envelope ) {
envelope.channel = envelope.channel || DEFAULT_CHANNEL;
return postal.configuration.bus.publish( envelope );
},
addWireTap : function ( callback ) {
return this.configuration.bus.addWireTap( callback );
},
linkChannels : function ( sources, destinations ) {
var result = [];
sources = !_.isArray( sources ) ? [sources] : sources;
@ -399,7 +403,7 @@
} );
return result;
},
utils : {
getSubscribersFor : function () {
var channel = arguments[ 0 ],
@ -414,7 +418,7 @@
}
return [];
},
reset : function () {
postal.configuration.bus.reset();
postal.configuration.resolver.reset();

File diff suppressed because one or more lines are too long

View file

@ -68,8 +68,10 @@
ChannelDefinition.prototype.publish = function () {
var envelope = arguments.length === 1 ?
(Object.prototype.toString.call(arguments[0]) === '[object String]' ?
{ topic: arguments[0] } : arguments[0]) : { topic : arguments[0], data : arguments[1] };
(Object.prototype.toString.call(arguments[0]) === '[object String]' ?
{ topic: arguments[0] } :
arguments[0]) :
{ topic : arguments[0], data : arguments[1] };
envelope.channel = this.channel;
return postal.configuration.bus.publish( envelope );
};
@ -93,17 +95,19 @@
SubscriptionDefinition.prototype = {
unsubscribe : function () {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
topic : "subscription.removed",
data : {
event : "subscription.removed",
channel : this.channel,
topic : this.topic
}
} );
if(!this.inactive) {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
topic : "subscription.removed",
data : {
event : "subscription.removed",
channel : this.channel,
topic : this.topic
}
} );
}
},
defer : function () {
@ -243,22 +247,22 @@
this.regex = {};
}
};
var fireSub = function(subDef, envelope) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
if ( typeof subDef.callback === 'function' ) {
subDef.callback.call( subDef.context, envelope.data, envelope );
}
}
}
var fireSub = function ( subDef, envelope ) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
if ( typeof subDef.callback === 'function' ) {
subDef.callback.call( subDef.context, envelope.data, envelope );
}
}
}
};
var pubInProgress = 0;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
var clearUnSubQueue = function () {
while ( unSubQueue.length ) {
unSubQueue.shift().unsubscribe();
}
};
@ -291,7 +295,7 @@
}
} );
}
if (--pubInProgress == 0) {
if ( --pubInProgress === 0 ) {
clearUnSubQueue();
}
return envelope;
@ -328,14 +332,14 @@
wireTaps : [],
unsubscribe : function ( config ) {
if (pubInProgress) {
unSubQueue.push(config);
if ( pubInProgress ) {
unSubQueue.push( config );
return;
}
if ( this.subscriptions[config.channel][config.topic] ) {
var len = this.subscriptions[config.channel][config.topic].length,
idx = 0;
while(idx < len) {
while ( idx < len ) {
if ( this.subscriptions[config.channel][config.topic][idx] === config ) {
this.subscriptions[config.channel][config.topic].splice( idx, 1 );
break;

File diff suppressed because one or more lines are too long

View file

@ -42,7 +42,7 @@
};
var DistinctPredicate = function () {
var previous = [];
return function ( data ) {
var isDistinct = !_.any( previous, function ( p ) {
if ( _.isObject( data ) || _.isArray( data ) ) {
@ -59,17 +59,19 @@
var ChannelDefinition = function ( channelName ) {
this.channel = channelName || DEFAULT_CHANNEL;
};
ChannelDefinition.prototype.subscribe = function () {
return arguments.length === 1 ?
new SubscriptionDefinition( this.channel, arguments[0].topic, arguments[0].callback ) :
new SubscriptionDefinition( this.channel, arguments[0], arguments[1] );
};
ChannelDefinition.prototype.publish = function () {
var envelope = arguments.length === 1 ?
(Object.prototype.toString.call(arguments[0]) === '[object String]' ?
{ topic: arguments[0] } : arguments[0]) : { topic : arguments[0], data : arguments[1] };
(Object.prototype.toString.call(arguments[0]) === '[object String]' ?
{ topic: arguments[0] } :
arguments[0]) :
{ topic : arguments[0], data : arguments[1] };
envelope.channel = this.channel;
return postal.configuration.bus.publish( envelope );
};
@ -90,22 +92,24 @@
} );
postal.configuration.bus.subscribe( this );
};
SubscriptionDefinition.prototype = {
unsubscribe : function () {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
topic : "subscription.removed",
data : {
event : "subscription.removed",
channel : this.channel,
topic : this.topic
}
} );
if(!this.inactive) {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
topic : "subscription.removed",
data : {
event : "subscription.removed",
channel : this.channel,
topic : this.topic
}
} );
}
},
defer : function () {
var fn = this.callback;
this.callback = function ( data ) {
@ -115,7 +119,7 @@
};
return this;
},
disposeAfter : function ( maxCalls ) {
if ( _.isNaN( maxCalls ) || maxCalls <= 0 ) {
throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero.";
@ -124,29 +128,29 @@
var dispose = _.after( maxCalls, _.bind( function () {
this.unsubscribe();
}, this ) );
this.callback = function () {
fn.apply( this.context, arguments );
dispose();
};
return this;
},
distinctUntilChanged : function () {
this.withConstraint( new ConsecutiveDistinctPredicate() );
return this;
},
distinct : function () {
this.withConstraint( new DistinctPredicate() );
return this;
},
once : function () {
this.disposeAfter( 1 );
return this;
},
withConstraint : function ( predicate ) {
if ( !_.isFunction( predicate ) ) {
throw "Predicate constraint must be a function";
@ -154,7 +158,7 @@
this.constraints.push( predicate );
return this;
},
withConstraints : function ( predicates ) {
var self = this;
if ( _.isArray( predicates ) ) {
@ -164,12 +168,12 @@
}
return self;
},
withContext : function ( context ) {
this.context = context;
return this;
},
withDebounce : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -178,7 +182,7 @@
this.callback = _.debounce( fn, milliseconds );
return this;
},
withDelay : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -191,7 +195,7 @@
};
return this;
},
withThrottle : function ( milliseconds ) {
if ( _.isNaN( milliseconds ) ) {
throw "Milliseconds must be a number";
@ -200,7 +204,7 @@
this.callback = _.throttle( fn, milliseconds );
return this;
},
subscribe : function ( callback ) {
this.callback = callback;
return this;
@ -209,7 +213,7 @@
var bindingsResolver = {
cache : {},
regex : {},
compare : function ( binding, topic ) {
var pattern, rgx, prevSegment, result = (this.cache[topic] && this.cache[topic][binding]);
if(typeof result !== "undefined") {
@ -237,32 +241,32 @@
this.cache[topic][binding] = result = rgx.test( topic );
return result;
},
reset : function () {
this.cache = {};
this.regex = {};
}
};
var fireSub = function(subDef, envelope) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
if ( typeof subDef.callback === 'function' ) {
subDef.callback.call( subDef.context, envelope.data, envelope );
}
}
}
var fireSub = function ( subDef, envelope ) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
if ( typeof subDef.callback === 'function' ) {
subDef.callback.call( subDef.context, envelope.data, envelope );
}
}
}
};
var pubInProgress = 0;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
var clearUnSubQueue = function () {
while ( unSubQueue.length ) {
unSubQueue.shift().unsubscribe();
}
};
var localBus = {
addWireTap : function ( callback ) {
var self = this;
@ -274,7 +278,7 @@
}
};
},
publish : function ( envelope ) {
++pubInProgress;
envelope.timeStamp = new Date();
@ -291,12 +295,12 @@
}
} );
}
if (--pubInProgress == 0) {
if ( --pubInProgress === 0 ) {
clearUnSubQueue();
}
return envelope;
},
reset : function () {
if ( this.subscriptions ) {
_.each( this.subscriptions, function ( channel ) {
@ -309,7 +313,7 @@
this.subscriptions = {};
}
},
subscribe : function ( subDef ) {
var idx, found, fn, channel = this.subscriptions[subDef.channel], subs;
if ( !channel ) {
@ -322,20 +326,20 @@
subs.push( subDef );
return subDef;
},
subscriptions : {},
wireTaps : [],
unsubscribe : function ( config ) {
if (pubInProgress) {
unSubQueue.push(config);
if ( pubInProgress ) {
unSubQueue.push( config );
return;
}
if ( this.subscriptions[config.channel][config.topic] ) {
var len = this.subscriptions[config.channel][config.topic].length,
idx = 0;
while(idx < len) {
while ( idx < len ) {
if ( this.subscriptions[config.channel][config.topic][idx] === config ) {
this.subscriptions[config.channel][config.topic].splice( idx, 1 );
break;
@ -353,27 +357,27 @@
DEFAULT_CHANNEL : DEFAULT_CHANNEL,
SYSTEM_CHANNEL : SYSTEM_CHANNEL
},
ChannelDefinition : ChannelDefinition,
SubscriptionDefinition : SubscriptionDefinition,
channel : function ( channelName ) {
return new ChannelDefinition( channelName );
},
subscribe : function ( options ) {
return new SubscriptionDefinition( options.channel || DEFAULT_CHANNEL, options.topic, options.callback );
},
publish : function ( envelope ) {
envelope.channel = envelope.channel || DEFAULT_CHANNEL;
return postal.configuration.bus.publish( envelope );
},
addWireTap : function ( callback ) {
return this.configuration.bus.addWireTap( callback );
},
linkChannels : function ( sources, destinations ) {
var result = [];
sources = !_.isArray( sources ) ? [sources] : sources;
@ -399,7 +403,7 @@
} );
return result;
},
utils : {
getSubscribersFor : function () {
var channel = arguments[ 0 ],
@ -414,7 +418,7 @@
}
return [];
},
reset : function () {
postal.configuration.bus.reset();
postal.configuration.resolver.reset();

2
lib/postal.min.js vendored

File diff suppressed because one or more lines are too long

View file

@ -112,6 +112,41 @@ describe( "Postal", function () {
expect( results[2] ).to.be("2 received message");
});
});
describe( "With nested publishing", function() {
var subscription1, subscription2, sysub, results = [];
before( function () {
channel = postal.channel();
sysub = postal.subscribe({
channel: postal.configuration.SYSTEM_CHANNEL,
topic : "subscription.removed",
callback : function(d, e) {
results.push("unsubscribed");
}
});
subscription1 = channel.subscribe('nest.test', function() {
results.push('1 received message');
channel.publish("nest.test2", "Hai");
}).once();
subscription2 = channel.subscribe('nest.test2', function() {
results.push('2 received message');
});
channel.publish('nest.test');
channel.publish('nest.test');
});
after( function () {
//subscription2.unsubscribe();
sysub.unsubscribe();
postal.utils.reset();
});
it( "should produce expected messages", function() {
console.log(results);
expect( results.length ).to.be(3);
expect( results[0] ).to.be("1 received message");
expect( results[1] ).to.be("2 received message");
expect( results[2] ).to.be("unsubscribed");
});
});
} );
describe( "When publishing a message", function () {
var msgReceivedCnt = 0,

View file

@ -10,8 +10,10 @@ ChannelDefinition.prototype.subscribe = function () {
ChannelDefinition.prototype.publish = function () {
var envelope = arguments.length === 1 ?
(Object.prototype.toString.call(arguments[0]) === '[object String]' ?
{ topic: arguments[0] } : arguments[0]) : { topic : arguments[0], data : arguments[1] };
(Object.prototype.toString.call(arguments[0]) === '[object String]' ?
{ topic: arguments[0] } :
arguments[0]) :
{ topic : arguments[0], data : arguments[1] };
envelope.channel = this.channel;
return postal.configuration.bus.publish( envelope );
};

View file

@ -1,19 +1,19 @@
var fireSub = function(subDef, envelope) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
if ( typeof subDef.callback === 'function' ) {
subDef.callback.call( subDef.context, envelope.data, envelope );
}
}
}
var fireSub = function ( subDef, envelope ) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
if ( typeof subDef.callback === 'function' ) {
subDef.callback.call( subDef.context, envelope.data, envelope );
}
}
}
};
var pubInProgress = 0;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
var clearUnSubQueue = function () {
while ( unSubQueue.length ) {
unSubQueue.shift().unsubscribe();
}
};
@ -46,7 +46,7 @@ var localBus = {
}
} );
}
if (--pubInProgress == 0) {
if ( --pubInProgress === 0 ) {
clearUnSubQueue();
}
return envelope;
@ -83,14 +83,14 @@ var localBus = {
wireTaps : [],
unsubscribe : function ( config ) {
if (pubInProgress) {
unSubQueue.push(config);
if ( pubInProgress ) {
unSubQueue.push( config );
return;
}
if ( this.subscriptions[config.channel][config.topic] ) {
var len = this.subscriptions[config.channel][config.topic].length,
idx = 0;
while(idx < len) {
while ( idx < len ) {
if ( this.subscriptions[config.channel][config.topic][idx] === config ) {
this.subscriptions[config.channel][config.topic].splice( idx, 1 );
break;

View file

@ -18,17 +18,19 @@ var SubscriptionDefinition = function ( channel, topic, callback ) {
SubscriptionDefinition.prototype = {
unsubscribe : function () {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
topic : "subscription.removed",
data : {
event : "subscription.removed",
channel : this.channel,
topic : this.topic
}
} );
if(!this.inactive) {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
topic : "subscription.removed",
data : {
event : "subscription.removed",
channel : this.channel,
topic : this.topic
}
} );
}
},
defer : function () {