architectural changes pushed as far as the server starts running again, just limited

This commit is contained in:
Dominic Bosch 2013-11-14 22:27:15 +01:00
parent 11ced9a57e
commit efbb179832
10 changed files with 226 additions and 304 deletions

View file

@ -1,22 +1,18 @@
'use strict';
var path = require('path'), log, config;
var path = require('path'),
log = require('./logging'),
config;
exports = module.exports = function(relPath) {
if(typeof relPath !== 'string') relPath = path.join('config', 'config.json');
loadConfigFile(relPath);
};
exports.init = function(args, cb) {
exports = module.exports = function(args) {
args = args || {};
if(args.log) log = args.log;
else log = args.log = require('./logging');
loadConfigFile(path.join('config', 'config.json'));
if(typeof cb === 'function') cb();
log(args);
if(typeof args.relPath === 'string') loadConfigFile(args.relPath);
//TODO check all modules whether they can be loaded without calling the module.exports with args
return module.exports;
};
loadConfigFile(path.join('config', 'config.json'));
function loadConfigFile(relPath) {
try {
@ -73,7 +69,4 @@ exports.getSessionSecret = function() {
return fetchProp('session_secret');
};
exports.die = function(cb) {
if(typeof cb === 'function') cb();
};

View file

@ -12,31 +12,28 @@
// (e.g. action\_module\_probinder).
'use strict';
var log, crypto_key, db,
redis = require('redis'),
crypto = require('crypto');
var redis = require('redis'),
crypto = require('crypto'),
log = require('./logging'),
crypto_key, db;
/**
* Initializes the DB connection. Requires a valid configuration file which contains
* a db port and a crypto key.
* @param args {Object}
* @param cb {function}
*/
exports.init = function(args, cb) {
exports = module.exports = function(args) {
args = args || {};
if(args.log) log = args.log;
else log = args.log = require('./logging');
log(args);
var config = require('./config');
config.init(args);
var config = require('./config')(args);
crypto_key = config.getCryptoKey();
db = redis.createClient(config.getDBPort());
db.on("error", function (err) {
err.addInfo = 'message from DB';
log.error('DB', err);
});
if(typeof cb === 'function') cb();
return module.exports;
};
/**
@ -93,7 +90,7 @@ function replyHandler(action) {
* arguments (err, obj)
*/
function getSetRecords(set, funcSingle, callback) {
db.smembers(set, function(err, reply) {
if(db) db.smembers(set, function(err, reply) {
if(err) log.error('DB', 'fetching ' + set + ': ' + err);
else {
if(reply.length === 0) {
@ -124,7 +121,7 @@ function getSetRecords(set, funcSingle, callback) {
// @method shutDown()
// Shuts down the db link.
exports.shutDown = function() { db.quit(); };
exports.shutDown = function() { if(db) db.quit(); };
// ## Action Modules
@ -135,8 +132,10 @@ exports.shutDown = function() { db.quit(); };
* @param {String} data the string representation
*/
exports.storeActionModule = function(id, data) {
db.sadd('action_modules', id, replyHandler('storing action module key ' + id));
db.set('action_module_' + id, data, replyHandler('storing action module ' + id));
if(db) {
db.sadd('action_modules', id, replyHandler('storing action module key ' + id));
db.set('action_module_' + id, data, replyHandler('storing action module ' + id));
}
};
/**
@ -146,7 +145,7 @@ exports.storeActionModule = function(id, data) {
* @param {function} callback the callback to receive the answer (err, obj)
*/
exports.getActionModule = function(id, callback) {
if(callback) db.get('action_module_' + id, callback);
if(callback && db) db.get('action_module_' + id, callback);
};
/**
@ -165,7 +164,7 @@ exports.getActionModules = function(callback) {
* @param {String} data the string representation
*/
exports.storeActionModuleAuth = function(id, data) {
if(data) {
if(data && db) {
db.sadd('action_modules_auth', id, replyHandler('storing action module auth key ' + id));
db.set('action_module_' + id +'_auth', encrypt(data), replyHandler('storing action module auth ' + id));
}
@ -178,7 +177,7 @@ exports.storeActionModuleAuth = function(id, data) {
* @param {function} callback the callback to receive the answer (err, obj)
*/
exports.getActionModuleAuth = function(id, callback) {
if(callback) db.get('action_module_' + id + '_auth', function(err, txt) { callback(err, decrypt(txt)); });
if(callback && db) db.get('action_module_' + id + '_auth', function(err, txt) { callback(err, decrypt(txt)); });
};
// ## Event Modules
@ -190,8 +189,10 @@ exports.getActionModuleAuth = function(id, callback) {
* @param {String} data the string representation
*/
exports.storeEventModule = function(id, data) {
db.sadd('event_modules', id, replyHandler('storing event module key ' + id));
db.set('event_module_' + id, data, replyHandler('storing event module ' + id));
if(db) {
db.sadd('event_modules', id, replyHandler('storing event module key ' + id));
db.set('event_module_' + id, data, replyHandler('storing event module ' + id));
}
};
/**
@ -201,7 +202,7 @@ exports.storeEventModule = function(id, data) {
* @param {function} callback the callback to receive the answer (err, obj)
*/
exports.getEventModule = function(id, callback) {
if(callback) db.get('event_module_' + id, callback);
if(callback && db) db.get('event_module_' + id, callback);
};
/**
@ -220,7 +221,7 @@ exports.getEventModules = function(callback) {
* @param {String} data the string representation
*/
exports.storeEventModuleAuth = function(id, data) {
if(data) {
if(data && db) {
db.sadd('event_modules_auth', id, replyHandler('storing event module auth key ' + id));
db.set('event_module_' + id +'_auth', encrypt(data), replyHandler('storing event module auth ' + id));
}
@ -243,8 +244,10 @@ exports.getEventModuleAuth = function(id, callback) {
// @param {String} id the unique identifier of the rule
// @param {String} data the string representation
exports.storeRule = function(id, data) {
db.sadd('rules', id, replyHandler('storing rule key ' + id));
db.set('rule_' + id, data, replyHandler('storing rule ' + id));
if(db) {
db.sadd('rules', id, replyHandler('storing rule key ' + id));
db.set('rule_' + id, data, replyHandler('storing rule ' + id));
}
};
// @method getRule(id, callback)
@ -253,7 +256,7 @@ exports.storeRule = function(id, data) {
// @param {String} id the rule id
// @param {function} callback the callback to receive the answer (err, obj)
exports.getRule = function(id, callback) {
db.get('rule_' + id, callback);
if(db) db.get('rule_' + id, callback);
};
// @method getRules(callback)
@ -270,13 +273,8 @@ exports.getRules = function(callback) {
* @param {Object} objUser
*/
exports.storeUser = function(cb, objUser) {
if(objUser && objUser.id) {
if(db && objUser && objUser.id) {
db.sadd('users', objUser.id, replyHandler('storing user key ' + objUser.id));
db.set('user:' + objUser.id, data, replyHandler('storing user properties ' + objUser.id));
}
};
exports.die = function(cb) {
if(typeof cb === 'function') cb();
};

View file

@ -1,35 +1,22 @@
'use strict';
var log, ml;
exports.init = function(args, cb) {
args = args || {};
if(args.log) log = args.log;
else log = args.log = require('./logging');
ml = require('./module_loader').init(args);
if(typeof cb === 'function') cb();
};
var path = require('path'),
cp = require('child_process'),
poller, db, isRunning = true,
qEvents = new (require('./queue')).Queue(); // export queue into redis
log = require('./logging'),
qEvents = new (require('./queue')).Queue(), //TODO export queue into redis
regex = /\$X\.[\w\.\[\]]*/g, // find properties of $X
listRules = {},
listActionModules = {},
isRunning = true,
actionsLoaded = false,
eventsLoaded = false,
ml, poller, db;
var regex = /\$X\.[\w\.\[\]]*/g, // find properties of $X
listRules = {},
listActionModules = {},
actionsLoaded = false, eventsLoaded = false;
/*
* Initialize the rules engine which initializes the module loader.
* @param {Object} db_link the link to the db, see [db\_interface](db_interface.html)
* @param {String} db_port the db port
* @param {String} crypto_key the key to be used for encryption on the db, max legnth 256
*/
exports.addDBLink = function(db_link) {
db = db_link;
loadActions();
poller = cp.fork(path.resolve(__dirname, 'eventpoller'));
exports = module.exports = function(args) {
args = args || {};
log(args);
ml = require('./module_loader')(args);
poller = cp.fork(path.resolve(__dirname, 'eventpoller'), [log.getLogType()]);
poller.on('message', function(evt) {
if(evt.event === 'ep_finished_loading') {
eventsLoaded = true;
@ -38,10 +25,19 @@ exports.addDBLink = function(db_link) {
});
//start to poll the event queue
pollQueue();
return module.exports;
};
function loadActions() {
db.getActionModules(function(err, obj) {
/*
* Initialize the rules engine which initializes the module loader.
* @param {Object} db_link the link to the db, see [db\_interface](db_interface.html)
* @param {String} db_port the db port
* @param {String} crypto_key the key to be used for encryption on the db, max legnth 256
*/
exports.addDBLink = function(db_link) { db = db_link; };
exports.loadActions = function(cb) {
if(ml && db) db.getActionModules(function(err, obj) {
if(err) log.error('EN', 'retrieving Action Modules from DB!');
else {
if(!obj) {
@ -66,12 +62,13 @@ function loadActions() {
listActionModules[el] = m;
}
}
}
}
});
}
else log.severe('EN', new Error('Module Loader not defined!'));
};
function tryToLoadRules() {
if(eventsLoaded && actionsLoaded) {
if(db && eventsLoaded && actionsLoaded) {
db.getRules(function(err, obj) {
for(var el in obj) exports.loadRule(JSON.parse(obj[el]));
});
@ -252,8 +249,3 @@ exports.shutDown = function() {
if(poller) poller.send('cmd|shutdown');
if(db) db.shutDown();
};
exports.die = function(cb) {
if(typeof cb === 'function') cb();
};

View file

@ -2,63 +2,64 @@
'use strict';
var log, db, ml;
function init() {
log = require('./logging');
if(process.argv.length > 2) log(parseInt(process.argv[2]) || 0);
ml = require('./module_loader').init({ log: log }),
db = require('./db_interface').init({ log: log }, doneInitDB);
if(typeof cb === 'function') cb();
};
var fs = require('fs'),
path = require('path'),
listMessageActions = {},
listAdminCommands = {},
listEventModules = {},
listPoll = {}, //TODO this will change in the future because it could have
//several parameterized (user-specific) instances of each event module
isRunning = true,
eId = 0;
path = require('path'),
log = require('./logging'),
listMessageActions = {},
listAdminCommands = {},
listEventModules = {},
listPoll = {}, //TODO this will change in the future because it could have
//several parameterized (user-specific) instances of each event module
isRunning = true,
eId = 0,
db, ml;
//TODO allow different polling intervals (a wrapper together with settimeout per to be polled could be an easy and solution)
function doneInitDB(err) {
if(!err) {
function init() {
//FIXME ensure eventpoller receives the log method from the engine
console.log('EP receives args:');
console.log('logmeth: ' + parseInt(process.argv[2]) || 0);
console.log(process.argv);
if(process.argv.length > 2) log({ logType: parseInt(process.argv[2]) || 0 });
var args = { logType: log.getLogType() };
ml = require('./module_loader')(args);
db = require('./db_interface')(args);
initAdminCommands();
initMessageActions();
loadEventModules();
pollLoop();
};
function loadEventModules() {
//TODO eventpoller will not load event modules from filesystem, this will be done by
// the moduel manager and the eventpoller receives messages about new/updated active rules
db.getEventModules(function(err, obj) {
if(err) log.error('EP', 'retrieving Event Modules from DB!');
else {
if(!obj) {
log.print('EP', 'No Event Modules found in DB!');
process.send({ event: 'ep_finished_loading' });
} else {
var m, semaphore = 0;
for(var el in obj) {
semaphore++;
m = ml.requireFromString(obj[el], el);
db.getEventModuleAuth(el, function(mod) {
return function(err, obj) {
if(--semaphore === 0) process.send({ event: 'ep_finished_loading' });
if(obj && mod.loadCredentials) mod.loadCredentials(JSON.parse(obj));
};
}(m));
log.print('EP', 'Loading Event Module: ' + el);
listEventModules[el] = m;
}
if(db && ml) db.getEventModules(function(err, obj) {
if(err) log.error('EP', 'retrieving Event Modules from DB!');
else {
if(!obj) {
log.print('EP', 'No Event Modules found in DB!');
process.send({ event: 'ep_finished_loading' });
} else {
var m, semaphore = 0;
for(var el in obj) {
semaphore++;
log.print('EP', 'Loading Event Module: ' + el);
m = ml.requireFromString(obj[el], el);
db.getEventModuleAuth(el, function(mod) {
return function(err, obj) {
if(--semaphore === 0) process.send({ event: 'ep_finished_loading' });
if(obj && mod.loadCredentials) mod.loadCredentials(JSON.parse(obj));
};
}(m));
listEventModules[el] = m;
}
initAdminCommands();
initMessageActions();
}
});
} else {
err.addInfo = 'eventpoller init failed';
log.error('EP', err);
}
}
});
}
function initMessageActions() {
@ -101,22 +102,24 @@ function initMessageActions() {
function initAdminCommands() {
listAdminCommands['loadevent'] = function(args) {
ml.loadModule('mod_events', args[2], loadEventCallback);
if(ml) ml.loadModule('mod_events', args[2], loadEventCallback);
};
listAdminCommands['loadevents'] = function(args) {
ml.loadModules('mod_events', loadEventCallback);
if(ml) ml.loadModules('mod_events', loadEventCallback);
};
listAdminCommands['shutdown'] = function(args) {
log.print('EP', 'Shutting down DB Link');
isRunning = false;
db.shutDown();
if(db) db.shutDown();
};
}
function loadEventCallback(name, data, mod, auth) {
db.storeEventModule(name, data); // store module in db
if(auth) db.storeEventModuleAuth(name, auth);
listEventModules[name] = mod; // store compiled module for polling
if(db) {
db.storeEventModule(name, data); // store module in db
if(auth) db.storeEventModuleAuth(name, auth);
listEventModules[name] = mod; // store compiled module for polling
}
}
function checkRemotes() {
@ -155,11 +158,5 @@ function pollLoop() {
setTimeout(pollLoop, 10000);
}
}
exports.die = function(cb) {
if(typeof cb === 'function') cb();
};
init();
pollLoop();
init();

View file

@ -2,23 +2,25 @@
// Isso
'use strict';
var log, config;
exports.init = function(args, cb) {
args = args || {};
if(args.log) log = args.log;
else log = args.log = require('./logging');
config = require('./config').init(args);
if(typeof cb === 'function') cb();
};
var path = require('path'),
express = require('express'),
app = express(),
RedisStore = require('connect-redis')(express),
qs = require('querystring'),
adminHandler, eventHandler, server;
log = require('./logging'),
sess_sec = '#C[>;j`@".TXm2TA;A2Tg)',
db_port, http_port, server,
adminHandler, eventHandler;
exports = module.exports = function(args) {
args = args || {};
log(args);
var config = require('./config')(args);
db_port = config.getDBPort(),
sess_sec = config.getSessionSecret(),
http_port = config.getHttpPort();
return module.exports;
};
exports.addHandlers = function(funcAdminHandler, funcEvtHandler) {
if(!funcEvtHandler) {
@ -37,9 +39,6 @@ exports.addHandlers = function(funcAdminHandler, funcEvtHandler) {
app.use('/rulesforge/', express.static(path.resolve(__dirname, '..', 'webpages', 'rulesforge')));
app.get('/admin', onAdminCommand);
app.post('/pushEvents', onPushEvent);
var db_port = config.getDBPort(),
sess_sec = config.getSessionSecret(),
http_port = config.getHttpPort();
if(db_port) {
app.use(express.session({
store: new RedisStore({
@ -54,11 +53,7 @@ exports.addHandlers = function(funcAdminHandler, funcEvtHandler) {
}));
log.print('HL', 'Added redis DB as session backbone');
} else {
if(sess_sec) app.use(express.session({secret: sess_sec}));
else {
app.use(express.session({ secret: '#C[>;j`@".TXm2TA;A2Tg)' }));
log.print('HL', 'no session secret found?!');
}
app.use(express.session({secret: sess_sec}));
log.print('HL', 'no session backbone');
}
if(http_port) server = app.listen(http_port); // inbound event channel
@ -136,8 +131,3 @@ exports.shutDown = function() {
log.print('HL', 'Shutting down HTTP listener');
process.exit(); // This is a bit brute force...
};
exports.die = function(cb) {
if(typeof cb === 'function') cb();
};

View file

@ -3,21 +3,25 @@
* =======
* Functions to handle logging and errors.
*
* Valid log methods are:
* Valid log types are:
*
* - 0 standard I/O
* - 1 file
* - 2 silent
*/
var logMethods = [ flushToConsole, flushToFile, null],
logMethod = 0, logFile;
var logTypes = [ flushToConsole, flushToFile, null],
logType = 0, logFile;
exports = module.exports = function(logMeth) {
if(logMeth) logMethod = parseInt(logMeth) || 0;
exports = module.exports = function(args) {
args = args || {};
if(args.logType) logType = parseInt(args.logType) || 0;
return module.exports;
};
exports.getLogType = function() { return logType; };
function flush(err, msg) {
if(typeof logMethods[logMethod] === 'function') logMethods[logMethod](err, msg);
if(typeof logTypes[logType] === 'function') logTypes[logType](err, msg);
}
function flushToConsole(err, msg) {

View file

@ -1,16 +1,15 @@
'use strict';
var log;
exports.init = function(args, cb) {
var fs = require('fs'),
path = require('path'),
log = require('./logging');
exports = module.exports = function(args) {
args = args || {};
if(args.log) log = args.log;
else log = args.log = require('./logging');
if(typeof cb === 'function') cb();
log(args);
return module.exports;
};
var fs = require('fs'),
path = require('path');
exports.requireFromString = function(src, name, dir) {
if(!dir) dir = __dirname;
//FIXME load modules only into a safe environment with given modules, no access to whole application
@ -71,8 +70,4 @@ exports.loadModules = function(directory, callback) {
});
});
};
exports.die = function(cb) {
if(typeof cb === 'function') cb();
};

View file

@ -9,20 +9,17 @@
'use strict';
var log, ml;
exports.init = function(args, cb) {
args = args || {};
if(args.log) log = args.log;
else log = args.log = require('./logging');
ml = require('./module_loader').init(args);
if(typeof cb === 'function') cb();
};
var fs = require('fs'),
path = require('path'),
db = null, funcLoadAction, funcLoadRule;
path = require('path'),
log = require('./logging'),
ml, db, funcLoadAction, funcLoadRule;
exports = module.exports = function(args) {
args = args || {};
log(args);
ml = require('./module_loader')(args);
return module.exports;
};
exports.addHandlers = function(db_link, fLoadAction, fLoadRule) {
db = db_link;
@ -95,18 +92,16 @@ function loadActionCallback(name, data, mod, auth) {
}
exports.loadActionModule = function (args, answHandler) {
if(args && args.name) {
if(ml && args && args.name) {
answHandler.answerSuccess('Loading action module ' + args.name + '...');
ml.loadModule('mod_actions', args.name, loadActionCallback);
}
};
exports.loadActionModules = function(args, answHandler) {
answHandler.answerSuccess('Loading action modules...');
ml.loadModules('mod_actions', loadActionCallback);
};
exports.die = function(cb) {
if(typeof cb === 'function') cb();
if(ml) {
answHandler.answerSuccess('Loading action modules...');
ml.loadModules('mod_actions', loadActionCallback);
}
};

View file

@ -24,85 +24,49 @@ dog's back.
*/
//FIXME server should be started via command line arguments http_port and logging level to allow proper testing
var log = require('./logging'), http_port;
log.print('RS', 'STARTING SERVER');
if(process.argv.length > 2) log(process.argv[2]);
else log.print('RS', 'No log method passed, using stdI/O');
if(process.argv.length > 3) http_port = parseInt(process.argv[3]);
else log.print('RS', 'No HTTP port passed, using standard port from config file');
var fs = require('fs'),
path = require('path'),
procCmds = {
'die': function() { shutDown(); }
};
path = require('path'),
log = require('./logging'),
procCmds = {
'die': function() { shutDown(); }
},
semaphore = 0,
args = {},
http_listener, mm, db, engine, objCmds;
function init() {
log.print('RS', 'STARTING SERVER');
function handleModuleLoad(cb, msg) {
return function(err) {
if(!err) {
if(typeof cb === 'function') cb();
log.print('RS', msg + ' initialized successfully');
} else {
err.addInfo = msg + ' init failed';
log.error('RS', err);
}
if(process.argv.length > 2) {
args.logType = parseInt(process.argv[2]) || 0 ;
log(args);
} else log.print('RS', 'No log method passed, using stdI/O');
if(process.argv.length > 3) args.http_port = parseInt(process.argv[3]);
else log.print('RS', 'No HTTP port passed, using standard port from config file');
engine = require('./engine')(args);
http_listener = require('./http_listener')(args);
mm = require('./module_manager')(args);
db = require('./db_interface')(args);
log.print('RS', 'Initialzing DB');
objCmds = {
'loadrules': mm.loadRulesFile,
'loadaction': mm.loadActionModule,
'loadactions': mm.loadActionModules,
'loadevent': engine.loadEventModule,
'loadevents': engine.loadEventModules,
'shutdown': shutDown
};
engine.addDBLink(db);
log.print('RS', 'Initialzing http listener');
http_listener.addHandlers(handleAdminCommands, engine.pushEvent);
log.print('RS', 'Initialzing module manager');
mm.addHandlers(db, engine.loadActionModule, engine.loadRule);
//FIXME load actions and events, then rules, do this here, visible for everybody on the first glance
//TODO for such events we should forge the architecture more into an event driven one
}
function loadHL() {
http_listener = require('./http_listener').init(log,
handleModuleLoad(loadEN, 'http listener')
);
}
function loadEN(cb) {
engine = require('./engine').init(log,
handleModuleLoad(loadMM, 'engine')
);
}
function loadMM(cb) {
mm = require('./module_manager').init(log,
handleModuleLoad(loadDB, 'module manager')
);
}
function loadDB(cb) {
db = require('./db_interface').init(log,
handleModuleLoad(doneInitDB, 'db interface init failed')
);
}
function doneInitDB(err) {
if(!err) {
objCmds = {
'loadrules': mm.loadRulesFile,
'loadaction': mm.loadActionModule,
'loadactions': mm.loadActionModules,
'loadevent': engine.loadEventModule,
'loadevents': engine.loadEventModules,
'shutdown': shutDown
};
//FIXME engine requires db to be finished with init...
engine.addDBLink(db);
log.print('RS', 'Initialzing http listener');
http_listener.addHandlers(handleAdminCommands, engine.pushEvent);
log.print('RS', 'Initialzing module manager');
mm.addHandlers(db, engine.loadActionModule, engine.loadRule);
}
else {
err.addInfo = err.message;
err.message = 'Not Starting engine!';
log.error(err);
}
}
(function() {
loadHL();
// engine = require('./engine').init(log),
// mm = require('./module_manager').init(log),
// db = require('./db_interface').init(log, doneInitDB), //TODO have a close lok at this special case
})();
function handleAdminCommands(args, answHandler) {
if(args && args.cmd) {
@ -134,8 +98,6 @@ process.on('message', function(cmd) {
else console.error('err with command');
});
log.print('RS', 'Initialzing DB');
//FIXME initialization of all modules should depend on one after the other
// in a transaction style manner
@ -153,3 +115,5 @@ log.print('RS', 'Initialzing DB');
* - init(args, cb)
* - die()
*/
init();

View file

@ -1,20 +1,19 @@
'use strict';
var log;
exports.init = function(args, cb) {
args = args || {};
if(args.log) log = args.log;
else log = args.log = require('./logging');
if(typeof cb === 'function') cb();
};
var log = require('./logging'),
objCmds = {
addUser: addUser,
getUser: getUser,
delUser: delUser,
addRule: addRule,
getRules: getRules,
delRule: delRule
};
var objCmds = {
addUser: addUser,
getUser: getUser,
delUser: delUser,
addRule: addRule,
getRules: getRules,
delRule: delRule
exports = module.exports = function(args) {
args = args || {};
log(args);
return module.exports;
};
exports.handleCommand = function(args, cb) {
@ -80,8 +79,3 @@ function getRule(args, cb) {
function delRule(args, cb) {
}
exports.die = function(cb) {
if(typeof cb === 'function') cb();
};