Merge pull request #3 from arobson/76c0cf337b5320be72aa97457458d1cbf818b204

Publish subscription 'events' as messages to 'postal' exchange
This commit is contained in:
Jim Cowart 2012-02-26 11:43:11 -08:00
commit a5223ea482
4 changed files with 54 additions and 4 deletions

View file

@ -2,14 +2,29 @@ QUnit.specify("postal.js", function(){
describe("Postal", function(){
var subscription,
sub,
channel;
channel,
caughtSubscribeEvent = false,
caughtUnsubscribeEvent = false;
describe("when creating basic subscription", function() {
var systemSubscription = {};
before(function(){
systemSubscription = postal.subscribe( "postal", "subscription.created", function(x){
console.log("on subscription " + JSON.stringify(x));
if( x.event &&
x.event == "subscription.created" &&
x.exchange == "MyExchange" &&
x.topic == "MyTopic") {
caughtSubscribeEvent = true;
};
});
subscription = postal.channel("MyExchange","MyTopic")
.subscribe(function() { });
sub = postal.configuration.bus.subscriptions.MyExchange.MyTopic[0];
});
after(function(){
systemSubscription.unsubscribe();
postal.configuration.bus.subscriptions = {};
});
it("should create an exchange called MyExchange", function(){
@ -36,11 +51,24 @@ QUnit.specify("postal.js", function(){
it("should have defaulted the subscription context value", function() {
assert(sub.context).isNull();
});
it("should have captured subscription creation event in wire-tap", function() {
assert(caughtSubscribeEvent).isTrue();
});
});
describe("when unsubscribing", function() {
var subExistsBefore = false,
subExistsAfter = true;
var systemSubscription = {};
before(function(){
systemSubscription = postal.subscribe( "postal", "subscription.*", function(x){
console.log("on unsubscription " + JSON.stringify(x));
if( x.event &&
x.event == "subscription.removed" &&
x.exchange == "MyExchange" &&
x.topic == "MyTopic") {
caughtUnsubscribeEvent = true;
};
});
subscription = postal.channel("MyExchange","MyTopic")
.subscribe(function() { });
subExistsBefore = postal.configuration.bus.subscriptions.MyExchange.MyTopic[0] !== undefined;
@ -48,6 +76,7 @@ QUnit.specify("postal.js", function(){
subExistsAfter = postal.configuration.bus.subscriptions.MyExchange.MyTopic.length !== 0;
});
after(function(){
systemSubscription.unsubscribe();
postal.configuration.bus.subscriptions = {};
});
it("subscription should exist before unsubscribe", function(){
@ -56,6 +85,9 @@ QUnit.specify("postal.js", function(){
it("subscription should not exist after unsubscribe", function(){
assert(subExistsAfter).isFalse();
});
it("should have captured unsubscription creation event in wire-tap", function() {
assert(caughtUnsubscribeEvent).isTrue();
});
});
describe("When publishing a message", function(){
var msgReceivedCnt = 0,

View file

@ -1,6 +1,7 @@
var DEFAULT_EXCHANGE = "/",
DEFAULT_PRIORITY = 50,
DEFAULT_DISPOSEAFTER = 0,
SYSTEM_EXCHANGE = "postal",
NO_OP = function() { },
parsePublishArgs = function(args) {
var parsed = { envelope: { } }, env;

View file

@ -5,9 +5,7 @@ var localBus = {
wireTaps: [],
publish: function(data, envelope) {
_.each(this.wireTaps,function(tap) {
tap(data, envelope);
});
this.notifyTaps(data, envelope);
_.each(this.subscriptions[envelope.exchange], function(topic) {
_.each(topic, function(binding){
@ -50,6 +48,12 @@ var localBus = {
return _.bind(function() { this.unsubscribe(subDef); }, this);
},
notifyTaps: function(data, envelope) {
_.each(this.wireTaps,function(tap) {
tap(data, envelope);
});
},
unsubscribe: function(config) {
if(this.subscriptions[config.exchange][config.topic]) {
var len = this.subscriptions[config.exchange][config.topic].length,

View file

@ -7,11 +7,24 @@ var SubscriptionDefinition = function(exchange, topic, callback) {
this.maxCalls = DEFAULT_DISPOSEAFTER;
this.onHandled = NO_OP;
this.context = null;
postal.publish(SYSTEM_EXCHANGE, "subscription.created",
{
event: "subscription.created",
exchange: exchange,
topic: topic
});
};
SubscriptionDefinition.prototype = {
unsubscribe: function() {
postal.configuration.bus.unsubscribe(this);
postal.publish(SYSTEM_EXCHANGE, "subscription.removed",
{
event: "subscription.removed",
exchange: this.exchange,
topic: this.topic
});
},
defer: function() {