Added the bindExchangeses call to the API, including initial tests

This commit is contained in:
Jim Cowart 2012-02-26 16:22:54 -05:00
parent a6d5fa01b7
commit 6d3e301cf4
4 changed files with 150 additions and 6 deletions

View file

@ -3,4 +3,5 @@ var express = require('express'),
path = require("path");
app.use("/", express.static(path.resolve(__dirname)));
app.listen(1581);
app.listen(1581);
console.log("Listening on port 1581");

View file

@ -408,6 +408,123 @@ QUnit.specify("postal.js", function(){
assert(sub.context).isNull();
});
});
// TODO: Add test coverage for direct unsubscribe and wire taps
describe("when subscribing and unsubscribing a wire tap", function() {
var wireTapData = [],
wireTapEnvelope = [],
wiretap;
before(function(){
caughtUnsubscribeEvent = false;
wiretap = postal.addWireTap(function(msg, envelope) {
wireTapData.push(msg);
wireTapEnvelope.push(envelope);
});
postal.publish("Oh.Hai.There", { data: "I'm in yer bus, tappin' yer subscriptionz..."});
wiretap();
postal.publish("Oh.Hai.There", { data: "I'm in yer bus, tappin' yer subscriptionz..."});
});
after(function(){
postal.configuration.bus.subscriptions = {};
});
it("wire tap should have been invoked only once", function(){
assert(wireTapData.length).equals(1);
assert(wireTapEnvelope.length).equals(1);
});
it("wireTap data should match expected results", function(){
assert(wireTapData[0].data).equals("I'm in yer bus, tappin' yer subscriptionz...");
});
it("wireTap envelope should match expected results", function() {
assert(wireTapEnvelope[0].exchange).equals(DEFAULT_EXCHANGE);
assert(wireTapEnvelope[0].topic).equals("Oh.Hai.There");
});
});
describe("when binding exchange - one source to one destination", function(){
describe("with only exchange values provided", function(){
var destData = [],
destEnv = [],
linkages;
before(function(){
linkages = postal.bindExchanges({ exchange: "sourceExchange" }, { exchange: "destinationExchange" });
subscription = postal.subscribe("destinationExchange", "Oh.Hai.There", function(data, env) {
destData.push(data);
destEnv.push(env);
});
postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."});
linkages[0].unsubscribe();
postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."});
});
after(function(){
postal.configuration.bus.subscriptions = {};
});
it("linked subscription should only have been invoked once", function(){
assert(destData.length).equals(1);
assert(destEnv.length).equals(1);
});
it("linked subscription data should match expected results", function(){
assert(destData[0].data).equals("I'm in yer bus, linkin' to yer subscriptionz...");
});
it("linked subscription envelope should match expected results", function() {
assert(destEnv[0].exchange).equals("destinationExchange");
assert(destEnv[0].topic).equals("Oh.Hai.There");
});
});
describe("with exchange and static topic values provided", function(){
var destData = [],
destEnv = [],
linkages;
before(function(){
linkages = postal.bindExchanges({ exchange: "sourceExchange", topic: "Oh.Hai.There" }, { exchange: "destinationExchange", topic: "kthxbye" });
subscription = postal.subscribe("destinationExchange", "kthxbye", function(data, env) {
destData.push(data);
destEnv.push(env);
});
postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."});
linkages[0].unsubscribe();
postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."});
});
after(function(){
postal.configuration.bus.subscriptions = {};
});
it("linked subscription should only have been invoked once", function(){
assert(destData.length).equals(1);
assert(destEnv.length).equals(1);
});
it("linked subscription data should match expected results", function(){
assert(destData[0].data).equals("I'm in yer bus, linkin' to yer subscriptionz...");
});
it("linked subscription envelope should match expected results", function() {
assert(destEnv[0].exchange).equals("destinationExchange");
assert(destEnv[0].topic).equals("kthxbye");
});
});
describe("with exchange and topic transform values provided", function(){
var destData = [],
destEnv = [],
linkages;
before(function(){
linkages = postal.bindExchanges({ exchange: "sourceExchange" }, { exchange: "destinationExchange", topic: function(tpc) { return "NewTopic." + tpc; } });
subscription = postal.subscribe("destinationExchange", "NewTopic.Oh.Hai.There", function(data, env) {
destData.push(data);
destEnv.push(env);
});
postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."});
linkages[0].unsubscribe();
postal.publish("sourceExchange", "Oh.Hai.There", { data: "I'm in yer bus, linkin' to yer subscriptionz..."});
});
after(function(){
postal.configuration.bus.subscriptions = {};
});
it("linked subscription should only have been invoked once", function(){
assert(destData.length).equals(1);
assert(destEnv.length).equals(1);
});
it("linked subscription data should match expected results", function(){
assert(destData[0].data).equals("I'm in yer bus, linkin' to yer subscriptionz...");
});
it("linked subscription envelope should match expected results", function() {
assert(destEnv[0].exchange).equals("destinationExchange");
assert(destEnv[0].topic).equals("NewTopic.Oh.Hai.There");
});
});
});
});
});

View file

@ -26,5 +26,30 @@ var postal = {
addWireTap: function(callback) {
return this.configuration.bus.addWireTap(callback);
}
},
bindExchanges: function(sources, destinations) {
var subscriptions = [];
if(!_.isArray(sources)) {
sources = [sources];
}
if(!_.isArray(destinations)) {
destinations = [destinations];
}
_.each(sources, function(source){
var sourceTopic = source.topic || "*";
console.log("SOURCE: " + source.exchange + " | " + sourceTopic);
_.each(destinations, function(destination) {
var destExchange = destination.exchange || DEFAULT_EXCHANGE;
subscriptions.push(
postal.subscribe(source.exchange || DEFAULT_EXCHANGE, source.topic || "*", function(msg, env) {
var destTopic = _.isFunction(destination.topic) ? destination.topic(env.topic) : destination.topic || env.topic;
console.log("DESTINATION: " + destExchange + " | " + destTopic);
postal.publish(destExchange, destTopic, msg);
})
);
});
});
return subscriptions;
}
};

View file

@ -68,11 +68,12 @@ var localBus = {
},
addWireTap: function(callback) {
this.wireTaps.push(callback);
var self = this;
self.wireTaps.push(callback);
return function() {
var idx = this.wireTaps.indexOf(callback);
var idx = self.wireTaps.indexOf(callback);
if(idx !== -1) {
this.wireTaps.splice(idx,1);
self.wireTaps.splice(idx,1);
}
};
}