postal.js/example/node/index.js
2012-08-28 12:06:19 -04:00

215 lines
6.2 KiB
JavaScript

var _ = require( "underscore" ),
express = require( 'express' ),
app = express.createServer(),
postal = require( "./messaging/postal.js" ),
PostalSocketHost = require( './messaging/postal.socket-host.js' ),
sockets = require( './messaging/socket-io-provider.js' ),
TwitterSearch = require( './twitter-search.js' ),
collectors = require( './stat-collectors.js' ),
BusAdapter = require( './messaging/bus-adapter.js' ),
machina = require( './machina.js' );
postal.addWireTap( function ( data, envelope ) {
if ( envelope.channel === "stats" /*|| envelope.channel === "twittersearch"*/ ) {
var env = _.extend( {}, envelope );
delete env.data;
console.log( JSON.stringify( env ) );
}
else if ( _.include( ["postal.socket", "postal", "app", "app.events"], envelope.channel ) ) {
console.log( JSON.stringify( envelope ) );
}
} );
// wire machina FSMs into postal automagically
require( './messaging/machina.postal.js' )( postal, machina );
var TwitterSocketStats = function ( port, refreshinterval ) {
// Stand up our express app
app.use( "/", express.static( __dirname + '/client' ) );
app.listen( port );
var searchChannel = postal.channel( "twittersearch", "#" ),
statsChannel = postal.channel( "stats", "#" ),
appChannel = postal.channel( "statsApp", "#" ),
fsm;
postal.linkChannels( { channel : "postal.socket", topic : "client.migrated"}, { channel : "statsApp", topic : "client.migrated" } );
fsm = new machina.Fsm( {
namespace : "statsApp",
currentSearch : {
id : null,
searchTerm : "{ Search Not Active }"
},
searchRequests : {},
express : app,
appChannel : appChannel,
searchChannel : searchChannel,
statsChannel : statsChannel,
searchAgent : new BusAdapter( new TwitterSearch( refreshinterval ), searchChannel, searchChannel ),
requestedSearches : [],
stats : collectors.load( searchChannel, statsChannel ),
postal : postal,
bridge : new PostalSocketHost( postal, sockets.getSocketProvider( postal, app ) ),
getAvailableStats : function ( clientId ) {
this.appChannel.publish( {
topic : "available.topics",
correlationId : clientId,
data : {
topics : _.toArray( this.stats ).map( function ( stat ) {
return { channel : "stats", topic : stat.namespace };
} )
}
} );
},
setSearch : function ( correlationId, searchTerm ) {
this.currentSearch = {
id : correlationId,
searchTerm : searchTerm
};
this.searchAgent.search( searchTerm );
this.removeSearchRequest( correlationId, searchTerm );
this.appChannel.publish( {
topic : "search.info",
data : this.currentSearch
} );
},
getSearchInfo : function ( env ) {
this.appChannel.publish( {
topic : "search.info",
correlationId : env.correlationId,
data : this.currentSearch
} );
},
getSearchRequests : function ( env ) {
env || (env = {});
this.appChannel.publish( {
topic : "search.requests",
correlationId : env.correlationId,
data : this.requestedSearches
} );
},
addSearchRequest : function ( correlationId, searchTerm ) {
if ( !_.any( this.searchRequests, function ( item ) {
return item.correlationId === request.correlationId &&
item.searchTerm === request.searchTerm;
} ) ) {
this.requestedSearches.push( { correlationId : correlationId, searchTerm : searchTerm } );
}
},
removeSearchRequest : function ( correlationId, searchTerm ) {
if ( _.any( this.requestedSearches, function ( item ) {
return item.correlationId === correlationId && item.searchTerm === searchTerm;
} ) ) {
this.requestedSearches = _.filter( this.requestedSearches, function ( item ) {
return item.correlationId !== correlationId && item.searchTerm !== searchTerm;
} );
this.getSearchRequests();
}
},
states : {
uninitialized : {
start : function () {
this.transition( "notSearching" );
},
"*" : function () {
this.deferUntilTransition();
}
},
notSearching : {
"search.new.request" : function ( data, envelope ) {
this.setSearch( envelope.correlationId, data.searchTerm );
this.transition( "searching" );
},
"get.search.info" : function ( data, env ) {
this.getSearchInfo( env );
},
"get.available" : function ( data, envelope ) {
this.getAvailableStats( envelope.correlationId );
}
},
searching : {
"search.new.request" : function ( data, envelope ) {
if ( envelope.correlationId === this.currentSearch.id ) {
this.setSearch( envelope.correlationId, data.searchTerm );
}
else {
this.addSearchRequest( envelope.correlationId, data.searchTerm );
this.appChannel.publish( {
topic : "search.new.ask",
data : {
correlationId : envelope.correlationId,
searchTerm : data.searchTerm
}
} );
}
},
"search.new.confirm" : function ( data, envelope ) {
if ( envelope.correlationId === this.currentSearch.id ) {
this.setSearch( data.correlationId, data.searchTerm );
}
},
"get.search.info" : function ( data, env ) {
this.getSearchInfo( env );
},
"get.search.requests" : function ( data, env ) {
this.getSearchRequests( env );
},
"get.available" : function ( data, envelope ) {
this.getAvailableStats( envelope.correlationId );
},
"client.migrated" : function ( data, envelope ) {
if ( data.lastSessionId === this.currentSearch.id ) {
this.currentSearch.id = data.sessionId;
}
},
"client.disconnect" : function ( data ) {
if ( this.currentSearch.id === data.sessionId ) {
if( this.requestedSearches.length ) {
var newSearch = this.requestedSearches.shift();
this.setSearch( newSearch.correlationId, newSearch.searchTerm );
} else {
this.transition("notSearching");
}
}
}
}
}
} );
postal.subscribe( {
channel : "postal.socket",
topic : "client.disconnect",
callback: function( data, env ) {
fsm.handle(env.topic, data );
}
} );
return fsm;
};
var x = module.exports = new TwitterSocketStats( 8002, 7000 );
x.on( "*", function ( evnt, data ) {
console.log( "FSM Event: " + evnt + " - " + JSON.stringify( [].slice.call( arguments, 1 ) ) );
} );
x.handle( "start" );