shifted module loading from fs to module manager, eventpoller now only fetches activated polling functions from the DB

This commit is contained in:
Dominic Bosch 2013-11-18 10:55:40 +01:00
parent a6c3dd4510
commit b3de695b07
6 changed files with 114 additions and 122 deletions

View file

@ -17,10 +17,11 @@ var redis = require('redis'),
log = require('./logging'),
crypto_key, db;
/**
* Initializes the DB connection. Requires a valid configuration file which contains
* a db port and a crypto key.
* @param args {Object}
*
*/
exports = module.exports = function(args) {
args = args || {};
@ -123,6 +124,7 @@ function getSetRecords(set, funcSingle, callback) {
// Shuts down the db link.
exports.shutDown = function() { if(db) db.quit(); };
// ## Action Modules
/**

View file

@ -56,12 +56,12 @@ exports.addDBLinkAndLoadActionsAndRules = function(db_link) {
}
}
});
else log.severe('EN', new Error('Module Loader not defined!'));
else log.severe('EN', new Error('Module Loader or DB not defined!'));
};
function loadRulesFromDB() {
if(db) db.getRules(function(err, obj) {
for(var el in obj) exports.loadRule(JSON.parse(obj[el]));
for(var el in obj) exports.addRule(JSON.parse(obj[el]));
});
}
@ -75,10 +75,10 @@ exports.loadActionModule = function(name, objModule) {
};
/**
* Insert a rule into the eca rules repository
* Add a rule into the working memory
* @param {Object} objRule the rule object
*/
exports.loadRule = function(objRule) {
exports.addRule = function(objRule) {
//TODO validate rule
log.print('EN', 'Loading Rule: ' + objRule.id);
if(listRules[objRule.id]) log.print('EN', 'Replacing rule: ' + objRule.id);
@ -221,18 +221,6 @@ function preprocessActionArguments(evt, act, res) {
}
}
exports.loadEventModule = function(args, answHandler) {
if(args && args.name) {
answHandler.answerSuccess('Loading event module ' + args.name + '...');
poller.send('cmd|loadevent|'+args.name);
} else if(args) answHandler.answerError(args.name + ' not found');
};
exports.loadEventModules = function(args, answHandler) {
answHandler.answerSuccess('Loading event moules...');
poller.send('cmd|loadevents');
};
exports.shutDown = function() {
log.print('EN', 'Shutting down Poller and DB Link');
isRunning = false;

View file

@ -18,64 +18,67 @@ var fs = require('fs'),
function init() {
//FIXME ensure eventpoller receives the log method from the engine
if(process.argv.length > 2) log({ logType: parseInt(process.argv[2]) || 0 });
var args = { logType: log.getLogType() };
ml = require('./module_loader')(args);
db = require('./db_interface')(args);
initAdminCommands();
initMessageActions();
loadEventModules();
pollLoop();
};
function loadEventModules() {
//TODO eventpoller will not load event modules from db on init, this will be done
// when receiving messages about new/updated active rules
if(db && ml) db.getEventModules(function(err, obj) {
if(err) log.error('EP', 'retrieving Event Modules from DB!');
function loadEventModule(el, cb) {
if(db && ml) db.getEventModule(el, function(err, obj) {
if(err || !obj) {
if(typeof cb === 'function') cb(new Error('Retrieving Event Module ' + el + ' from DB: ' + err));
else log.error('EP', 'Retrieving Event Module ' + el + ' from DB!');
}
else {
if(!obj) {
log.print('EP', 'No Event Modules found in DB!');
//process.send({ event: 'ep_finished_loading' });
} else {
var m, semaphore = 0;
for(var el in obj) {
semaphore++;
log.print('EP', 'Loading Event Module: ' + el);
m = ml.requireFromString(obj[el], el);
db.getEventModuleAuth(el, function(mod) {
return function(err, obj) {
//if(--semaphore === 0) process.send({ event: 'ep_finished_loading' });
if(obj && mod.loadCredentials) mod.loadCredentials(JSON.parse(obj));
};
}(m));
listEventModules[el] = m;
}
}
// log.print('EP', 'Loading Event Module: ' + el);
var m = ml.requireFromString(obj, el);
db.getEventModuleAuth(el, function(mod) {
return function(err, objA) {
//TODO authentication needs to be done differently
if(objA && mod.loadCredentials) mod.loadCredentials(JSON.parse(objA));
};
}(m));
listEventModules[el] = m;
if(typeof cb === 'function') cb(null, m);
}
});
}
function fetchPollFunctionFromModule(mod, func) {
for(var i = 1; i < func.length; i++) {
if(mod) mod = mod[func[i]];
}
if(mod) {
log.print('EP', 'Found active event module "' + func.join('->') + '", adding it to polling list');
//FIXME change this to [module][prop] = module; because like this identical properties get overwritten
// also add some on a per user basis information because this should go into a user context for the users
// that sat up this rule!
listPoll[func.join('->')] = mod;
} else {
log.print('EP', 'No property "' + func.join('->') + '" found');
}
}
function initMessageActions() {
listMessageActions['event'] = function(args) {
var prop = args[1], arrModule = prop.split('->');
// var arrModule = obj.module.split('->');
if(arrModule.length > 1){
var module = listEventModules[arrModule[0]];
for(var i = 1; i < arrModule.length; i++) {
if(module) module = module[arrModule[i]];
}
if(module) {
log.print('EP', 'Found active event module "' + prop + '", adding it to polling list');
//FIXME change this to [module][prop] = module; because like this identical properties get overwritten
// also add some on a per user basis information because this should go into a user context for the users
// that sat up this rule!
listPoll[prop] = module;
if(listEventModules[arrModule[0]]) {
fetchPollFunctionFromModule(listEventModules[arrModule[0]], arrModule);
} else {
log.print('EP', 'No property "' + prop + '" found');
log.print('EP', 'Event Module ' + arrModule[0] + ' needs to be loaded, doing it now...');
loadEventModule(arrModule[0], function(err, obj) {
if(err || !obj) log.error('EP', 'Event Module "' + arrModule[0] + '" not found: ' + err);
else {
log.print('EP', 'Event Module ' + arrModule[0] + ' found and loaded');
fetchPollFunctionFromModule(obj, arrModule);
}
});
}
}
};
@ -96,15 +99,8 @@ function initMessageActions() {
}
});
}
//FIXME the eventpoller doesn't do the loading! this is done by the module manager,
// the ep only gets notified of new rules that require active polling!
function initAdminCommands() {
listAdminCommands['loadevent'] = function(args) {
if(ml) ml.loadModule('mod_events', args[2], loadEventCallback);
};
listAdminCommands['loadevents'] = function(args) {
if(ml) ml.loadModules('mod_events', loadEventCallback);
};
listAdminCommands['shutdown'] = function(args) {
log.print('EP', 'Shutting down DB Link');
isRunning = false;
@ -112,14 +108,6 @@ function initAdminCommands() {
};
}
function loadEventCallback(name, data, mod, auth) {
if(db) {
db.storeEventModule(name, data); // store module in db
if(auth) db.storeEventModuleAuth(name, auth);
listEventModules[name] = mod; // store compiled module for polling
}
}
function checkRemotes() {
for(var prop in listPoll) {
try {

View file

@ -9,12 +9,14 @@
* - 1 file
* - 2 silent
*/
var logTypes = [ flushToConsole, flushToFile, null],
logType = 0, logFile;
var fs = require('fs'),
logTypes = [ flushToConsole, flushToFile, null],
logType = 0, logFile = './server.log';
exports = module.exports = function(args) {
args = args || {};
if(args.logType) logType = parseInt(args.logType) || 0;
if(logType == 1) fs.truncateSync(logFile, 0);
return module.exports;
};
@ -30,7 +32,7 @@ function flushToConsole(err, msg) {
}
function flushToFile(err, msg) {
fs.appendFile('./server.log', msg, function (err) {});
fs.appendFile(logFile, msg + '\n', function (err) {});
}
// @function print(module, msg)
@ -51,11 +53,11 @@ exports.print = function(module, msg) {
*/
function printError(module, err, isSevere) {
var ts = (new Date()).toISOString() + ' | ', ai = '';
if(typeof err === 'string') {
var e = new Error();
if(module) flush(true, ts + module + ' | ERROR AND BAD HANDLING: ' + err + '\n' + e.stack);
else flush(true, ts + '!N/A! | ERROR, BAD HANDLING AND NO MODULE NAME: ' + err + '\n' + e.stack);
} else if(err) {
if(!err) err = new Error('Unexpected error');
if(typeof err === 'string') err = new Error(err);
// if(module) flush(true, ts + module + ' | ERROR AND BAD HANDLING: ' + err + '\n' + e.stack);
// else flush(true, ts + '!N/A! | ERROR, BAD HANDLING AND NO MODULE NAME: ' + err + '\n' + e.stack);
// } else if(err) {
if(err.addInfo) ai = ' (' + err.addInfo + ')';
if(!err.message) err.message = 'UNKNOWN REASON!\n' + err.stack;
if(module) {
@ -63,10 +65,10 @@ function printError(module, err, isSevere) {
if(isSevere) msg += '\n' + err.stack;
flush(true, msg);
} else flush(true, ts + '!N/A! | ERROR AND NO MODULE NAME'+ai+': ' + err.message + '\n' + err.stack);
} else {
var e = new Error('Unexpected error');
flush(true, e.message + ': \n' + e.stack);
}
// } else {
// var e = new Error('Unexpected error');
// flush(true, e.message + ': \n' + e.stack);
// }
};
/**

View file

@ -4,7 +4,7 @@
> phase and on user request.
> Event and Action modules are loaded as strings and stored in the database,
> then compiled into node modules and and rules
> then compiled into node modules and rules
*/
'use strict';
@ -26,33 +26,12 @@ exports.addHandlers = function(db_link, fLoadAction, fLoadRule) {
funcLoadAction = fLoadAction;
funcLoadRule = fLoadRule;
};
/*
# A First Level Header
A Second Level Header
---------------------
Now is the time for all good men to come to
the aid of their country. This is just a
regular paragraph.
The quick brown fox jumped over the lazy
dog's back.
### Header 3
> This is a blockquote.
>
> This is the second paragraph in the blockquote.
>
> ## This is an H2 in a blockquote
This is the function documentation
@param {Object} [args] the optional arguments
@param {String} [args.name] the optional name in the arguments
* Load Rules from fs
* ------------------
*/
exports.loadRulesFile = function(args, answHandler) {
exports.loadRulesFromFS = function(args, answHandler) {
if(!args) args = {};
if(!args.name) args.name = 'rules';
if(!funcLoadRule) log.error('ML', 'no rule loader function available');
@ -78,6 +57,11 @@ exports.loadRulesFile = function(args, answHandler) {
}
};
/*
* Load Action Modules from fs
* ---------------------------
*/
/**
*
* @param {Object} name
@ -87,21 +71,49 @@ exports.loadRulesFile = function(args, answHandler) {
*/
function loadActionCallback(name, data, mod, auth) {
db.storeActionModule(name, data); // store module in db
funcLoadAction(name, mod); // hand compiled module back
funcLoadAction(name, mod); // hand back compiled module
if(auth) db.storeActionModuleAuth(name, auth);
}
exports.loadActionModule = function (args, answHandler) {
if(ml && args && args.name) {
answHandler.answerSuccess('Loading action module ' + args.name + '...');
ml.loadModule('mod_actions', args.name, loadActionCallback);
exports.loadActionModuleFromFS = function (args, answHandler) {
if(ml) {
if(args && args.name) {
answHandler.answerSuccess('Loading action module ' + args.name + '...');
ml.loadModule('mod_actions', args.name, loadActionCallback);
} else log.error('MM', 'Action Module name not provided!');
}
};
exports.loadActionModules = function(args, answHandler) {
exports.loadActionModulesFromFS = function(args, answHandler) {
if(ml) {
answHandler.answerSuccess('Loading action modules...');
ml.loadModules('mod_actions', loadActionCallback);
}
};
/*
* Load Event Modules from fs
* --------------------------
*/
function loadEventCallback(name, data, mod, auth) {
if(db) {
db.storeEventModule(name, data); // store module in db
if(auth) db.storeEventModuleAuth(name, auth);
}
}
exports.loadEventModuleFromFS = function(args, answHandler) {
if(ml) {
if(args && args.name) {
answHandler.answerSuccess('Loading event module ' + args.name + '...');
ml.loadModule('mod_events', args.name, loadEventCallback);
} else log.error('MM', 'Event Module name not provided!');
}
};
exports.loadEventModulesFromFS = function(args, answHandler) {
answHandler.answerSuccess('Loading event moules...');
ml.loadModules('mod_actions', loadEventCallback);
};

View file

@ -51,11 +51,11 @@ function init() {
log.print('RS', 'Initialzing DB');
db = require('./db_interface')(args);
objCmds = {
'loadrules': mm.loadRulesFile,
'loadaction': mm.loadActionModule,
'loadactions': mm.loadActionModules,
'loadevent': engine.loadEventModule,
'loadevents': engine.loadEventModules,
'loadrules': mm.loadRulesFromFS,
'loadaction': mm.loadActionModuleFromFS,
'loadactions': mm.loadActionModulesFromFS,
'loadevent': mm.loadEventModuleFromFS,
'loadevents': mm.loadEventModulesFromFS,
'shutdown': shutDown
};
log.print('RS', 'Initialzing engine');
@ -63,7 +63,7 @@ function init() {
log.print('RS', 'Initialzing http listener');
http_listener.addHandlers(handleAdminCommands, engine.pushEvent);
log.print('RS', 'Initialzing module manager');
mm.addHandlers(db, engine.loadActionModule, engine.loadRule);
mm.addHandlers(db, engine.loadActionModule, engine.addRule);
//FIXME load actions and events, then rules, do this here, visible for everybody on the first glance
//TODO for such events we should forge the architecture more into an event driven one
}