diff --git a/coffee/components-manager.coffee b/coffee/components-manager.coffee index ca08d01..20e1f7b 100644 --- a/coffee/components-manager.coffee +++ b/coffee/components-manager.coffee @@ -165,6 +165,7 @@ forgeModule = ( user, oPayload, dbMod, callback ) => if answ.code isnt 200 callback answ else + i = 0 dbMod.getModule oPayload.id, ( err, mod ) => if mod answ.code = 409 @@ -296,28 +297,24 @@ commandFunctions = code: 409 message: 'Rule name already existing!' else - console.log 'new ruke' rule = id: oPayload.id event: oPayload.event conditions: oPayload.conditions actions: oPayload.actions strRule = JSON.stringify rule - console.log 'stringified' db.storeRule rule.id, strRule - console.log 'stored' db.linkRule rule.id, user.username - console.log 'linked' db.activateRule rule.id, user.username - console.log 'activated' if oPayload.event_params epModId = rule.event.split( ' -> ' )[0] db.eventPollers.storeUserParams epModId, user.username, oPayload.event_params - console.log 'event params loaded' - arrParams = oPayload.action_params - console.log 'arractionparams' - db.actionInvokers.storeUserParams id, user.username, JSON.stringify params for id, params of arrParams - console.log 'action aprams stored' + oParams = oPayload.action_params + db.actionInvokers.storeUserParams id, user.username, JSON.stringify params for id, params of oParams + oParams = oPayload.action_functions + for id, params of oParams + arr = id.split ' -> ' + db.actionInvokers.storeUserArguments arr[ 0 ], arr[ 1 ], user.username, JSON.stringify params db.resetLog user.username, rule.id db.appendLog user.username, rule.id, "INIT", "Rule '#{ rule.id }' initialized" eventEmitter.emit 'rule', @@ -327,7 +324,6 @@ commandFunctions = answ = code: 200 message: "Rule '#{ rule.id }' stored and activated!" - console.log 'done' callback answ delete_rule: ( user, oPayload, callback ) -> diff --git a/coffee/dynamic-modules.coffee b/coffee/dynamic-modules.coffee index 72f7022..09c4222 100644 --- a/coffee/dynamic-modules.coffee +++ b/coffee/dynamic-modules.coffee @@ -49,33 +49,6 @@ exports = module.exports = ( args ) => exports.getPublicKey = () => @strPublicKey -# We need to wrap the callbacks in try/catch so the token does not get killed and -# other modules are not called. This will be obsolete as soon as each module -# runs in a child process -# FIXME this seems not to achieve what we expected... token gets still lost -# -> implement child processes per module. (or better per user?) -issueNeedleCall = ( logger ) -> - ( method, url, data, options, cb ) -> - try - needle.request method, url, data, options, ( err, resp, body ) => - try - cb err, resp, body - catch err - logger 'Error during needle request! ' + err.message - catch err - logger 'Error before needle request! ' + err.message - -issueRequest = ( logger ) -> - ( options, cb ) -> - try - request options, ( err, resp, body ) => - try - cb err, resp, body - catch err - logger 'Error during request! ' + err.message - catch err - logger 'Error before request! ' + err.message - logFunction = ( uId, rId, mId ) -> ( msg ) -> db.appendLog uId, rId, mId, msg @@ -114,7 +87,6 @@ exports.compileString = ( src, userId, ruleId, modId, lang, dbMod, cb ) => err.location.first_line fTryToLoad = ( params ) => - if params try oDecrypted = cryptico.decrypt params, @oPrivateRSAkey @@ -126,12 +98,13 @@ exports.compileString = ( src, userId, ruleId, modId, lang, dbMod, cb ) => else params = {} + logFunc = logFunction userId, ruleId, modId sandbox = id: userId + '.' + modId + '.vm' params: params - needlereq: issueNeedleCall logFunc - request: issueRequest logFunc + needle: needle + request: request cryptoJS: cryptoJS log: logFunc debug: console.log @@ -154,14 +127,33 @@ exports.compileString = ( src, userId, ruleId, modId, lang, dbMod, cb ) => oFuncParams = {} for fName, func of sandbox.exports getFunctionParamNames fName, func, oFuncParams + + if dbMod + oFuncArgs = {} + console.log 'oFuncParams' + console.log oFuncParams + + for func of oFuncParams + console.log 'fetching ' + func + console.log typeof func + dbMod.getUserArguments modId, func, userId, ( err, obj ) => + console.log err, obj + try + oDecrypted = cryptico.decrypt obj, @oPrivateRSAkey + oFuncArgs[ func ] = JSON.parse oDecrypted.plaintext + catch err + @log.warn "DM | Error during parsing of user defined params for #{ userId }, #{ ruleId }, #{ modId }" + @log.warn err cb answ: answ module: sandbox.exports funcParams: oFuncParams + funcArgs: oFuncArgs logger: sandbox.log if dbMod dbMod.getUserParams modId, userId, ( err, obj ) -> fTryToLoad obj else - fTryToLoad() + fTryToLoad null + diff --git a/coffee/engine.coffee b/coffee/engine.coffee index 48de951..48514bc 100644 --- a/coffee/engine.coffee +++ b/coffee/engine.coffee @@ -39,6 +39,11 @@ An object of users with their active rules and the required action modules "actions": "action-3": oAction-3 ### + +#TODO how often do we allow rules to be processed? +#it would make sense to implement a scheduler approach, which means to store the +#events in the DB and query for them if a rule is evaluated. Through this we would allow +#a CEP approach, rather than just ECA and could combine events/evaluate time constraints listUserRules = {} isRunning = false diff --git a/coffee/persistence.coffee b/coffee/persistence.coffee index ec197dd..2efef24 100644 --- a/coffee/persistence.coffee +++ b/coffee/persistence.coffee @@ -284,6 +284,7 @@ class IndexedModules @db.smembers "#{ @setname }:#{ mId }:users", ( err, obj ) => @unlinkModule mId, userId for userId in obj @deleteUserParams mId, userId for userId in obj + @deleteUserArguments mId, userId for userId in obj ### Stores user params for a module. They are expected to be RSA encrypted with helps of @@ -316,6 +317,34 @@ class IndexedModules @db.del "#{ @setname }-params:#{ mId }:#{ userId }", replyHandler "del '#{ @setname }-params:#{ mId }:#{ userId }'" + ### + Stores user arguments for a function within a module. They are expected to be RSA encrypted with helps of + the provided cryptico JS library and will only be decrypted right before the module is loaded! + + @private storeUserArguments( *mId, userId, encData* ) + @param {String} mId + @param {String} userId + @param {object} encData + ### + storeUserArguments: ( mId, funcId, userId, encData ) => + @log.info "DB | (IdxedMods) #{ @setname }.storeUserArguments( #{ mId }, #{ funcId }, #{ userId }, encData )" + @db.sadd "#{ @setname }:#{ mId }:#{ userId }:functions", funcId, + replyHandler "sadd '#{ funcId }' to '#{ @setname }:#{ mId }:#{ userId }:functions'" + @db.set "#{ @setname }:#{ mId }:#{ userId }:function:#{ funcId }", encData, + replyHandler "set user params in '#{ @setname }:#{ mId }:#{ userId }:function:#{ func }'" + + getUserArguments: ( mId, funcId, userId, cb ) => + console.log 'calling ffunct' + @log.info "DB | (IdxedMods) #{ @setname }.getUserArguments( #{ mId }, #{ funcId }, #{ userId } )" + @db.get "#{ @setname }:#{ mId }:#{ userId }:function:#{ funcId }", cb + + deleteUserArguments: ( mId, userId ) => + @log.info "DB | (IdxedMods) #{ @setname }.deleteUserArguments(#{ mId }, #{ userId } )" + @db.smembers "#{ @setname }:#{ mId }:#{ userId }:functions", ( err, obj ) => + for func in obj + @db.del "#{ @setname }:#{ mId }:#{ userId }:function:#{ func }", + replyHandler "del '#{ @setname }:#{ mId }:#{ userId }:function:#{ func }'" + ### ## Rules diff --git a/coffee/request-handler.coffee b/coffee/request-handler.coffee index 48e6986..6a2232e 100644 --- a/coffee/request-handler.coffee +++ b/coffee/request-handler.coffee @@ -68,26 +68,29 @@ exports.handleEvent = ( req, resp ) -> body = '' req.on 'data', ( data ) -> body += data + req.on 'end', -> - if req.session and req.session.user - try - obj = JSON.parse body - catch err - resp.send 400, 'Badly formed event!' - # If required event properties are present we process the event # - if obj and obj.event and not err - timestamp = ( new Date ).toISOString() - rand = ( Math.floor Math.random() * 10e9 ).toString( 16 ).toUpperCase() - obj.eventid = "#{ obj.event }_#{ timestamp }_#{ rand }" - answ = - code: 200 - message: "Thank you for the event: #{ obj.eventid }" - resp.send answ.code, answ - db.pushEvent obj - else - resp.send 400, 'Your event was missing important parameters!' + #if req.session and req.session.user + try + obj = JSON.parse body + console.log 'got foreign event!' + console.log obj + catch err + resp.send 400, 'Badly formed event!' + # If required event properties are present we process the event # + if obj and obj.event and not err + timestamp = ( new Date ).toISOString() + rand = ( Math.floor Math.random() * 10e9 ).toString( 16 ).toUpperCase() + obj.eventid = "#{ obj.event }_#{ timestamp }_#{ rand }" + answ = + code: 200 + message: "Thank you for the event: #{ obj.eventid }" + resp.send answ.code, answ + db.pushEvent obj else - resp.send 401, 'Please login!' + resp.send 400, 'Your event was missing important parameters!' + # else + # resp.send 401, 'Please login!' ### diff --git a/coffee/webapi-eca.coffee b/coffee/webapi-eca.coffee index 74b7fba..e9fbccd 100644 --- a/coffee/webapi-eca.coffee +++ b/coffee/webapi-eca.coffee @@ -147,6 +147,7 @@ init = => else # > Initialize all required modules with the args object. @log.info 'RS | Initialzing engine' + #TODO We could in the future make the engine a child process as well engine args # Start the event poller. The module manager will emit events for it diff --git a/js/components-manager.js b/js/components-manager.js index 88bfa77..667a3a8 100644 --- a/js/components-manager.js +++ b/js/components-manager.js @@ -223,11 +223,12 @@ Components Manager forgeModule = (function(_this) { return function(user, oPayload, dbMod, callback) { - var answ; + var answ, i; answ = hasRequiredParams(['id', 'params', 'lang', 'data'], oPayload); if (answ.code !== 200) { return callback(answ); } else { + i = 0; return dbMod.getModule(oPayload.id, function(err, mod) { var src; if (mod) { @@ -379,14 +380,13 @@ Components Manager return callback(answ); } else { return db.getRule(oPayload.id, function(err, oExisting) { - var arrParams, epModId, id, params, rule, strRule; + var arr, epModId, id, oParams, params, rule, strRule; if (oExisting !== null) { answ = { code: 409, message: 'Rule name already existing!' }; } else { - console.log('new ruke'); rule = { id: oPayload.id, event: oPayload.event, @@ -394,25 +394,24 @@ Components Manager actions: oPayload.actions }; strRule = JSON.stringify(rule); - console.log('stringified'); db.storeRule(rule.id, strRule); - console.log('stored'); db.linkRule(rule.id, user.username); - console.log('linked'); db.activateRule(rule.id, user.username); - console.log('activated'); if (oPayload.event_params) { epModId = rule.event.split(' -> ')[0]; db.eventPollers.storeUserParams(epModId, user.username, oPayload.event_params); } - console.log('event params loaded'); - arrParams = oPayload.action_params; - console.log('arractionparams'); - for (id in arrParams) { - params = arrParams[id]; + oParams = oPayload.action_params; + for (id in oParams) { + params = oParams[id]; db.actionInvokers.storeUserParams(id, user.username, JSON.stringify(params)); } - console.log('action aprams stored'); + oParams = oPayload.action_functions; + for (id in oParams) { + params = oParams[id]; + arr = id.split(' -> '); + db.actionInvokers.storeUserArguments(arr[0], arr[1], user.username, JSON.stringify(params)); + } db.resetLog(user.username, rule.id); db.appendLog(user.username, rule.id, "INIT", "Rule '" + rule.id + "' initialized"); eventEmitter.emit('rule', { @@ -424,7 +423,6 @@ Components Manager code: 200, message: "Rule '" + rule.id + "' stored and activated!" }; - console.log('done'); } return callback(answ); }); diff --git a/js/dynamic-modules.js b/js/dynamic-modules.js index 974430b..49833b6 100644 --- a/js/dynamic-modules.js +++ b/js/dynamic-modules.js @@ -9,7 +9,7 @@ Dynamic Modules */ (function() { - var cryptico, cryptoJS, cs, db, exports, getFunctionParamNames, issueNeedleCall, issueRequest, logFunction, needle, regexpComments, request, vm; + var cryptico, cryptoJS, cs, db, exports, getFunctionParamNames, logFunction, needle, regexpComments, request, vm; db = require('./persistence'); @@ -56,48 +56,6 @@ Dynamic Modules }; })(this); - issueNeedleCall = function(logger) { - return function(method, url, data, options, cb) { - var err; - try { - return needle.request(method, url, data, options, (function(_this) { - return function(err, resp, body) { - try { - return cb(err, resp, body); - } catch (_error) { - err = _error; - return logger('Error during needle request! ' + err.message); - } - }; - })(this)); - } catch (_error) { - err = _error; - return logger('Error before needle request! ' + err.message); - } - }; - }; - - issueRequest = function(logger) { - return function(options, cb) { - var err; - try { - return request(options, (function(_this) { - return function(err, resp, body) { - try { - return cb(err, resp, body); - } catch (_error) { - err = _error; - return logger('Error during request! ' + err.message); - } - }; - })(this)); - } catch (_error) { - err = _error; - return logger('Error before request! ' + err.message); - } - }; - }; - logFunction = function(uId, rId, mId) { return function(msg) { return db.appendLog(uId, rId, mId, msg); @@ -146,7 +104,7 @@ Dynamic Modules } } fTryToLoad = function(params) { - var fName, func, logFunc, msg, oDecrypted, oFuncParams, sandbox, _ref; + var fName, func, logFunc, msg, oDecrypted, oFuncArgs, oFuncParams, sandbox, _ref; if (params) { try { oDecrypted = cryptico.decrypt(params, _this.oPrivateRSAkey); @@ -164,8 +122,8 @@ Dynamic Modules sandbox = { id: userId + '.' + modId + '.vm', params: params, - needlereq: issueNeedleCall(logFunc), - request: issueRequest(logFunc), + needle: needle, + request: request, cryptoJS: cryptoJS, log: logFunc, debug: console.log, @@ -188,10 +146,31 @@ Dynamic Modules func = _ref[fName]; getFunctionParamNames(fName, func, oFuncParams); } + if (dbMod) { + oFuncArgs = {}; + console.log('oFuncParams'); + console.log(oFuncParams); + for (func in oFuncParams) { + console.log('fetching ' + func); + console.log(typeof func); + dbMod.getUserArguments(modId, func, userId, function(err, obj) { + console.log(err, obj); + try { + oDecrypted = cryptico.decrypt(obj, _this.oPrivateRSAkey); + return oFuncArgs[func] = JSON.parse(oDecrypted.plaintext); + } catch (_error) { + err = _error; + _this.log.warn("DM | Error during parsing of user defined params for " + userId + ", " + ruleId + ", " + modId); + return _this.log.warn(err); + } + }); + } + } return cb({ answ: answ, module: sandbox.exports, funcParams: oFuncParams, + funcArgs: oFuncArgs, logger: sandbox.log }); }; @@ -200,7 +179,7 @@ Dynamic Modules return fTryToLoad(obj); }); } else { - return fTryToLoad(); + return fTryToLoad(null); } }; })(this); diff --git a/js/persistence.js b/js/persistence.js index 60a1974..fab9334 100644 --- a/js/persistence.js +++ b/js/persistence.js @@ -253,6 +253,9 @@ Persistence function IndexedModules(setname, log) { this.setname = setname; this.log = log; + this.deleteUserArguments = __bind(this.deleteUserArguments, this); + this.getUserArguments = __bind(this.getUserArguments, this); + this.storeUserArguments = __bind(this.storeUserArguments, this); this.deleteUserParams = __bind(this.deleteUserParams, this); this.getUserParamsIds = __bind(this.getUserParamsIds, this); this.getUserParams = __bind(this.getUserParams, this); @@ -352,15 +355,19 @@ Persistence this.unpublish(mId); return this.db.smembers("" + this.setname + ":" + mId + ":users", (function(_this) { return function(err, obj) { - var userId, _i, _j, _len, _len1, _results; + var userId, _i, _j, _k, _len, _len1, _len2, _results; for (_i = 0, _len = obj.length; _i < _len; _i++) { userId = obj[_i]; _this.unlinkModule(mId, userId); } - _results = []; for (_j = 0, _len1 = obj.length; _j < _len1; _j++) { userId = obj[_j]; - _results.push(_this.deleteUserParams(mId, userId)); + _this.deleteUserParams(mId, userId); + } + _results = []; + for (_k = 0, _len2 = obj.length; _k < _len2; _k++) { + userId = obj[_k]; + _results.push(_this.deleteUserArguments(mId, userId)); } return _results; }; @@ -400,6 +407,44 @@ Persistence return this.db.del("" + this.setname + "-params:" + mId + ":" + userId, replyHandler("del '" + this.setname + "-params:" + mId + ":" + userId + "'")); }; + + /* + Stores user arguments for a function within a module. They are expected to be RSA encrypted with helps of + the provided cryptico JS library and will only be decrypted right before the module is loaded! + + @private storeUserArguments( *mId, userId, encData* ) + @param {String} mId + @param {String} userId + @param {object} encData + */ + + IndexedModules.prototype.storeUserArguments = function(mId, funcId, userId, encData) { + this.log.info("DB | (IdxedMods) " + this.setname + ".storeUserArguments( " + mId + ", " + funcId + ", " + userId + ", encData )"); + this.db.sadd("" + this.setname + ":" + mId + ":" + userId + ":functions", funcId, replyHandler("sadd '" + funcId + "' to '" + this.setname + ":" + mId + ":" + userId + ":functions'")); + return this.db.set("" + this.setname + ":" + mId + ":" + userId + ":function:" + funcId, encData, replyHandler("set user params in '" + this.setname + ":" + mId + ":" + userId + ":function:" + func + "'")); + }; + + IndexedModules.prototype.getUserArguments = function(mId, funcId, userId, cb) { + console.log('calling ffunct'); + this.log.info("DB | (IdxedMods) " + this.setname + ".getUserArguments( " + mId + ", " + funcId + ", " + userId + " )"); + return this.db.get("" + this.setname + ":" + mId + ":" + userId + ":function:" + funcId, cb); + }; + + IndexedModules.prototype.deleteUserArguments = function(mId, userId) { + this.log.info("DB | (IdxedMods) " + this.setname + ".deleteUserArguments(" + mId + ", " + userId + " )"); + return this.db.smembers("" + this.setname + ":" + mId + ":" + userId + ":functions", (function(_this) { + return function(err, obj) { + var func, _i, _len, _results; + _results = []; + for (_i = 0, _len = obj.length; _i < _len; _i++) { + func = obj[_i]; + _results.push(_this.db.del("" + _this.setname + ":" + mId + ":" + userId + ":function:" + func, replyHandler("del '" + _this.setname + ":" + mId + ":" + userId + ":function:" + func + "'"))); + } + return _results; + }; + })(this)); + }; + return IndexedModules; })(); diff --git a/testing/files/testObjects.json b/testing/files/testObjects.json index 48a0939..22be358 100644 --- a/testing/files/testObjects.json +++ b/testing/files/testObjects.json @@ -6,7 +6,8 @@ "data":"\n#\n# EmailYak EVENT POLLER\n#\n# Requires user params:\n# - apikey: The user's EmailYak API key\n#\n\nurl = 'https://api.emailyak.com/v1/' + params.apikey + '/json/get/new/email/'\n\nexports.newMail = ( pushEvent ) ->\n needle.get url, ( err, resp, body ) ->\n if not err and resp.statusCode is 200\n mails = JSON.parse( body ).Emails\n pushEvent mail for mail in mails\n else\n log.error 'Error in EmailYak EM newMail: ' + err.message\n\n", "public":"false", "params":"[\"apikey\"]", - "functions":"[\"newMail\"]" + "functions":"[\"newMail\"]", + "functionArgs":"{\"newMail\":[\"pushEvent\"]}" }, "epTwo": { "id":"epTwo", @@ -14,7 +15,8 @@ "data":"\nurl = 'https://api.emailyak.com/v1/' + params.firstparam + '/json/get/new/email/'\n\nexports.newEvent = ( pushEvent ) ->\n needle.get url, ( err, resp, body ) ->\n if not err and resp.statusCode is 200\n mails = JSON.parse( body ).Emails\n pushEvent mail for mail in mails\n else\n log.error 'Error in EmailYak EM newMail: ' + err.message\n\nexports.randomNess = ( pushEvent ) ->\n console.log 'test runs: ' + params.secondparam\n", "public":"true", "params":"[\"firstparam\",\"secondparam\"]", - "functions":"[\"newEvent\",\"randomNess\"]" + "functions":"[\"newEvent\",\"randomNess\"]", + "functionArgs":"{\"newEvent\":[\"pushEvent\"],\"randomNess\":[\"pushEvent\"]}" } }, "ais": { @@ -24,7 +26,8 @@ "data":"exports.printToLog = ( evt ) ->\n\tlog evt.property", "public":"false", "params":"[\"apikey\"]", - "functions":"[\"printToLog\"]" + "functions":"[\"printToLog\"]", + "functionArgs":"{\"printToLog\":[\"evt\"]}" }, "aiTwo": { "id":"aiTwo", diff --git a/testing/test_components-manager.coffee b/testing/test_components-manager.coffee index 5ec4f7f..3769888 100644 --- a/testing/test_components-manager.coffee +++ b/testing/test_components-manager.coffee @@ -28,6 +28,7 @@ oRuleTwo = objects.rules.ruleTwo oRuleThree = objects.rules.ruleThree oEpOne = objects.eps.epOne oEpTwo = objects.eps.epTwo +oAiTwo = objects.ais.aiTwo exports.tearDown = ( cb ) -> db.deleteRule oRuleOne.id @@ -108,6 +109,7 @@ exports.moduleHandling = tearDown: ( cb ) -> db.eventPollers.deleteModule oEpOne.id db.eventPollers.deleteModule oEpTwo.id + db.actionInvokers.deleteModule oAiTwo.id setTimeout cb, 100 testGetModules: ( test ) -> @@ -148,14 +150,18 @@ exports.moduleHandling = test.expect 2 oTmp = {} - oTmp[key] = val for key, val of oEpTwo when key isnt 'functions' + for key, val of oAiTwo + oTmp[key] = val if key isnt 'functions' and key isnt 'functionParameters' + request = - command: 'forge_event_poller' + command: 'forge_action_invoker' payload: JSON.stringify oTmp cm.processRequest oUser, request, ( answ ) => test.strictEqual 200, answ.code, 'Forging Module did not return 200' - db.eventPollers.getModule oEpTwo.id, ( err, obj ) -> - test.deepEqual obj, oEpTwo, 'Forged Module is not what we expected' + db.actionInvokers.getModule oAiTwo.id, ( err, obj ) -> + console.log obj + console.log oAiTwo + test.deepEqual obj, oAiTwo, 'Forged Module is not what we expected' test.done() diff --git a/webpages/handlers/coffee/forge_rule.coffee b/webpages/handlers/coffee/forge_rule.coffee index 19b3e6b..d3a555e 100644 --- a/webpages/handlers/coffee/forge_rule.coffee +++ b/webpages/handlers/coffee/forge_rule.coffee @@ -134,7 +134,7 @@ fOnLoad = () -> for functionArgument in oParams[ arrName[ 1 ] ] tr = $( '