Event Poller also in coffee, now everything is set up and ready to be thoroughly tested

This commit is contained in:
Dominic Bosch 2014-04-04 23:57:22 +02:00
parent 418d5441fe
commit 21ff603f36
8 changed files with 306 additions and 380 deletions

View file

@ -8,6 +8,7 @@ Dynamic Modules
# **Loads Modules:**
# - [Persistence](persistence.html)
db = require './persistence'
# - Node.js Modules: [vm](http://nodejs.org/api/vm.html) and
@ -53,6 +54,21 @@ exports = module.exports = ( args ) =>
exports.getPublicKey = () =>
@strPublicKey
issueApiCall = ( method, url, credentials, cb ) =>
try
if method is 'get'
func = needle.get
else
func = needle.post
func url, credentials, ( err, resp, body ) =>
if not err
cb body
else
cb()
catch err
@log.info 'DM | Error even before calling!'
###
Try to run a JS module from a string, together with the
given parameters. If it is written in CoffeeScript we
@ -92,10 +108,11 @@ exports.compileString = ( src, userId, ruleId, modId, lang, dbMod, cb ) =>
params = {}
else
params = {}
sandbox =
id: userId + '.' + modId + '.vm'
params: params
needle: needle
apicall: issueApiCall
log: logFunction userId, ruleId, modId
# debug: console.log
exports: {}

140
coffee/event-poller.coffee Normal file
View file

@ -0,0 +1,140 @@
###
Dynamic Modules
===============
> Compiles CoffeeScript modules and loads JS modules in a VM, together
> with only a few allowed node.js modules.
###
# **Loads Modules:**
# - [Logging](logging.html), [Persistence](persistence.html)
# and [Dynamic Modules](dynamic-modules.html)
logger = require './logging'
db = require './persistence'
dynmod = require './dynamic-modules'
# If we do not receive all required arguments we shut down immediately
if process.argv.length < 7
console.error 'Not all arguments have been passed!'
process.exit()
# Fetch all the command line arguments to the process to init the logger
logconf =
mode: process.argv[ 2 ]
nolog: process.argv[ 6 ]
logconf[ 'io-level' ] = process.argv[ 3 ]
logconf[ 'file-level' ] = process.argv[ 4 ]
logconf[ 'file-path' ] = process.argv[ 5 ]
log = logger.getLogger logconf
log.info 'EP | Event Poller starts up'
# Initialize required modules (should be in cache already)
db logger: log
dynmod logger: log
# Initialize module local variables and
listUserModules = {}
isRunning = true
# Register disconnect action. Since no standalone mode is intended
# the event poller will shut down
process.on 'disconnect', () ->
log.info 'EP | Shutting down Event Poller'
isRunning = false
# very important so the process doesnt linger on when the paren process is killed
process.exit()
# If the process receives a message it is concerning the rules
process.on 'message', ( msg ) ->
# Let's split the event string to find module and function in an array
# A initialization notification or a new rule
if msg.event is 'new' or msg.event is 'init'
fLoadModule msg
# We fetch the module also if the rule was updated
# A rule was deleted
if msg.event is 'del'
delete listUserModules[msg.user][msg.ruleId]
if JSON.stringify( listUserModules[msg.user] ) is "{}"
delete listUserModules[msg.user]
# Loads a module if required
fLoadModule = ( msg ) ->
arrName = msg.rule.event.split ' -> '
fAnonymous = () ->
db.eventPollers.getModule arrName[ 0 ], ( err, obj ) ->
if not obj
log.warn "EP | Strange... no module retrieved: #{ arrName[0] }"
else
# we compile the module and pass:
dynmod.compileString obj.data, # code
msg.user, # userId
msg.rule.id, # ruleId
arrName[0], # moduleId
obj.lang, # script language
db.eventPollers, # the DB interface
( result ) ->
if not result.answ is 200
log.error "EP | Compilation of code failed! #{ msg.user },
#{ msg.rule.id }, #{ arrName[0] }"
# If user is not yet stored, we open a new object
if not listUserModules[msg.user]
listUserModules[msg.user] = {}
# We open up a new object for the rule it
listUserModules[msg.user][msg.rule.id] =
id: msg.rule.event
pollfunc: arrName[1]
module: result.module
log.info "EP | New event module loaded! #{ msg.user },
#{ msg.rule.id }, #{ arrName[0] }"
if msg.event is 'new' or
not listUserModules[msg.user] or
not listUserModules[msg.user][msg.rule.id]
fAnonymous()
###
This function will loop infinitely every 10 seconds until isRunning is set to false
@private pollLoop()
###
pollLoop = () ->
# We only loop if we're running
if isRunning
# Go through all users
for userName, oRules of listUserModules
# Go through each of the users modules
for ruleName, myRule of oRules
# This is the to be polled function
fPoll = myRule.module[myRule.pollfunc]
# We have to register the poll function in belows anonymous function
# because we're fast iterating through the listUserModules and references will
# eventually not be what they are expected to be
fRegisterModuleReference = ( ruleId, userId, eventId ) ->
( obj ) ->
db.pushEvent
event: eventId
eventid: "polled #{ eventId } #{ userId }_#{ ( new Date ).toISOString() }"
payload: obj
try
fPoll fRegisterModuleReference ruleName, userName, myRule.id
catch err
log.info 'EP | ERROR encountered during polling!'
log.info err
setTimeout pollLoop, 10000
# Finally if everything initialized we start polling for new events
pollLoop()

View file

@ -44,6 +44,9 @@ exports = module.exports = ( args ) =>
exports.actionInvokers = new IndexedModules 'action-invoker', @log
exports.initPort args[ 'db-port' ]
exports.getLogger = () =>
@log
exports.initPort = ( port ) =>
@connRefused = false
@db?.quit()

View file

@ -9,7 +9,7 @@ Dynamic Modules
*/
(function() {
var cryptico, cs, db, exports, needle, vm;
var cryptico, cs, db, exports, issueApiCall, needle, vm;
db = require('./persistence');
@ -52,6 +52,29 @@ Dynamic Modules
};
})(this);
issueApiCall = (function(_this) {
return function(method, url, credentials, cb) {
var err, func;
try {
if (method === 'get') {
func = needle.get;
} else {
func = needle.post;
}
return func(url, credentials, function(err, resp, body) {
if (!err) {
return cb(body);
} else {
return cb();
}
});
} catch (_error) {
err = _error;
return _this.log.info('DM | Error even before calling!');
}
};
})(this);
/*
Try to run a JS module from a string, together with the
@ -104,7 +127,7 @@ Dynamic Modules
sandbox = {
id: userId + '.' + modId + '.vm',
params: params,
needle: needle,
apicall: issueApiCall,
log: logFunction(userId, ruleId, modId),
exports: {}
};

View file

@ -1,167 +1,139 @@
// # Event Poller
// Generated by CoffeeScript 1.7.1
'use strict';
/*
var logger = require('./logging'),
listMessageActions = {},
listAdminCommands = {},
listEventModules = {},
listPoll = {}, //TODO this will change in the future because it could have
//several parameterized (user-specific) instances of each event module
log,
isRunning = true,
eId = 0,
db, ml;
Dynamic Modules
===============
> Compiles CoffeeScript modules and loads JS modules in a VM, together
> with only a few allowed node.js modules.
*/
//TODO allow different polling intervals (a wrapper together with settimeout per to be polled could be an easy and solution)
(function() {
var db, dynmod, fLoadModule, isRunning, listUserModules, log, logconf, logger, pollLoop;
// FIXME Eventually we don't even need to pass these arguments because they are anyways cached even over child_processes
logger = require('./logging');
function init() {
if(process.argv.length < 7){
db = require('./persistence');
dynmod = require('./dynamic-modules');
if (process.argv.length < 7) {
console.error('Not all arguments have been passed!');
process.exit();
}
var logconf = {}
logconf['mode'] = process.argv[2]
logconf['io-level'] = process.argv[3]
logconf['file-level'] = process.argv[4]
logconf['file-path'] = process.argv[5]
logconf['nolog'] = process.argv[6]
logconf = {
mode: process.argv[2],
nolog: process.argv[6]
};
logconf['io-level'] = process.argv[3];
logconf['file-level'] = process.argv[4];
logconf['file-path'] = process.argv[5];
log = logger.getLogger(logconf);
var args = { logger: log };
(ml = require('./components-manager'))(args);
(db = require('./persistence'))(args);
initMessageActions();
pollLoop();
log.info('Event Poller instantiated');
};
function shutDown() {
log.info('EP', 'Shutting down DB Link');
isRunning = false;
if(db) db.shutDown();
process.exit();
}
log.info('EP | Event Poller starts up');
function loadEventModule(el, cb) {
if(db && ml) db.getEventModule(el, function(err, obj) {
if(err || !obj) {
if(typeof cb === 'function') cb(new Error('Retrieving Event Module ' + el + ' from DB: ' + err));
else log.error('EP', 'Retrieving Event Module ' + el + ' from DB!');
db({
logger: log
});
dynmod({
logger: log
});
listUserModules = {};
isRunning = true;
process.on('disconnect', function() {
log.info('EP | Shutting down Event Poller');
isRunning = false;
return process.exit();
});
process.on('message', function(msg) {
if (msg.event === 'new' || msg.event === 'init') {
fLoadModule(msg);
}
else {
// log.info('EP', 'Loading Event Module: ' + el);
try {
var m = ml.requireFromString(obj, el);
db.getEventModuleAuth(el, function(mod) {
return function(err, objA) {
//TODO authentication needs to be done differently
if(objA && mod.loadCredentials) mod.loadCredentials(JSON.parse(objA));
};
}(m));
listEventModules[el] = m;
if(typeof cb === 'function') cb(null, m);
} catch(e) {
if(typeof cb === 'function') cb(e);
else log.error(e);
if (msg.event === 'del') {
delete listUserModules[msg.user][msg.ruleId];
if (JSON.stringify(listUserModules[msg.user]) === "{}") {
return delete listUserModules[msg.user];
}
}
});
}
function fetchPollFunctionFromModule(mod, func) {
for(var i = 1; i < func.length; i++) {
if(mod) mod = mod[func[i]];
}
if(mod) {
log.info('EP', 'Found active event module "' + func.join('->') + '", adding it to polling list');
//FIXME change this to [module][prop] = module; because like this identical properties get overwritten
// also add some on a per user basis information because this should go into a user context for the users
// that sat up this rule!
listPoll[func.join('->')] = mod;
} else {
log.info('EP', 'No property "' + func.join('->') + '" found');
}
}
function initMessageActions() {
listMessageActions['event'] = function(args) {
var prop = args[1], arrModule = prop.split('->');
if(arrModule.length > 1){
if(listEventModules[arrModule[0]]) {
fetchPollFunctionFromModule(listEventModules[arrModule[0]], arrModule);
} else {
log.info('EP', 'Event Module ' + arrModule[0] + ' needs to be loaded, doing it now...');
loadEventModule(arrModule[0], function(err, obj) {
if(err || !obj) log.error('EP', 'Event Module "' + arrModule[0] + '" not found: ' + err);
else {
log.info('EP', 'Event Module ' + arrModule[0] + ' found and loaded');
fetchPollFunctionFromModule(obj, arrModule);
}
});
}
fLoadModule = function(msg) {
var arrName, fAnonymous;
arrName = msg.rule.event.split(' -> ');
fAnonymous = function() {
return db.eventPollers.getModule(arrName[0], function(err, obj) {
if (!obj) {
return log.warn("EP | Strange... no module retrieved: " + arrName[0]);
} else {
return dynmod.compileString(obj.data, msg.user, msg.rule.id, arrName[0], obj.lang, db.eventPollers, function(result) {
if (!result.answ === 200) {
log.error("EP | Compilation of code failed! " + msg.user + ", " + msg.rule.id + ", " + arrName[0]);
}
if (!listUserModules[msg.user]) {
listUserModules[msg.user] = {};
}
listUserModules[msg.user][msg.rule.id] = {
id: msg.rule.event,
pollfunc: arrName[1],
module: result.module
};
return log.info("EP | New event module loaded! " + msg.user + ", " + msg.rule.id + ", " + arrName[0]);
});
}
});
};
if (msg.event === 'new' || !listUserModules[msg.user] || !listUserModules[msg.user][msg.rule.id]) {
return fAnonymous();
}
};
/*
This function will loop infinitely every 10 seconds until isRunning is set to false
process.on('message', function( msg ) {
console.log( 'EVENT POLLER GOT MESSAGE!');
console.log( typeof msg);
console.log(msg);
// var arrProps = obj .split('|');
// if(arrProps.length < 2) log.error('EP', 'too few parameter in message!');
// else {
// var func = listMessageActions[arrProps[0]];
// if(func) func(arrProps);
// }
console.log('EP internal event handled');
});
@private pollLoop()
*/
// very important so the process doesnt linger on when the paren process is killed
process.on('disconnect', shutDown);
}
function checkRemotes() {
for(var prop in listPoll) {
try {
listPoll[prop](
/*
* define and immediately call anonymous function with param prop.
* This places the value of prop into the context of the callback
* and thus doesn't change when the for loop keeps iterating over listPoll
*/
(function(p) {
return function(err, obj) {
if(err) {
err.additionalInfo = 'module: ' + p;
log.error('EP', err);
} else {
// FIXME this needs to be pushed into the db not passed to the process!
console.error('eventpoller needs to push event into db queue')
// process.send({
// event: p,
// eventid: 'polled_' + eId++,
// payload: obj
// });
}
pollLoop = function() {
var err, fPoll, fRegisterModuleReference, myRule, oRules, ruleName, userName;
if (isRunning) {
for (userName in listUserModules) {
oRules = listUserModules[userName];
for (ruleName in oRules) {
myRule = oRules[ruleName];
fPoll = myRule.module[myRule.pollfunc];
fRegisterModuleReference = function(ruleId, userId, eventId) {
return function(obj) {
return db.pushEvent({
event: eventId,
eventid: "polled " + eventId + " " + userId + "_" + ((new Date).toISOString()),
payload: obj
});
};
};
})(prop)
);
} catch (e) {
log.error('EP', e);
try {
fPoll(fRegisterModuleReference(ruleName, userName, myRule.id));
} catch (_error) {
err = _error;
log.info('EP | ERROR encountered during polling!');
log.info(err);
}
}
}
return setTimeout(pollLoop, 10000);
}
}
}
};
function pollLoop() {
if(isRunning) {
checkRemotes();
setTimeout(pollLoop, 10000);
}
}
init();
pollLoop();
}).call(this);

View file

@ -50,6 +50,12 @@ Persistence
};
})(this);
exports.getLogger = (function(_this) {
return function() {
return _this.log;
};
})(this);
exports.initPort = (function(_this) {
return function(port) {
var _ref;

View file

@ -1,81 +0,0 @@
'use strict';
var log = require('./logging'),
objCmds = {
addUser: addUser,
getUser: getUser,
delUser: delUser,
addRule: addRule,
getRules: getRules,
delRule: delRule
};
exports = module.exports = function(args) {
args = args || {};
log(args);
return module.exports;
};
exports.handleCommand = function(args, cb) {
if(!args.cmd) {
var e = new Error('No command defined!');
if(typeof cb === 'function') cb(e);
else log.error('US', e);
} else {
objCmds[args.cmd](args, cb);
}
};
/**
*
* @param {Object} args
* @param {function} cb
*/
function addUser(args, cb) {
}
/**
*
* @param {Object} args
* @param {function} cb
*/
function getUser(args, cb) {
}
/**
*
* @param {Object} args
* @param {function} cb
*/
function delUser(args, cb) {
}
/**
*
* @param {Object} args
* @param {function} cb
*/
function addRule(args, cb) {
}
/**
*
* @param {Object} args
* @param {function} cb
*/
function getRule(args, cb) {
}
/**
*
* @param {Object} args
* @param {function} cb
*/
function delRule(args, cb) {
}

View file

@ -1,154 +0,0 @@
// # Event Poller
'use strict';
var fs = require('fs'),
path = require('path'),
log = require('./logging'),
listMessageActions = {},
listAdminCommands = {},
listEventModules = {},
listPoll = {}, //TODO this will change in the future because it could have
//several parameterized (user-specific) instances of each event module
isRunning = true,
eId = 0,
db, ml;
//TODO allow different polling intervals (a wrapper together with settimeout per to be polled could be an easy and solution)
function init() {
if(process.argv.length > 2) log({ logType: parseInt(process.argv[2]) || 0 });
var args = { logType: log.getLogType() };
ml = require('./module_loader')(args);
db = require('./db_interface')(args);
initAdminCommands();
initMessageActions();
pollLoop();
};
function loadEventModule(el, cb) {
if(db && ml) db.getEventModule(el, function(err, obj) {
if(err || !obj) {
if(typeof cb === 'function') cb(new Error('Retrieving Event Module ' + el + ' from DB: ' + err));
else log.error('EP', 'Retrieving Event Module ' + el + ' from DB!');
}
else {
// log.print('EP', 'Loading Event Module: ' + el);
try {
var m = ml.requireFromString(obj, el);
db.getEventModuleAuth(el, function(mod) {
return function(err, objA) {
//TODO authentication needs to be done differently
if(objA && mod.loadCredentials) mod.loadCredentials(JSON.parse(objA));
};
}(m));
listEventModules[el] = m;
if(typeof cb === 'function') cb(null, m);
} catch(e) {
if(typeof cb === 'function') cb(e);
else log.error(e);
}
}
});
}
function fetchPollFunctionFromModule(mod, func) {
for(var i = 1; i < func.length; i++) {
if(mod) mod = mod[func[i]];
}
if(mod) {
log.print('EP', 'Found active event module "' + func.join('->') + '", adding it to polling list');
//FIXME change this to [module][prop] = module; because like this identical properties get overwritten
// also add some on a per user basis information because this should go into a user context for the users
// that sat up this rule!
listPoll[func.join('->')] = mod;
} else {
log.print('EP', 'No property "' + func.join('->') + '" found');
}
}
function initMessageActions() {
listMessageActions['event'] = function(args) {
var prop = args[1], arrModule = prop.split('->');
if(arrModule.length > 1){
if(listEventModules[arrModule[0]]) {
fetchPollFunctionFromModule(listEventModules[arrModule[0]], arrModule);
} else {
log.print('EP', 'Event Module ' + arrModule[0] + ' needs to be loaded, doing it now...');
loadEventModule(arrModule[0], function(err, obj) {
if(err || !obj) log.error('EP', 'Event Module "' + arrModule[0] + '" not found: ' + err);
else {
log.print('EP', 'Event Module ' + arrModule[0] + ' found and loaded');
fetchPollFunctionFromModule(obj, arrModule);
}
});
}
}
};
//TODO this goes into module_manager, this will receive notification about
// new loaded/stored event modules and fetch them from the db
listMessageActions['cmd'] = function(args) {
var func = listAdminCommands[args[1]];
if(typeof(func) === 'function') func(args);
};
process.on('message', function(strProps) {
var arrProps = strProps.split('|');
if(arrProps.length < 2) log.error('EP', 'too few parameter in message!');
else {
var func = listMessageActions[arrProps[0]];
if(func) func(arrProps);
}
});
}
function initAdminCommands() {
listAdminCommands['shutdown'] = function(args) {
log.print('EP', 'Shutting down DB Link');
isRunning = false;
if(db) db.shutDown();
};
}
function checkRemotes() {
for(var prop in listPoll) {
try {
listPoll[prop](
/*
* define and immediately call anonymous function with param prop.
* This places the value of prop into the context of the callback
* and thus doesn't change when the for loop keeps iterating over listPoll
* TODO add this example to the documentation and elaborate
*/
(function(p) {
return function(err, obj) {
if(err) {
err.additionalInfo = 'module: ' + p;
log.error('EP', err);
} else {
process.send({
event: p,
eventid: 'polled_' + eId++,
payload: obj
});
}
};
})(prop)
);
} catch (e) {
log.error('EP', e);
}
}
}
function pollLoop() {
if(isRunning) {
checkRemotes();
setTimeout(pollLoop, 10000);
}
}
init();