diff --git a/README.md b/README.md index 5d3fc8c..b5005c8 100644 --- a/README.md +++ b/README.md @@ -93,10 +93,10 @@ dupSubscription.unsubscribe(); ## How can I extend it? There are two main ways you can extend Postal: -* First, you can write an entirely new bus implementation (want to tie into a real broker like AMQP, and wrap it with Postal's API? This is how you'd do it.). If you want to do this, look over the `localBus` implementation to see how the core version works. Then, you can simply swap the bus implementation out by calling: `postal.configuration.bus = myWayBetterBusImplementation`. +* First, you can write an entirely new bus implementation (want to tie into a real broker like RabbitMQ by hitting the [experimental] JSON RPC endpoints and wrap it with Postal's API? This is how you'd do it.). If you want to do this, look over the `localBus` implementation to see how the core version works. Then, you can simply swap the bus implementation out by calling: `postal.configuration.bus = myWayBetterBusImplementation`. * The second way you can extend Postal is to change how the `bindingResolver` works. You may not care for the RabbitMQ-style bindings functionality. No problem! Write your own resolver object that implements a `compare` method and swap the core version out with your implementation by calling: `postal.configuration.resolver = myWayBetterResolver`. -It's also possible to extend the monitoring of messages passing through Postal by adding a "wire tap". A wire tap is a callback that will get invoked for any published message (even if no actual subscriptions would bind to the message's topic). Wire taps should _not_ be used in lieu of an actual subscription - but instead should be used for diagnostics, logging, forwarding or other concerns that fall along those lines. +It's also possible to extend the monitoring of messages passing through Postal by adding a "wire tap". A wire tap is a callback that will get invoked for any published message (even if no actual subscriptions would bind to the message's topic). Wire taps should _not_ be used in lieu of an actual subscription - but instead should be used for diagnostics, logging, forwarding (to a websocket publisher, for example) or other concerns that fall along those lines. ## Can I contribute? Please - by all means! While I hope the API is relatively stable, I'm open to pull requests. (Hint - if you want a feature implemented, a pull request gives it a much higher probability of being included than simply asking me.) As I said, pull requests are most certainly welcome - but please include tests for your additions. Otherwise, it will disappear into the ether. diff --git a/spec/Postal.spec.js b/spec/Postal.spec.js index f2e9b47..23fbaca 100644 --- a/spec/Postal.spec.js +++ b/spec/Postal.spec.js @@ -9,21 +9,19 @@ QUnit.specify("postal.js", function(){ describe("when creating basic subscription", function() { var systemSubscription = {}; before(function(){ - systemSubscription = postal.subscribe({ channel: "postal", topic: "subscription.created", - callback: function(x){ - console.log("on subscription " + JSON.stringify(x)); - if( x.event && - x.event == "subscription.created" && - x.channel == "MyChannel" && - x.topic == "MyTopic") { + callback: function(data, env){ + console.log("on subscription " + JSON.stringify(data)); + if( data.event && + data.event == "subscription.created" && + data.channel == "MyChannel" && + data.topic == "MyTopic") { caughtSubscribeEvent = true; } } }); - subscription = postal.channel({ channel: "MyChannel", topic: "MyTopic" }) .subscribe(function() { }); sub = postal.configuration.bus.subscriptions.MyChannel.MyTopic[0]; @@ -56,7 +54,7 @@ QUnit.specify("postal.js", function(){ it("should have defaulted the subscription context value", function() { assert(sub.context).isNull(); }); - it("should have captured subscription creation event in wire-tap", function() { + it("should have captured subscription creation event", function() { assert(caughtSubscribeEvent).isTrue(); }); }); @@ -68,11 +66,11 @@ QUnit.specify("postal.js", function(){ systemSubscription = postal.subscribe({ channel: "postal", topic: "subscription.*", - callback: function(x){ - if( x.event && - x.event == "subscription.removed" && - x.channel == "MyChannel" && - x.topic == "MyTopic") { + callback: function(data, env){ + if( data.event && + data.event == "subscription.removed" && + data.channel == "MyChannel" && + data.topic == "MyTopic") { caughtUnsubscribeEvent = true; }; } @@ -93,7 +91,7 @@ QUnit.specify("postal.js", function(){ it("subscription should not exist after unsubscribe", function(){ assert(subExistsAfter).isFalse(); }); - it("should have captured unsubscription creation event in wire-tap", function() { + it("should have captured unsubscription creation event", function() { assert(caughtUnsubscribeEvent).isTrue(); }); }); @@ -180,7 +178,7 @@ QUnit.specify("postal.js", function(){ assert(whte).isTrue(); }); }); - describe("When subscribing with one constraint returning true", function(){ + describe("When subscribing with one constraint returning true", function(){ var recvd = false; before(function(){ channel = postal.channel({ channel: "MyChannel", topic: "MyTopic" }); @@ -428,13 +426,13 @@ QUnit.specify("postal.js", function(){ caughtUnsubscribeEvent = false; wireTapData = []; wireTapEnvelope = []; - wiretap = postal.addWireTap(function(envelope, msg) { + wiretap = postal.addWireTap(function(msg, envelope) { wireTapData.push(msg); wireTapEnvelope.push(envelope); }); - postal.publish({ topic: "Oh.Hai.There" }, { data: "I'm in yer bus, tappin' yer subscriptionz..."}); + postal.publish({ data: "I'm in yer bus, tappin' yer subscriptionz..."}, { topic: "Oh.Hai.There" }); wiretap(); - postal.publish({ topic: "Oh.Hai.There" }, { data: "I'm in yer bus, tappin' yer subscriptionz..."}); + postal.publish({ data: "I'm in yer bus, tappin' yer subscriptionz..."}, { topic: "Oh.Hai.There" }); }); after(function(){ postal.configuration.bus.subscriptions = {}; diff --git a/spec/runner.html b/spec/runner.html index 8d0e2fa..4a8919d 100644 --- a/spec/runner.html +++ b/spec/runner.html @@ -28,4 +28,6 @@

    - \ No newline at end of file + + + diff --git a/src/main/Api.js b/src/main/Api.js index b571f94..ed4819a 100644 --- a/src/main/Api.js +++ b/src/main/Api.js @@ -1,12 +1,12 @@ var publishPicker = { - "2" : function(envelope, payload) { + "2" : function(data, envelope) { if(!envelope.channel) { envelope.channel = DEFAULT_CHANNEL; } - postal.configuration.bus.publish(envelope, payload); + postal.configuration.bus.publish(data, envelope); }, "3" : function(channel, topic, payload) { - postal.configuration.bus.publish({ channel: channel, topic: topic }, payload); + postal.configuration.bus.publish(payload, { channel: channel, topic: topic }); } }; @@ -63,7 +63,7 @@ var postal = { var newEnv = env; newEnv.topic = _.isFunction(destination.topic) ? destination.topic(env.topic) : destination.topic || env.topic; newEnv.channel = destChannel; - postal.publish(newEnv, msg); + postal.publish(msg, newEnv); } }) ); diff --git a/src/main/ChannelDefinition.js b/src/main/ChannelDefinition.js index de8f578..6267cee 100644 --- a/src/main/ChannelDefinition.js +++ b/src/main/ChannelDefinition.js @@ -4,8 +4,14 @@ var ChannelDefinition = function(channelName, defaultTopic) { }; ChannelDefinition.prototype = { - subscribe: function(callback) { - return new SubscriptionDefinition(this.channel, this.topic, callback); + subscribe: function() { + var len = arguments.length; + if(len === 1) { + return new SubscriptionDefinition(this.channel, this.topic, arguments[0]); + } + else if (len === 2) { + return new SubscriptionDefinition(this.channel, arguments[0], arguments[1]); + } }, publish: function(data, envelope) { @@ -13,6 +19,6 @@ ChannelDefinition.prototype = { env.channel = this.channel; env.timeStamp = new Date(); env.topic = env.topic || this.topic; - postal.configuration.bus.publish(env, data); + postal.configuration.bus.publish(data, env); } }; diff --git a/src/main/LocalBus.js b/src/main/LocalBus.js index ab66bb6..704aa9f 100644 --- a/src/main/LocalBus.js +++ b/src/main/LocalBus.js @@ -4,9 +4,9 @@ var localBus = { wireTaps: new Array(0), - publish: function(envelope, data) { + publish: function(data, envelope) { _.each(this.wireTaps,function(tap) { - tap(envelope, data); + tap(data, envelope); }); _.each(this.subscriptions[envelope.channel], function(topic) { diff --git a/src/main/SubscriptionDefinition.js b/src/main/SubscriptionDefinition.js index e7a60de..54015b4 100644 --- a/src/main/SubscriptionDefinition.js +++ b/src/main/SubscriptionDefinition.js @@ -1,133 +1,131 @@ var SubscriptionDefinition = function(channel, topic, callback) { - this.channel = channel; - this.topic = topic; - this.callback = callback; - this.priority = DEFAULT_PRIORITY; - this.constraints = new Array(0); - this.maxCalls = DEFAULT_DISPOSEAFTER; - this.onHandled = NO_OP; - this.context = null; + this.channel = channel; + this.topic = topic; + this.callback = callback; + this.priority = DEFAULT_PRIORITY; + this.constraints = new Array(0); + this.maxCalls = DEFAULT_DISPOSEAFTER; + this.onHandled = NO_OP; + this.context = null; + postal.publish({ + event: "subscription.created", + channel: channel, + topic: topic + },{ + channel: SYSTEM_CHANNEL, + topic: "subscription.created" + }); postal.configuration.bus.subscribe(this); - postal.publish({ - channel: SYSTEM_CHANNEL, - topic: "subscription.created" - }, - { - event: "subscription.created", - channel: channel, - topic: topic - }); }; SubscriptionDefinition.prototype = { - unsubscribe: function() { - postal.configuration.bus.unsubscribe(this); - postal.publish({ - channel: SYSTEM_CHANNEL, - topic: "subscription.removed" - }, - { - event: "subscription.removed", - channel: this.channel, - topic: this.topic - }); - }, + unsubscribe: function() { + postal.configuration.bus.unsubscribe(this); + postal.publish({ + event: "subscription.removed", + channel: this.channel, + topic: this.topic + },{ + channel: SYSTEM_CHANNEL, + topic: "subscription.removed" + }); + }, - defer: function() { - var fn = this.callback; - this.callback = function(data) { - setTimeout(fn,0,data); - }; - return this; - }, + defer: function() { + var fn = this.callback; + this.callback = function(data) { + setTimeout(fn,0,data); + }; + return this; + }, - disposeAfter: function(maxCalls) { - if(_.isNaN(maxCalls) || maxCalls <= 0) { - throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero."; - } + disposeAfter: function(maxCalls) { + if(_.isNaN(maxCalls) || maxCalls <= 0) { + throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero."; + } - var fn = this.onHandled; - var dispose = _.after(maxCalls, _.bind(function() { - this.unsubscribe(this); - }, this)); + var fn = this.onHandled; + var dispose = _.after(maxCalls, _.bind(function() { + this.unsubscribe(this); + }, this)); - this.onHandled = function() { - fn.apply(this.context, arguments); - dispose(); - }; - return this; - }, + this.onHandled = function() { + fn.apply(this.context, arguments); + dispose(); + }; + return this; + }, - ignoreDuplicates: function() { - this.withConstraint(new DistinctPredicate()); - return this; - }, + ignoreDuplicates: function() { + this.withConstraint(new DistinctPredicate()); + return this; + }, - whenHandledThenExecute: function(callback) { - if(! _.isFunction(callback)) { - throw "Value provided to 'whenHandledThenExecute' must be a function"; - } - this.onHandled = callback; - return this; - }, + whenHandledThenExecute: function(callback) { + if(! _.isFunction(callback)) { + throw "Value provided to 'whenHandledThenExecute' must be a function"; + } + this.onHandled = callback; + return this; + }, - withConstraint: function(predicate) { - if(! _.isFunction(predicate)) { - throw "Predicate constraint must be a function"; - } - this.constraints.push(predicate); - return this; - }, + withConstraint: function(predicate) { + if(! _.isFunction(predicate)) { + throw "Predicate constraint must be a function"; + } + this.constraints.push(predicate); + return this; + }, - withConstraints: function(predicates) { - var self = this; - if(_.isArray(predicates)) { - _.each(predicates, function(predicate) { self.withConstraint(predicate); } ); - } - return self; - }, + withConstraints: function(predicates) { + var self = this; + if(_.isArray(predicates)) { + _.each(predicates, function(predicate) { self.withConstraint(predicate); } ); + } + return self; + }, - withContext: function(context) { - this.context = context; - return this; - }, + withContext: function(context) { + this.context = context; + return this; + }, - withDebounce: function(milliseconds) { - if(_.isNaN(milliseconds)) { - throw "Milliseconds must be a number"; - } - var fn = this.callback; - this.callback = _.debounce(fn, milliseconds); - return this; - }, + withDebounce: function(milliseconds) { + if(_.isNaN(milliseconds)) { + throw "Milliseconds must be a number"; + } + var fn = this.callback; + this.callback = _.debounce(fn, milliseconds); + return this; + }, - withDelay: function(milliseconds) { - if(_.isNaN(milliseconds)) { - throw "Milliseconds must be a number"; - } - var fn = this.callback; - this.callback = function(data) { - setTimeout(fn, milliseconds, data); - }; - return this; - }, + withDelay: function(milliseconds) { + if(_.isNaN(milliseconds)) { + throw "Milliseconds must be a number"; + } + var fn = this.callback; + this.callback = function(data) { + setTimeout(fn, milliseconds, data); + }; + return this; + }, - withPriority: function(priority) { - if(_.isNaN(priority)) { - throw "Priority must be a number"; - } - this.priority = priority; - return this; - }, + withPriority: function(priority) { + if(_.isNaN(priority)) { + throw "Priority must be a number"; + } + this.priority = priority; + return this; + }, - withThrottle: function(milliseconds) { - if(_.isNaN(milliseconds)) { - throw "Milliseconds must be a number"; - } - var fn = this.callback; - this.callback = _.throttle(fn, milliseconds); - return this; - } + withThrottle: function(milliseconds) { + if(_.isNaN(milliseconds)) { + throw "Milliseconds must be a number"; + } + var fn = this.callback; + this.callback = _.throttle(fn, milliseconds); + return this; + } };