diff --git a/coffee/components-manager.coffee b/coffee/components-manager.coffee index 420d991..2045f77 100644 --- a/coffee/components-manager.coffee +++ b/coffee/components-manager.coffee @@ -54,15 +54,23 @@ exports.addRuleListener = ( eh ) => fGoThroughUsers = ( user, rules ) -> # Fetch the rules object for each rule in each user - fFetchRule = ( rule ) -> - db.getRule rule, ( err, oRule ) => - eventEmitter.emit 'rule', - event: 'init' - user: user - rule: JSON.parse oRule + fFetchRule = ( userName ) -> + ( rule ) -> + db.getRule rule, ( err, strRule ) => + try + oRule = JSON.parse strRule + db.resetLog userName, oRule.id + db.appendLog userName, oRule.id, "INIT", "Rule '#{ oRule.id }' initialized" + + eventEmitter.emit 'rule', + event: 'init' + user: userName + rule: oRule + catch err + @log.warn "CM | There's an invalid rule in the system: #{ strRule }" # Go through all rules for each user - fFetchRule rule for rule in rules + fFetchRule( user ) rule for rule in rules # Go through each user fGoThroughUsers user, rules for user, rules of objUsers @@ -171,7 +179,7 @@ forgeModule = ( user, oPayload, dbMod, callback ) => funcs.push name for name, id of cm.module @log.info "CM | Storing new module with functions #{ funcs.join() }" answ.message = - "Event Poller module successfully stored! Found following function(s): #{ funcs }" + " Module #{ oPayload.id } successfully stored! Found following function(s): #{ funcs }" oPayload.functions = JSON.stringify funcs dbMod.storeModule user.username, oPayload if oPayload.public is 'true' @@ -184,24 +192,62 @@ commandFunctions = code: 200 message: dynmod.getPublicKey() +# EVENT POLLERS +# ------------- get_event_pollers: ( user, oPayload, callback ) -> getModules user, oPayload, db.eventPollers, callback - get_action_invokers: ( user, oPayload, callback ) -> - getModules user, oPayload, db.actionInvokers, callback + get_full_event_poller: ( user, oPayload, callback ) -> + db.eventPollers.getModule oPayload.id, ( err, obj ) -> + callback + code: 200 + message: JSON.stringify obj get_event_poller_params: ( user, oPayload, callback ) -> getModuleParams user, oPayload, db.eventPollers, callback - - get_action_invoker_params: ( user, oPayload, callback ) -> - getModuleParams user, oPayload, db.actionInvokers, callback - + forge_event_poller: ( user, oPayload, callback ) -> forgeModule user, oPayload, db.eventPollers, callback + + delete_event_poller: ( user, oPayload, callback ) -> + answ = hasRequiredParams [ 'id' ], oPayload + if answ.code isnt 200 + callback answ + else + db.eventPollers.deleteModule oPayload.id + callback + code: 200 + message: 'OK!' + +# ACTION INVOKERS +# --------------- + get_action_invokers: ( user, oPayload, callback ) -> + getModules user, oPayload, db.actionInvokers, callback + + get_full_action_invoker: ( user, oPayload, callback ) -> + db.actionInvokers.getModule oPayload.id, ( err, obj ) -> + callback + code: 200 + message: JSON.stringify obj + + get_action_invoker_params: ( user, oPayload, callback ) -> + getModuleParams user, oPayload, db.actionInvokers, callback forge_action_invoker: ( user, oPayload, callback ) -> forgeModule user, oPayload, db.actionInvokers, callback + delete_action_invoker: ( user, oPayload, callback ) -> + answ = hasRequiredParams [ 'id' ], oPayload + if answ.code isnt 200 + callback answ + else + db.actionInvokers.deleteModule oPayload.id + callback + code: 200 + message: 'OK!' + +# RULES +# ----- get_rules: ( user, oPayload, callback ) -> db.getUserLinkedRules user.username, ( err, obj ) -> callback @@ -218,21 +264,6 @@ commandFunctions = code: 200 message: obj - delete_rule: ( user, oPayload, callback ) -> - answ = hasRequiredParams [ 'id' ], oPayload - if answ.code isnt 200 - callback answ - else - db.deleteRule oPayload.id - eventEmitter.emit 'rule', - event: 'del' - user: user.username - rule: null - ruleId: oPayload.id - callback - code: 200 - message: 'OK!' - # A rule needs to be in following format: # - id @@ -264,6 +295,8 @@ commandFunctions = db.eventPollers.storeUserParams epModId, user.username, oPayload.event_params arrParams = oPayload.action_params db.actionInvokers.storeUserParams id, user.username, JSON.stringify params for id, params of arrParams + db.resetLog user.username, rule.id + db.appendLog user.username, rule.id, "INIT", "Rule '#{ rule.id }' initialized" eventEmitter.emit 'rule', event: 'new' user: user.username @@ -271,4 +304,19 @@ commandFunctions = answ = code: 200 message: "Rule '#{ rule.id }' stored and activated!" - callback answ \ No newline at end of file + callback answ + + delete_rule: ( user, oPayload, callback ) -> + answ = hasRequiredParams [ 'id' ], oPayload + if answ.code isnt 200 + callback answ + else + db.deleteRule oPayload.id + eventEmitter.emit 'rule', + event: 'del' + user: user.username + rule: null + ruleId: oPayload.id + callback + code: 200 + message: 'OK!' diff --git a/coffee/dynamic-modules.coffee b/coffee/dynamic-modules.coffee index 770ced8..ab7605f 100644 --- a/coffee/dynamic-modules.coffee +++ b/coffee/dynamic-modules.coffee @@ -48,20 +48,26 @@ exports.getPublicKey = () => @strPublicKey -issueApiCall = ( method, url, credentials, cb ) => - try - if method is 'get' - func = needle.get - else - func = needle.post - - func url, credentials, ( err, resp, body ) => - if not err - cb body +issueApiCall = ( logger ) -> + ( method, url, credentials, cb ) -> + try + if method is 'get' + func = needle.get else - cb() - catch err - @log.info 'DM | Error even before calling!' + func = needle.post + + func url, credentials, ( err, resp, body ) => + try + cb err, resp, body + catch err + logger 'Error during apicall! ' + err.message + catch err + logger 'Error before apicall! ' + err.message + +logFunction = ( uId, rId, mId ) -> + ( msg ) -> + db.appendLog uId, rId, mId, msg + ### Try to run a JS module from a string, together with the given parameters. If it is written in CoffeeScript we @@ -86,11 +92,6 @@ exports.compileString = ( src, userId, ruleId, modId, lang, dbMod, cb ) => answ.message = 'Compilation of CoffeeScript failed at line ' + err.location.first_line - logFunction = ( uId, rId, mId ) -> - ( msg ) -> - db.appendLog uId, rId, mId, msg - db.resetLog userId, ruleId - fTryToLoad = ( params ) => if params try @@ -98,15 +99,18 @@ exports.compileString = ( src, userId, ruleId, modId, lang, dbMod, cb ) => params = JSON.parse oDecrypted.plaintext catch err @log.warn "DM | Error during parsing of user defined params for #{ userId }, #{ ruleId }, #{ modId }" + @log.warn err params = {} else params = {} + logFunc = logFunction userId, ruleId, modId sandbox = id: userId + '.' + modId + '.vm' params: params - apicall: issueApiCall - log: logFunction userId, ruleId, modId + apicall: issueApiCall logFunc + # needle: needle + log: logFunc # debug: console.log exports: {} @@ -119,12 +123,12 @@ exports.compileString = ( src, userId, ruleId, modId, lang, dbMod, cb ) => # TODO We should investigate memory usage and garbage collection (global.gc())? # Start Node with the flags —nouse_idle_notification and —expose_gc, and then when you want to run the GC, just call global.gc(). catch err - console.log err answ.code = 400 answ.message = 'Loading Module failed: ' + err.message cb answ: answ module: sandbox.exports + logger: sandbox.log if dbMod dbMod.getUserParams modId, userId, ( err, obj ) -> diff --git a/coffee/engine.coffee b/coffee/engine.coffee index c250ccc..48de951 100644 --- a/coffee/engine.coffee +++ b/coffee/engine.coffee @@ -16,7 +16,7 @@ db = require './persistence' dynmod = require './dynamic-modules' # - External Modules: -# [js-select](https://www.npmjs.org/package/js-select) +# [js-select](https://github.com/harthur/js-select) jsonQuery = require 'js-select' ### @@ -51,11 +51,10 @@ Initializes the Engine and starts polling the event queue for new events. ### exports = module.exports = ( args ) => if not isRunning - isRunning = true @log = args.logger db args dynmod args - setTimeout pollQueue, 10 # Very important, this forks a token for the poll task + setTimeout exports.startEngine, 10 # Very important, this forks a token for the poll task module.exports @@ -70,6 +69,11 @@ modules are loaded correctly exports.getListUserRules = () -> listUserRules +# We need this so we can shut it down after the module unit tests +exports.startEngine = () -> + if not isRunning + isRunning = true + pollQueue() ### An event associated to rules happened and is captured here. Such events @@ -157,13 +161,14 @@ updateActionModules = ( updatedRuleId ) -> # load all required modules for all users fAddRequired userName, oUser for userName, oUser of listUserRules - +semaphore = 0 pollQueue = () -> if isRunning db.popEvent ( err, obj ) -> if not err and obj processEvent obj - setTimeout pollQueue, 50 #TODO adapt to load + semaphore-- + setTimeout pollQueue, 20 * semaphore #FIXME right wayx to adapt to load? ### Checks whether all conditions of the rule are met by the event. @@ -179,6 +184,7 @@ validConditions = ( evt, rule ) -> return false if jsonQuery( evt, prop ).nodes().length is 0 return true +semaphore = 0 ### Handles retrieved events. @@ -186,18 +192,21 @@ Handles retrieved events. @param {Object} evt ### processEvent = ( evt ) => - - fSearchAndInvokeAction = ( node, arrPath, evt, depth ) -> + fSearchAndInvokeAction = ( node, arrPath, funcName, evt, depth ) -> if not node @log.error "EN | Didn't find property in user rule list: " + arrPath.join ', ' + " at depth " + depth return if depth is arrPath.length try - node evt.payload + semaphore++ + node[funcName] evt.payload catch err @log.info "EN | ERROR IN ACTION INVOKER: " + err.message + node.logger err.message + if semaphore-- % 100 is 0 + @log.warn "EN | The system is producing too many tokens! Currently: #{ semaphore }" else - fSearchAndInvokeAction node[arrPath[depth]], arrPath, evt, depth + 1 + fSearchAndInvokeAction node[arrPath[depth]], arrPath, funcName, evt, depth + 1 @log.info 'EN | processing event: ' + evt.event + '(' + evt.eventid + ')' for userName, oUser of listUserRules @@ -206,7 +215,7 @@ processEvent = ( evt ) => @log.info 'EN | EVENT FIRED: ' + evt.event + '(' + evt.eventid + ') for rule ' + ruleName for action in oMyRule.rule.actions arr = action.split ' -> ' - fSearchAndInvokeAction listUserRules, [ userName, ruleName, 'actions', arr[0], arr[1]], evt, 0 + fSearchAndInvokeAction listUserRules, [ userName, ruleName, 'actions', arr[0]], arr[1], evt, 0 exports.shutDown = () -> isRunning = false \ No newline at end of file diff --git a/coffee/event-poller.coffee b/coffee/event-poller.coffee index 636b3ce..93840a5 100644 --- a/coffee/event-poller.coffee +++ b/coffee/event-poller.coffee @@ -15,7 +15,7 @@ db = require './persistence' dynmod = require './dynamic-modules' # If we do not receive all required arguments we shut down immediately -if process.argv.length < 7 +if process.argv.length < 8 console.error 'Not all arguments have been passed!' process.exit() @@ -31,7 +31,9 @@ log.info 'EP | Event Poller starts up' # Initialize required modules (should be in cache already) db logger: log -dynmod logger: log +dynmod + logger: log + keygen: process.argv[ 7 ] # Initialize module local variables and listUserModules = {} @@ -90,6 +92,7 @@ fLoadModule = ( msg ) -> id: msg.rule.event pollfunc: arrName[1] module: result.module + logger: result.logger log.info "EP | New event module loaded! #{ msg.user }, #{ msg.rule.id }, #{ arrName[0] }" @@ -115,24 +118,24 @@ pollLoop = () -> # Go through each of the users modules for ruleName, myRule of oRules - # This is the to be polled function - fPoll = myRule.module[myRule.pollfunc] + # # This is the to be polled function + # fPoll = myRule.module[myRule.pollfunc] # We have to register the poll function in belows anonymous function # because we're fast iterating through the listUserModules and references will # eventually not be what they are expected to be - fRegisterModuleReference = ( ruleId, userId, eventId ) -> - ( obj ) -> - db.pushEvent - event: eventId - eventid: "polled #{ eventId } #{ userId }_#{ ( new Date ).toISOString() }" - payload: obj + fCallFunction = ( oRule, ruleId, userId ) -> + try + oRule.module[oRule.pollfunc] ( obj ) -> + db.pushEvent + event: oRule.id + eventid: "polled #{ oRule.id } #{ userId }_#{ ( new Date ).toISOString() }" + payload: obj + catch err + log.info "EP | ERROR in module when polled: #{ oRule.id } #{ userId }: #{err.message}" + oRule.logger err.message - try - fPoll fRegisterModuleReference ruleName, userName, myRule.id - catch err - log.info 'EP | ERROR encountered during polling!' - log.info err + fCallFunction myRule, ruleName, userName setTimeout pollLoop, 10000 diff --git a/coffee/logging.coffee b/coffee/logging.coffee index 35a73c1..8abf6f0 100644 --- a/coffee/logging.coffee +++ b/coffee/logging.coffee @@ -35,7 +35,9 @@ exports.getLogger = ( args ) => # `args` holds the configuration settings for the logging, see either CLI arguments # in [webapi-eca](webapi-eca.html) or the configuration parameters in [config](config.html). args = args ? {} - if args.nolog + # We need to check for string 'true' also since the cliArgs passed to + # the event-poller will be strings + if args.nolog is true or args.nolog is 'true' # if the user doesn't want to have a log at all (e.g. during tests), it can be omitted with # the nolog flag emptylog diff --git a/coffee/webapi-eca.coffee b/coffee/webapi-eca.coffee index 153412c..74b7fba 100644 --- a/coffee/webapi-eca.coffee +++ b/coffee/webapi-eca.coffee @@ -83,7 +83,7 @@ opt = 'p': alias : 'log-file-path', describe: 'Specify the path to the log file within the "logs" folder' -# `-n`, `--nolog`: Set this if no output shall be generated +# `-n`, `--nolog`: Set this true if no output shall be generated 'n': alias : 'nolog', describe: 'Set this if no output shall be generated' @@ -111,7 +111,7 @@ if argv.f if argv.p logconf[ 'file-path' ] = argv.p if argv.n - logconf[ 'nolog' ] = argv.n + logconf[ 'nolog' ] = true try fs.unlinkSync path.resolve __dirname, '..', 'logs', logconf[ 'file-path' ] @log = logger.getLogger logconf @@ -163,8 +163,10 @@ init = => args.logconf[ 'file-level' ] # - the optional path to the log file args.logconf[ 'file-path' ] - # - whether a log file shall be written at all: null else + # - whether a log file shall be written at all [true|false] args.logconf[ 'nolog' ] + # - The keygen phrase, this has to be handled differently in the future! + args[ 'keygen' ] ] poller = cp.fork path.resolve( __dirname, nameEP ), cliArgs diff --git a/config/system.json b/config/system.json index 832d046..ca43219 100644 --- a/config/system.json +++ b/config/system.json @@ -2,6 +2,7 @@ "http-port": 8125, "db-port": 6379, "log": { + "nolog": "false", "mode": "development", "io-level": "info", "file-level": "info", diff --git a/js/components-manager.js b/js/components-manager.js index 25d800f..a9a992d 100644 --- a/js/components-manager.js +++ b/js/components-manager.js @@ -57,21 +57,32 @@ Components Manager var fGoThroughUsers, rules, user, _results; fGoThroughUsers = function(user, rules) { var fFetchRule, rule, _i, _len, _results; - fFetchRule = function(rule) { - return db.getRule(rule, (function(_this) { - return function(err, oRule) { - return eventEmitter.emit('rule', { - event: 'init', - user: user, - rule: JSON.parse(oRule) - }); - }; - })(this)); + fFetchRule = function(userName) { + return function(rule) { + return db.getRule(rule, (function(_this) { + return function(err, strRule) { + var oRule; + try { + oRule = JSON.parse(strRule); + db.resetLog(userName, oRule.id); + db.appendLog(userName, oRule.id, "INIT", "Rule '" + oRule.id + "' initialized"); + return eventEmitter.emit('rule', { + event: 'init', + user: userName, + rule: oRule + }); + } catch (_error) { + err = _error; + return _this.log.warn("CM | There's an invalid rule in the system: " + strRule); + } + }; + })(this)); + }; }; _results = []; for (_i = 0, _len = rules.length; _i < _len; _i++) { rule = rules[_i]; - _results.push(fFetchRule(rule)); + _results.push(fFetchRule(user)(rule)); } return _results; }; @@ -236,7 +247,7 @@ Components Manager funcs.push(name); } _this.log.info("CM | Storing new module with functions " + (funcs.join())); - answ.message = "Event Poller module successfully stored! Found following function(s): " + funcs; + answ.message = " Module " + oPayload.id + " successfully stored! Found following function(s): " + funcs; oPayload.functions = JSON.stringify(funcs); dbMod.storeModule(user.username, oPayload); if (oPayload["public"] === 'true') { @@ -261,21 +272,63 @@ Components Manager get_event_pollers: function(user, oPayload, callback) { return getModules(user, oPayload, db.eventPollers, callback); }, - get_action_invokers: function(user, oPayload, callback) { - return getModules(user, oPayload, db.actionInvokers, callback); + get_full_event_poller: function(user, oPayload, callback) { + return db.eventPollers.getModule(oPayload.id, function(err, obj) { + return callback({ + code: 200, + message: JSON.stringify(obj) + }); + }); }, get_event_poller_params: function(user, oPayload, callback) { return getModuleParams(user, oPayload, db.eventPollers, callback); }, - get_action_invoker_params: function(user, oPayload, callback) { - return getModuleParams(user, oPayload, db.actionInvokers, callback); - }, forge_event_poller: function(user, oPayload, callback) { return forgeModule(user, oPayload, db.eventPollers, callback); }, + delete_event_poller: function(user, oPayload, callback) { + var answ; + answ = hasRequiredParams(['id'], oPayload); + if (answ.code !== 200) { + return callback(answ); + } else { + db.eventPollers.deleteModule(oPayload.id); + return callback({ + code: 200, + message: 'OK!' + }); + } + }, + get_action_invokers: function(user, oPayload, callback) { + return getModules(user, oPayload, db.actionInvokers, callback); + }, + get_full_action_invoker: function(user, oPayload, callback) { + return db.actionInvokers.getModule(oPayload.id, function(err, obj) { + return callback({ + code: 200, + message: JSON.stringify(obj) + }); + }); + }, + get_action_invoker_params: function(user, oPayload, callback) { + return getModuleParams(user, oPayload, db.actionInvokers, callback); + }, forge_action_invoker: function(user, oPayload, callback) { return forgeModule(user, oPayload, db.actionInvokers, callback); }, + delete_action_invoker: function(user, oPayload, callback) { + var answ; + answ = hasRequiredParams(['id'], oPayload); + if (answ.code !== 200) { + return callback(answ); + } else { + db.actionInvokers.deleteModule(oPayload.id); + return callback({ + code: 200, + message: 'OK!' + }); + } + }, get_rules: function(user, oPayload, callback) { return db.getUserLinkedRules(user.username, function(err, obj) { return callback({ @@ -298,25 +351,6 @@ Components Manager }); } }, - delete_rule: function(user, oPayload, callback) { - var answ; - answ = hasRequiredParams(['id'], oPayload); - if (answ.code !== 200) { - return callback(answ); - } else { - db.deleteRule(oPayload.id); - eventEmitter.emit('rule', { - event: 'del', - user: user.username, - rule: null, - ruleId: oPayload.id - }); - return callback({ - code: 200, - message: 'OK!' - }); - } - }, forge_rule: function(user, oPayload, callback) { var answ; answ = hasRequiredParams(['id', 'event', 'conditions', 'actions'], oPayload); @@ -350,6 +384,8 @@ Components Manager params = arrParams[id]; db.actionInvokers.storeUserParams(id, user.username, JSON.stringify(params)); } + db.resetLog(user.username, rule.id); + db.appendLog(user.username, rule.id, "INIT", "Rule '" + rule.id + "' initialized"); eventEmitter.emit('rule', { event: 'new', user: user.username, @@ -363,6 +399,25 @@ Components Manager return callback(answ); }); } + }, + delete_rule: function(user, oPayload, callback) { + var answ; + answ = hasRequiredParams(['id'], oPayload); + if (answ.code !== 200) { + return callback(answ); + } else { + db.deleteRule(oPayload.id); + eventEmitter.emit('rule', { + event: 'del', + user: user.username, + rule: null, + ruleId: oPayload.id + }); + return callback({ + code: 200, + message: 'OK!' + }); + } } }; diff --git a/js/dynamic-modules.js b/js/dynamic-modules.js index 543f47f..bf227d8 100644 --- a/js/dynamic-modules.js +++ b/js/dynamic-modules.js @@ -9,7 +9,7 @@ Dynamic Modules */ (function() { - var cryptico, cs, db, exports, issueApiCall, needle, vm; + var cryptico, cs, db, exports, issueApiCall, logFunction, needle, vm; db = require('./persistence'); @@ -52,7 +52,7 @@ Dynamic Modules }; })(this); - issueApiCall = (function(_this) { + issueApiCall = function(logger) { return function(method, url, credentials, cb) { var err, func; try { @@ -61,19 +61,28 @@ Dynamic Modules } else { func = needle.post; } - return func(url, credentials, function(err, resp, body) { - if (!err) { - return cb(body); - } else { - return cb(); - } - }); + return func(url, credentials, (function(_this) { + return function(err, resp, body) { + try { + return cb(err, resp, body); + } catch (_error) { + err = _error; + return logger('Error during apicall! ' + err.message); + } + }; + })(this)); } catch (_error) { err = _error; - return _this.log.info('DM | Error even before calling!'); + return logger('Error before apicall! ' + err.message); } }; - })(this); + }; + + logFunction = function(uId, rId, mId) { + return function(msg) { + return db.appendLog(uId, rId, mId, msg); + }; + }; /* @@ -90,7 +99,7 @@ Dynamic Modules exports.compileString = (function(_this) { return function(src, userId, ruleId, modId, lang, dbMod, cb) { - var answ, err, fTryToLoad, logFunction; + var answ, err, fTryToLoad; answ = { code: 200, message: 'Successfully compiled' @@ -104,14 +113,8 @@ Dynamic Modules answ.message = 'Compilation of CoffeeScript failed at line ' + err.location.first_line; } } - logFunction = function(uId, rId, mId) { - return function(msg) { - return db.appendLog(uId, rId, mId, msg); - }; - }; - db.resetLog(userId, ruleId); fTryToLoad = function(params) { - var oDecrypted, sandbox; + var logFunc, oDecrypted, sandbox; if (params) { try { oDecrypted = cryptico.decrypt(params, _this.oPrivateRSAkey); @@ -119,29 +122,31 @@ Dynamic Modules } catch (_error) { err = _error; _this.log.warn("DM | Error during parsing of user defined params for " + userId + ", " + ruleId + ", " + modId); + _this.log.warn(err); params = {}; } } else { params = {}; } + logFunc = logFunction(userId, ruleId, modId); sandbox = { id: userId + '.' + modId + '.vm', params: params, - apicall: issueApiCall, - log: logFunction(userId, ruleId, modId), + apicall: issueApiCall(logFunc), + log: logFunc, exports: {} }; try { vm.runInNewContext(src, sandbox, sandbox.id); } catch (_error) { err = _error; - console.log(err); answ.code = 400; answ.message = 'Loading Module failed: ' + err.message; } return cb({ answ: answ, - module: sandbox.exports + module: sandbox.exports, + logger: sandbox.log }); }; if (dbMod) { diff --git a/js/engine.js b/js/engine.js index ae7cea1..9430c60 100644 --- a/js/engine.js +++ b/js/engine.js @@ -10,7 +10,7 @@ Engine */ (function() { - var db, dynmod, exports, isRunning, jsonQuery, listUserRules, pollQueue, processEvent, updateActionModules, validConditions; + var db, dynmod, exports, isRunning, jsonQuery, listUserRules, pollQueue, processEvent, semaphore, updateActionModules, validConditions; db = require('./persistence'); @@ -56,11 +56,10 @@ Engine exports = module.exports = (function(_this) { return function(args) { if (!isRunning) { - isRunning = true; _this.log = args.logger; db(args); dynmod(args); - setTimeout(pollQueue, 10); + setTimeout(exports.startEngine, 10); return module.exports; } }; @@ -78,6 +77,13 @@ Engine return listUserRules; }; + exports.startEngine = function() { + if (!isRunning) { + isRunning = true; + return pollQueue(); + } + }; + /* An event associated to rules happened and is captured here. Such events @@ -190,14 +196,17 @@ Engine return _results; }; + semaphore = 0; + pollQueue = function() { if (isRunning) { - return db.popEvent(function(err, obj) { + db.popEvent(function(err, obj) { if (!err && obj) { processEvent(obj); } - return setTimeout(pollQueue, 50); + return semaphore--; }); + return setTimeout(pollQueue, 20 * semaphore); } }; @@ -225,6 +234,8 @@ Engine return true; }; + semaphore = 0; + /* Handles retrieved events. @@ -236,7 +247,7 @@ Engine processEvent = (function(_this) { return function(evt) { var action, arr, fSearchAndInvokeAction, oMyRule, oUser, ruleName, userName, _results; - fSearchAndInvokeAction = function(node, arrPath, evt, depth) { + fSearchAndInvokeAction = function(node, arrPath, funcName, evt, depth) { var err; if (!node) { this.log.error("EN | Didn't find property in user rule list: " + arrPath.join(', ' + " at depth " + depth)); @@ -244,13 +255,18 @@ Engine } if (depth === arrPath.length) { try { - return node(evt.payload); + semaphore++; + node[funcName](evt.payload); } catch (_error) { err = _error; - return this.log.info("EN | ERROR IN ACTION INVOKER: " + err.message); + this.log.info("EN | ERROR IN ACTION INVOKER: " + err.message); + node.logger(err.message); + } + if (semaphore-- % 100 === 0) { + return this.log.warn("EN | The system is producing too many tokens! Currently: " + semaphore); } } else { - return fSearchAndInvokeAction(node[arrPath[depth]], arrPath, evt, depth + 1); + return fSearchAndInvokeAction(node[arrPath[depth]], arrPath, funcName, evt, depth + 1); } }; _this.log.info('EN | processing event: ' + evt.event + '(' + evt.eventid + ')'); @@ -271,7 +287,7 @@ Engine for (_i = 0, _len = _ref.length; _i < _len; _i++) { action = _ref[_i]; arr = action.split(' -> '); - _results2.push(fSearchAndInvokeAction(listUserRules, [userName, ruleName, 'actions', arr[0], arr[1]], evt, 0)); + _results2.push(fSearchAndInvokeAction(listUserRules, [userName, ruleName, 'actions', arr[0]], arr[1], evt, 0)); } return _results2; })()); diff --git a/js/event-poller.js b/js/event-poller.js index 9b4121e..1954c93 100644 --- a/js/event-poller.js +++ b/js/event-poller.js @@ -17,7 +17,7 @@ Dynamic Modules dynmod = require('./dynamic-modules'); - if (process.argv.length < 7) { + if (process.argv.length < 8) { console.error('Not all arguments have been passed!'); process.exit(); } @@ -42,7 +42,8 @@ Dynamic Modules }); dynmod({ - logger: log + logger: log, + keygen: process.argv[7] }); listUserModules = {}; @@ -85,7 +86,8 @@ Dynamic Modules listUserModules[msg.user][msg.rule.id] = { id: msg.rule.event, pollfunc: arrName[1], - module: result.module + module: result.module, + logger: result.logger }; return log.info("EP | New event module loaded! " + msg.user + ", " + msg.rule.id + ", " + arrName[0]); }); @@ -105,29 +107,29 @@ Dynamic Modules */ pollLoop = function() { - var err, fPoll, fRegisterModuleReference, myRule, oRules, ruleName, userName; + var fCallFunction, myRule, oRules, ruleName, userName; if (isRunning) { for (userName in listUserModules) { oRules = listUserModules[userName]; for (ruleName in oRules) { myRule = oRules[ruleName]; - fPoll = myRule.module[myRule.pollfunc]; - fRegisterModuleReference = function(ruleId, userId, eventId) { - return function(obj) { - return db.pushEvent({ - event: eventId, - eventid: "polled " + eventId + " " + userId + "_" + ((new Date).toISOString()), - payload: obj + fCallFunction = function(oRule, ruleId, userId) { + var err; + try { + return oRule.module[oRule.pollfunc](function(obj) { + return db.pushEvent({ + event: oRule.id, + eventid: "polled " + oRule.id + " " + userId + "_" + ((new Date).toISOString()), + payload: obj + }); }); - }; + } catch (_error) { + err = _error; + log.info("EP | ERROR in module when polled: " + oRule.id + " " + userId + ": " + err.message); + return oRule.logger(err.message); + } }; - try { - fPoll(fRegisterModuleReference(ruleName, userName, myRule.id)); - } catch (_error) { - err = _error; - log.info('EP | ERROR encountered during polling!'); - log.info(err); - } + fCallFunction(myRule, ruleName, userName); } } return setTimeout(pollLoop, 10000); diff --git a/js/logging.js b/js/logging.js index 9c57fad..73137ae 100644 --- a/js/logging.js +++ b/js/logging.js @@ -28,7 +28,7 @@ fatal: function() {} }; args = args != null ? args : {}; - if (args.nolog) { + if (args.nolog === true || args.nolog === 'true') { return emptylog; } else { try { diff --git a/js/webapi-eca.js b/js/webapi-eca.js index a0eaa22..86d72c9 100644 --- a/js/webapi-eca.js +++ b/js/webapi-eca.js @@ -118,7 +118,7 @@ WebAPI-ECA Engine } if (argv.n) { - logconf['nolog'] = argv.n; + logconf['nolog'] = true; } try { @@ -157,7 +157,7 @@ WebAPI-ECA Engine _this.log.info('RS | Initialzing engine'); engine(args); _this.log.info('RS | Forking a child process for the event poller'); - cliArgs = [args.logconf['mode'], args.logconf['io-level'], args.logconf['file-level'], args.logconf['file-path'], args.logconf['nolog']]; + cliArgs = [args.logconf['mode'], args.logconf['io-level'], args.logconf['file-level'], args.logconf['file-path'], args.logconf['nolog'], args['keygen']]; poller = cp.fork(path.resolve(__dirname, nameEP), cliArgs); _this.log.info('RS | Initialzing module manager'); cm(args); diff --git a/testing/test_dynamic-modules.coffee b/testing/test_dynamic-modules.coffee index 8bd7a92..59259a6 100644 --- a/testing/test_dynamic-modules.coffee +++ b/testing/test_dynamic-modules.coffee @@ -91,7 +91,8 @@ exports.testCorrectUserParams = ( test ) -> db.getLog oUser.username, oRuleReal.id, ( err, data ) -> try logged = data.split( '] ' )[1] - test.strictEqual logged, "{#{ oAi.id }} " + pw + "\n", 'Did not log the right thing' + logged = logged.split( "\n" )[0] + test.strictEqual logged, "{#{ oAi.id }} " + pw, 'Did not log the right thing' catch e test.ok false, 'Parsing log failed' @@ -100,6 +101,7 @@ exports.testCorrectUserParams = ( test ) -> user: oUser.username rule: null ruleId: oRuleReal.id + engine.shutDown() setTimeout test.done, 200 setTimeout fWaitAgain, 200 diff --git a/testing/test_engine.coffee b/testing/test_engine.coffee index 95e8d70..c845121 100644 --- a/testing/test_engine.coffee +++ b/testing/test_engine.coffee @@ -30,6 +30,10 @@ oRuleRealTwo = objects.rules.ruleRealTwo oAiOne = objects.ais.aiOne oAiTwo = objects.ais.aiTwo +exports.tearDown = ( cb ) -> + engine.startEngine() + cb() + exports.tearDown = ( cb ) -> db.deleteRule oRuleReal.id db.actionInvokers.deleteModule oAiOne.id @@ -46,6 +50,7 @@ exports.tearDown = ( cb ) -> event: 'del' user: oUser.username rule: oRuleRealTwo + engine.shutDown() setTimeout cb, 100 @@ -161,7 +166,8 @@ exports.engine = db.getLog oUser.username, oRuleReal.id, ( err, data ) -> try logged = data.split( '] ' )[1] - test.strictEqual logged, "{#{ oAiOne.id }} " + evt.payload.property + "\n", 'Did not log the right thing' + logged = logged.split( "\n" )[0] + test.strictEqual logged, "{#{ oAiOne.id }} " + evt.payload.property, 'Did not log the right thing' catch e test.ok false, 'Parsing log failed' test.done() diff --git a/webpages/handlers/coffee/edit_modules.coffee b/webpages/handlers/coffee/edit_modules.coffee new file mode 100644 index 0000000..5e503d6 --- /dev/null +++ b/webpages/handlers/coffee/edit_modules.coffee @@ -0,0 +1,105 @@ + +fOnLoad = () -> + document.title = 'Edit Modules' + $( '#pagetitle' ).text "{{{user.username}}}, edit your Modules!" + + moduleType = $( '#module_type' ).val() + $( '#module_type' ).change () -> + moduleType = $( this ).val() + console.log moduleType + fFetchModules() + + editor = ace.edit "editor" + editor.setTheme "ace/theme/monokai" + editor.setReadOnly true + editor.setShowPrintMargin false + + fErrHandler = ( errMsg ) -> + ( err ) -> + $( '#moduleName' ).html "

 

" + $( '#moduleLanguage' ).html " " + editor.setValue "" + fDelayed = () -> + if err.responseText is '' + msg = 'No Response from Server!' + else + try + oErr = JSON.parse err.responseText + msg = oErr.message + $( '#info' ).text errMsg + msg + $( '#info' ).attr 'class', 'error' + setTimeout fDelayed, 500 + + fFetchModules = () -> + if moduleType is 'Event Poller' + cmd = 'get_event_pollers' + else + cmd = 'get_action_invokers' + $.post( '/usercommand', command: cmd ) + .done fUpdateModuleList + .fail fErrHandler 'Did not retrieve rules! ' + + fUpdateModuleList = ( data ) -> + $( '#tableModules tr' ).remove() + oMods = JSON.parse data.message + for modName of oMods + tr = $ '' + img = $( '' ).attr( 'class', 'del' ).attr 'src', 'red_cross_small.png' + imgTwo = $( '' ).attr( 'class', 'log' ).attr 'src', 'logicon.png' + inp = $( '
' ).text modName + tr.append( $( '' ).append img ) + tr.append( $( '' ).append imgTwo ) + tr.append( $( '' ).append inp ) + $( '#tableModules' ).append tr + + fFetchModules() + + $( '#tableModules' ).on 'click', 'img.del', () -> + modName = $( 'div', $( this ).closest( 'tr' )).text() + if confirm "Do you really want to delete the Module '#{ modName }'? + The module might still be active in some of your rules!" + $( '#moduleName' ).html "

 

" + $( '#moduleLanguage' ).html " " + editor.setValue "" + if moduleType is 'Event Poller' + cmd = 'delete_event_poller' + else + cmd = 'delete_action_invoker' + data = + command: cmd + payload: + id: modName + data.payload = JSON.stringify data.payload + $.post( '/usercommand', data ) + .done fFetchModules + .fail fErrHandler 'Could not delete module! ' + + $( '#tableModules' ).on 'click', 'img.log', () -> + modName = $( 'div', $( this ).closest( 'tr' )).text() + if moduleType is 'Event Poller' + cmd = 'get_full_event_poller' + else + cmd = 'get_full_action_invoker' + data = + command: cmd + payload: + id: modName + data.payload = JSON.stringify data.payload + $.post( '/usercommand', data ) + .done ( data ) -> + try + oMod = JSON.parse data.message + catch err + fErrHandler err.message + if oMod.lang is 'CoffeeScript' + editor.getSession().setMode "ace/mode/coffee" + else + editor.getSession().setMode "ace/mode/javascript" + editor.setValue oMod.data + editor.gotoLine 1, 1 + $( '#moduleName' ).html "

#{ oMod.id }

" + $( '#moduleLanguage' ).html "#{ oMod.lang }" + + .fail fErrHandler 'Could not get module! ' + +window.addEventListener 'load', fOnLoad, true diff --git a/webpages/handlers/coffee/edit_rules.coffee b/webpages/handlers/coffee/edit_rules.coffee index 29fd81b..6fe1505 100644 --- a/webpages/handlers/coffee/edit_rules.coffee +++ b/webpages/handlers/coffee/edit_rules.coffee @@ -5,6 +5,7 @@ fOnLoad = () -> fErrHandler = ( errMsg ) -> ( err ) -> + $( '#log_col' ).text "" fDelayed = () -> if err.responseText is '' msg = 'No Response from Server!' @@ -38,6 +39,7 @@ fOnLoad = () -> $( '#tableRules' ).on 'click', 'img.del', () -> ruleName = $( 'div', $( this ).closest( 'tr' )).text() if confirm "Do you really want to delete the rule '#{ ruleName }'?" + $( '#log_col' ).text "" data = command: 'delete_rule' payload: diff --git a/webpages/handlers/coffee/forge_action_invoker.coffee b/webpages/handlers/coffee/forge_action_invoker.coffee index 9e94f0f..53a6ce4 100644 --- a/webpages/handlers/coffee/forge_action_invoker.coffee +++ b/webpages/handlers/coffee/forge_action_invoker.coffee @@ -8,6 +8,7 @@ fOnLoad = () -> editor.setTheme "ace/theme/monokai" editor.getSession().setMode "ace/mode/coffee" editor.setShowPrintMargin false + editor.session.setUseSoftTabs false $( '#editor_mode' ).change ( el ) -> if $( this ).val() is 'CoffeeScript' diff --git a/webpages/handlers/coffee/forge_rule.coffee b/webpages/handlers/coffee/forge_rule.coffee index a61b6d1..fcabce8 100644 --- a/webpages/handlers/coffee/forge_rule.coffee +++ b/webpages/handlers/coffee/forge_rule.coffee @@ -9,6 +9,12 @@ $.post( '/usercommand', command: 'get_public_key' ) fOnLoad = () -> + editor = ace.edit "editor_conditions" + editor.setTheme "ace/theme/monokai" + editor.getSession().setMode "ace/mode/json" + editor.setShowPrintMargin false + # editor.session.setUseSoftTabs false + document.title = 'Rule Forge!' $( '#pagetitle' ).text '{{{user.username}}}, forge your rule!' @@ -134,16 +140,17 @@ fOnLoad = () -> $( '#select_actions' ).on 'change', () -> opt = $ 'option:selected', this arrAI = opt.val().split ' -> ' + idAI = opt.attr 'id' table = $( '#selected_actions' ) - tr = $( '' ).attr( 'id', 'title_' + opt.attr 'id') + tr = $( '' ).attr( 'id', 'title_' + idAI ) img = $( '' ).attr 'src', 'red_cross_small.png' tr.append $( '' ).css( 'width', '20px' ).append img tr.append $( '' ).attr( 'class', 'title').text( opt.val() ) table.append tr - if $( '#ap_' + arrAI[0] ).length is 0 + if $( '#ap_' + idAI ).length is 0 div = $( '
' ) - .attr( 'id', 'ap_' + arrAI[0] ) + .attr( 'id', 'ap_' + idAI ) div.append $( '
') .attr( 'class', 'underlined') .text arrAI[0] @@ -159,12 +166,7 @@ fOnLoad = () -> $( '#params_' + id ).remove() opt = $( '