Updated API to consistently use (data, envelope) signature wherever suscribers and publishers are involved

This commit is contained in:
Jim Cowart 2012-03-19 23:01:38 -04:00
parent bc29519b4c
commit e542269058
7 changed files with 147 additions and 143 deletions

View file

@ -93,10 +93,10 @@ dupSubscription.unsubscribe();
## How can I extend it?
There are two main ways you can extend Postal:
* First, you can write an entirely new bus implementation (want to tie into a real broker like AMQP, and wrap it with Postal's API? This is how you'd do it.). If you want to do this, look over the `localBus` implementation to see how the core version works. Then, you can simply swap the bus implementation out by calling: `postal.configuration.bus = myWayBetterBusImplementation`.
* First, you can write an entirely new bus implementation (want to tie into a real broker like RabbitMQ by hitting the [experimental] JSON RPC endpoints and wrap it with Postal's API? This is how you'd do it.). If you want to do this, look over the `localBus` implementation to see how the core version works. Then, you can simply swap the bus implementation out by calling: `postal.configuration.bus = myWayBetterBusImplementation`.
* The second way you can extend Postal is to change how the `bindingResolver` works. You may not care for the RabbitMQ-style bindings functionality. No problem! Write your own resolver object that implements a `compare` method and swap the core version out with your implementation by calling: `postal.configuration.resolver = myWayBetterResolver`.
It's also possible to extend the monitoring of messages passing through Postal by adding a "wire tap". A wire tap is a callback that will get invoked for any published message (even if no actual subscriptions would bind to the message's topic). Wire taps should _not_ be used in lieu of an actual subscription - but instead should be used for diagnostics, logging, forwarding or other concerns that fall along those lines.
It's also possible to extend the monitoring of messages passing through Postal by adding a "wire tap". A wire tap is a callback that will get invoked for any published message (even if no actual subscriptions would bind to the message's topic). Wire taps should _not_ be used in lieu of an actual subscription - but instead should be used for diagnostics, logging, forwarding (to a websocket publisher, for example) or other concerns that fall along those lines.
## Can I contribute?
Please - by all means! While I hope the API is relatively stable, I'm open to pull requests. (Hint - if you want a feature implemented, a pull request gives it a much higher probability of being included than simply asking me.) As I said, pull requests are most certainly welcome - but please include tests for your additions. Otherwise, it will disappear into the ether.

View file

@ -9,21 +9,19 @@ QUnit.specify("postal.js", function(){
describe("when creating basic subscription", function() {
var systemSubscription = {};
before(function(){
systemSubscription = postal.subscribe({
channel: "postal",
topic: "subscription.created",
callback: function(x){
console.log("on subscription " + JSON.stringify(x));
if( x.event &&
x.event == "subscription.created" &&
x.channel == "MyChannel" &&
x.topic == "MyTopic") {
callback: function(data, env){
console.log("on subscription " + JSON.stringify(data));
if( data.event &&
data.event == "subscription.created" &&
data.channel == "MyChannel" &&
data.topic == "MyTopic") {
caughtSubscribeEvent = true;
}
}
});
subscription = postal.channel({ channel: "MyChannel", topic: "MyTopic" })
.subscribe(function() { });
sub = postal.configuration.bus.subscriptions.MyChannel.MyTopic[0];
@ -56,7 +54,7 @@ QUnit.specify("postal.js", function(){
it("should have defaulted the subscription context value", function() {
assert(sub.context).isNull();
});
it("should have captured subscription creation event in wire-tap", function() {
it("should have captured subscription creation event", function() {
assert(caughtSubscribeEvent).isTrue();
});
});
@ -68,11 +66,11 @@ QUnit.specify("postal.js", function(){
systemSubscription = postal.subscribe({
channel: "postal",
topic: "subscription.*",
callback: function(x){
if( x.event &&
x.event == "subscription.removed" &&
x.channel == "MyChannel" &&
x.topic == "MyTopic") {
callback: function(data, env){
if( data.event &&
data.event == "subscription.removed" &&
data.channel == "MyChannel" &&
data.topic == "MyTopic") {
caughtUnsubscribeEvent = true;
};
}
@ -93,7 +91,7 @@ QUnit.specify("postal.js", function(){
it("subscription should not exist after unsubscribe", function(){
assert(subExistsAfter).isFalse();
});
it("should have captured unsubscription creation event in wire-tap", function() {
it("should have captured unsubscription creation event", function() {
assert(caughtUnsubscribeEvent).isTrue();
});
});
@ -180,7 +178,7 @@ QUnit.specify("postal.js", function(){
assert(whte).isTrue();
});
});
describe("When subscribing with one constraint returning true", function(){
describe("When subscribing with one constraint returning true", function(){
var recvd = false;
before(function(){
channel = postal.channel({ channel: "MyChannel", topic: "MyTopic" });
@ -428,13 +426,13 @@ QUnit.specify("postal.js", function(){
caughtUnsubscribeEvent = false;
wireTapData = [];
wireTapEnvelope = [];
wiretap = postal.addWireTap(function(envelope, msg) {
wiretap = postal.addWireTap(function(msg, envelope) {
wireTapData.push(msg);
wireTapEnvelope.push(envelope);
});
postal.publish({ topic: "Oh.Hai.There" }, { data: "I'm in yer bus, tappin' yer subscriptionz..."});
postal.publish({ data: "I'm in yer bus, tappin' yer subscriptionz..."}, { topic: "Oh.Hai.There" });
wiretap();
postal.publish({ topic: "Oh.Hai.There" }, { data: "I'm in yer bus, tappin' yer subscriptionz..."});
postal.publish({ data: "I'm in yer bus, tappin' yer subscriptionz..."}, { topic: "Oh.Hai.There" });
});
after(function(){
postal.configuration.bus.subscriptions = {};

View file

@ -28,4 +28,6 @@
<h2 id="qunit-userAgent"></h2>
<ol id="qunit-tests"></ol>
</body>
</html>
</html>

View file

@ -1,12 +1,12 @@
var publishPicker = {
"2" : function(envelope, payload) {
"2" : function(data, envelope) {
if(!envelope.channel) {
envelope.channel = DEFAULT_CHANNEL;
}
postal.configuration.bus.publish(envelope, payload);
postal.configuration.bus.publish(data, envelope);
},
"3" : function(channel, topic, payload) {
postal.configuration.bus.publish({ channel: channel, topic: topic }, payload);
postal.configuration.bus.publish(payload, { channel: channel, topic: topic });
}
};
@ -63,7 +63,7 @@ var postal = {
var newEnv = env;
newEnv.topic = _.isFunction(destination.topic) ? destination.topic(env.topic) : destination.topic || env.topic;
newEnv.channel = destChannel;
postal.publish(newEnv, msg);
postal.publish(msg, newEnv);
}
})
);

View file

@ -4,8 +4,14 @@ var ChannelDefinition = function(channelName, defaultTopic) {
};
ChannelDefinition.prototype = {
subscribe: function(callback) {
return new SubscriptionDefinition(this.channel, this.topic, callback);
subscribe: function() {
var len = arguments.length;
if(len === 1) {
return new SubscriptionDefinition(this.channel, this.topic, arguments[0]);
}
else if (len === 2) {
return new SubscriptionDefinition(this.channel, arguments[0], arguments[1]);
}
},
publish: function(data, envelope) {
@ -13,6 +19,6 @@ ChannelDefinition.prototype = {
env.channel = this.channel;
env.timeStamp = new Date();
env.topic = env.topic || this.topic;
postal.configuration.bus.publish(env, data);
postal.configuration.bus.publish(data, env);
}
};

View file

@ -4,9 +4,9 @@ var localBus = {
wireTaps: new Array(0),
publish: function(envelope, data) {
publish: function(data, envelope) {
_.each(this.wireTaps,function(tap) {
tap(envelope, data);
tap(data, envelope);
});
_.each(this.subscriptions[envelope.channel], function(topic) {

View file

@ -1,133 +1,131 @@
var SubscriptionDefinition = function(channel, topic, callback) {
this.channel = channel;
this.topic = topic;
this.callback = callback;
this.priority = DEFAULT_PRIORITY;
this.constraints = new Array(0);
this.maxCalls = DEFAULT_DISPOSEAFTER;
this.onHandled = NO_OP;
this.context = null;
this.channel = channel;
this.topic = topic;
this.callback = callback;
this.priority = DEFAULT_PRIORITY;
this.constraints = new Array(0);
this.maxCalls = DEFAULT_DISPOSEAFTER;
this.onHandled = NO_OP;
this.context = null;
postal.publish({
event: "subscription.created",
channel: channel,
topic: topic
},{
channel: SYSTEM_CHANNEL,
topic: "subscription.created"
});
postal.configuration.bus.subscribe(this);
postal.publish({
channel: SYSTEM_CHANNEL,
topic: "subscription.created"
},
{
event: "subscription.created",
channel: channel,
topic: topic
});
};
SubscriptionDefinition.prototype = {
unsubscribe: function() {
postal.configuration.bus.unsubscribe(this);
postal.publish({
channel: SYSTEM_CHANNEL,
topic: "subscription.removed"
},
{
event: "subscription.removed",
channel: this.channel,
topic: this.topic
});
},
unsubscribe: function() {
postal.configuration.bus.unsubscribe(this);
postal.publish({
event: "subscription.removed",
channel: this.channel,
topic: this.topic
},{
channel: SYSTEM_CHANNEL,
topic: "subscription.removed"
});
},
defer: function() {
var fn = this.callback;
this.callback = function(data) {
setTimeout(fn,0,data);
};
return this;
},
defer: function() {
var fn = this.callback;
this.callback = function(data) {
setTimeout(fn,0,data);
};
return this;
},
disposeAfter: function(maxCalls) {
if(_.isNaN(maxCalls) || maxCalls <= 0) {
throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero.";
}
disposeAfter: function(maxCalls) {
if(_.isNaN(maxCalls) || maxCalls <= 0) {
throw "The value provided to disposeAfter (maxCalls) must be a number greater than zero.";
}
var fn = this.onHandled;
var dispose = _.after(maxCalls, _.bind(function() {
this.unsubscribe(this);
}, this));
var fn = this.onHandled;
var dispose = _.after(maxCalls, _.bind(function() {
this.unsubscribe(this);
}, this));
this.onHandled = function() {
fn.apply(this.context, arguments);
dispose();
};
return this;
},
this.onHandled = function() {
fn.apply(this.context, arguments);
dispose();
};
return this;
},
ignoreDuplicates: function() {
this.withConstraint(new DistinctPredicate());
return this;
},
ignoreDuplicates: function() {
this.withConstraint(new DistinctPredicate());
return this;
},
whenHandledThenExecute: function(callback) {
if(! _.isFunction(callback)) {
throw "Value provided to 'whenHandledThenExecute' must be a function";
}
this.onHandled = callback;
return this;
},
whenHandledThenExecute: function(callback) {
if(! _.isFunction(callback)) {
throw "Value provided to 'whenHandledThenExecute' must be a function";
}
this.onHandled = callback;
return this;
},
withConstraint: function(predicate) {
if(! _.isFunction(predicate)) {
throw "Predicate constraint must be a function";
}
this.constraints.push(predicate);
return this;
},
withConstraint: function(predicate) {
if(! _.isFunction(predicate)) {
throw "Predicate constraint must be a function";
}
this.constraints.push(predicate);
return this;
},
withConstraints: function(predicates) {
var self = this;
if(_.isArray(predicates)) {
_.each(predicates, function(predicate) { self.withConstraint(predicate); } );
}
return self;
},
withConstraints: function(predicates) {
var self = this;
if(_.isArray(predicates)) {
_.each(predicates, function(predicate) { self.withConstraint(predicate); } );
}
return self;
},
withContext: function(context) {
this.context = context;
return this;
},
withContext: function(context) {
this.context = context;
return this;
},
withDebounce: function(milliseconds) {
if(_.isNaN(milliseconds)) {
throw "Milliseconds must be a number";
}
var fn = this.callback;
this.callback = _.debounce(fn, milliseconds);
return this;
},
withDebounce: function(milliseconds) {
if(_.isNaN(milliseconds)) {
throw "Milliseconds must be a number";
}
var fn = this.callback;
this.callback = _.debounce(fn, milliseconds);
return this;
},
withDelay: function(milliseconds) {
if(_.isNaN(milliseconds)) {
throw "Milliseconds must be a number";
}
var fn = this.callback;
this.callback = function(data) {
setTimeout(fn, milliseconds, data);
};
return this;
},
withDelay: function(milliseconds) {
if(_.isNaN(milliseconds)) {
throw "Milliseconds must be a number";
}
var fn = this.callback;
this.callback = function(data) {
setTimeout(fn, milliseconds, data);
};
return this;
},
withPriority: function(priority) {
if(_.isNaN(priority)) {
throw "Priority must be a number";
}
this.priority = priority;
return this;
},
withPriority: function(priority) {
if(_.isNaN(priority)) {
throw "Priority must be a number";
}
this.priority = priority;
return this;
},
withThrottle: function(milliseconds) {
if(_.isNaN(milliseconds)) {
throw "Milliseconds must be a number";
}
var fn = this.callback;
this.callback = _.throttle(fn, milliseconds);
return this;
}
withThrottle: function(milliseconds) {
if(_.isNaN(milliseconds)) {
throw "Milliseconds must be a number";
}
var fn = this.callback;
this.callback = _.throttle(fn, milliseconds);
return this;
}
};