From 21ff603f3653067acb937fb1fed62c08efc65876 Mon Sep 17 00:00:00 2001 From: Dominic Bosch Date: Fri, 4 Apr 2014 23:57:22 +0200 Subject: [PATCH] Event Poller also in coffee, now everything is set up and ready to be thoroughly tested --- coffee/dynamic-modules.coffee | 19 ++- coffee/event-poller.coffee | 140 +++++++++++++++++++ coffee/persistence.coffee | 3 + js-coffee/dynamic-modules.js | 27 +++- js-coffee/event-poller.js | 256 +++++++++++++++------------------- js-coffee/persistence.js | 6 + js-coffee/users.js | 81 ----------- js/eventpoller.js | 154 -------------------- 8 files changed, 306 insertions(+), 380 deletions(-) create mode 100644 coffee/event-poller.coffee delete mode 100644 js-coffee/users.js diff --git a/coffee/dynamic-modules.coffee b/coffee/dynamic-modules.coffee index 100b6f1..0d09fd2 100644 --- a/coffee/dynamic-modules.coffee +++ b/coffee/dynamic-modules.coffee @@ -8,6 +8,7 @@ Dynamic Modules # **Loads Modules:** +# - [Persistence](persistence.html) db = require './persistence' # - Node.js Modules: [vm](http://nodejs.org/api/vm.html) and @@ -53,6 +54,21 @@ exports = module.exports = ( args ) => exports.getPublicKey = () => @strPublicKey + +issueApiCall = ( method, url, credentials, cb ) => + try + if method is 'get' + func = needle.get + else + func = needle.post + + func url, credentials, ( err, resp, body ) => + if not err + cb body + else + cb() + catch err + @log.info 'DM | Error even before calling!' ### Try to run a JS module from a string, together with the given parameters. If it is written in CoffeeScript we @@ -92,10 +108,11 @@ exports.compileString = ( src, userId, ruleId, modId, lang, dbMod, cb ) => params = {} else params = {} + sandbox = id: userId + '.' + modId + '.vm' params: params - needle: needle + apicall: issueApiCall log: logFunction userId, ruleId, modId # debug: console.log exports: {} diff --git a/coffee/event-poller.coffee b/coffee/event-poller.coffee new file mode 100644 index 0000000..636b3ce --- /dev/null +++ b/coffee/event-poller.coffee @@ -0,0 +1,140 @@ +### + +Dynamic Modules +=============== +> Compiles CoffeeScript modules and loads JS modules in a VM, together +> with only a few allowed node.js modules. +### + +# **Loads Modules:** + +# - [Logging](logging.html), [Persistence](persistence.html) +# and [Dynamic Modules](dynamic-modules.html) +logger = require './logging' +db = require './persistence' +dynmod = require './dynamic-modules' + +# If we do not receive all required arguments we shut down immediately +if process.argv.length < 7 + console.error 'Not all arguments have been passed!' + process.exit() + +# Fetch all the command line arguments to the process to init the logger +logconf = + mode: process.argv[ 2 ] + nolog: process.argv[ 6 ] +logconf[ 'io-level' ] = process.argv[ 3 ] +logconf[ 'file-level' ] = process.argv[ 4 ] +logconf[ 'file-path' ] = process.argv[ 5 ] +log = logger.getLogger logconf +log.info 'EP | Event Poller starts up' + +# Initialize required modules (should be in cache already) +db logger: log +dynmod logger: log + +# Initialize module local variables and +listUserModules = {} +isRunning = true + +# Register disconnect action. Since no standalone mode is intended +# the event poller will shut down +process.on 'disconnect', () -> + log.info 'EP | Shutting down Event Poller' + isRunning = false + # very important so the process doesnt linger on when the paren process is killed + process.exit() + +# If the process receives a message it is concerning the rules +process.on 'message', ( msg ) -> + + # Let's split the event string to find module and function in an array + + # A initialization notification or a new rule + if msg.event is 'new' or msg.event is 'init' + fLoadModule msg + # We fetch the module also if the rule was updated + + # A rule was deleted + if msg.event is 'del' + delete listUserModules[msg.user][msg.ruleId] + if JSON.stringify( listUserModules[msg.user] ) is "{}" + delete listUserModules[msg.user] + +# Loads a module if required +fLoadModule = ( msg ) -> + arrName = msg.rule.event.split ' -> ' + fAnonymous = () -> + db.eventPollers.getModule arrName[ 0 ], ( err, obj ) -> + if not obj + log.warn "EP | Strange... no module retrieved: #{ arrName[0] }" + else + # we compile the module and pass: + dynmod.compileString obj.data, # code + msg.user, # userId + msg.rule.id, # ruleId + arrName[0], # moduleId + obj.lang, # script language + db.eventPollers, # the DB interface + ( result ) -> + if not result.answ is 200 + log.error "EP | Compilation of code failed! #{ msg.user }, + #{ msg.rule.id }, #{ arrName[0] }" + + # If user is not yet stored, we open a new object + if not listUserModules[msg.user] + listUserModules[msg.user] = {} + + # We open up a new object for the rule it + listUserModules[msg.user][msg.rule.id] = + id: msg.rule.event + pollfunc: arrName[1] + module: result.module + + log.info "EP | New event module loaded! #{ msg.user }, + #{ msg.rule.id }, #{ arrName[0] }" + + if msg.event is 'new' or + not listUserModules[msg.user] or + not listUserModules[msg.user][msg.rule.id] + fAnonymous() + + +### +This function will loop infinitely every 10 seconds until isRunning is set to false + +@private pollLoop() +### +pollLoop = () -> + # We only loop if we're running + if isRunning + + # Go through all users + for userName, oRules of listUserModules + + # Go through each of the users modules + for ruleName, myRule of oRules + + # This is the to be polled function + fPoll = myRule.module[myRule.pollfunc] + + # We have to register the poll function in belows anonymous function + # because we're fast iterating through the listUserModules and references will + # eventually not be what they are expected to be + fRegisterModuleReference = ( ruleId, userId, eventId ) -> + ( obj ) -> + db.pushEvent + event: eventId + eventid: "polled #{ eventId } #{ userId }_#{ ( new Date ).toISOString() }" + payload: obj + + try + fPoll fRegisterModuleReference ruleName, userName, myRule.id + catch err + log.info 'EP | ERROR encountered during polling!' + log.info err + setTimeout pollLoop, 10000 + + +# Finally if everything initialized we start polling for new events +pollLoop() \ No newline at end of file diff --git a/coffee/persistence.coffee b/coffee/persistence.coffee index 44abdf0..076ea26 100644 --- a/coffee/persistence.coffee +++ b/coffee/persistence.coffee @@ -44,6 +44,9 @@ exports = module.exports = ( args ) => exports.actionInvokers = new IndexedModules 'action-invoker', @log exports.initPort args[ 'db-port' ] +exports.getLogger = () => + @log + exports.initPort = ( port ) => @connRefused = false @db?.quit() diff --git a/js-coffee/dynamic-modules.js b/js-coffee/dynamic-modules.js index 19ae4a0..543f47f 100644 --- a/js-coffee/dynamic-modules.js +++ b/js-coffee/dynamic-modules.js @@ -9,7 +9,7 @@ Dynamic Modules */ (function() { - var cryptico, cs, db, exports, needle, vm; + var cryptico, cs, db, exports, issueApiCall, needle, vm; db = require('./persistence'); @@ -52,6 +52,29 @@ Dynamic Modules }; })(this); + issueApiCall = (function(_this) { + return function(method, url, credentials, cb) { + var err, func; + try { + if (method === 'get') { + func = needle.get; + } else { + func = needle.post; + } + return func(url, credentials, function(err, resp, body) { + if (!err) { + return cb(body); + } else { + return cb(); + } + }); + } catch (_error) { + err = _error; + return _this.log.info('DM | Error even before calling!'); + } + }; + })(this); + /* Try to run a JS module from a string, together with the @@ -104,7 +127,7 @@ Dynamic Modules sandbox = { id: userId + '.' + modId + '.vm', params: params, - needle: needle, + apicall: issueApiCall, log: logFunction(userId, ruleId, modId), exports: {} }; diff --git a/js-coffee/event-poller.js b/js-coffee/event-poller.js index 12cd80f..9b4121e 100644 --- a/js-coffee/event-poller.js +++ b/js-coffee/event-poller.js @@ -1,167 +1,139 @@ -// # Event Poller +// Generated by CoffeeScript 1.7.1 -'use strict'; +/* -var logger = require('./logging'), - listMessageActions = {}, - listAdminCommands = {}, - listEventModules = {}, - listPoll = {}, //TODO this will change in the future because it could have - //several parameterized (user-specific) instances of each event module - log, - isRunning = true, - eId = 0, - db, ml; +Dynamic Modules +=============== +> Compiles CoffeeScript modules and loads JS modules in a VM, together +> with only a few allowed node.js modules. + */ -//TODO allow different polling intervals (a wrapper together with settimeout per to be polled could be an easy and solution) +(function() { + var db, dynmod, fLoadModule, isRunning, listUserModules, log, logconf, logger, pollLoop; -// FIXME Eventually we don't even need to pass these arguments because they are anyways cached even over child_processes + logger = require('./logging'); -function init() { - if(process.argv.length < 7){ + db = require('./persistence'); + + dynmod = require('./dynamic-modules'); + + if (process.argv.length < 7) { console.error('Not all arguments have been passed!'); process.exit(); } - var logconf = {} - logconf['mode'] = process.argv[2] - logconf['io-level'] = process.argv[3] - logconf['file-level'] = process.argv[4] - logconf['file-path'] = process.argv[5] - logconf['nolog'] = process.argv[6] + logconf = { + mode: process.argv[2], + nolog: process.argv[6] + }; + + logconf['io-level'] = process.argv[3]; + + logconf['file-level'] = process.argv[4]; + + logconf['file-path'] = process.argv[5]; log = logger.getLogger(logconf); - var args = { logger: log }; - (ml = require('./components-manager'))(args); - (db = require('./persistence'))(args); - initMessageActions(); - pollLoop(); - log.info('Event Poller instantiated'); -}; -function shutDown() { - log.info('EP', 'Shutting down DB Link'); - isRunning = false; - if(db) db.shutDown(); - process.exit(); -} + log.info('EP | Event Poller starts up'); -function loadEventModule(el, cb) { - if(db && ml) db.getEventModule(el, function(err, obj) { - if(err || !obj) { - if(typeof cb === 'function') cb(new Error('Retrieving Event Module ' + el + ' from DB: ' + err)); - else log.error('EP', 'Retrieving Event Module ' + el + ' from DB!'); + db({ + logger: log + }); + + dynmod({ + logger: log + }); + + listUserModules = {}; + + isRunning = true; + + process.on('disconnect', function() { + log.info('EP | Shutting down Event Poller'); + isRunning = false; + return process.exit(); + }); + + process.on('message', function(msg) { + if (msg.event === 'new' || msg.event === 'init') { + fLoadModule(msg); } - else { - // log.info('EP', 'Loading Event Module: ' + el); - try { - var m = ml.requireFromString(obj, el); - db.getEventModuleAuth(el, function(mod) { - return function(err, objA) { - //TODO authentication needs to be done differently - if(objA && mod.loadCredentials) mod.loadCredentials(JSON.parse(objA)); - }; - }(m)); - listEventModules[el] = m; - if(typeof cb === 'function') cb(null, m); - } catch(e) { - if(typeof cb === 'function') cb(e); - else log.error(e); + if (msg.event === 'del') { + delete listUserModules[msg.user][msg.ruleId]; + if (JSON.stringify(listUserModules[msg.user]) === "{}") { + return delete listUserModules[msg.user]; } } }); -} -function fetchPollFunctionFromModule(mod, func) { - for(var i = 1; i < func.length; i++) { - if(mod) mod = mod[func[i]]; - } - if(mod) { - log.info('EP', 'Found active event module "' + func.join('->') + '", adding it to polling list'); - //FIXME change this to [module][prop] = module; because like this identical properties get overwritten - // also add some on a per user basis information because this should go into a user context for the users - // that sat up this rule! - listPoll[func.join('->')] = mod; - } else { - log.info('EP', 'No property "' + func.join('->') + '" found'); - } -} - -function initMessageActions() { - listMessageActions['event'] = function(args) { - var prop = args[1], arrModule = prop.split('->'); - if(arrModule.length > 1){ - if(listEventModules[arrModule[0]]) { - fetchPollFunctionFromModule(listEventModules[arrModule[0]], arrModule); - } else { - log.info('EP', 'Event Module ' + arrModule[0] + ' needs to be loaded, doing it now...'); - loadEventModule(arrModule[0], function(err, obj) { - if(err || !obj) log.error('EP', 'Event Module "' + arrModule[0] + '" not found: ' + err); - else { - log.info('EP', 'Event Module ' + arrModule[0] + ' found and loaded'); - fetchPollFunctionFromModule(obj, arrModule); - } - }); - } + fLoadModule = function(msg) { + var arrName, fAnonymous; + arrName = msg.rule.event.split(' -> '); + fAnonymous = function() { + return db.eventPollers.getModule(arrName[0], function(err, obj) { + if (!obj) { + return log.warn("EP | Strange... no module retrieved: " + arrName[0]); + } else { + return dynmod.compileString(obj.data, msg.user, msg.rule.id, arrName[0], obj.lang, db.eventPollers, function(result) { + if (!result.answ === 200) { + log.error("EP | Compilation of code failed! " + msg.user + ", " + msg.rule.id + ", " + arrName[0]); + } + if (!listUserModules[msg.user]) { + listUserModules[msg.user] = {}; + } + listUserModules[msg.user][msg.rule.id] = { + id: msg.rule.event, + pollfunc: arrName[1], + module: result.module + }; + return log.info("EP | New event module loaded! " + msg.user + ", " + msg.rule.id + ", " + arrName[0]); + }); + } + }); + }; + if (msg.event === 'new' || !listUserModules[msg.user] || !listUserModules[msg.user][msg.rule.id]) { + return fAnonymous(); } }; + + + /* + This function will loop infinitely every 10 seconds until isRunning is set to false - - process.on('message', function( msg ) { - console.log( 'EVENT POLLER GOT MESSAGE!'); - console.log( typeof msg); - console.log(msg); - // var arrProps = obj .split('|'); - // if(arrProps.length < 2) log.error('EP', 'too few parameter in message!'); - // else { - // var func = listMessageActions[arrProps[0]]; - // if(func) func(arrProps); - // } - console.log('EP internal event handled'); - }); + @private pollLoop() + */ - // very important so the process doesnt linger on when the paren process is killed - process.on('disconnect', shutDown); -} - - -function checkRemotes() { - for(var prop in listPoll) { - try { - listPoll[prop]( - /* - * define and immediately call anonymous function with param prop. - * This places the value of prop into the context of the callback - * and thus doesn't change when the for loop keeps iterating over listPoll - */ - (function(p) { - return function(err, obj) { - if(err) { - err.additionalInfo = 'module: ' + p; - log.error('EP', err); - } else { - // FIXME this needs to be pushed into the db not passed to the process! - console.error('eventpoller needs to push event into db queue') - // process.send({ - // event: p, - // eventid: 'polled_' + eId++, - // payload: obj - // }); - } + pollLoop = function() { + var err, fPoll, fRegisterModuleReference, myRule, oRules, ruleName, userName; + if (isRunning) { + for (userName in listUserModules) { + oRules = listUserModules[userName]; + for (ruleName in oRules) { + myRule = oRules[ruleName]; + fPoll = myRule.module[myRule.pollfunc]; + fRegisterModuleReference = function(ruleId, userId, eventId) { + return function(obj) { + return db.pushEvent({ + event: eventId, + eventid: "polled " + eventId + " " + userId + "_" + ((new Date).toISOString()), + payload: obj + }); + }; }; - })(prop) - ); - } catch (e) { - log.error('EP', e); + try { + fPoll(fRegisterModuleReference(ruleName, userName, myRule.id)); + } catch (_error) { + err = _error; + log.info('EP | ERROR encountered during polling!'); + log.info(err); + } + } + } + return setTimeout(pollLoop, 10000); } - } -} + }; -function pollLoop() { - if(isRunning) { - checkRemotes(); - setTimeout(pollLoop, 10000); - } -} - -init(); \ No newline at end of file + pollLoop(); + +}).call(this); diff --git a/js-coffee/persistence.js b/js-coffee/persistence.js index d60fd30..22d1d58 100644 --- a/js-coffee/persistence.js +++ b/js-coffee/persistence.js @@ -50,6 +50,12 @@ Persistence }; })(this); + exports.getLogger = (function(_this) { + return function() { + return _this.log; + }; + })(this); + exports.initPort = (function(_this) { return function(port) { var _ref; diff --git a/js-coffee/users.js b/js-coffee/users.js deleted file mode 100644 index 813b71f..0000000 --- a/js-coffee/users.js +++ /dev/null @@ -1,81 +0,0 @@ -'use strict'; - -var log = require('./logging'), - objCmds = { - addUser: addUser, - getUser: getUser, - delUser: delUser, - addRule: addRule, - getRules: getRules, - delRule: delRule - }; - -exports = module.exports = function(args) { - args = args || {}; - log(args); - return module.exports; -}; - -exports.handleCommand = function(args, cb) { - if(!args.cmd) { - var e = new Error('No command defined!'); - if(typeof cb === 'function') cb(e); - else log.error('US', e); - } else { - objCmds[args.cmd](args, cb); - } -}; - -/** - * - * @param {Object} args - * @param {function} cb - */ -function addUser(args, cb) { - -} - -/** - * - * @param {Object} args - * @param {function} cb - */ -function getUser(args, cb) { - -} - -/** - * - * @param {Object} args - * @param {function} cb - */ -function delUser(args, cb) { - -} - -/** - * - * @param {Object} args - * @param {function} cb - */ -function addRule(args, cb) { - -} - -/** - * - * @param {Object} args - * @param {function} cb - */ -function getRule(args, cb) { - -} - -/** - * - * @param {Object} args - * @param {function} cb - */ -function delRule(args, cb) { - -} diff --git a/js/eventpoller.js b/js/eventpoller.js index 0be62b9..e69de29 100644 --- a/js/eventpoller.js +++ b/js/eventpoller.js @@ -1,154 +0,0 @@ -// # Event Poller - -'use strict'; - -var fs = require('fs'), - path = require('path'), - log = require('./logging'), - listMessageActions = {}, - listAdminCommands = {}, - listEventModules = {}, - listPoll = {}, //TODO this will change in the future because it could have - //several parameterized (user-specific) instances of each event module - isRunning = true, - eId = 0, - db, ml; - -//TODO allow different polling intervals (a wrapper together with settimeout per to be polled could be an easy and solution) - - -function init() { - if(process.argv.length > 2) log({ logType: parseInt(process.argv[2]) || 0 }); - var args = { logType: log.getLogType() }; - ml = require('./module_loader')(args); - db = require('./db_interface')(args); - initAdminCommands(); - initMessageActions(); - pollLoop(); -}; - - -function loadEventModule(el, cb) { - if(db && ml) db.getEventModule(el, function(err, obj) { - if(err || !obj) { - if(typeof cb === 'function') cb(new Error('Retrieving Event Module ' + el + ' from DB: ' + err)); - else log.error('EP', 'Retrieving Event Module ' + el + ' from DB!'); - } - else { - // log.print('EP', 'Loading Event Module: ' + el); - try { - var m = ml.requireFromString(obj, el); - db.getEventModuleAuth(el, function(mod) { - return function(err, objA) { - //TODO authentication needs to be done differently - if(objA && mod.loadCredentials) mod.loadCredentials(JSON.parse(objA)); - }; - }(m)); - listEventModules[el] = m; - if(typeof cb === 'function') cb(null, m); - } catch(e) { - if(typeof cb === 'function') cb(e); - else log.error(e); - } - } - }); -} - -function fetchPollFunctionFromModule(mod, func) { - for(var i = 1; i < func.length; i++) { - if(mod) mod = mod[func[i]]; - } - if(mod) { - log.print('EP', 'Found active event module "' + func.join('->') + '", adding it to polling list'); - //FIXME change this to [module][prop] = module; because like this identical properties get overwritten - // also add some on a per user basis information because this should go into a user context for the users - // that sat up this rule! - listPoll[func.join('->')] = mod; - } else { - log.print('EP', 'No property "' + func.join('->') + '" found'); - } -} - -function initMessageActions() { - listMessageActions['event'] = function(args) { - var prop = args[1], arrModule = prop.split('->'); - if(arrModule.length > 1){ - if(listEventModules[arrModule[0]]) { - fetchPollFunctionFromModule(listEventModules[arrModule[0]], arrModule); - } else { - log.print('EP', 'Event Module ' + arrModule[0] + ' needs to be loaded, doing it now...'); - loadEventModule(arrModule[0], function(err, obj) { - if(err || !obj) log.error('EP', 'Event Module "' + arrModule[0] + '" not found: ' + err); - else { - log.print('EP', 'Event Module ' + arrModule[0] + ' found and loaded'); - fetchPollFunctionFromModule(obj, arrModule); - } - }); - } - } - }; - - //TODO this goes into module_manager, this will receive notification about - // new loaded/stored event modules and fetch them from the db - listMessageActions['cmd'] = function(args) { - var func = listAdminCommands[args[1]]; - if(typeof(func) === 'function') func(args); - }; - - process.on('message', function(strProps) { - var arrProps = strProps.split('|'); - if(arrProps.length < 2) log.error('EP', 'too few parameter in message!'); - else { - var func = listMessageActions[arrProps[0]]; - if(func) func(arrProps); - } - }); -} - -function initAdminCommands() { - listAdminCommands['shutdown'] = function(args) { - log.print('EP', 'Shutting down DB Link'); - isRunning = false; - if(db) db.shutDown(); - }; -} - -function checkRemotes() { - for(var prop in listPoll) { - try { - listPoll[prop]( - /* - * define and immediately call anonymous function with param prop. - * This places the value of prop into the context of the callback - * and thus doesn't change when the for loop keeps iterating over listPoll - * TODO add this example to the documentation and elaborate - */ - (function(p) { - return function(err, obj) { - if(err) { - err.additionalInfo = 'module: ' + p; - log.error('EP', err); - } else { - process.send({ - event: p, - eventid: 'polled_' + eId++, - payload: obj - }); - } - }; - })(prop) - ); - } catch (e) { - log.error('EP', e); - } - } -} - -function pollLoop() { - if(isRunning) { - checkRemotes(); - setTimeout(pollLoop, 10000); - } -} - -init(); \ No newline at end of file