fast commit to check event retrieval through webhooj

This commit is contained in:
Dominic Bosch 2014-04-15 14:25:26 +02:00
parent a2e9e970c8
commit 6c5b9d0885
13 changed files with 207 additions and 139 deletions

View file

@ -165,6 +165,7 @@ forgeModule = ( user, oPayload, dbMod, callback ) =>
if answ.code isnt 200
callback answ
else
i = 0
dbMod.getModule oPayload.id, ( err, mod ) =>
if mod
answ.code = 409
@ -296,28 +297,24 @@ commandFunctions =
code: 409
message: 'Rule name already existing!'
else
console.log 'new ruke'
rule =
id: oPayload.id
event: oPayload.event
conditions: oPayload.conditions
actions: oPayload.actions
strRule = JSON.stringify rule
console.log 'stringified'
db.storeRule rule.id, strRule
console.log 'stored'
db.linkRule rule.id, user.username
console.log 'linked'
db.activateRule rule.id, user.username
console.log 'activated'
if oPayload.event_params
epModId = rule.event.split( ' -> ' )[0]
db.eventPollers.storeUserParams epModId, user.username, oPayload.event_params
console.log 'event params loaded'
arrParams = oPayload.action_params
console.log 'arractionparams'
db.actionInvokers.storeUserParams id, user.username, JSON.stringify params for id, params of arrParams
console.log 'action aprams stored'
oParams = oPayload.action_params
db.actionInvokers.storeUserParams id, user.username, JSON.stringify params for id, params of oParams
oParams = oPayload.action_functions
for id, params of oParams
arr = id.split ' -> '
db.actionInvokers.storeUserArguments arr[ 0 ], arr[ 1 ], user.username, JSON.stringify params
db.resetLog user.username, rule.id
db.appendLog user.username, rule.id, "INIT", "Rule '#{ rule.id }' initialized"
eventEmitter.emit 'rule',
@ -327,7 +324,6 @@ commandFunctions =
answ =
code: 200
message: "Rule '#{ rule.id }' stored and activated!"
console.log 'done'
callback answ
delete_rule: ( user, oPayload, callback ) ->

View file

@ -49,33 +49,6 @@ exports = module.exports = ( args ) =>
exports.getPublicKey = () =>
@strPublicKey
# We need to wrap the callbacks in try/catch so the token does not get killed and
# other modules are not called. This will be obsolete as soon as each module
# runs in a child process
# FIXME this seems not to achieve what we expected... token gets still lost
# -> implement child processes per module. (or better per user?)
issueNeedleCall = ( logger ) ->
( method, url, data, options, cb ) ->
try
needle.request method, url, data, options, ( err, resp, body ) =>
try
cb err, resp, body
catch err
logger 'Error during needle request! ' + err.message
catch err
logger 'Error before needle request! ' + err.message
issueRequest = ( logger ) ->
( options, cb ) ->
try
request options, ( err, resp, body ) =>
try
cb err, resp, body
catch err
logger 'Error during request! ' + err.message
catch err
logger 'Error before request! ' + err.message
logFunction = ( uId, rId, mId ) ->
( msg ) ->
db.appendLog uId, rId, mId, msg
@ -114,7 +87,6 @@ exports.compileString = ( src, userId, ruleId, modId, lang, dbMod, cb ) =>
err.location.first_line
fTryToLoad = ( params ) =>
if params
try
oDecrypted = cryptico.decrypt params, @oPrivateRSAkey
@ -126,12 +98,13 @@ exports.compileString = ( src, userId, ruleId, modId, lang, dbMod, cb ) =>
else
params = {}
logFunc = logFunction userId, ruleId, modId
sandbox =
id: userId + '.' + modId + '.vm'
params: params
needlereq: issueNeedleCall logFunc
request: issueRequest logFunc
needle: needle
request: request
cryptoJS: cryptoJS
log: logFunc
debug: console.log
@ -154,14 +127,33 @@ exports.compileString = ( src, userId, ruleId, modId, lang, dbMod, cb ) =>
oFuncParams = {}
for fName, func of sandbox.exports
getFunctionParamNames fName, func, oFuncParams
if dbMod
oFuncArgs = {}
console.log 'oFuncParams'
console.log oFuncParams
for func of oFuncParams
console.log 'fetching ' + func
console.log typeof func
dbMod.getUserArguments modId, func, userId, ( err, obj ) =>
console.log err, obj
try
oDecrypted = cryptico.decrypt obj, @oPrivateRSAkey
oFuncArgs[ func ] = JSON.parse oDecrypted.plaintext
catch err
@log.warn "DM | Error during parsing of user defined params for #{ userId }, #{ ruleId }, #{ modId }"
@log.warn err
cb
answ: answ
module: sandbox.exports
funcParams: oFuncParams
funcArgs: oFuncArgs
logger: sandbox.log
if dbMod
dbMod.getUserParams modId, userId, ( err, obj ) ->
fTryToLoad obj
else
fTryToLoad()
fTryToLoad null

View file

@ -39,6 +39,11 @@ An object of users with their active rules and the required action modules
"actions":
"action-3": oAction-3
###
#TODO how often do we allow rules to be processed?
#it would make sense to implement a scheduler approach, which means to store the
#events in the DB and query for them if a rule is evaluated. Through this we would allow
#a CEP approach, rather than just ECA and could combine events/evaluate time constraints
listUserRules = {}
isRunning = false

View file

@ -284,6 +284,7 @@ class IndexedModules
@db.smembers "#{ @setname }:#{ mId }:users", ( err, obj ) =>
@unlinkModule mId, userId for userId in obj
@deleteUserParams mId, userId for userId in obj
@deleteUserArguments mId, userId for userId in obj
###
Stores user params for a module. They are expected to be RSA encrypted with helps of
@ -316,6 +317,34 @@ class IndexedModules
@db.del "#{ @setname }-params:#{ mId }:#{ userId }",
replyHandler "del '#{ @setname }-params:#{ mId }:#{ userId }'"
###
Stores user arguments for a function within a module. They are expected to be RSA encrypted with helps of
the provided cryptico JS library and will only be decrypted right before the module is loaded!
@private storeUserArguments( *mId, userId, encData* )
@param {String} mId
@param {String} userId
@param {object} encData
###
storeUserArguments: ( mId, funcId, userId, encData ) =>
@log.info "DB | (IdxedMods) #{ @setname }.storeUserArguments( #{ mId }, #{ funcId }, #{ userId }, encData )"
@db.sadd "#{ @setname }:#{ mId }:#{ userId }:functions", funcId,
replyHandler "sadd '#{ funcId }' to '#{ @setname }:#{ mId }:#{ userId }:functions'"
@db.set "#{ @setname }:#{ mId }:#{ userId }:function:#{ funcId }", encData,
replyHandler "set user params in '#{ @setname }:#{ mId }:#{ userId }:function:#{ func }'"
getUserArguments: ( mId, funcId, userId, cb ) =>
console.log 'calling ffunct'
@log.info "DB | (IdxedMods) #{ @setname }.getUserArguments( #{ mId }, #{ funcId }, #{ userId } )"
@db.get "#{ @setname }:#{ mId }:#{ userId }:function:#{ funcId }", cb
deleteUserArguments: ( mId, userId ) =>
@log.info "DB | (IdxedMods) #{ @setname }.deleteUserArguments(#{ mId }, #{ userId } )"
@db.smembers "#{ @setname }:#{ mId }:#{ userId }:functions", ( err, obj ) =>
for func in obj
@db.del "#{ @setname }:#{ mId }:#{ userId }:function:#{ func }",
replyHandler "del '#{ @setname }:#{ mId }:#{ userId }:function:#{ func }'"
###
## Rules

View file

@ -68,26 +68,29 @@ exports.handleEvent = ( req, resp ) ->
body = ''
req.on 'data', ( data ) ->
body += data
req.on 'end', ->
if req.session and req.session.user
try
obj = JSON.parse body
catch err
resp.send 400, 'Badly formed event!'
# If required event properties are present we process the event #
if obj and obj.event and not err
timestamp = ( new Date ).toISOString()
rand = ( Math.floor Math.random() * 10e9 ).toString( 16 ).toUpperCase()
obj.eventid = "#{ obj.event }_#{ timestamp }_#{ rand }"
answ =
code: 200
message: "Thank you for the event: #{ obj.eventid }"
resp.send answ.code, answ
db.pushEvent obj
else
resp.send 400, 'Your event was missing important parameters!'
#if req.session and req.session.user
try
obj = JSON.parse body
console.log 'got foreign event!'
console.log obj
catch err
resp.send 400, 'Badly formed event!'
# If required event properties are present we process the event #
if obj and obj.event and not err
timestamp = ( new Date ).toISOString()
rand = ( Math.floor Math.random() * 10e9 ).toString( 16 ).toUpperCase()
obj.eventid = "#{ obj.event }_#{ timestamp }_#{ rand }"
answ =
code: 200
message: "Thank you for the event: #{ obj.eventid }"
resp.send answ.code, answ
db.pushEvent obj
else
resp.send 401, 'Please login!'
resp.send 400, 'Your event was missing important parameters!'
# else
# resp.send 401, 'Please login!'
###

View file

@ -147,6 +147,7 @@ init = =>
else
# > Initialize all required modules with the args object.
@log.info 'RS | Initialzing engine'
#TODO We could in the future make the engine a child process as well
engine args
# Start the event poller. The module manager will emit events for it

View file

@ -223,11 +223,12 @@ Components Manager
forgeModule = (function(_this) {
return function(user, oPayload, dbMod, callback) {
var answ;
var answ, i;
answ = hasRequiredParams(['id', 'params', 'lang', 'data'], oPayload);
if (answ.code !== 200) {
return callback(answ);
} else {
i = 0;
return dbMod.getModule(oPayload.id, function(err, mod) {
var src;
if (mod) {
@ -379,14 +380,13 @@ Components Manager
return callback(answ);
} else {
return db.getRule(oPayload.id, function(err, oExisting) {
var arrParams, epModId, id, params, rule, strRule;
var arr, epModId, id, oParams, params, rule, strRule;
if (oExisting !== null) {
answ = {
code: 409,
message: 'Rule name already existing!'
};
} else {
console.log('new ruke');
rule = {
id: oPayload.id,
event: oPayload.event,
@ -394,25 +394,24 @@ Components Manager
actions: oPayload.actions
};
strRule = JSON.stringify(rule);
console.log('stringified');
db.storeRule(rule.id, strRule);
console.log('stored');
db.linkRule(rule.id, user.username);
console.log('linked');
db.activateRule(rule.id, user.username);
console.log('activated');
if (oPayload.event_params) {
epModId = rule.event.split(' -> ')[0];
db.eventPollers.storeUserParams(epModId, user.username, oPayload.event_params);
}
console.log('event params loaded');
arrParams = oPayload.action_params;
console.log('arractionparams');
for (id in arrParams) {
params = arrParams[id];
oParams = oPayload.action_params;
for (id in oParams) {
params = oParams[id];
db.actionInvokers.storeUserParams(id, user.username, JSON.stringify(params));
}
console.log('action aprams stored');
oParams = oPayload.action_functions;
for (id in oParams) {
params = oParams[id];
arr = id.split(' -> ');
db.actionInvokers.storeUserArguments(arr[0], arr[1], user.username, JSON.stringify(params));
}
db.resetLog(user.username, rule.id);
db.appendLog(user.username, rule.id, "INIT", "Rule '" + rule.id + "' initialized");
eventEmitter.emit('rule', {
@ -424,7 +423,6 @@ Components Manager
code: 200,
message: "Rule '" + rule.id + "' stored and activated!"
};
console.log('done');
}
return callback(answ);
});

View file

@ -9,7 +9,7 @@ Dynamic Modules
*/
(function() {
var cryptico, cryptoJS, cs, db, exports, getFunctionParamNames, issueNeedleCall, issueRequest, logFunction, needle, regexpComments, request, vm;
var cryptico, cryptoJS, cs, db, exports, getFunctionParamNames, logFunction, needle, regexpComments, request, vm;
db = require('./persistence');
@ -56,48 +56,6 @@ Dynamic Modules
};
})(this);
issueNeedleCall = function(logger) {
return function(method, url, data, options, cb) {
var err;
try {
return needle.request(method, url, data, options, (function(_this) {
return function(err, resp, body) {
try {
return cb(err, resp, body);
} catch (_error) {
err = _error;
return logger('Error during needle request! ' + err.message);
}
};
})(this));
} catch (_error) {
err = _error;
return logger('Error before needle request! ' + err.message);
}
};
};
issueRequest = function(logger) {
return function(options, cb) {
var err;
try {
return request(options, (function(_this) {
return function(err, resp, body) {
try {
return cb(err, resp, body);
} catch (_error) {
err = _error;
return logger('Error during request! ' + err.message);
}
};
})(this));
} catch (_error) {
err = _error;
return logger('Error before request! ' + err.message);
}
};
};
logFunction = function(uId, rId, mId) {
return function(msg) {
return db.appendLog(uId, rId, mId, msg);
@ -146,7 +104,7 @@ Dynamic Modules
}
}
fTryToLoad = function(params) {
var fName, func, logFunc, msg, oDecrypted, oFuncParams, sandbox, _ref;
var fName, func, logFunc, msg, oDecrypted, oFuncArgs, oFuncParams, sandbox, _ref;
if (params) {
try {
oDecrypted = cryptico.decrypt(params, _this.oPrivateRSAkey);
@ -164,8 +122,8 @@ Dynamic Modules
sandbox = {
id: userId + '.' + modId + '.vm',
params: params,
needlereq: issueNeedleCall(logFunc),
request: issueRequest(logFunc),
needle: needle,
request: request,
cryptoJS: cryptoJS,
log: logFunc,
debug: console.log,
@ -188,10 +146,31 @@ Dynamic Modules
func = _ref[fName];
getFunctionParamNames(fName, func, oFuncParams);
}
if (dbMod) {
oFuncArgs = {};
console.log('oFuncParams');
console.log(oFuncParams);
for (func in oFuncParams) {
console.log('fetching ' + func);
console.log(typeof func);
dbMod.getUserArguments(modId, func, userId, function(err, obj) {
console.log(err, obj);
try {
oDecrypted = cryptico.decrypt(obj, _this.oPrivateRSAkey);
return oFuncArgs[func] = JSON.parse(oDecrypted.plaintext);
} catch (_error) {
err = _error;
_this.log.warn("DM | Error during parsing of user defined params for " + userId + ", " + ruleId + ", " + modId);
return _this.log.warn(err);
}
});
}
}
return cb({
answ: answ,
module: sandbox.exports,
funcParams: oFuncParams,
funcArgs: oFuncArgs,
logger: sandbox.log
});
};
@ -200,7 +179,7 @@ Dynamic Modules
return fTryToLoad(obj);
});
} else {
return fTryToLoad();
return fTryToLoad(null);
}
};
})(this);

View file

@ -253,6 +253,9 @@ Persistence
function IndexedModules(setname, log) {
this.setname = setname;
this.log = log;
this.deleteUserArguments = __bind(this.deleteUserArguments, this);
this.getUserArguments = __bind(this.getUserArguments, this);
this.storeUserArguments = __bind(this.storeUserArguments, this);
this.deleteUserParams = __bind(this.deleteUserParams, this);
this.getUserParamsIds = __bind(this.getUserParamsIds, this);
this.getUserParams = __bind(this.getUserParams, this);
@ -352,15 +355,19 @@ Persistence
this.unpublish(mId);
return this.db.smembers("" + this.setname + ":" + mId + ":users", (function(_this) {
return function(err, obj) {
var userId, _i, _j, _len, _len1, _results;
var userId, _i, _j, _k, _len, _len1, _len2, _results;
for (_i = 0, _len = obj.length; _i < _len; _i++) {
userId = obj[_i];
_this.unlinkModule(mId, userId);
}
_results = [];
for (_j = 0, _len1 = obj.length; _j < _len1; _j++) {
userId = obj[_j];
_results.push(_this.deleteUserParams(mId, userId));
_this.deleteUserParams(mId, userId);
}
_results = [];
for (_k = 0, _len2 = obj.length; _k < _len2; _k++) {
userId = obj[_k];
_results.push(_this.deleteUserArguments(mId, userId));
}
return _results;
};
@ -400,6 +407,44 @@ Persistence
return this.db.del("" + this.setname + "-params:" + mId + ":" + userId, replyHandler("del '" + this.setname + "-params:" + mId + ":" + userId + "'"));
};
/*
Stores user arguments for a function within a module. They are expected to be RSA encrypted with helps of
the provided cryptico JS library and will only be decrypted right before the module is loaded!
@private storeUserArguments( *mId, userId, encData* )
@param {String} mId
@param {String} userId
@param {object} encData
*/
IndexedModules.prototype.storeUserArguments = function(mId, funcId, userId, encData) {
this.log.info("DB | (IdxedMods) " + this.setname + ".storeUserArguments( " + mId + ", " + funcId + ", " + userId + ", encData )");
this.db.sadd("" + this.setname + ":" + mId + ":" + userId + ":functions", funcId, replyHandler("sadd '" + funcId + "' to '" + this.setname + ":" + mId + ":" + userId + ":functions'"));
return this.db.set("" + this.setname + ":" + mId + ":" + userId + ":function:" + funcId, encData, replyHandler("set user params in '" + this.setname + ":" + mId + ":" + userId + ":function:" + func + "'"));
};
IndexedModules.prototype.getUserArguments = function(mId, funcId, userId, cb) {
console.log('calling ffunct');
this.log.info("DB | (IdxedMods) " + this.setname + ".getUserArguments( " + mId + ", " + funcId + ", " + userId + " )");
return this.db.get("" + this.setname + ":" + mId + ":" + userId + ":function:" + funcId, cb);
};
IndexedModules.prototype.deleteUserArguments = function(mId, userId) {
this.log.info("DB | (IdxedMods) " + this.setname + ".deleteUserArguments(" + mId + ", " + userId + " )");
return this.db.smembers("" + this.setname + ":" + mId + ":" + userId + ":functions", (function(_this) {
return function(err, obj) {
var func, _i, _len, _results;
_results = [];
for (_i = 0, _len = obj.length; _i < _len; _i++) {
func = obj[_i];
_results.push(_this.db.del("" + _this.setname + ":" + mId + ":" + userId + ":function:" + func, replyHandler("del '" + _this.setname + ":" + mId + ":" + userId + ":function:" + func + "'")));
}
return _results;
};
})(this));
};
return IndexedModules;
})();

View file

@ -6,7 +6,8 @@
"data":"\n#\n# EmailYak EVENT POLLER\n#\n# Requires user params:\n# - apikey: The user's EmailYak API key\n#\n\nurl = 'https://api.emailyak.com/v1/' + params.apikey + '/json/get/new/email/'\n\nexports.newMail = ( pushEvent ) ->\n needle.get url, ( err, resp, body ) ->\n if not err and resp.statusCode is 200\n mails = JSON.parse( body ).Emails\n pushEvent mail for mail in mails\n else\n log.error 'Error in EmailYak EM newMail: ' + err.message\n\n",
"public":"false",
"params":"[\"apikey\"]",
"functions":"[\"newMail\"]"
"functions":"[\"newMail\"]",
"functionArgs":"{\"newMail\":[\"pushEvent\"]}"
},
"epTwo": {
"id":"epTwo",
@ -14,7 +15,8 @@
"data":"\nurl = 'https://api.emailyak.com/v1/' + params.firstparam + '/json/get/new/email/'\n\nexports.newEvent = ( pushEvent ) ->\n needle.get url, ( err, resp, body ) ->\n if not err and resp.statusCode is 200\n mails = JSON.parse( body ).Emails\n pushEvent mail for mail in mails\n else\n log.error 'Error in EmailYak EM newMail: ' + err.message\n\nexports.randomNess = ( pushEvent ) ->\n console.log 'test runs: ' + params.secondparam\n",
"public":"true",
"params":"[\"firstparam\",\"secondparam\"]",
"functions":"[\"newEvent\",\"randomNess\"]"
"functions":"[\"newEvent\",\"randomNess\"]",
"functionArgs":"{\"newEvent\":[\"pushEvent\"],\"randomNess\":[\"pushEvent\"]}"
}
},
"ais": {
@ -24,7 +26,8 @@
"data":"exports.printToLog = ( evt ) ->\n\tlog evt.property",
"public":"false",
"params":"[\"apikey\"]",
"functions":"[\"printToLog\"]"
"functions":"[\"printToLog\"]",
"functionArgs":"{\"printToLog\":[\"evt\"]}"
},
"aiTwo": {
"id":"aiTwo",

View file

@ -28,6 +28,7 @@ oRuleTwo = objects.rules.ruleTwo
oRuleThree = objects.rules.ruleThree
oEpOne = objects.eps.epOne
oEpTwo = objects.eps.epTwo
oAiTwo = objects.ais.aiTwo
exports.tearDown = ( cb ) ->
db.deleteRule oRuleOne.id
@ -108,6 +109,7 @@ exports.moduleHandling =
tearDown: ( cb ) ->
db.eventPollers.deleteModule oEpOne.id
db.eventPollers.deleteModule oEpTwo.id
db.actionInvokers.deleteModule oAiTwo.id
setTimeout cb, 100
testGetModules: ( test ) ->
@ -148,14 +150,18 @@ exports.moduleHandling =
test.expect 2
oTmp = {}
oTmp[key] = val for key, val of oEpTwo when key isnt 'functions'
for key, val of oAiTwo
oTmp[key] = val if key isnt 'functions' and key isnt 'functionParameters'
request =
command: 'forge_event_poller'
command: 'forge_action_invoker'
payload: JSON.stringify oTmp
cm.processRequest oUser, request, ( answ ) =>
test.strictEqual 200, answ.code, 'Forging Module did not return 200'
db.eventPollers.getModule oEpTwo.id, ( err, obj ) ->
test.deepEqual obj, oEpTwo, 'Forged Module is not what we expected'
db.actionInvokers.getModule oAiTwo.id, ( err, obj ) ->
console.log obj
console.log oAiTwo
test.deepEqual obj, oAiTwo, 'Forged Module is not what we expected'
test.done()

View file

@ -134,7 +134,7 @@ fOnLoad = () ->
for functionArgument in oParams[ arrName[ 1 ] ]
tr = $( '<tr>' ).appendTo table
td = $( '<td>' ).appendTo tr
td.append $( '<div>' ).text functionArgument
td.append $( '<div>' ).attr( 'class', 'funcarg' ).text functionArgument
tr.append td
td = $( '<td>' ).appendTo tr
td.append $( '<input>' ).attr 'type', 'text'
@ -221,10 +221,14 @@ fOnLoad = () ->
acts = []
actParams = {}
$( '#selected_actions' ).each () ->
acts.push $( '.title', this ).text()
actionName = $( '.title', this ).text()
acts.push actionName
$( '.funcMappings tr' ).each () ->
console.log $( 'input[type=text]', this ).val()
console.log $( 'input[type=checkbox]', this ).is( ':checked' )
tmp =
argument: $( 'div.funcarg', this ).val()
value: $( 'input[type=text]', this ).val()
regexp: $( 'input[type=checkbox]', this ).is( ':checked' )
actParams[ actionName ] = cryptico.encrypt JSON.stringify( tmp ), strPublicKey
try
conds = JSON.parse editor.getValue()

View file

@ -194,7 +194,7 @@
functionArgument = _ref[_i];
tr = $('<tr>').appendTo(table);
td = $('<td>').appendTo(tr);
td.append($('<div>').text(functionArgument));
td.append($('<div>').attr('class', 'funcarg').text(functionArgument));
tr.append(td);
td = $('<td>').appendTo(tr);
td.append($('<input>').attr('type', 'text'));
@ -295,10 +295,17 @@
acts = [];
actParams = {};
$('#selected_actions').each(function() {
acts.push($('.title', this).text());
var actionName;
actionName = $('.title', this).text();
acts.push(actionName);
return $('.funcMappings tr').each(function() {
console.log($('input[type=text]', this).val());
return console.log($('input[type=checkbox]', this).is(':checked'));
var tmp;
tmp = {
argument: $('div.funcarg', this).val(),
value: $('input[type=text]', this).val(),
regexp: $('input[type=checkbox]', this).is(':checked')
};
return actParams[actionName] = cryptico.encrypt(JSON.stringify(tmp), strPublicKey);
});
});
try {