Lots of refactoring to core pub sub engine, and also added message capture and replay...

This commit is contained in:
Jim Cowart 2011-08-10 17:59:18 -04:00
parent 6818e7e808
commit e860d8700b
13 changed files with 827 additions and 309 deletions

View file

@ -1,2 +1,5 @@
src/languageExtensions.js
src/broker.js
src/CapturedMessageBatch.js
src/MessageCaptor.js
src/ReplayContext.js
src/Postal.js

100
lib/amplify.core.js Normal file
View file

@ -0,0 +1,100 @@
/*!
* Amplify Core @VERSION
*
* Copyright 2011 appendTo LLC. (http://appendto.com/team)
* Dual licensed under the MIT or GPL licenses.
* http://appendto.com/open-source-licenses
*
* http://amplifyjs.com
*/
(function( global, undefined ) {
var slice = [].slice,
subscriptions = {};
var amplify = global.amplify = {
publish: function( topic ) {
var args = slice.call( arguments, 1 ),
subscription,
length,
i = 0,
ret;
if ( !subscriptions[ topic ] ) {
return true;
}
for ( length = subscriptions[ topic ].length; i < length; i++ ) {
subscription = subscriptions[ topic ][ i ];
ret = subscription.callback.apply( subscription.context, args );
if ( ret === false ) {
break;
}
}
return ret !== false;
},
subscribe: function( topic, context, callback, priority ) {
if ( arguments.length === 3 && typeof callback === "number" ) {
priority = callback;
callback = context;
context = null;
}
if ( arguments.length === 2 ) {
callback = context;
context = null;
}
priority = priority || 10;
var topicIndex = 0,
topics = topic.split( /\s/ ),
topicLength = topics.length,
added;
for ( ; topicIndex < topicLength; topicIndex++ ) {
topic = topics[ topicIndex ];
added = false;
if ( !subscriptions[ topic ] ) {
subscriptions[ topic ] = [];
}
var i = subscriptions[ topic ].length - 1,
subscriptionInfo = {
callback: callback,
context: context,
priority: priority
};
for ( ; i >= 0; i-- ) {
if ( subscriptions[ topic ][ i ].priority <= priority ) {
subscriptions[ topic ].splice( i + 1, 0, subscriptionInfo );
added = true;
break;
}
}
if ( !added ) {
subscriptions[ topic ].unshift( subscriptionInfo );
}
}
return callback;
},
unsubscribe: function( topic, callback ) {
if ( !subscriptions[ topic ] ) {
return;
}
var length = subscriptions[ topic ].length,
i = 0;
for ( ; i < length; i++ ) {
if ( subscriptions[ topic ][ i ].callback === callback ) {
subscriptions[ topic ].splice( i, 1 );
break;
}
}
}
};
}( this ) );

277
lib/amplify.store.js Normal file
View file

@ -0,0 +1,277 @@
/*!
* Amplify Store - Persistent Client-Side Storage @VERSION
*
* Copyright 2011 appendTo LLC. (http://appendto.com/team)
* Dual licensed under the MIT or GPL licenses.
* http://appendto.com/open-source-licenses
*
* http://amplifyjs.com
*/
(function( amplify, undefined ) {
var store = amplify.store = function( key, value, options, type ) {
var type = store.type;
if ( options && options.type && options.type in store.types ) {
type = options.type;
}
return store.types[ type ]( key, value, options || {} );
};
store.types = {};
store.type = null;
store.addType = function( type, storage ) {
if ( !store.type ) {
store.type = type;
}
store.types[ type ] = storage;
store[ type ] = function( key, value, options ) {
options = options || {};
options.type = type;
return store( key, value, options );
};
}
store.error = function() {
return "amplify.store quota exceeded";
};
var rprefix = /^__amplify__/;
function createFromStorageInterface( storageType, storage ) {
store.addType( storageType, function( key, value, options ) {
var storedValue, parsed, i, remove,
ret = value,
now = (new Date()).getTime();
if ( !key ) {
ret = {};
remove = [];
i = 0;
try {
// accessing the length property works around a localStorage bug
// in Firefox 4.0 where the keys don't update cross-page
// we assign to key just to avoid Closure Compiler from removing
// the access as "useless code"
// https://bugzilla.mozilla.org/show_bug.cgi?id=662511
key = storage.length;
while ( key = storage.key( i++ ) ) {
if ( rprefix.test( key ) ) {
parsed = JSON.parse( storage.getItem( key ) );
if ( parsed.expires && parsed.expires <= now ) {
remove.push( key );
} else {
ret[ key.replace( rprefix, "" ) ] = parsed.data;
}
}
}
while ( key = remove.pop() ) {
storage.removeItem( key );
}
} catch ( error ) {}
return ret;
}
// protect against name collisions with direct storage
key = "__amplify__" + key;
if ( value === undefined ) {
storedValue = storage.getItem( key );
parsed = storedValue ? JSON.parse( storedValue ) : { expires: -1 };
if ( parsed.expires && parsed.expires <= now ) {
storage.removeItem( key );
} else {
return parsed.data;
}
} else {
if ( value === null ) {
storage.removeItem( key );
} else {
parsed = JSON.stringify({
data: value,
expires: options.expires ? now + options.expires : null
});
try {
storage.setItem( key, parsed );
// quota exceeded
} catch( error ) {
// expire old data and try again
store[ storageType ]();
try {
storage.setItem( key, parsed );
} catch( error ) {
throw store.error();
}
}
}
}
return ret;
});
}
// localStorage + sessionStorage
// IE 8+, Firefox 3.5+, Safari 4+, Chrome 4+, Opera 10.5+, iPhone 2+, Android 2+
for ( var webStorageType in { localStorage: 1, sessionStorage: 1 } ) {
// try/catch for file protocol in Firefox
try {
if ( window[ webStorageType ].getItem ) {
createFromStorageInterface( webStorageType, window[ webStorageType ] );
}
} catch( e ) {}
}
// globalStorage
// non-standard: Firefox 2+
// https://developer.mozilla.org/en/dom/storage#globalStorage
if ( window.globalStorage ) {
// try/catch for file protocol in Firefox
try {
createFromStorageInterface( "globalStorage",
window.globalStorage[ window.location.hostname ] );
// Firefox 2.0 and 3.0 have sessionStorage and globalStorage
// make sure we default to globalStorage
// but don't default to globalStorage in 3.5+ which also has localStorage
if ( store.type === "sessionStorage" ) {
store.type = "globalStorage";
}
} catch( e ) {}
}
// userData
// non-standard: IE 5+
// http://msdn.microsoft.com/en-us/library/ms531424(v=vs.85).aspx
(function() {
// IE 9 has quirks in userData that are a huge pain
// rather than finding a way to detect these quirks
// we just don't register userData if we have localStorage
if ( store.types.localStorage ) {
return;
}
// append to html instead of body so we can do this from the head
var div = document.createElement( "div" ),
attrKey = "amplify";
div.style.display = "none";
document.getElementsByTagName( "head" )[ 0 ].appendChild( div );
if ( div.addBehavior ) {
div.addBehavior( "#default#userdata" );
store.addType( "userData", function( key, value, options ) {
div.load( attrKey );
var attr, parsed, prevValue, i, remove,
ret = value,
now = (new Date()).getTime();
if ( !key ) {
ret = {};
remove = [];
i = 0;
while ( attr = div.XMLDocument.documentElement.attributes[ i++ ] ) {
parsed = JSON.parse( attr.value );
if ( parsed.expires && parsed.expires <= now ) {
remove.push( attr.name );
} else {
ret[ attr.name ] = parsed.data;
}
}
while ( key = remove.pop() ) {
div.removeAttribute( key );
}
div.save( attrKey );
return ret;
}
// convert invalid characters to dashes
// http://www.w3.org/TR/REC-xml/#NT-Name
// simplified to assume the starting character is valid
// also removed colon as it is invalid in HTML attribute names
key = key.replace( /[^-._0-9A-Za-z\xb7\xc0-\xd6\xd8-\xf6\xf8-\u037d\u37f-\u1fff\u200c-\u200d\u203f\u2040\u2070-\u218f]/g, "-" );
if ( value === undefined ) {
attr = div.getAttribute( key );
parsed = attr ? JSON.parse( attr ) : { expires: -1 };
if ( parsed.expires && parsed.expires <= now ) {
div.removeAttribute( key );
} else {
return parsed.data;
}
} else {
if ( value === null ) {
div.removeAttribute( key );
} else {
// we need to get the previous value in case we need to rollback
prevValue = div.getAttribute( key );
parsed = JSON.stringify({
data: value,
expires: (options.expires ? (now + options.expires) : null)
});
div.setAttribute( key, parsed );
}
}
try {
div.save( attrKey );
// quota exceeded
} catch ( error ) {
// roll the value back to the previous value
if ( prevValue === null ) {
div.removeAttribute( key );
} else {
div.setAttribute( key, prevValue );
}
// expire old data and try again
store.userData();
try {
div.setAttribute( key, parsed );
div.save( attrKey );
} catch ( error ) {
// roll the value back to the previous value
if ( prevValue === null ) {
div.removeAttribute( key );
} else {
div.setAttribute( key, prevValue );
}
throw store.error();
}
}
return ret;
});
}
}() );
// in-memory storage
// fallback for all browsers to enable the API even if we can't persist data
(function() {
var memory = {};
function copy( obj ) {
return obj === undefined ? undefined : JSON.parse( JSON.stringify( obj ) );
}
store.addType( "memory", function( key, value, options ) {
if ( !key ) {
return copy( memory );
}
if ( value === undefined ) {
return copy( memory[ key ] );
}
if ( value === null ) {
delete memory[ key ];
return null;
}
memory[ key ] = value;
if ( options.expires ) {
setTimeout(function() {
delete memory[ key ];
}, options.expires );
}
return value;
});
}() );
}( this.amplify = this.amplify || {} ) );

View file

@ -1,200 +0,0 @@
/*
QUnit.specify("postal.js", function(){
describe("broker", function(){
describe("When publishing a message to a specific one level topic", function() {
describe("with one recipient", function() {
var broker = new MessageBroker(),
objA = {
messageReceived: false
};
broker.subscribe("Test", function() { objA.messageReceived = true; });
broker.publish("Test", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageReceived).isTrue();
});
});
describe("with two recipients", function() {
var broker = new MessageBroker(),
ObjA = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients", function() {
this.messageReceived = true;
}.bind(this));
},
ObjB = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients", function() {
this.messageReceived = true;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients", {});
it("the subscription callback should be invoked on 'a'", function(){
assert(a.messageReceived).isTrue();
});
it("the subscription callback should be invoked on 'b'", function(){
assert(b.messageReceived).isTrue();
});
});
});
describe("When publishing a message to a specific multi-level topic", function() {
describe("with one recipient", function() {
var broker = new MessageBroker(),
objA = {
messageReceived: false
};
broker.subscribe("Test.Topic", function() { objA.messageReceived = true; });
broker.publish("Test.Topic", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageReceived).isTrue();
});
});
describe("with two recipients", function() {
var broker = new MessageBroker(),
ObjA = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.Listening", function() {
this.messageReceived = true;
}.bind(this));
},
ObjB = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.Listening", function() {
this.messageReceived = true;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients.Listening", {});
it("the subscription callback should be invoked on 'a'", function(){
assert(a.messageReceived).isTrue();
});
it("the subscription callback should be invoked on 'b'", function(){
assert(b.messageReceived).isTrue();
});
});
});
describe("When publishing a wildcard message to a multi-level topic", function() {
describe("with one recipient", function() {
var broker = new MessageBroker(),
objA = {
messageReceived: false
};
broker.subscribe("Test.*", function() { objA.messageReceived = true; });
broker.publish("Test.Topic", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageReceived).isTrue();
});
});
describe("with two recipients", function() {
var broker = new MessageBroker(),
ObjA = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.Listening.*", function() {
this.messageReceived = true;
}.bind(this));
},
ObjB = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.*", function() {
this.messageReceived = true;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients.Listening.Closely", {});
it("the subscription callback should be invoked on 'a'", function(){
assert(a.messageReceived).isTrue();
});
it("the subscription callback should be invoked on 'b'", function(){
assert(b.messageReceived).isTrue();
});
});
});
describe("When unsubscribing using provided callback", function() {
describe("with one callback", function() {
var broker = new MessageBroker(),
objA = {
messageCount: 0
};
var unsubscribe = broker.subscribe("Test.*", function() { objA.messageCount++; });
broker.publish("Test.Topic", {});
unsubscribe();
broker.publish("Test.Topic", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageCount).equals(1);
});
});
describe("with two callbacks", function() {
var broker = new MessageBroker(),
ObjA = function() {
var _unsubscribe;
this.messageCount = 0;
this.unsubscribe = function() {
if(_unsubscribe) {
_unsubscribe();
}
};
_unsubscribe = broker.subscribe("TwoRecipients.Listening.*", function() {
this.messageCount++;
}.bind(this));
},
ObjB = function() {
var _unsubscribe;
this.messageCount = 0;
this.unsubscribe = function() {
if(_unsubscribe) {
_unsubscribe();
}
};
_unsubscribe = broker.subscribe("TwoRecipients.*", function() {
this.messageCount++;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients.Listening", {});
a.unsubscribe();
broker.publish("TwoRecipients.Listening", {});
it("First object message count should be 1", function(){
assert(a.messageCount).equals(1);
});
it("Second object message count should be 2", function(){
assert(b.messageCount).equals(2);
});
});
});
});
});*/

View file

@ -1,39 +1,41 @@
QUnit.specify("postal.js", function(){
describe("broker", function(){
describe("When publishing a message to a specific one level topic", function() {
describe("with one recipient", function() {
var broker = new Broker(),
objA = {
postal = new Postal();
var objA = {
messageReceived: false
};
broker.subscribe("Test", function() { objA.messageReceived = true; });
broker.publish("Test", {});
postal.subscribe("Test", function() { objA.messageReceived = true; });
postal.publish("Test", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageReceived).isTrue();
});
});
describe("with two recipients", function() {
var broker = new Broker(),
ObjA = function() {
postal = new Postal();
var ObjA = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients", function() {
postal.subscribe("TwoRecipients", function() {
this.messageReceived = true;
}.bind(this));
},
ObjB = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients", function() {
postal.subscribe("TwoRecipients", function() {
this.messageReceived = true;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients", {});
postal.publish("TwoRecipients", {});
it("the subscription callback should be invoked on 'a'", function(){
assert(a.messageReceived).isTrue();
@ -46,38 +48,38 @@ QUnit.specify("postal.js", function(){
});
describe("When publishing a message to a specific multi-level topic", function() {
describe("with one recipient", function() {
var broker = new Broker(),
objA = {
postal = new Postal();
var objA = {
messageReceived: false
};
broker.subscribe("Test.Topic", function() { objA.messageReceived = true; });
broker.publish("Test.Topic", {});
postal.subscribe("Test.Topic", function() { objA.messageReceived = true; });
postal.publish("Test.Topic", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageReceived).isTrue();
});
});
describe("with two recipients", function() {
var broker = new Broker(),
ObjA = function() {
postal = new Postal();
var ObjA = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.Listening", function() {
postal.subscribe("TwoRecipients.Listening", function() {
this.messageReceived = true;
}.bind(this));
},
ObjB = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.Listening", function() {
postal.subscribe("TwoRecipients.Listening", function() {
this.messageReceived = true;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients.Listening", {});
postal.publish("TwoRecipients.Listening", {});
it("the subscription callback should be invoked on 'a'", function(){
assert(a.messageReceived).isTrue();
@ -90,38 +92,38 @@ QUnit.specify("postal.js", function(){
});
describe("When publishing a wildcard message to a multi-level topic", function() {
describe("with one recipient", function() {
var broker = new Broker(),
objA = {
postal = new Postal();
var objA = {
messageReceived: false
};
broker.subscribe("Test.*", function() { objA.messageReceived = true; });
broker.publish("Test.Topic", {});
postal.subscribe("Test.*", function() { objA.messageReceived = true; });
postal.publish("Test.Topic", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageReceived).isTrue();
});
});
describe("with two recipients", function() {
var broker = new Broker(),
ObjA = function() {
postal = new Postal();
var ObjA = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.Listening.*", function() {
postal.subscribe("TwoRecipients.Listening.*", function() {
this.messageReceived = true;
}.bind(this));
},
ObjB = function() {
this.messageReceived = false;
broker.subscribe("TwoRecipients.*", function() {
postal.subscribe("TwoRecipients.*", function() {
this.messageReceived = true;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients.Listening.Closely", {});
postal.publish("TwoRecipients.Listening.Closely", {});
it("the subscription callback should be invoked on 'a'", function(){
assert(a.messageReceived).isTrue();
@ -134,23 +136,23 @@ QUnit.specify("postal.js", function(){
});
describe("When unsubscribing using provided callback", function() {
describe("with one callback", function() {
var broker = new Broker(),
objA = {
postal = new Postal();
var objA = {
messageCount: 0
};
var unsubscribe = broker.subscribe("Test.*", function() { objA.messageCount++; });
broker.publish("Test.Topic", {});
var unsubscribe = postal.subscribe("Test.*", function() { objA.messageCount++; });
postal.publish("Test.Topic", {});
unsubscribe();
broker.publish("Test.Topic", {});
postal.publish("Test.Topic", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageCount).equals(1);
});
});
describe("with two callbacks", function() {
var broker = new Broker(),
ObjA = function() {
postal = new Postal();
var ObjA = function() {
var _unsubscribe;
this.messageCount = 0;
@ -161,7 +163,7 @@ QUnit.specify("postal.js", function(){
}
};
_unsubscribe = broker.subscribe("TwoRecipients.Listening.*", function() {
_unsubscribe = postal.subscribe("TwoRecipients.Listening.*", function() {
this.messageCount++;
}.bind(this));
},
@ -176,15 +178,15 @@ QUnit.specify("postal.js", function(){
}
};
_unsubscribe = broker.subscribe("TwoRecipients.*", function() {
_unsubscribe = postal.subscribe("TwoRecipients.*", function() {
this.messageCount++;
}.bind(this));
};
var a = new ObjA(),
b = new ObjB();
broker.publish("TwoRecipients.Listening.Something", {});
postal.publish("TwoRecipients.Listening.Something", {});
a.unsubscribe();
broker.publish("TwoRecipients.Listening.Something", {});
postal.publish("TwoRecipients.Listening.Something", {});
it("First object message count should be 1", function(){
assert(a.messageCount).equals(1);
@ -198,20 +200,237 @@ QUnit.specify("postal.js", function(){
describe("When publishing a message on a specific exchange", function(){
describe("With a valid exchange", function() {
var broker = new Broker(),
objA = {
postal = new Postal();
var objA = {
messageCount: 0
};
var unsubscribe = broker.subscribe("MyExchange", "Test.*", function() { objA.messageCount++; });
broker.publish("MyExchange", "Test.Topic", {});
var unsubscribe = postal.subscribe("MyExchange", "Test.*", function() { objA.messageCount++; });
postal.publish("MyExchange", "Test.Topic", {});
unsubscribe();
broker.publish("MyExchange", "Test.Topic", {});
postal.publish("MyExchange", "Test.Topic", {});
it("the subscription callback should be invoked", function(){
assert(objA.messageCount).equals(1);
});
})
});
describe("With an invalid exchange", function() {
postal = new Postal();
var objA = {
messageCount: 0
};
var unsubscribe = postal.subscribe("MyExchange", "Test.*", function() { objA.messageCount++; });
postal.publish("WrongExchange", "Test.Topic", {});
unsubscribe();
postal.publish("WrongExchange", "Test.Topic", {});
it("the subscription callback should not be invoked", function(){
assert(objA.messageCount).equals(0);
});
});
describe("With multiple active exchanges", function() {
describe("Publishing only to one exchange", function(){
postal = new Postal();
var objA = {
messageCount: 0
};
var objB = {
messageCount: 0
};
var unsubscribeA = postal.subscribe("MyExchangeA", "Test.*", function() { objA.messageCount++; });
var unsubscribeB = postal.subscribe("MyExchangeB", "Test.*", function() { objB.messageCount++; });
postal.publish("MyExchangeA", "Test.Topic", {});
postal.publish("MyExchangeA", "Test.Topic", {});
unsubscribeA();
unsubscribeB();
it("the subscription callback for objA should be invoked", function(){
assert(objA.messageCount).equals(2);
});
it("the subscription callback for objB should not be invoked", function(){
assert(objB.messageCount).equals(0);
});
});
describe("Publishing to multiple exchanges", function(){
postal = new Postal();
var objA = {
messageCount: 0
};
var objB = {
messageCount: 0
};
var unsubscribeA = postal.subscribe("MyExchangeA", "Test.*", function() { objA.messageCount++; });
var unsubscribeB = postal.subscribe("MyExchangeB", "Test.*", function() { objB.messageCount++; });
postal.publish("MyExchangeA", "Test.Topic", {});
postal.publish("MyExchangeA", "Test.Topic", {});
postal.publish("MyExchangeB", "Test.Topic", {});
postal.publish("MyExchangeB", "Test.Topic", {});
postal.publish("MyExchangeB", "Test.Topic", {});
unsubscribeA();
unsubscribeB();
it("the subscription callback for objA should be invoked", function(){
assert(objA.messageCount).equals(2);
});
it("the subscription callback for objB should be invoked", function(){
assert(objB.messageCount).equals(3);
});
});
});
});
describe("With Mode Change Messages", function(){
describe("Change To Replay", function() {
postal = new Postal();
var objA = {
messageCount: 0
},
mode;
var unsubscribeA = postal.subscribe("MyExchangeA", "Test.*", function() { objA.messageCount++; });
postal.publish("MyExchangeA", "Test.Topic", {});
postal.publish(SYSTEM_EXCHANGE, "mode.set", { mode: REPLAY_MODE });
mode = postal.getMode();
postal.publish("MyExchangeA", "Test.Topic", {});
unsubscribeA();
it("the subscription callback for objA should be invoked only once", function(){
assert(objA.messageCount).equals(1);
});
it("broker should report replay mode", function() {
assert(mode).equals(REPLAY_MODE);
});
});
describe("Change To Replay & Back to Normal", function() {
postal = new Postal();
var mode,
objA = {
messageCount: 0
},
mode2;
var unsubscribeA = postal.subscribe("MyExchangeA", "Test.*", function() { objA.messageCount++; });
postal.publish("MyExchangeA", "Test.Topic", {});
postal.publish(SYSTEM_EXCHANGE, "mode.set", { mode: REPLAY_MODE });
postal.publish("MyExchangeA", "Test.Topic", {});
mode = postal.getMode();
postal.publish(SYSTEM_EXCHANGE, "mode.set", { mode: NORMAL_MODE });
mode2 = postal.getMode();
postal.publish("MyExchangeA", "Test.Topic", {});
unsubscribeA();
it("the subscription callback for objA should be invoked only twice", function(){
assert(objA.messageCount).equals(2);
});
it("broker should report replay mode", function() {
assert(mode).equals(REPLAY_MODE);
});
it("broker should report normal mode", function() {
assert(mode2).equals(NORMAL_MODE);
});
});
describe("Change To Replay & Then to Capture", function() {
postal = new Postal();
var mode,
objA = {
messageCount: 0
},
mode2;
var unsubscribeA = postal.subscribe("MyExchangeA", "Test.*", function() { objA.messageCount++; });
postal.publish("MyExchangeA", "Test.Topic", {});
postal.publish(SYSTEM_EXCHANGE, "mode.set", { mode: REPLAY_MODE });
postal.publish("MyExchangeA", "Test.Topic", {});
mode = postal.getMode();
postal.publish(SYSTEM_EXCHANGE, "mode.set", { mode: CAPTURE_MODE });
mode2 = postal.getMode();
postal.publish("MyExchangeA", "Test.Topic", {});
unsubscribeA();
it("the subscription callback for objA should be invoked only twice", function(){
assert(objA.messageCount).equals(2);
});
it("broker should report replay mode", function() {
assert(mode).equals(REPLAY_MODE);
});
it("broker should report capture mode", function() {
assert(mode2).equals(CAPTURE_MODE);
});
});
describe("Change To Capture & Then to Normal", function() {
postal = new Postal();
var mode,
mode2,
objA = {
messageCount: 0
};
var unsubscribeA = postal.subscribe("MyExchangeA", "Test.*", function() { objA.messageCount++; });
postal.publish("MyExchangeA", "Test.Topic", {});
postal.publish(SYSTEM_EXCHANGE, "mode.set", { mode: CAPTURE_MODE });
postal.publish("MyExchangeA", "Test.Topic", {});
mode = postal.getMode();
postal.publish(SYSTEM_EXCHANGE, "mode.set", { mode: NORMAL_MODE });
mode2 = postal.getMode();
postal.publish("MyExchangeA", "Test.Topic", {});
unsubscribeA();
it("the subscription callback for objA should be invoked only 3x", function(){
assert(objA.messageCount).equals(3);
});
it("broker should report replay mode", function() {
assert(mode).equals(CAPTURE_MODE);
});
it("broker should report capture mode", function() {
assert(mode2).equals(NORMAL_MODE);
});
});
describe("Change To Capture", function() {
postal = new Postal();
var mode,
savedBatch,
objA = {
messageCount: 0
},
objB = {
messageCount: 0
};
var unsubscribeA = postal.subscribe("MyExchangeA", "Test.*", function() { objA.messageCount++; });
var unsubscribeB = postal.subscribe("MyExchangeB", "Test.*", function() { objB.messageCount++; });
postal.publish(SYSTEM_EXCHANGE, "mode.set", { mode: CAPTURE_MODE });
mode = postal.getMode();
postal.publish("MyExchangeA", "Test.Topic", {});
postal.publish("MyExchangeA", "Test.Topic", {});
postal.publish("MyExchangeB", "Test.Topic", {});
postal.publish("MyExchangeB", "Test.Topic", {});
unsubscribeA();
unsubscribeB();
postal.publish(SYSTEM_EXCHANGE, "captor.save", { batchId: "MyMsgBatch", description: "Just a Test" });
savedBatch = amplify.store(POSTAL_MSG_STORE_KEY)["MyMsgBatch"];
it("the subscription callback for objA should be invoked only twice", function(){
assert(objA.messageCount).equals(2);
});
it("broker should report replay mode", function() {
assert(mode).equals(CAPTURE_MODE);
});
it("captured message batch should exist", function() {
assert(savedBatch !== undefined).isTrue();
})
it("captured message batch should have 4 messages", function() {
});
});
});
});
});

View file

@ -1,42 +0,0 @@
QUnit.specify("postal.js", function(){
describe("LanguageExtensions", function(){
describe("Object forEach extension", function() {
describe("When testing an empty object", function() {
var emptyObj = {};
var counter = 0;
var testClosure = function() {
counter++;
};
it("should not invoke the callback when no members exist to iterate over", function(){
emptyObj.forEach(testClosure);
assert(counter).equals(0);
});
});
describe("When testing an object with one member", function() {
var emptyObj = { test: "Test Value"};
var counter = 0;
var testClosure = function() {
counter++;
};
it("should invoke the callback once", function(){
emptyObj.forEach(testClosure);
assert(counter).equals(1);
});
});
describe("When testing an object with two members", function() {
var emptyObj = { test: "Test Value", other: "Moar"};
var counter = 0;
var testClosure = function() {
counter++;
};
it("should invoke the callback twice", function(){
emptyObj.forEach(testClosure);
assert(counter).equals(2);
});
});
});
});
});

View file

@ -4,9 +4,13 @@
<script type="text/javascript" src="../lib/jquery-1.5.2.js"></script>
<script type="text/javascript" src="../lib/qunit.js"></script>
<script type="text/javascript" src="../lib/pavlov.js"></script>
<script type="text/javascript" src="../lib/amplify.core.js"></script>
<script type="text/javascript" src="../lib/amplify.store.js"></script>
<script type="text/javascript" src="../src/languageExtensions.js"></script>
<script type="text/javascript" src="../src/broker2.js"></script>
<script type="text/javascript" src="languageExtensions.spec.js"></script>
<script type="text/javascript" src="../src/Postal.js"></script>
<script type="text/javascript" src="../src/ReplayContext.js"></script>
<script type="text/javascript" src="../src/CapturedMessageBatch.js"></script>
<script type="text/javascript" src="../src/MessageCaptor.js"></script>
<script type="text/javascript" src="broker.spec.js"></script>
<link rel="stylesheet" href="../lib/qunit.css" type="text/css" media="screen" />
</head>

View file

@ -0,0 +1,5 @@
var CapturedMessageBatch = function(batchId, description, messages) {
this.batchId = "";
this.description = description || "Captured Message Batch";
this.messages = messages || [];
};

27
src/MessageCaptor.js Normal file
View file

@ -0,0 +1,27 @@
var MessageCaptor = function(plugUp, unPlug) {
var _grabMsg = function(data) {
// We need to ignore system messages, since they could involve captures, replays, etc.
if(data.exchange !== SYSTEM_EXCHANGE) {
this.messages.push(data);
}
}.bind(this);
plugUp(_grabMsg);
this.messages = [];
this.save = function(batchId, description) {
unPlug(_grabMsg);
var captureStore = amplify.store(POSTAL_MSG_STORE_KEY);
if(!captureStore) {
captureStore = {};
}
captureStore[batchId] = new CapturedMessageBatch(batchId, description, this.messages);
amplify.store(POSTAL_MSG_STORE_KEY, captureStore);
};
postal.subscribe(SYSTEM_EXCHANGE, "captor.save", function(data) {
this.save(data.batchId || new Date().toString(),
data.description || "Captured Message Batch");
}.bind(this));
};

View file

@ -1,4 +1,9 @@
var DEFAULT_EXCHANGE = "/",
SYSTEM_EXCHANGE = "postal",
NORMAL_MODE = "Normal",
CAPTURE_MODE = "Capture",
REPLAY_MODE = "Replay",
POSTAL_MSG_STORE_KEY = "postal.captured",
_forEachKeyValue = function(object, callback) {
for(var x in object) {
if(object.hasOwnProperty(x)) {
@ -7,7 +12,7 @@ var DEFAULT_EXCHANGE = "/",
}
};
var Broker = function() {
var Postal = function() {
var _regexify = function(topic) {
if(!this[topic]) {
this[topic] = topic.replace(".", "\.").replace("*", ".*");
@ -21,7 +26,14 @@ var Broker = function() {
(topic.indexOf("*") !== -1 && comparison.search(_regexify(topic)) !== -1);
}
return this[topic + '_' + comparison];
}.bind(this);
}.bind(this),
_mode = NORMAL_MODE,
_replayContext,
_captor;
this.getMode = function() { return _mode; };
this.wireTaps = [];
this.subscriptions = {};
@ -59,7 +71,7 @@ var Broker = function() {
_topicList = _args[1].split(/\s/);
_subData.callback = _args[2];
}
else if(_args.length === 3 && typeof _args[2] === 'object') {
else if(_args.length === 3 && typeof _args[2] === 'object') { // expecting topic, callback and options
_exchange = DEFAULT_EXCHANGE;
_topicList = _args[0].split(/\s/);
_subData.callback = _args[1];
@ -67,6 +79,14 @@ var Broker = function() {
_subData.context = _args[2].context ? _args[2].context : null;
_once = _args[2].once ? _args[2].once : false;
}
else {
_exchange = exchange;
_topicList = topic.split(/\s/);
_subData.callback = callback;
_subData.priority = options.priority ? options.priority : 50;
_subData.context = options.context ? options.context : null;
_once = options.once ? options.once : false;
}
if(_once) {
_subData.onFired = function() {
@ -144,25 +164,68 @@ var Broker = function() {
if(_args.length == 2) {
_exchange = DEFAULT_EXCHANGE;
_topicList = _args[0].split(/\s/);
_data = _args[1];
_data = _args[1] || {};
}
else {
_exchange = exchange;
_topicList = topic.split(/\s/);
_data = data;
_data = data || {};
}
if(_mode !== REPLAY_MODE || (_mode === REPLAY_MODE && _exchange === SYSTEM_EXCHANGE)) {
_topicList.forEach(function(tpc){
_forEachKeyValue(this.subscriptions[_exchange],function(subTpc, subs) {
if(_isTopicMatch(tpc, subTpc)) {
subs.map(function(sub) { return sub.callback; })
.forEach(function(callback) {
if(typeof callback === 'function') {
callback(data);
}
});
}
});
}, this);
_topicList.forEach(function(tpc){
this.wireTaps.forEach(function(tap) {
tap({
exchange: _exchange,
topic: tpc,
data: _data,
timeStamp: new Date()
});
});
_forEachKeyValue(this.subscriptions[_exchange],function(subTpc, subs) {
if(_isTopicMatch(tpc, subTpc)) {
subs.map(function(sub) { return sub.callback; })
.forEach(function(callback) {
if(typeof callback === 'function') {
callback(_data);
}
});
}
});
}, this);
}
};
this.subscribe(SYSTEM_EXCHANGE, "mode.set", function(data) {
if(data.mode) {
switch(data.mode) {
case REPLAY_MODE:
_mode = REPLAY_MODE;
_replayContext = new ReplayContext(this.publish.bind(this), this.subscribe.bind(this));
_captor = undefined;
break;
case CAPTURE_MODE:
_mode = CAPTURE_MODE;
_captor = new MessageCaptor(function(callback){
this.wireTaps.push(callback);
}.bind(this),
function(callback) {
var idx = this.wireTaps.indexOf(callback);
if(idx !== -1) {
this.wireTaps.splice(idx,1);
}
}.bind(this));
break;
default:
_mode = NORMAL_MODE;
_replayContext = undefined;
_captor = undefined;
break;
}
}
}.bind(this));
};
var postal = new Postal();

61
src/ReplayContext.js Normal file
View file

@ -0,0 +1,61 @@
var ReplayContext = function (publish, subscribe) {
var _batch,
_continue = true,
_loadMessages = function(batchId) {
var msgStore = amplify.store(POSTAL_MSG_STORE_KEY),
targetBatch = msgStore[batchId];
if(targetBatch) {
_batch = targetBatch;
}
},
_replayImmediate = function() {
while(_batch.messages.length > 0) {
if(_continue) {
_advanceNext();
}
else {
break;
}
}
},
_advanceNext = function() {
var msg = _batch.messages.shift();
publish(msg.exchange, msg.topic, msg.data);
},
_replayRealTime = function() {
var lastTime = _batch.messages[0].timeStamp;
while(_batch.messages.length > 0) {
if(_continue) {
_advanceNext();
setTimeout(publish(SYSTEM_EXCHANGE, "replay.realTime"), _batch.messages[0].timeStamp - lastTime);
}
else {
break;
}
}
};
subscribe(SYSTEM_EXCHANGE, "replay.load", function(data) {
_continue = false;
_loadMessages(data);
});
subscribe(SYSTEM_EXCHANGE, "replay.immediate", function() {
_continue = true;
_replayImmediate();
});
subscribe(SYSTEM_EXCHANGE, "replay.advanceNext", function() {
_continue = true;
_advanceNext();
});
subscribe(SYSTEM_EXCHANGE, "replay.realTime", function() {
_continue = true;
_replayRealTime();
});
subscribe(SYSTEM_EXCHANGE, "replay.stop", function() {
_continue = false;
});
};

View file

@ -1,3 +1,4 @@
/*
var MessageBroker = function() {
var subscriptions = {},
regexify = function(topic) {
@ -66,4 +67,4 @@ var MessageBroker = function() {
}
}
};
};
};*/

View file

@ -1,4 +1,4 @@
if(!Object.prototype.forEach) {
/*if(!Object.prototype.forEach) {
Object.prototype.forEach = function (callback) {
var self = this;
for(var x in self) {
@ -18,7 +18,7 @@ if(!Object.prototype.forEachKeyValue) {
}
}
};
};
};*/
var isArray = function(value) {
var s = typeof value;