diff --git a/nodetesthost.js b/nodetesthost.js index 88ac18d..7781db5 100644 --- a/nodetesthost.js +++ b/nodetesthost.js @@ -3,4 +3,5 @@ var express = require('express'), path = require("path"); app.use("/", express.static(path.resolve(__dirname))); -app.listen(1581); \ No newline at end of file +app.listen(1581); +console.log("Listening on port 1581"); \ No newline at end of file diff --git a/spec/Postal.spec.js b/spec/Postal.spec.js index a178ace..95c3d84 100644 --- a/spec/Postal.spec.js +++ b/spec/Postal.spec.js @@ -408,6 +408,123 @@ QUnit.specify("postal.js", function(){ assert(sub.context).isNull(); }); }); - // TODO: Add test coverage for direct unsubscribe and wire taps + describe("when subscribing and unsubscribing a wire tap", function() { + var wireTapData = [], + wireTapEnvelope = [], + wiretap; + before(function(){ + caughtUnsubscribeEvent = false; + wiretap = postal.addWireTap(function(msg, envelope) { + wireTapData.push(msg); + wireTapEnvelope.push(envelope); + }); + postal.publish("Oh.Hai.There", { data: "I'm in yer bus, tappin' yer subscriptionz..."}); + wiretap(); + postal.publish("Oh.Hai.There", { data: "I'm in yer bus, tappin' yer subscriptionz..."}); + }); + after(function(){ + postal.configuration.bus.subscriptions = {}; + }); + it("wire tap should have been invoked only once", function(){ + assert(wireTapData.length).equals(1); + assert(wireTapEnvelope.length).equals(1); + }); + it("wireTap data should match expected results", function(){ + assert(wireTapData[0].data).equals("I'm in yer bus, tappin' yer subscriptionz..."); + }); + it("wireTap envelope should match expected results", function() { + assert(wireTapEnvelope[0].exchange).equals(DEFAULT_EXCHANGE); + assert(wireTapEnvelope[0].topic).equals("Oh.Hai.There"); + }); + }); + describe("when binding exchange - one source to one destination", function(){ + describe("with only exchange values provided", function(){ + var destData = [], + destEnv = [], + linkages; + before(function(){ + linkages = postal.bindExchanges({ exchange: "sourceExchange" }, { exchange: "destinationExchange" }); + subscription = postal.subscribe("destinationExchange", "Oh.Hai.There", function(data, env) { + destData.push(data); + destEnv.push(env); + }); + postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."}); + linkages[0].unsubscribe(); + postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."}); + }); + after(function(){ + postal.configuration.bus.subscriptions = {}; + }); + it("linked subscription should only have been invoked once", function(){ + assert(destData.length).equals(1); + assert(destEnv.length).equals(1); + }); + it("linked subscription data should match expected results", function(){ + assert(destData[0].data).equals("I'm in yer bus, linkin' to yer subscriptionz..."); + }); + it("linked subscription envelope should match expected results", function() { + assert(destEnv[0].exchange).equals("destinationExchange"); + assert(destEnv[0].topic).equals("Oh.Hai.There"); + }); + }); + describe("with exchange and static topic values provided", function(){ + var destData = [], + destEnv = [], + linkages; + before(function(){ + linkages = postal.bindExchanges({ exchange: "sourceExchange", topic: "Oh.Hai.There" }, { exchange: "destinationExchange", topic: "kthxbye" }); + subscription = postal.subscribe("destinationExchange", "kthxbye", function(data, env) { + destData.push(data); + destEnv.push(env); + }); + postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."}); + linkages[0].unsubscribe(); + postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."}); + }); + after(function(){ + postal.configuration.bus.subscriptions = {}; + }); + it("linked subscription should only have been invoked once", function(){ + assert(destData.length).equals(1); + assert(destEnv.length).equals(1); + }); + it("linked subscription data should match expected results", function(){ + assert(destData[0].data).equals("I'm in yer bus, linkin' to yer subscriptionz..."); + }); + it("linked subscription envelope should match expected results", function() { + assert(destEnv[0].exchange).equals("destinationExchange"); + assert(destEnv[0].topic).equals("kthxbye"); + }); + }); + describe("with exchange and topic transform values provided", function(){ + var destData = [], + destEnv = [], + linkages; + before(function(){ + linkages = postal.bindExchanges({ exchange: "sourceExchange" }, { exchange: "destinationExchange", topic: function(tpc) { return "NewTopic." + tpc; } }); + subscription = postal.subscribe("destinationExchange", "NewTopic.Oh.Hai.There", function(data, env) { + destData.push(data); + destEnv.push(env); + }); + postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."}); + linkages[0].unsubscribe(); + postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."}); + }); + after(function(){ + postal.configuration.bus.subscriptions = {}; + }); + it("linked subscription should only have been invoked once", function(){ + assert(destData.length).equals(1); + assert(destEnv.length).equals(1); + }); + it("linked subscription data should match expected results", function(){ + assert(destData[0].data).equals("I'm in yer bus, linkin' to yer subscriptionz..."); + }); + it("linked subscription envelope should match expected results", function() { + assert(destEnv[0].exchange).equals("destinationExchange"); + assert(destEnv[0].topic).equals("NewTopic.Oh.Hai.There"); + }); + }); + }); }); }); \ No newline at end of file diff --git a/src/main/Api.js b/src/main/Api.js index d787f1a..555bc42 100644 --- a/src/main/Api.js +++ b/src/main/Api.js @@ -26,5 +26,30 @@ var postal = { addWireTap: function(callback) { return this.configuration.bus.addWireTap(callback); - } + }, + + bindExchanges: function(sources, destinations) { + var subscriptions = []; + if(!_.isArray(sources)) { + sources = [sources]; + } + if(!_.isArray(destinations)) { + destinations = [destinations]; + } + _.each(sources, function(source){ + var sourceTopic = source.topic || "*"; + console.log("SOURCE: " + source.exchange + " | " + sourceTopic); + _.each(destinations, function(destination) { + var destExchange = destination.exchange || DEFAULT_EXCHANGE; + subscriptions.push( + postal.subscribe(source.exchange || DEFAULT_EXCHANGE, source.topic || "*", function(msg, env) { + var destTopic = _.isFunction(destination.topic) ? destination.topic(env.topic) : destination.topic || env.topic; + console.log("DESTINATION: " + destExchange + " | " + destTopic); + postal.publish(destExchange, destTopic, msg); + }) + ); + }); + }); + return subscriptions; + } }; diff --git a/src/main/LocalBus.js b/src/main/LocalBus.js index 2a4734e..f140207 100644 --- a/src/main/LocalBus.js +++ b/src/main/LocalBus.js @@ -68,11 +68,12 @@ var localBus = { }, addWireTap: function(callback) { - this.wireTaps.push(callback); + var self = this; + self.wireTaps.push(callback); return function() { - var idx = this.wireTaps.indexOf(callback); + var idx = self.wireTaps.indexOf(callback); if(idx !== -1) { - this.wireTaps.splice(idx,1); + self.wireTaps.splice(idx,1); } }; }