initial pass for perf cleanup

This commit is contained in:
Jim Cowart 2012-03-01 03:40:47 -05:00
parent c0f25ea4b4
commit e59d3ab07a
19 changed files with 171 additions and 167 deletions

View file

@ -315,15 +315,15 @@ var postal = {
},
channel: function(exchange, topic) {
var exch = arguments.length === 2 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 2 ? topic : exchange;
var exch = topic ? exchange : DEFAULT_EXCHANGE,
tpc = topic || exchange;
return new ChannelDefinition(exch, tpc);
},
subscribe: function(exchange, topic, callback) {
var exch = arguments.length === 3 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 3 ? topic : exchange,
callbk = arguments.length === 3 ? callback : topic;
var callbk = callback || topic,
tpc = callback ? topic : exchange,
exch = callback ? exchange : DEFAULT_EXCHANGE;
var channel = this.channel(exch, tpc);
return channel.subscribe(callbk);
},

View file

@ -315,15 +315,15 @@ var postal = {
},
channel: function(exchange, topic) {
var exch = arguments.length === 2 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 2 ? topic : exchange;
var exch = topic ? exchange : DEFAULT_EXCHANGE,
tpc = topic || exchange;
return new ChannelDefinition(exch, tpc);
},
subscribe: function(exchange, topic, callback) {
var exch = arguments.length === 3 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 3 ? topic : exchange,
callbk = arguments.length === 3 ? callback : topic;
var callbk = callback || topic,
tpc = callback ? topic : exchange,
exch = callback ? exchange : DEFAULT_EXCHANGE;
var channel = this.channel(exch, tpc);
return channel.subscribe(callbk);
},

View file

@ -315,15 +315,15 @@ var postal = {
},
channel: function(exchange, topic) {
var exch = arguments.length === 2 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 2 ? topic : exchange;
var exch = topic ? exchange : DEFAULT_EXCHANGE,
tpc = topic || exchange;
return new ChannelDefinition(exch, tpc);
},
subscribe: function(exchange, topic, callback) {
var exch = arguments.length === 3 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 3 ? topic : exchange,
callbk = arguments.length === 3 ? callback : topic;
var callbk = callback || topic,
tpc = callback ? topic : exchange,
exch = callback ? exchange : DEFAULT_EXCHANGE;
var channel = this.channel(exch, tpc);
return channel.subscribe(callbk);
},

Binary file not shown.

File diff suppressed because one or more lines are too long

View file

@ -315,15 +315,15 @@ var postal = {
},
channel: function(exchange, topic) {
var exch = arguments.length === 2 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 2 ? topic : exchange;
var exch = topic ? exchange : DEFAULT_EXCHANGE,
tpc = topic || exchange;
return new ChannelDefinition(exch, tpc);
},
subscribe: function(exchange, topic, callback) {
var exch = arguments.length === 3 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 3 ? topic : exchange,
callbk = arguments.length === 3 ? callback : topic;
var callbk = callback || topic,
tpc = callback ? topic : exchange,
exch = callback ? exchange : DEFAULT_EXCHANGE;
var channel = this.channel(exch, tpc);
return channel.subscribe(callbk);
},

File diff suppressed because one or more lines are too long

View file

@ -315,15 +315,15 @@ var postal = {
},
channel: function(exchange, topic) {
var exch = arguments.length === 2 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 2 ? topic : exchange;
var exch = topic ? exchange : DEFAULT_EXCHANGE,
tpc = topic || exchange;
return new ChannelDefinition(exch, tpc);
},
subscribe: function(exchange, topic, callback) {
var exch = arguments.length === 3 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 3 ? topic : exchange,
callbk = arguments.length === 3 ? callback : topic;
var callbk = callback || topic,
tpc = callback ? topic : exchange,
exch = callback ? exchange : DEFAULT_EXCHANGE;
var channel = this.channel(exch, tpc);
return channel.subscribe(callbk);
},

View file

@ -1,6 +1,6 @@
QUnit.specify("postal.js", function(){
describe("bindingsResolver", function(){
describe("When calling regexify", function() {
/*describe("When calling regexify", function() {
describe("With a topic containing no special escape chars", function() {
var result = bindingsResolver.regexify("CoolTopic");
it("Should equal 'CoolTopic'", function(){
@ -43,7 +43,7 @@ QUnit.specify("postal.js", function(){
assert(result).equals("Top\\..*\\.Bottom");
});
});
});
});*/
describe("When calling compare", function(){
describe("With topic Top.Middle.Bottom and binding Top.Middle.Bottom", function(){
var result = bindingsResolver.compare("Top.Middle.Bottom", "Top.Middle.Bottom"),

View file

@ -10,16 +10,21 @@ QUnit.specify("postal.js", function(){
var systemSubscription = {};
before(function(){
systemSubscription = postal.subscribe( "postal", "subscription.created", function(x){
console.log("on subscription " + JSON.stringify(x));
if( x.event &&
x.event == "subscription.created" &&
x.exchange == "MyExchange" &&
x.topic == "MyTopic") {
caughtSubscribeEvent = true;
};
systemSubscription = postal.subscribe({
exchange: "postal",
topic: "subscription.created",
callback: function(x){
console.log("on subscription " + JSON.stringify(x));
if( x.event &&
x.event == "subscription.created" &&
x.exchange == "MyExchange" &&
x.topic == "MyTopic") {
caughtSubscribeEvent = true;
};
}
});
subscription = postal.channel("MyExchange","MyTopic")
subscription = postal.channel({ exchange: "MyExchange", topic: "MyTopic" })
.subscribe(function() { });
sub = postal.configuration.bus.subscriptions.MyExchange.MyTopic[0];
});
@ -60,16 +65,20 @@ QUnit.specify("postal.js", function(){
subExistsAfter = true;
var systemSubscription = {};
before(function(){
systemSubscription = postal.subscribe( "postal", "subscription.*", function(x){
console.log("on unsubscription " + JSON.stringify(x));
if( x.event &&
x.event == "subscription.removed" &&
x.exchange == "MyExchange" &&
x.topic == "MyTopic") {
caughtUnsubscribeEvent = true;
};
systemSubscription = postal.subscribe({
exchange: "postal",
topic: "subscription.*",
callback: function(x){
console.log("on unsubscription " + JSON.stringify(x));
if( x.event &&
x.event == "subscription.removed" &&
x.exchange == "MyExchange" &&
x.topic == "MyTopic") {
caughtUnsubscribeEvent = true;
};
}
});
subscription = postal.channel("MyExchange","MyTopic")
subscription = postal.channel({ exchange: "MyExchange", topic: "MyTopic" })
.subscribe(function() { });
subExistsBefore = postal.configuration.bus.subscriptions.MyExchange.MyTopic[0] !== undefined;
subscription.unsubscribe();
@ -93,7 +102,7 @@ QUnit.specify("postal.js", function(){
var msgReceivedCnt = 0,
msgData;
before(function(){
channel = postal.channel("MyExchange","MyTopic")
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic" })
subscription = channel.subscribe(function(data) { msgReceivedCnt++; msgData = data;});
channel.publish("Testing123");
subscription.unsubscribe();
@ -112,7 +121,7 @@ QUnit.specify("postal.js", function(){
describe("When subscribing with a disposeAfter of 5", function(){
var msgReceivedCnt = 0;
before(function(){
channel = postal.channel("MyExchange","MyTopic");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic" });
subscription = channel.subscribe(function(data) { msgReceivedCnt++; })
.disposeAfter(5);
channel.publish("Testing123");
@ -132,7 +141,7 @@ QUnit.specify("postal.js", function(){
describe("When subscribing and ignoring duplicates", function(){
var subInvokedCnt = 0;
before(function(){
channel = postal.channel("MyExchange", "MyTopic");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic" });
subscription = channel.subscribe(function(data) { subInvokedCnt++; })
.ignoreDuplicates();
channel.publish("Testing123");
@ -156,7 +165,7 @@ QUnit.specify("postal.js", function(){
describe("When subscribing and passing onHandled callback", function(){
var whte = false;
before(function(){
channel = postal.channel("MyExchange", "MyTopic");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic" });
subscription = channel.subscribe(function(data) { })
.whenHandledThenExecute(function() { whte = true; });
channel.publish("Testing123");
@ -175,7 +184,7 @@ QUnit.specify("postal.js", function(){
describe("When subscribing with one constraint returning true", function(){
var recvd = false;
before(function(){
channel = postal.channel("MyExchange", "MyTopic");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic" });
subscription = channel.subscribe(function(data) { recvd= true; })
.withConstraint(function() { return true; });
channel.publish("Testing123");
@ -194,7 +203,7 @@ QUnit.specify("postal.js", function(){
describe("When subscribing with one constraint returning false", function(){
var recvd = false;
before(function(){
channel = postal.channel("MyExchange", "MyTopic");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic" });
subscription = channel.subscribe(function(data) { recvd= true; })
.withConstraint(function() { return false; });
channel.publish("Testing123");
@ -213,7 +222,7 @@ QUnit.specify("postal.js", function(){
describe("When subscribing with multiple constraints returning true", function(){
var recvd = false;
before(function(){
channel = postal.channel("MyExchange", "MyTopic");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic" });
subscription = channel.subscribe(function(data) { recvd= true; })
.withConstraints([function() { return true; },
function() { return true; },
@ -234,7 +243,7 @@ QUnit.specify("postal.js", function(){
describe("When subscribing with multiple constraints and one returning false", function(){
var recvd = false;
before(function(){
channel = postal.channel("MyExchange", "MyTopic");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic" });
subscription = channel.subscribe(function(data) { recvd= true; })
.withConstraints([function() { return true; },
function() { return false; },
@ -260,7 +269,7 @@ QUnit.specify("postal.js", function(){
}
};
before(function(){
channel = postal.channel("MyExchange", "MyTopic");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic" });
subscription = channel.subscribe(function(data) { this.increment(); })
.withContext(obj);
channel.publish("Testing123");
@ -275,9 +284,9 @@ QUnit.specify("postal.js", function(){
describe("When subscribing with a hierarchical binding, no wildcards", function(){
var count = 0, channelB, channelC;
before(function(){
channel = postal.channel("MyExchange", "MyTopic.MiddleTopic.SubTopic");
channelB = postal.channel("MyExchange", "MyTopic.MiddleTopic");
channelC = postal.channel("MyExchange", "MyTopic.MiddleTopic.SubTopic.YetAnother");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic.SubTopic" });
channelB = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic" });
channelC = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic.SubTopic.YetAnother" });
subscription = channel.subscribe(function(data) { count++; });
channel.publish("Testing123");
channelB.publish("Testing123");
@ -294,10 +303,10 @@ QUnit.specify("postal.js", function(){
describe("When subscribing with a hierarchical binding, using #", function(){
var count = 0, channelB, channelC, channelD;
before(function(){
channel = postal.channel("MyExchange", "MyTopic.#.SubTopic");
channelB = postal.channel("MyExchange", "MyTopic.MiddleTopic");
channelC = postal.channel("MyExchange", "MyTopic.MiddleTopic.SubTopic");
channelD = postal.channel("MyExchange", "MyTopic.MiddleTopic.SubTopic.YetAnother");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic.#.SubTopic" });
channelB = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic" });
channelC = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic.SubTopic" });
channelD = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic.SubTopic.YetAnother" });
subscription = channel.subscribe(function(data) { count++; });
channelC.publish({exchange: "MyExchange", topic: "MyTopic.MiddleTopic.SubTopic", data: "Testing123"});
channelB.publish({exchange: "MyExchange", topic: "MyTopic.MiddleTopic", data: "Testing123"});
@ -314,10 +323,10 @@ QUnit.specify("postal.js", function(){
describe("When subscribing with a hierarchical binding, using *", function(){
var count = 0, channelB, channelC, channelD;
before(function(){
channel = postal.channel("MyExchange", "MyTopic.MiddleTopic.*");
channelB = postal.channel("MyExchange", "MyTopic.MiddleTopic");
channelC = postal.channel("MyExchange", "MyTopic.MiddleTopic.SubTopic");
channelD = postal.channel("MyExchange", "MyTopic.MiddleTopic.SubTopic.YetAnother");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic.*" });
channelB = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic" });
channelC = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic.SubTopic" });
channelD = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic.SubTopic.YetAnother" });
subscription = channel.subscribe(function(data) { count++; });
channelC.publish("Testing123");
@ -335,11 +344,11 @@ QUnit.specify("postal.js", function(){
describe("When subscribing with a hierarchical binding, using # and *", function(){
var count = 0, channelB, channelC, channelD, channelE;
before(function(){
channel = postal.channel("MyExchange", "MyTopic.#.*");
channelB = postal.channel("MyExchange", "MyTopic.MiddleTopic");
channelC = postal.channel("MyExchange", "MyTopic.MiddleTopic.SubTopic");
channelD = postal.channel("MyExchange", "MyTopic.MiddleTopic.SubTopic.YetAnother");
channelE = postal.channel("MyExchange", "OtherTopic.MiddleTopic.SubTopic.YetAnother");
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic.#.*" });
channelB = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic" });
channelC = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic.SubTopic" });
channelD = postal.channel({ exchange: "MyExchange", topic: "MyTopic.MiddleTopic.SubTopic.YetAnother" });
channelE = postal.channel({ exchange: "MyExchange", topic: "OtherTopic.MiddleTopic.SubTopic.YetAnother" });
subscription = channel.subscribe(function(data) { count++; });
channelC.publish({exchange: "MyExchange", topic: "MyTopic.MiddleTopic.SubTopic", data: "Testing123"});
@ -359,7 +368,7 @@ QUnit.specify("postal.js", function(){
var msgReceivedCnt = 0,
msgData;
before(function(){
channel = postal.channel("MyExchange","MyTopic")
channel = postal.channel({ exchange: "MyExchange", topic: "MyTopic" });
subscription = channel.subscribe(function(data) { msgReceivedCnt++; msgData = data;});
postal.publish("MyExchange", "MyTopic", "Testing123");
subscription.unsubscribe();
@ -377,7 +386,11 @@ QUnit.specify("postal.js", function(){
});
describe("When using shortcut subscribe api", function(){
before(function(){
subscription = postal.subscribe("MyExchange", "MyTopic", function() { });
subscription = postal.subscribe({
exchange: "MyExchange",
topic: "MyTopic",
callback: function() { }
});
sub = postal.configuration.bus.subscriptions.MyExchange.MyTopic[0];
});
after(function(){
@ -409,18 +422,20 @@ QUnit.specify("postal.js", function(){
});
});
describe("when subscribing and unsubscribing a wire tap", function() {
var wireTapData = [],
wireTapEnvelope = [],
var wireTapData,
wireTapEnvelope,
wiretap;
before(function(){
caughtUnsubscribeEvent = false;
wiretap = postal.addWireTap(function(msg, envelope) {
wireTapData = [];
wireTapEnvelope = [];
wiretap = postal.addWireTap(function(envelope, msg) {
wireTapData.push(msg);
wireTapEnvelope.push(envelope);
});
postal.publish("Oh.Hai.There", { data: "I'm in yer bus, tappin' yer subscriptionz..."});
postal.publish({ topic: "Oh.Hai.There" }, { data: "I'm in yer bus, tappin' yer subscriptionz..."});
wiretap();
postal.publish("Oh.Hai.There", { data: "I'm in yer bus, tappin' yer subscriptionz..."});
postal.publish({ topic: "Oh.Hai.There" }, { data: "I'm in yer bus, tappin' yer subscriptionz..."});
});
after(function(){
postal.configuration.bus.subscriptions = {};
@ -433,6 +448,7 @@ QUnit.specify("postal.js", function(){
assert(wireTapData[0].data).equals("I'm in yer bus, tappin' yer subscriptionz...");
});
it("wireTap envelope should match expected results", function() {
console.log("ONOES! " + JSON.stringify(wireTapEnvelope));
assert(wireTapEnvelope[0].exchange).equals(DEFAULT_EXCHANGE);
assert(wireTapEnvelope[0].topic).equals("Oh.Hai.There");
});
@ -444,7 +460,7 @@ QUnit.specify("postal.js", function(){
linkages;
before(function(){
linkages = postal.bindExchanges({ exchange: "sourceExchange" }, { exchange: "destinationExchange" });
subscription = postal.subscribe("destinationExchange", "Oh.Hai.There", function(data, env) {
subscription = postal.subscribe({ exchange: "destinationExchange", topic: "Oh.Hai.There", callback: function(data, env) {
destData.push(data);
destEnv.push(env);
});
@ -473,7 +489,7 @@ QUnit.specify("postal.js", function(){
linkages;
before(function(){
linkages = postal.bindExchanges({ exchange: "sourceExchange", topic: "Oh.Hai.There" }, { exchange: "destinationExchange", topic: "kthxbye" });
subscription = postal.subscribe("destinationExchange", "kthxbye", function(data, env) {
subscription = postal.subscribe({ exchange: "destinationExchange", topic: "kthxbye", callback: function(data, env) {
destData.push(data);
destEnv.push(env);
});
@ -502,7 +518,7 @@ QUnit.specify("postal.js", function(){
linkages;
before(function(){
linkages = postal.bindExchanges({ exchange: "sourceExchange" }, { exchange: "destinationExchange", topic: function(tpc) { return "NewTopic." + tpc; } });
subscription = postal.subscribe("destinationExchange", "NewTopic.Oh.Hai.There", function(data, env) {
subscription = postal.subscribe({ exchange: "destinationExchange", topic: "NewTopic.Oh.Hai.There", callback: function(data, env) {
destData.push(data);
destEnv.push(env);
});

View file

@ -1,27 +1,39 @@
var publishPicker = {
"2" : function(envelope, payload) {
if(!envelope.exchange) {
envelope.exchange = DEFAULT_EXCHANGE;
}
postal.configuration.bus.publish(envelope, payload);
},
"3" : function(exchange, topic, payload) {
postal.configuration.bus.publish({ exchange: exchange, topic: topic }, payload);
}
};
var postal = {
configuration: {
bus: localBus,
resolver: bindingsResolver
},
channel: function(exchange, topic) {
var exch = arguments.length === 2 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 2 ? topic : exchange;
channel: function(options) {
var exch = options.exchange || DEFAULT_EXCHANGE,
tpc = options.topic;
return new ChannelDefinition(exch, tpc);
},
subscribe: function(exchange, topic, callback) {
var exch = arguments.length === 3 ? exchange : DEFAULT_EXCHANGE,
tpc = arguments.length === 3 ? topic : exchange,
callbk = arguments.length === 3 ? callback : topic;
var channel = this.channel(exch, tpc);
return channel.subscribe(callbk);
subscribe: function(options) {
var callback = options.callback,
topic = options.topic,
exchange = options.exchange || DEFAULT_EXCHANGE;
return new ChannelDefinition(exchange, topic).subscribe(callback);
},
publish: function(exchange, topic, payload, envelopeOptions) {
var parsedArgs = parsePublishArgs([].slice.call(arguments,0));
var channel = this.channel(parsedArgs.envelope.exchange, parsedArgs.envelope.topic);
channel.publish(parsedArgs.payload, parsedArgs.envelope);
publish: function() {
var len = arguments.length;
if(publishPicker[len]) {
publishPicker[len].apply(this, arguments);
}
},
addWireTap: function(callback) {
@ -29,21 +41,28 @@ var postal = {
},
bindExchanges: function(sources, destinations) {
var subscriptions = [];
var subscriptions;
if(!_.isArray(sources)) {
sources = [sources];
}
if(!_.isArray(destinations)) {
destinations = [destinations];
}
subscriptions = new Array(sources.length * destinations.length);
_.each(sources, function(source){
var sourceTopic = source.topic || "*";
_.each(destinations, function(destination) {
var destExchange = destination.exchange || DEFAULT_EXCHANGE;
subscriptions.push(
postal.subscribe(source.exchange || DEFAULT_EXCHANGE, source.topic || "*", function(msg, env) {
var destTopic = _.isFunction(destination.topic) ? destination.topic(env.topic) : destination.topic || env.topic;
postal.publish(destExchange, destTopic, msg);
postal.subscribe({
exchange: source.exchange || DEFAULT_EXCHANGE,
topic: source.topic || "*",
callback : function(msg, env) {
var newEnv = env;
newEnv.topic = _.isFunction(destination.topic) ? destination.topic(env.topic) : destination.topic || env.topic;
newEnv.exchange = destExchange;
postal.publish(newEnv, msg);
}
})
);
});

View file

@ -5,7 +5,10 @@ var bindingsResolver = {
if(this.cache[topic] && this.cache[topic][binding]) {
return true;
}
var rgx = new RegExp("^" + this.regexify(binding) + "$"), // match from start to end of string
// binding.replace(/\./g,"\\.") // escape actual periods
// .replace(/\*/g, ".*") // asterisks match any value
// .replace(/#/g, "[A-Z,a-z,0-9]*"); // hash matches any alpha-numeric 'word'
var rgx = new RegExp("^" + binding.replace(/\./g,"\\.").replace(/\*/g, ".*").replace(/#/g, "[A-Z,a-z,0-9]*") + "$"),
result = rgx.test(topic);
if(result) {
if(!this.cache[topic]) {
@ -14,11 +17,5 @@ var bindingsResolver = {
this.cache[topic][binding] = true;
}
return result;
},
regexify: function(binding) {
return binding.replace(/\./g,"\\.") // escape actual periods
.replace(/\*/g, ".*") // asterisks match any value
.replace(/#/g, "[A-Z,a-z,0-9]*"); // hash matches any alpha-numeric 'word'
}
};

View file

@ -11,11 +11,10 @@ ChannelDefinition.prototype = {
},
publish: function(data, envelope) {
var env = _.extend({
exchange: this.exchange,
timeStamp: new Date(),
topic: this.topic
}, envelope);
postal.configuration.bus.publish(data, env);
var env = envelope || {};
env.exchange = this.exchange;
env.timeStamp = new Date();
env.topic = this.topic;
postal.configuration.bus.publish(env, data);
}
};

View file

@ -2,36 +2,4 @@ var DEFAULT_EXCHANGE = "/",
DEFAULT_PRIORITY = 50,
DEFAULT_DISPOSEAFTER = 0,
SYSTEM_EXCHANGE = "postal",
NO_OP = function() { },
parsePublishArgs = function(args) {
var parsed = { envelope: { } }, env;
switch(args.length) {
case 3:
if(typeof args[1] === "Object" && typeof args[2] === "Object") {
parsed.envelope.exchange = DEFAULT_EXCHANGE;
parsed.envelope.topic = args[0];
parsed.payload = args[1];
env = parsed.envelope;
parsed.envelope = _.extend(env, args[2]);
}
else {
parsed.envelope.exchange = args[0];
parsed.envelope.topic = args[1];
parsed.payload = args[2];
}
break;
case 4:
parsed.envelope.exchange = args[0];
parsed.envelope.topic = args[1];
parsed.payload = args[2];
env = parsed.envelope;
parsed.envelope = _.extend(env, args[3]);
break;
default:
parsed.envelope.exchange = DEFAULT_EXCHANGE;
parsed.envelope.topic = args[0];
parsed.payload = args[1];
break;
}
return parsed;
};
NO_OP = function() { };

View file

@ -2,10 +2,12 @@ var localBus = {
subscriptions: {},
wireTaps: [],
wireTaps: new Array(0),
publish: function(data, envelope) {
this.notifyTaps(data, envelope);
publish: function(envelope, data) {
_.each(this.wireTaps,function(tap) {
tap(envelope, data);
});
_.each(this.subscriptions[envelope.exchange], function(topic) {
_.each(topic, function(binding){
@ -22,10 +24,15 @@ var localBus = {
},
subscribe: function(subDef) {
var idx, found, fn, exch, subs;
var idx, found, fn, exch = this.subscriptions[subDef.exchange], subs;
exch = this.subscriptions[subDef.exchange] = this.subscriptions[subDef.exchange] || {};
subs = this.subscriptions[subDef.exchange][subDef.topic] = this.subscriptions[subDef.exchange][subDef.topic] || [];
if(!exch) {
exch = this.subscriptions[subDef.exchange] = {};
}
subs = this.subscriptions[subDef.exchange][subDef.topic]
if(!subs) {
subs = this.subscriptions[subDef.exchange][subDef.topic] = new Array(0);
}
idx = subs.length - 1;
//if(!_.any(subs, function(cfg) { return cfg === subDef; })) {
@ -42,12 +49,6 @@ var localBus = {
//}
},
notifyTaps: function(data, envelope) {
_.each(this.wireTaps,function(tap) {
tap(data, envelope);
});
},
unsubscribe: function(config) {
if(this.subscriptions[config.exchange][config.topic]) {
var len = this.subscriptions[config.exchange][config.topic].length,

View file

@ -3,25 +3,29 @@ var SubscriptionDefinition = function(exchange, topic, callback) {
this.topic = topic;
this.callback = callback;
this.priority = DEFAULT_PRIORITY;
this.constraints = [];
this.constraints = new Array(0);
this.maxCalls = DEFAULT_DISPOSEAFTER;
this.onHandled = NO_OP;
this.context = null;
_.defer(function() {
postal.publish(SYSTEM_EXCHANGE, "subscription.created",
{
event: "subscription.created",
exchange: exchange,
topic: topic
})
});
postal.publish({
exchange: SYSTEM_EXCHANGE,
topic: "subscription.created"
},
{
event: "subscription.created",
exchange: exchange,
topic: topic
});
};
SubscriptionDefinition.prototype = {
unsubscribe: function() {
postal.configuration.bus.unsubscribe(this);
postal.publish(SYSTEM_EXCHANGE, "subscription.removed",
postal.publish({
exchange: SYSTEM_EXCHANGE,
topic: "subscription.removed"
},
{
event: "subscription.removed",
exchange: this.exchange,