Core re-write of broker done....more to come...

This commit is contained in:
Jim Cowart 2011-08-09 22:14:22 -04:00
parent 6dabb5795c
commit 6818e7e808
4 changed files with 397 additions and 11 deletions

200
spec/OLDbroker.spec.js Normal file
View file

@ -0,0 +1,200 @@
/*
QUnit.specify("postal.js", function(){
describe("broker", function(){
describe("When publishing a message to a specific one level topic", function() {
describe("with one recipient", function() {
var broker = new MessageBroker(),
objA = {
messageReceived: false
};
broker.subscribe("Test", function() { objA.messageReceived = true; });
broker.publish("Test", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageReceived).isTrue();
});
});
describe("with two recipients", function() {
var broker = new MessageBroker(),
ObjA = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients", function() {
this.messageReceived = true;
}.bind(this));
},
ObjB = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients", function() {
this.messageReceived = true;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients", {});
it("the subscription callback should be invoked on 'a'", function(){
assert(a.messageReceived).isTrue();
});
it("the subscription callback should be invoked on 'b'", function(){
assert(b.messageReceived).isTrue();
});
});
});
describe("When publishing a message to a specific multi-level topic", function() {
describe("with one recipient", function() {
var broker = new MessageBroker(),
objA = {
messageReceived: false
};
broker.subscribe("Test.Topic", function() { objA.messageReceived = true; });
broker.publish("Test.Topic", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageReceived).isTrue();
});
});
describe("with two recipients", function() {
var broker = new MessageBroker(),
ObjA = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.Listening", function() {
this.messageReceived = true;
}.bind(this));
},
ObjB = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.Listening", function() {
this.messageReceived = true;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients.Listening", {});
it("the subscription callback should be invoked on 'a'", function(){
assert(a.messageReceived).isTrue();
});
it("the subscription callback should be invoked on 'b'", function(){
assert(b.messageReceived).isTrue();
});
});
});
describe("When publishing a wildcard message to a multi-level topic", function() {
describe("with one recipient", function() {
var broker = new MessageBroker(),
objA = {
messageReceived: false
};
broker.subscribe("Test.*", function() { objA.messageReceived = true; });
broker.publish("Test.Topic", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageReceived).isTrue();
});
});
describe("with two recipients", function() {
var broker = new MessageBroker(),
ObjA = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.Listening.*", function() {
this.messageReceived = true;
}.bind(this));
},
ObjB = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.*", function() {
this.messageReceived = true;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients.Listening.Closely", {});
it("the subscription callback should be invoked on 'a'", function(){
assert(a.messageReceived).isTrue();
});
it("the subscription callback should be invoked on 'b'", function(){
assert(b.messageReceived).isTrue();
});
});
});
describe("When unsubscribing using provided callback", function() {
describe("with one callback", function() {
var broker = new MessageBroker(),
objA = {
messageCount: 0
};
var unsubscribe = broker.subscribe("Test.*", function() { objA.messageCount++; });
broker.publish("Test.Topic", {});
unsubscribe();
broker.publish("Test.Topic", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageCount).equals(1);
});
});
describe("with two callbacks", function() {
var broker = new MessageBroker(),
ObjA = function() {
var _unsubscribe;
this.messageCount = 0;
this.unsubscribe = function() {
if(_unsubscribe) {
_unsubscribe();
}
};
_unsubscribe = broker.subscribe("TwoRecipients.Listening.*", function() {
this.messageCount++;
}.bind(this));
},
ObjB = function() {
var _unsubscribe;
this.messageCount = 0;
this.unsubscribe = function() {
if(_unsubscribe) {
_unsubscribe();
}
};
_unsubscribe = broker.subscribe("TwoRecipients.*", function() {
this.messageCount++;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients.Listening", {});
a.unsubscribe();
broker.publish("TwoRecipients.Listening", {});
it("First object message count should be 1", function(){
assert(a.messageCount).equals(1);
});
it("Second object message count should be 2", function(){
assert(b.messageCount).equals(2);
});
});
});
});
});*/

View file

@ -2,7 +2,7 @@ QUnit.specify("postal.js", function(){
describe("broker", function(){
describe("When publishing a message to a specific one level topic", function() {
describe("with one recipient", function() {
var broker = new MessageBroker(),
var broker = new Broker(),
objA = {
messageReceived: false
};
@ -15,7 +15,7 @@ QUnit.specify("postal.js", function(){
});
});
describe("with two recipients", function() {
var broker = new MessageBroker(),
var broker = new Broker(),
ObjA = function() {
this.messageReceived = false;
@ -46,7 +46,7 @@ QUnit.specify("postal.js", function(){
});
describe("When publishing a message to a specific multi-level topic", function() {
describe("with one recipient", function() {
var broker = new MessageBroker(),
var broker = new Broker(),
objA = {
messageReceived: false
};
@ -59,7 +59,7 @@ QUnit.specify("postal.js", function(){
});
});
describe("with two recipients", function() {
var broker = new MessageBroker(),
var broker = new Broker(),
ObjA = function() {
this.messageReceived = false;
@ -90,7 +90,7 @@ QUnit.specify("postal.js", function(){
});
describe("When publishing a wildcard message to a multi-level topic", function() {
describe("with one recipient", function() {
var broker = new MessageBroker(),
var broker = new Broker(),
objA = {
messageReceived: false
};
@ -103,7 +103,7 @@ QUnit.specify("postal.js", function(){
});
});
describe("with two recipients", function() {
var broker = new MessageBroker(),
var broker = new Broker(),
ObjA = function() {
this.messageReceived = false;
@ -134,7 +134,7 @@ QUnit.specify("postal.js", function(){
});
describe("When unsubscribing using provided callback", function() {
describe("with one callback", function() {
var broker = new MessageBroker(),
var broker = new Broker(),
objA = {
messageCount: 0
};
@ -149,7 +149,7 @@ QUnit.specify("postal.js", function(){
});
});
describe("with two callbacks", function() {
var broker = new MessageBroker(),
var broker = new Broker(),
ObjA = function() {
var _unsubscribe;
@ -182,9 +182,9 @@ QUnit.specify("postal.js", function(){
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients.Listening", {});
broker.publish("TwoRecipients.Listening.Something", {});
a.unsubscribe();
broker.publish("TwoRecipients.Listening", {});
broker.publish("TwoRecipients.Listening.Something", {});
it("First object message count should be 1", function(){
assert(a.messageCount).equals(1);
@ -195,5 +195,23 @@ QUnit.specify("postal.js", function(){
});
});
});
describe("When publishing a message on a specific exchange", function(){
describe("With a valid exchange", function() {
var broker = new Broker(),
objA = {
messageCount: 0
};
var unsubscribe = broker.subscribe("MyExchange", "Test.*", function() { objA.messageCount++; });
broker.publish("MyExchange", "Test.Topic", {});
unsubscribe();
broker.publish("MyExchange", "Test.Topic", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageCount).equals(1);
});
})
});
});
});

View file

@ -5,7 +5,7 @@
<script type="text/javascript" src="../lib/qunit.js"></script>
<script type="text/javascript" src="../lib/pavlov.js"></script>
<script type="text/javascript" src="../src/languageExtensions.js"></script>
<script type="text/javascript" src="../src/broker.js"></script>
<script type="text/javascript" src="../src/broker2.js"></script>
<script type="text/javascript" src="languageExtensions.spec.js"></script>
<script type="text/javascript" src="broker.spec.js"></script>
<link rel="stylesheet" href="../lib/qunit.css" type="text/css" media="screen" />

168
src/broker2.js Normal file
View file

@ -0,0 +1,168 @@
var DEFAULT_EXCHANGE = "/",
_forEachKeyValue = function(object, callback) {
for(var x in object) {
if(object.hasOwnProperty(x)) {
callback(x, object[x]);
}
}
};
var Broker = function() {
var _regexify = function(topic) {
if(!this[topic]) {
this[topic] = topic.replace(".", "\.").replace("*", ".*");
}
return this[topic];
}.bind(this),
_isTopicMatch = function(topic, comparison) {
if(!this[topic + '_' + comparison]) {
this[topic + '_' + comparison] = topic === comparison ||
(comparison.indexOf("*") !== -1 && topic.search(_regexify(comparison)) !== -1) ||
(topic.indexOf("*") !== -1 && comparison.search(_regexify(topic)) !== -1);
}
return this[topic + '_' + comparison];
}.bind(this);
this.subscriptions = {};
this.subscriptions[DEFAULT_EXCHANGE] = {};
/*
options object has the following optional members:
{
once: {true || false (true indicates a fire-only-once subscription},
priority: {integer value - lower value == higher priority},
context: {the "this" context for the callback invocation}
}
*/
this.subscribe = function(exchange, topic, callback, options) {
var _args = slice.call(arguments, 0),
_exchange,
_topicList, // we allow multiple topics to be subscribed in one call.,
_once = false,
_subData = {
callback: function() { /* placeholder noop */ },
priority: 50,
context: null,
onFired: function() { /* noop */ }
},
_idx,
_found;
if(_args.length === 2) { // expecting topic and callback
_exchange = DEFAULT_EXCHANGE;
_topicList = _args[0].split(/\s/);
_subData.callback = _args[1];
}
else if(_args.length === 3 && typeof _args[2] === 'function') { // expecting exchange, topic, callback
_exchange = exchange;
_topicList = _args[1].split(/\s/);
_subData.callback = _args[2];
}
else if(_args.length === 3 && typeof _args[2] === 'object') {
_exchange = DEFAULT_EXCHANGE;
_topicList = _args[0].split(/\s/);
_subData.callback = _args[1];
_subData.priority = _args[2].priority ? _args[2].priority : 50;
_subData.context = _args[2].context ? _args[2].context : null;
_once = _args[2].once ? _args[2].once : false;
}
if(_once) {
_subData.onFired = function() {
this.unsubscribe.apply(this,[_exchange, _topicList.join(' '), _subData.callback]);
}.bind(this);
}
if(!this.subscriptions[_exchange]) {
this.subscriptions[_exchange] = {};
}
_topicList.forEach(function(tpc) {
if(!this.subscriptions[_exchange][tpc]) {
this.subscriptions[_exchange][tpc] = [_subData];
}
else {
_idx = this.subscriptions[_exchange][tpc].length - 1;
if(this.subscriptions[_exchange][tpc].filter(function(sub) { return sub === callback; }).length === 0) {
for(; _idx >= 0; _idx--) {
if(this.subscriptions[_exchange][tpc][_idx].priority <= _subData.priority) {
this.subscriptions[_exchange][tpc].splice(_idx + 1, 0, _subData);
_found = true;
break;
}
}
if(!_found) {
this.subscriptions[_exchange][tpc].unshift(_subData);
}
}
}
}, this);
// return callback for un-subscribing...
return function() {
this.unsubscribe(_exchange, _topicList.join(' '), _subData.callback);
}.bind(this);
};
this.unsubscribe = function(exchange, topic, callback) {
var _args = slice.call(arguments,0),
_exchange,
_topicList, // we allow multiple topics to be unsubscribed in one call.
_callback;
if(_args.length === 2) {
_exchange = DEFAULT_EXCHANGE;
_topicList = _args[0].split(/\s/);
_callback = _args[1];
}
else if(_args.length === 3) {
_exchange = exchange;
_topicList = topic.split(/\s/);
_callback = callback;
}
_topicList.forEach(function(tpc) {
if(this.subscriptions[_exchange][tpc]) {
var _len = this.subscriptions[_exchange][tpc].length,
_idx = 0;
for ( ; _idx < _len; _idx++ ) {
if (this.subscriptions[_exchange][tpc][_idx].callback === callback) {
this.subscriptions[_exchange][tpc].splice( _idx, 1 );
break;
}
}
}
},this);
};
this.publish = function(exchange, topic, data) {
var _args = slice.call(arguments,0),
_exchange,
_topicList,
_data;
if(_args.length == 2) {
_exchange = DEFAULT_EXCHANGE;
_topicList = _args[0].split(/\s/);
_data = _args[1];
}
else {
_exchange = exchange;
_topicList = topic.split(/\s/);
_data = data;
}
_topicList.forEach(function(tpc){
_forEachKeyValue(this.subscriptions[_exchange],function(subTpc, subs) {
if(_isTopicMatch(tpc, subTpc)) {
subs.map(function(sub) { return sub.callback; })
.forEach(function(callback) {
if(typeof callback === 'function') {
callback(data);
}
});
}
});
}, this);
};
};