mirror of
https://github.com/Hopiu/postal.js.git
synced 2026-03-16 22:20:23 +00:00
215 lines
6.2 KiB
JavaScript
215 lines
6.2 KiB
JavaScript
var _ = require( "underscore" ),
|
|
express = require( 'express' ),
|
|
app = express.createServer(),
|
|
postal = require( "./messaging/postal.js" ),
|
|
PostalSocketHost = require( './messaging/postal.socket-host.js' ),
|
|
sockets = require( './messaging/socket-io-provider.js' ),
|
|
TwitterSearch = require( './twitter-search.js' ),
|
|
collectors = require( './stat-collectors.js' ),
|
|
BusAdapter = require( './messaging/bus-adapter.js' ),
|
|
machina = require( './machina.js' );
|
|
|
|
postal.addWireTap( function ( data, envelope ) {
|
|
if ( envelope.channel === "stats" /*|| envelope.channel === "twittersearch"*/ ) {
|
|
var env = _.extend( {}, envelope );
|
|
delete env.data;
|
|
console.log( JSON.stringify( env ) );
|
|
}
|
|
else if ( _.include( ["postal.socket", "postal", "app", "app.events"], envelope.channel ) ) {
|
|
console.log( JSON.stringify( envelope ) );
|
|
}
|
|
} );
|
|
|
|
// wire machina FSMs into postal automagically
|
|
require( './messaging/machina.postal.js' )( postal, machina );
|
|
|
|
var TwitterSocketStats = function ( port, refreshinterval ) {
|
|
// Stand up our express app
|
|
app.use( "/", express.static( __dirname + '/client' ) );
|
|
app.listen( port );
|
|
var searchChannel = postal.channel( "twittersearch", "#" ),
|
|
statsChannel = postal.channel( "stats", "#" ),
|
|
appChannel = postal.channel( "statsApp", "#" ),
|
|
fsm;
|
|
|
|
postal.linkChannels( { channel : "postal.socket", topic : "client.migrated"}, { channel : "statsApp", topic : "client.migrated" } );
|
|
|
|
fsm = new machina.Fsm( {
|
|
|
|
namespace : "statsApp",
|
|
|
|
currentSearch : {
|
|
id : null,
|
|
searchTerm : "{ Search Not Active }"
|
|
},
|
|
|
|
searchRequests : {},
|
|
|
|
express : app,
|
|
|
|
appChannel : appChannel,
|
|
|
|
searchChannel : searchChannel,
|
|
|
|
statsChannel : statsChannel,
|
|
|
|
searchAgent : new BusAdapter( new TwitterSearch( refreshinterval ), searchChannel, searchChannel ),
|
|
|
|
requestedSearches : [],
|
|
|
|
stats : collectors.load( searchChannel, statsChannel ),
|
|
|
|
postal : postal,
|
|
|
|
bridge : new PostalSocketHost( postal, sockets.getSocketProvider( postal, app ) ),
|
|
|
|
getAvailableStats : function ( clientId ) {
|
|
this.appChannel.publish( {
|
|
topic : "available.topics",
|
|
correlationId : clientId,
|
|
data : {
|
|
topics : _.toArray( this.stats ).map( function ( stat ) {
|
|
return { channel : "stats", topic : stat.namespace };
|
|
} )
|
|
}
|
|
} );
|
|
},
|
|
|
|
setSearch : function ( correlationId, searchTerm ) {
|
|
this.currentSearch = {
|
|
id : correlationId,
|
|
searchTerm : searchTerm
|
|
};
|
|
this.searchAgent.search( searchTerm );
|
|
this.removeSearchRequest( correlationId, searchTerm );
|
|
this.appChannel.publish( {
|
|
topic : "search.info",
|
|
data : this.currentSearch
|
|
} );
|
|
},
|
|
|
|
getSearchInfo : function ( env ) {
|
|
this.appChannel.publish( {
|
|
topic : "search.info",
|
|
correlationId : env.correlationId,
|
|
data : this.currentSearch
|
|
} );
|
|
},
|
|
|
|
getSearchRequests : function ( env ) {
|
|
env || (env = {});
|
|
this.appChannel.publish( {
|
|
topic : "search.requests",
|
|
correlationId : env.correlationId,
|
|
data : this.requestedSearches
|
|
} );
|
|
},
|
|
|
|
addSearchRequest : function ( correlationId, searchTerm ) {
|
|
if ( !_.any( this.searchRequests, function ( item ) {
|
|
return item.correlationId === request.correlationId &&
|
|
item.searchTerm === request.searchTerm;
|
|
} ) ) {
|
|
this.requestedSearches.push( { correlationId : correlationId, searchTerm : searchTerm } );
|
|
}
|
|
},
|
|
|
|
removeSearchRequest : function ( correlationId, searchTerm ) {
|
|
if ( _.any( this.requestedSearches, function ( item ) {
|
|
return item.correlationId === correlationId && item.searchTerm === searchTerm;
|
|
} ) ) {
|
|
this.requestedSearches = _.filter( this.requestedSearches, function ( item ) {
|
|
return item.correlationId !== correlationId && item.searchTerm !== searchTerm;
|
|
} );
|
|
this.getSearchRequests();
|
|
}
|
|
},
|
|
|
|
states : {
|
|
uninitialized : {
|
|
start : function () {
|
|
this.transition( "notSearching" );
|
|
},
|
|
"*" : function () {
|
|
this.deferUntilTransition();
|
|
}
|
|
},
|
|
notSearching : {
|
|
"search.new.request" : function ( data, envelope ) {
|
|
this.setSearch( envelope.correlationId, data.searchTerm );
|
|
this.transition( "searching" );
|
|
},
|
|
"get.search.info" : function ( data, env ) {
|
|
this.getSearchInfo( env );
|
|
},
|
|
"get.available" : function ( data, envelope ) {
|
|
this.getAvailableStats( envelope.correlationId );
|
|
}
|
|
},
|
|
searching : {
|
|
"search.new.request" : function ( data, envelope ) {
|
|
if ( envelope.correlationId === this.currentSearch.id ) {
|
|
this.setSearch( envelope.correlationId, data.searchTerm );
|
|
}
|
|
else {
|
|
this.addSearchRequest( envelope.correlationId, data.searchTerm );
|
|
this.appChannel.publish( {
|
|
topic : "search.new.ask",
|
|
data : {
|
|
correlationId : envelope.correlationId,
|
|
searchTerm : data.searchTerm
|
|
}
|
|
} );
|
|
}
|
|
},
|
|
"search.new.confirm" : function ( data, envelope ) {
|
|
if ( envelope.correlationId === this.currentSearch.id ) {
|
|
this.setSearch( data.correlationId, data.searchTerm );
|
|
}
|
|
},
|
|
"get.search.info" : function ( data, env ) {
|
|
this.getSearchInfo( env );
|
|
},
|
|
"get.search.requests" : function ( data, env ) {
|
|
this.getSearchRequests( env );
|
|
},
|
|
"get.available" : function ( data, envelope ) {
|
|
this.getAvailableStats( envelope.correlationId );
|
|
},
|
|
"client.migrated" : function ( data, envelope ) {
|
|
if ( data.lastSessionId === this.currentSearch.id ) {
|
|
this.currentSearch.id = data.sessionId;
|
|
}
|
|
},
|
|
"client.disconnect" : function ( data ) {
|
|
if ( this.currentSearch.id === data.sessionId ) {
|
|
if( this.requestedSearches.length ) {
|
|
var newSearch = this.requestedSearches.shift();
|
|
this.setSearch( newSearch.correlationId, newSearch.searchTerm );
|
|
} else {
|
|
this.transition("notSearching");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
} );
|
|
|
|
postal.subscribe( {
|
|
channel : "postal.socket",
|
|
topic : "client.disconnect",
|
|
callback: function( data, env ) {
|
|
fsm.handle(env.topic, data );
|
|
}
|
|
} );
|
|
|
|
return fsm;
|
|
};
|
|
|
|
var x = module.exports = new TwitterSocketStats( 8002, 7000 );
|
|
|
|
x.on( "*", function ( evnt, data ) {
|
|
console.log( "FSM Event: " + evnt + " - " + JSON.stringify( [].slice.call( arguments, 1 ) ) );
|
|
} );
|
|
|
|
x.handle( "start" );
|