Added bindingsResolver and completed publish method on localBus. Added additional fluent config options that will be implemented next.

This commit is contained in:
Jim Cowart 2011-09-07 05:11:24 -04:00
parent edb2920c9b
commit 7c8c5d7754

View file

@ -1,33 +1,61 @@
var DEFAULT_EXCHANGE = "/",
DEFAULT_PRIORITY = 50,
DEFAULT_DISPOSEAFTER = 0,
NO_OP = function() { };
NO_OP = function() { },
bus;
var bus = {
subscriptions: {}
var bindingsResolver = {
cache: { },
compare: function(binding, topic) {
var rgx = new RegExp("^" + this.regexify(binding) + "$"); // match from start to end of string
return rgx.test(topic);
},
regexify: function(binding) {
return binding.replace(/\./g,"\\.") // escape actual periods
.replace(/\*/g, ".*") // asterisks match any value
.replace(/#/g, "[A-Z,a-z,0-9]*"); // hash matches any alpha-numeric 'word'
}
};
var localBus = {
subscriptions: {},
var postal = {
exchange: function(exchange) {
return new ChannelDefinition(exchange);
},
publish: function(envelope) {
_.each(this.wireTaps,function(tap) {
tap({
exchange: envelope.exchange,
topic: envelope.topic,
data: envelope.data,
timeStamp: new Date()
});
});
topic: function(topic) {
return new ChannelDefinition(undefined, topic);
},
publish: function(config) {
console.log("PUBLISH: " + JSON.stringify(config));
_.each(this.subscriptions[envelope.exchange], function(topic) {
_.each(topic, function(binding){
if(postal.config.bindingsResolver.compare(binding.topic, envelope.topic)) {
if(typeof binding.callback === 'function') {
binding.callback.apply(binding.context, [envelope.data]);
binding.onHandled();
}
}
});
});
},
subscribe: function(config) {
console.log("SUBSCRIBE: " + JSON.stringify(config));
var idx, found;
if(config.disposeAfter && config.disposeAfter > 0) {
var fn = config.onHandled,
dispose = _.after(config.disposeAfter, _.bind(function() {
this.unsubscribe(config);
}, this));
config.onHandled = function() {
config.onHandled();
_.after(config.disposeAfter, _.bind(function() { this.unsubscribe(config); }, this));
fn.apply(config.context, arguments);
dispose();
}
}
@ -51,6 +79,7 @@ var postal = {
if(!found) {
bus.subscriptions[config.exchange][config.topic].unshift(config);
}
console.log("SUBSCRIBE: " + JSON.stringify(config));
}
}
@ -58,24 +87,61 @@ var postal = {
},
unsubscribe: function(config) {
console.log("UNSUBSCRIBE: " + JSON.stringify(config));
if(bus.subscriptions[config.exchange][config.topic]) {
var len = bus.subscriptions[config.exchange][config.topic].length,
idx = 0;
for ( ; idx < len; idx++ ) {
if (bus.subscriptions[config.exchange][config.topic][idx] === config) {
bus.subscriptions[config.exchange][config.topic].splice( idx, 1 );
console.log("UNSUBSCRIBE: " + JSON.stringify(config));
break;
}
}
}
},
addWireTap: function(callback) {
console.log("WIRETAP: " + JSON.stringify(callback));
}
};
var postal = {
config: {
setBusBehavior: function(behavior) {
bus = behavior;
},
bindingsResolver: bindingsResolver
},
exchange: function(exchange) {
return new ChannelDefinition(exchange);
},
topic: function(topic) {
return new ChannelDefinition(undefined, topic);
},
publish: function(config) {
bus.publish(config);
},
subscribe: function(config) {
return bus.subscribe(config);
},
unsubscribe: function(config) {
bus.unsubscribe(config);
},
addWireTap: function(callback) {
bus.addWireTap(callback);
}
};
postal.config.setBusBehavior(localBus);
var ChannelDefinition = function(exchange, topic) {
this.configuration = {
exchange: exchange || DEFAULT_EXCHANGE,
@ -100,6 +166,11 @@ ChannelDefinition.prototype = {
return this;
},
defer: function() {
this.configuration.defer = true;
return this;
},
disposeAfter: function(receiveCount) {
if(_.isNaN(receiveCount)) {
throw "The value provided to disposeAfter (receiveCount) must be a number";
@ -141,6 +212,22 @@ ChannelDefinition.prototype = {
return this;
},
withDebounce: function(milliseconds) {
if(_.isNaN(milliseconds)) {
throw "Milliseconds must be a number";
}
this.configuration.debounce = milliseconds;
return this;
},
withDelay: function(milliseconds) {
if(_.isNaN(milliseconds)) {
throw "Milliseconds must be a number";
}
this.configuration.delay = milliseconds;
return this;
},
withPriority: function(priority) {
if(_.isNaN(priority)) {
throw "Priority must be a number";
@ -149,6 +236,14 @@ ChannelDefinition.prototype = {
return this;
},
withThrottle: function(milliseconds) {
if(_.isNaN(milliseconds)) {
throw "Milliseconds must be a number";
}
this.configuration.throttle = milliseconds;
return this;
},
subscribe: function(callback) {
this.configuration.callback = callback || NO_OP;
return postal.subscribe(this.configuration);