Fixed bug reported by @tsgautier in Issue #34. The publish call was incorrectly handling a mutating array of subscribers as it iterated over them. I also fixed the once() call to return the instance.

This commit is contained in:
ifandelse 2013-05-01 00:42:09 -04:00
parent 3f7d51e55c
commit e909bbad08
9 changed files with 192 additions and 97 deletions

View file

@ -93,6 +93,7 @@
SubscriptionDefinition.prototype = {
unsubscribe : function () {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
@ -143,6 +144,7 @@
once : function () {
this.disposeAfter( 1 );
return this;
},
withConstraint : function ( predicate ) {
@ -242,7 +244,7 @@
}
};
var fireSub = function(subDef, envelope) {
if ( postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
@ -253,6 +255,14 @@
}
};
var pubInProgress = false;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
unSubQueue.shift().unsubscribe();
}
};
var localBus = {
addWireTap : function ( callback ) {
var self = this;
@ -266,20 +276,22 @@
},
publish : function ( envelope ) {
pubInProgress = true;
envelope.timeStamp = new Date();
_.each( this.wireTaps, function ( tap ) {
tap( envelope.data, envelope );
} );
if ( this.subscriptions[envelope.channel] ) {
_.each( this.subscriptions[envelope.channel], function ( subscribers ) {
var idx = 0, len = subscribers.length, subDef;
while(idx < len) {
if( subDef = subscribers[idx++] ){
fireSub(subDef, envelope);
}
}
} );
_.each( this.subscriptions[envelope.channel], function ( subscribers ) {
var idx = 0, len = subscribers.length, subDef;
while ( idx < len ) {
if ( subDef = subscribers[idx++] ) {
fireSub( subDef, envelope );
}
}
} );
}
pubInProgress = false;
return envelope;
},
@ -314,14 +326,19 @@
wireTaps : [],
unsubscribe : function ( config ) {
if(pubInProgress) {
unSubQueue.push(config);
return;
}
if ( this.subscriptions[config.channel][config.topic] ) {
var len = this.subscriptions[config.channel][config.topic].length,
idx = 0;
for ( ; idx < len; idx++ ) {
while(idx < len) {
if ( this.subscriptions[config.channel][config.topic][idx] === config ) {
this.subscriptions[config.channel][config.topic].splice( idx, 1 );
break;
}
idx += 1;
}
}
}

File diff suppressed because one or more lines are too long

View file

@ -93,6 +93,7 @@
SubscriptionDefinition.prototype = {
unsubscribe : function () {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
@ -143,6 +144,7 @@
once : function () {
this.disposeAfter( 1 );
return this;
},
withConstraint : function ( predicate ) {
@ -242,7 +244,7 @@
}
};
var fireSub = function(subDef, envelope) {
if ( postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
@ -253,6 +255,14 @@
}
};
var pubInProgress = false;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
unSubQueue.shift().unsubscribe();
}
};
var localBus = {
addWireTap : function ( callback ) {
var self = this;
@ -266,20 +276,22 @@
},
publish : function ( envelope ) {
pubInProgress = true;
envelope.timeStamp = new Date();
_.each( this.wireTaps, function ( tap ) {
tap( envelope.data, envelope );
} );
if ( this.subscriptions[envelope.channel] ) {
_.each( this.subscriptions[envelope.channel], function ( subscribers ) {
var idx = 0, len = subscribers.length, subDef;
while(idx < len) {
if( subDef = subscribers[idx++] ){
fireSub(subDef, envelope);
}
}
} );
_.each( this.subscriptions[envelope.channel], function ( subscribers ) {
var idx = 0, len = subscribers.length, subDef;
while ( idx < len ) {
if ( subDef = subscribers[idx++] ) {
fireSub( subDef, envelope );
}
}
} );
}
pubInProgress = false;
return envelope;
},
@ -314,14 +326,19 @@
wireTaps : [],
unsubscribe : function ( config ) {
if(pubInProgress) {
unSubQueue.push(config);
return;
}
if ( this.subscriptions[config.channel][config.topic] ) {
var len = this.subscriptions[config.channel][config.topic].length,
idx = 0;
for ( ; idx < len; idx++ ) {
while(idx < len) {
if ( this.subscriptions[config.channel][config.topic][idx] === config ) {
this.subscriptions[config.channel][config.topic].splice( idx, 1 );
break;
}
idx += 1;
}
}
}

File diff suppressed because one or more lines are too long

View file

@ -93,6 +93,7 @@
SubscriptionDefinition.prototype = {
unsubscribe : function () {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
@ -143,6 +144,7 @@
once : function () {
this.disposeAfter( 1 );
return this;
},
withConstraint : function ( predicate ) {
@ -242,7 +244,7 @@
}
};
var fireSub = function(subDef, envelope) {
if ( postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
@ -253,6 +255,14 @@
}
};
var pubInProgress = false;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
unSubQueue.shift().unsubscribe();
}
};
var localBus = {
addWireTap : function ( callback ) {
var self = this;
@ -266,20 +276,22 @@
},
publish : function ( envelope ) {
pubInProgress = true;
envelope.timeStamp = new Date();
_.each( this.wireTaps, function ( tap ) {
tap( envelope.data, envelope );
} );
if ( this.subscriptions[envelope.channel] ) {
_.each( this.subscriptions[envelope.channel], function ( subscribers ) {
var idx = 0, len = subscribers.length, subDef;
while(idx < len) {
if( subDef = subscribers[idx++] ){
fireSub(subDef, envelope);
}
}
} );
_.each( this.subscriptions[envelope.channel], function ( subscribers ) {
var idx = 0, len = subscribers.length, subDef;
while ( idx < len ) {
if ( subDef = subscribers[idx++] ) {
fireSub( subDef, envelope );
}
}
} );
}
pubInProgress = false;
return envelope;
},
@ -314,14 +326,19 @@
wireTaps : [],
unsubscribe : function ( config ) {
if(pubInProgress) {
unSubQueue.push(config);
return;
}
if ( this.subscriptions[config.channel][config.topic] ) {
var len = this.subscriptions[config.channel][config.topic].length,
idx = 0;
for ( ; idx < len; idx++ ) {
while(idx < len) {
if ( this.subscriptions[config.channel][config.topic][idx] === config ) {
this.subscriptions[config.channel][config.topic].splice( idx, 1 );
break;
}
idx += 1;
}
}
}

2
lib/postal.min.js vendored

File diff suppressed because one or more lines are too long

View file

@ -50,41 +50,68 @@ describe( "Postal", function () {
} );
} );
describe( "When unsubscribing", function () {
var subExistsBefore = false,
subExistsAfter = true;
var systemSubscription = {};
before( function () {
systemSubscription = postal.subscribe( {
channel : "postal",
topic : "subscription.*",
callback : function ( data, env ) {
if ( data.event &&
data.event == "subscription.removed" &&
data.channel == "MyChannel" &&
data.topic == "MyTopic" ) {
caughtUnsubscribeEvent = true;
describe( "With a single subscription", function(){
var subExistsBefore = false,
subExistsAfter = true;
var systemSubscription = {};
before( function () {
systemSubscription = postal.subscribe( {
channel : "postal",
topic : "subscription.*",
callback : function ( data, env ) {
if ( data.event &&
data.event == "subscription.removed" &&
data.channel == "MyChannel" &&
data.topic == "MyTopic" ) {
caughtUnsubscribeEvent = true;
}
}
;
}
} );
subscription = postal.channel( "MyChannel" ).subscribe( "MyTopic" , function () {} );
subExistsBefore = postal.configuration.bus.subscriptions.MyChannel.MyTopic[0] !== undefined;
subscription.unsubscribe();
subExistsAfter = postal.configuration.bus.subscriptions.MyChannel.MyTopic.length !== 0;
} );
subscription = postal.channel( "MyChannel" ).subscribe( "MyTopic" , function () {} );
subExistsBefore = postal.configuration.bus.subscriptions.MyChannel.MyTopic[0] !== undefined;
subscription.unsubscribe();
subExistsAfter = postal.configuration.bus.subscriptions.MyChannel.MyTopic.length !== 0;
} );
after( function () {
systemSubscription.unsubscribe();
postal.utils.reset();
} );
it( "subscription should exist before unsubscribe", function () {
expect( subExistsBefore ).to.be.ok();
} );
it( "subscription should not exist after unsubscribe", function () {
expect( subExistsAfter ).to.not.be.ok()
} );
it( "should have captured unsubscription creation event", function () {
expect( caughtUnsubscribeEvent ).to.be.ok();
} );
after( function () {
systemSubscription.unsubscribe();
postal.utils.reset();
} );
it( "subscription should exist before unsubscribe", function () {
expect( subExistsBefore ).to.be.ok();
} );
it( "subscription should not exist after unsubscribe", function () {
expect( subExistsAfter ).to.not.be.ok()
} );
it( "should have captured unsubscription creation event", function () {
expect( caughtUnsubscribeEvent ).to.be.ok();
} );
});
describe( "With multiple subscribers on one channel", function() {
var subscription1, subscription2, results = [];
before( function () {
channel = postal.channel();
subscription1 = channel.subscribe('test', function() {
results.push('1 received message');
}).once();
subscription2 = channel.subscribe('test', function() {
results.push('2 received message');
});
channel.publish('test');
channel.publish('test');
});
after( function () {
subscription2.unsubscribe();
postal.utils.reset();
});
it( "should produce expected messages", function() {
expect( results.length ).to.be(3);
expect( results[0] ).to.be("1 received message");
expect( results[1] ).to.be("2 received message");
expect( results[2] ).to.be("2 received message");
});
});
} );
describe( "When publishing a message", function () {
var msgReceivedCnt = 0,
@ -131,27 +158,27 @@ describe( "Postal", function () {
expect( msgReceivedCnt ).to.be( 5 );
} );
} );
describe( "When subscribing with once()", function () {
var msgReceivedCnt = 0;
before( function () {
channel = postal.channel( "MyChannel" );
subscription = channel.subscribe( "MyTopic", function ( data ) {
msgReceivedCnt++;
} ).once();
channel.publish( "MyTopic", "Testing123" );
channel.publish( "MyTopic", "Testing123" );
channel.publish( "MyTopic", "Testing123" );
channel.publish( "MyTopic", "Testing123" );
channel.publish( "MyTopic", "Testing123" );
channel.publish( "MyTopic", "Testing123" );
} );
after( function () {
postal.utils.reset();
} );
it( "subscription callback should be invoked 1 time", function () {
expect( msgReceivedCnt ).to.be( 1 );
} );
} );
describe( "When subscribing with once()", function () {
var msgReceivedCnt = 0;
before( function () {
channel = postal.channel( "MyChannel" );
subscription = channel.subscribe( "MyTopic",function ( data ) {
msgReceivedCnt++;
} ).once();
channel.publish( "MyTopic", "Testing123" );
channel.publish( "MyTopic", "Testing123" );
channel.publish( "MyTopic", "Testing123" );
channel.publish( "MyTopic", "Testing123" );
channel.publish( "MyTopic", "Testing123" );
channel.publish( "MyTopic", "Testing123" );
} );
after( function () {
postal.utils.reset();
} );
it( "subscription callback should be invoked 1 time", function () {
expect( msgReceivedCnt ).to.be( 1 );
} );
} );
describe( "When subscribing and ignoring duplicates", function () {
var subInvokedCnt = 0;
before( function () {

View file

@ -1,5 +1,5 @@
var fireSub = function(subDef, envelope) {
if ( postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( !subDef.inactive && postal.configuration.resolver.compare( subDef.topic, envelope.topic ) ) {
if ( _.all( subDef.constraints, function ( constraint ) {
return constraint.call( subDef.context, envelope.data, envelope );
} ) ) {
@ -10,6 +10,14 @@ var fireSub = function(subDef, envelope) {
}
};
var pubInProgress = false;
var unSubQueue = [];
var clearUnSubQueue = function() {
while(unSubQueue.length) {
unSubQueue.shift().unsubscribe();
}
};
var localBus = {
addWireTap : function ( callback ) {
var self = this;
@ -23,20 +31,22 @@ var localBus = {
},
publish : function ( envelope ) {
pubInProgress = true;
envelope.timeStamp = new Date();
_.each( this.wireTaps, function ( tap ) {
tap( envelope.data, envelope );
} );
if ( this.subscriptions[envelope.channel] ) {
_.each( this.subscriptions[envelope.channel], function ( subscribers ) {
var idx = 0, len = subscribers.length, subDef;
while(idx < len) {
if( subDef = subscribers[idx++] ){
fireSub(subDef, envelope);
}
}
} );
_.each( this.subscriptions[envelope.channel], function ( subscribers ) {
var idx = 0, len = subscribers.length, subDef;
while ( idx < len ) {
if ( subDef = subscribers[idx++] ) {
fireSub( subDef, envelope );
}
}
} );
}
pubInProgress = false;
return envelope;
},
@ -71,14 +81,19 @@ var localBus = {
wireTaps : [],
unsubscribe : function ( config ) {
if(pubInProgress) {
unSubQueue.push(config);
return;
}
if ( this.subscriptions[config.channel][config.topic] ) {
var len = this.subscriptions[config.channel][config.topic].length,
idx = 0;
for ( ; idx < len; idx++ ) {
while(idx < len) {
if ( this.subscriptions[config.channel][config.topic][idx] === config ) {
this.subscriptions[config.channel][config.topic].splice( idx, 1 );
break;
}
idx += 1;
}
}
}

View file

@ -18,6 +18,7 @@ var SubscriptionDefinition = function ( channel, topic, callback ) {
SubscriptionDefinition.prototype = {
unsubscribe : function () {
this.inactive = true;
postal.configuration.bus.unsubscribe( this );
postal.configuration.bus.publish( {
channel : SYSTEM_CHANNEL,
@ -68,6 +69,7 @@ SubscriptionDefinition.prototype = {
once : function () {
this.disposeAfter( 1 );
return this;
},
withConstraint : function ( predicate ) {