diff --git a/src/Bus.new.js b/src/Bus.new.js index 6241246..d72534e 100644 --- a/src/Bus.new.js +++ b/src/Bus.new.js @@ -1,33 +1,61 @@ var DEFAULT_EXCHANGE = "/", DEFAULT_PRIORITY = 50, DEFAULT_DISPOSEAFTER = 0, - NO_OP = function() { }; + NO_OP = function() { }, + bus; -var bus = { - subscriptions: {} +var bindingsResolver = { + cache: { }, + + compare: function(binding, topic) { + var rgx = new RegExp("^" + this.regexify(binding) + "$"); // match from start to end of string + return rgx.test(topic); + }, + + regexify: function(binding) { + return binding.replace(/\./g,"\\.") // escape actual periods + .replace(/\*/g, ".*") // asterisks match any value + .replace(/#/g, "[A-Z,a-z,0-9]*"); // hash matches any alpha-numeric 'word' + } }; + +var localBus = { + subscriptions: {}, -var postal = { - exchange: function(exchange) { - return new ChannelDefinition(exchange); - }, + publish: function(envelope) { + _.each(this.wireTaps,function(tap) { + tap({ + exchange: envelope.exchange, + topic: envelope.topic, + data: envelope.data, + timeStamp: new Date() + }); + }); - topic: function(topic) { - return new ChannelDefinition(undefined, topic); - }, - publish: function(config) { - console.log("PUBLISH: " + JSON.stringify(config)); + _.each(this.subscriptions[envelope.exchange], function(topic) { + _.each(topic, function(binding){ + if(postal.config.bindingsResolver.compare(binding.topic, envelope.topic)) { + if(typeof binding.callback === 'function') { + binding.callback.apply(binding.context, [envelope.data]); + binding.onHandled(); + } + } + }); + }); }, subscribe: function(config) { - console.log("SUBSCRIBE: " + JSON.stringify(config)); - var idx, found; if(config.disposeAfter && config.disposeAfter > 0) { + var fn = config.onHandled, + dispose = _.after(config.disposeAfter, _.bind(function() { + this.unsubscribe(config); + }, this)); + config.onHandled = function() { - config.onHandled(); - _.after(config.disposeAfter, _.bind(function() { this.unsubscribe(config); }, this)); + fn.apply(config.context, arguments); + dispose(); } } @@ -51,6 +79,7 @@ var postal = { if(!found) { bus.subscriptions[config.exchange][config.topic].unshift(config); } + console.log("SUBSCRIBE: " + JSON.stringify(config)); } } @@ -58,24 +87,61 @@ var postal = { }, unsubscribe: function(config) { - console.log("UNSUBSCRIBE: " + JSON.stringify(config)); if(bus.subscriptions[config.exchange][config.topic]) { var len = bus.subscriptions[config.exchange][config.topic].length, idx = 0; for ( ; idx < len; idx++ ) { if (bus.subscriptions[config.exchange][config.topic][idx] === config) { bus.subscriptions[config.exchange][config.topic].splice( idx, 1 ); + console.log("UNSUBSCRIBE: " + JSON.stringify(config)); break; } } } }, - + addWireTap: function(callback) { console.log("WIRETAP: " + JSON.stringify(callback)); } }; +var postal = { + + config: { + setBusBehavior: function(behavior) { + bus = behavior; + }, + + bindingsResolver: bindingsResolver + }, + + exchange: function(exchange) { + return new ChannelDefinition(exchange); + }, + + topic: function(topic) { + return new ChannelDefinition(undefined, topic); + }, + + publish: function(config) { + bus.publish(config); + }, + + subscribe: function(config) { + return bus.subscribe(config); + }, + + unsubscribe: function(config) { + bus.unsubscribe(config); + }, + + addWireTap: function(callback) { + bus.addWireTap(callback); + } +}; + +postal.config.setBusBehavior(localBus); + var ChannelDefinition = function(exchange, topic) { this.configuration = { exchange: exchange || DEFAULT_EXCHANGE, @@ -100,6 +166,11 @@ ChannelDefinition.prototype = { return this; }, + defer: function() { + this.configuration.defer = true; + return this; + }, + disposeAfter: function(receiveCount) { if(_.isNaN(receiveCount)) { throw "The value provided to disposeAfter (receiveCount) must be a number"; @@ -141,6 +212,22 @@ ChannelDefinition.prototype = { return this; }, + withDebounce: function(milliseconds) { + if(_.isNaN(milliseconds)) { + throw "Milliseconds must be a number"; + } + this.configuration.debounce = milliseconds; + return this; + }, + + withDelay: function(milliseconds) { + if(_.isNaN(milliseconds)) { + throw "Milliseconds must be a number"; + } + this.configuration.delay = milliseconds; + return this; + }, + withPriority: function(priority) { if(_.isNaN(priority)) { throw "Priority must be a number"; @@ -149,6 +236,14 @@ ChannelDefinition.prototype = { return this; }, + withThrottle: function(milliseconds) { + if(_.isNaN(milliseconds)) { + throw "Milliseconds must be a number"; + } + this.configuration.throttle = milliseconds; + return this; + }, + subscribe: function(callback) { this.configuration.callback = callback || NO_OP; return postal.subscribe(this.configuration);