webapi-eca/coffee/engine.coffee

287 lines
No EOL
9 KiB
CoffeeScript

###
Engine
==================
> The heart of the WebAPI ECA System. The engine loads action invoker modules
> corresponding to active rules actions and invokes them if an appropriate event
> is retrieved.
###
# **Loads Modules:**
# - [Persistence](persistence.html)
db = require './persistence'
# - [Dynamic Modules](dynamic-modules.html)
dynmod = require './dynamic-modules'
# - External Modules:
# [js-select](https://github.com/harthur/js-select)
jsonQuery = require 'js-select'
###
This is ging to have a structure like:
An object of users with their active rules and the required action modules
"user-1":
"rule-1":
"rule": oRule-1
"actions":
"action-1": oAction-1
"action-2": oAction-2
"rule-2":
"rule": oRule-2
"actions":
"action-1": oAction-1
"user-2":
"rule-3":
"rule": oRule-3
"actions":
"action-3": oAction-3
###
#TODO how often do we allow rules to be processed?
#it would make sense to implement a scheduler approach, which means to store the
#events in the DB and query for them if a rule is evaluated. Through this we would allow
#a CEP approach, rather than just ECA and could combine events/evaluate time constraints
listUserRules = {}
isRunning = false
###
Module call
-----------
Initializes the Engine and starts polling the event queue for new events.
@param {Object} args
###
exports = module.exports = ( args ) =>
if not isRunning
@log = args.logger
db args
dynmod args
setTimeout exports.startEngine, 10 # Very important, this forks a token for the poll task
module.exports
###
This is a helper function for the unit tests so we can verify that action
modules are loaded correctly
@public getListUserRules ()
###
#TODO we should change this to functions returning true or false rather than returning
#the whole list
exports.getListUserRules = () ->
listUserRules
# We need this so we can shut it down after the module unit tests
exports.startEngine = () ->
if not isRunning
isRunning = true
pollQueue()
###
An event associated to rules happened and is captured here. Such events
are basically CRUD on rules.
@public internalEvent ( *evt* )
@param {Object} evt
###
exports.internalEvent = ( evt ) =>
if not listUserRules[evt.user] and evt.event isnt 'del'
listUserRules[evt.user] = {}
oUser = listUserRules[evt.user]
oRule = evt.rule
if evt.event is 'new' or ( evt.event is 'init' and not oUser[oRule.id] )
oUser[oRule.id] =
rule: oRule
actions: {}
updateActionModules oRule.id
if evt.event is 'del' and oUser
delete oUser[evt.ruleId]
# If a user is empty after all the updates above, we remove her from the list
if JSON.stringify( oUser ) is "{}"
delete listUserRules[evt.user]
###
As soon as changes were made to the rule set we need to ensure that the aprropriate action
invoker modules are loaded, updated or deleted.
@private updateActionModules ( *updatedRuleId* )
@param {Object} updatedRuleId
###
updateActionModules = ( updatedRuleId ) =>
# Remove all action invoker modules that are not required anymore
fRemoveNotRequired = ( oUser ) ->
# Check whether the action is still existing in the rule
fRequired = ( actionName ) ->
for action in oUser[updatedRuleId].rule.actions
# Since the event is in the format 'module -> function' we need to split the string
if (action.split ' -> ')[ 0 ] is actionName
return true
false
# Go thorugh all loaded action modules and check whether the action is still required
if oUser[updatedRuleId]
for action of oUser[updatedRuleId].rule.actions
delete oUser[updatedRuleId].actions[action] if not fRequired action
fRemoveNotRequired oUser for name, oUser of listUserRules
# Add action invoker modules that are not yet loaded
fAddRequired = ( userName, oUser ) =>
# Check whether the action is existing in a rule and load if not
fCheckRules = ( oMyRule ) =>
# Load the action invoker module if it was part of the updated rule or if it's new
fAddIfNewOrNotExisting = ( actionName ) =>
moduleName = (actionName.split ' -> ')[ 0 ]
if not oMyRule.actions[moduleName] or oMyRule.rule.id is updatedRuleId
db.actionInvokers.getModule userName, moduleName, ( err, obj ) =>
if obj
# we compile the module and pass:
dynmod.compileString obj.data, # code
userName, # userId
oMyRule.rule, # oRule
moduleName, # moduleId
obj.lang, # script language
"actioninvoker", # module type
db.actionInvokers, # the DB interface
( result ) =>
if result.answ.code is 200
@log.info "EN | Module '#{ moduleName }' successfully loaded for userName
'#{ userName }' in rule '#{ oMyRule.rule.id }'"
else
@log.error "EN | Compilation of code failed! #{ userName },
#{ oMyRule.rule.id }, #{ moduleName }: #{ result.answ.message }"
oMyRule.actions[moduleName] = result
else
@log.warn "EN | #{ moduleName } not found for #{ oMyRule.rule.id }!"
fAddIfNewOrNotExisting action for action in oMyRule.rule.actions
# Go thorugh all rules and check whether the action is still required
fCheckRules oRl for nmRl, oRl of oUser
# load all required modules for all users
fAddRequired userName, oUser for userName, oUser of listUserRules
numExecutingFunctions = 1
pollQueue = () ->
if isRunning
db.popEvent ( err, obj ) ->
if not err and obj
processEvent obj
setTimeout pollQueue, 20 * numExecutingFunctions #FIXME right way to adapt to load?
oOperators =
'<': ( x, y ) -> x < y
'<=': ( x, y ) -> x <= y
'>': ( x, y ) -> x > y
'>=': ( x, y ) -> x >= y
'==': ( x, y ) -> x is y
'instr': ( x, y ) -> x.indexOf( y ) > -1
###
Checks whether all conditions of the rule are met by the event.
@private validConditions ( *evt, rule* )
@param {Object} evt
@param {Object} rule
###
validConditions = ( evt, rule, userId, ruleId ) ->
if rule.conditions.length is 0
return true
for cond in rule.conditions
selectedProperty = jsonQuery( evt, cond.selector ).nodes()
if selectedProperty.length is 0
db.appendLog userId, ruleId, 'Condition', "Node not found in event: #{ cond.selector }"
return false
op = oOperators[ cond.operator ]
if not op
db.appendLog userId, ruleId, 'Condition', "Unknown operator: #{ cond.operator }.
Use one of #{ Object.keys( oOperators ).join ', ' }"
return false
try
if cond.type is 'string'
val = selectedProperty[ 0 ]
else if cond.type is 'value'
val = parseFloat( selectedProperty[ 0 ] ) || 0
if not op val, cond.compare
return false
catch err
db.appendLog userId, ruleId, 'Condition', "Error: Selector '#{ cond.selector }',
Operator #{ cond.operator }, Compare: #{ cond.compare }"
return true
###
Handles retrieved events.
@private processEvent ( *evt* )
@param {Object} evt
###
processEvent = ( evt ) =>
fSearchAndInvokeAction = ( node, arrPath, funcName, evt, depth ) =>
if not node
@log.error "EN | Didn't find property in user rule list: " + arrPath.join( ', ' ) + " at depth " + depth
return
if depth is arrPath.length
try
numExecutingFunctions++
@log.info "EN | #{ funcName } executes..."
arrArgs = []
if node.funcArgs[ funcName ]
for oArg in node.funcArgs[ funcName ]
arrSelectors = oArg.value.match /#\{(.*?)\}/g
argument = oArg.value
if arrSelectors
for sel in arrSelectors
selector = sel.substring 2, sel.length - 1
data = jsonQuery( evt.payload, selector ).nodes()[ 0 ]
argument = argument.replace sel, data
if oArg.value is sel
argument = data # if the user wants to pass an object, we allow him to do so
# if oArg.jsselector
arrArgs.push argument #jsonQuery( evt.payload, oArg.value ).nodes()[ 0 ]
# else
# arrArgs.push oArg.value
else
@log.warn "EN | Weird! arguments not loaded for function '#{ funcName }'!"
node.module[ funcName ].apply null, arrArgs
@log.info "EN | #{ funcName } finished execution"
catch err
@log.info "EN | ERROR IN ACTION INVOKER: " + err.message
node.logger err.message
if numExecutingFunctions-- % 100 is 0
@log.warn "EN | The system is producing too many tokens! Currently: #{ numExecutingFunctions }"
else
fSearchAndInvokeAction node[arrPath[depth]], arrPath, funcName, evt, depth + 1
@log.info 'EN | processing event: ' + evt.event + '(' + evt.eventid + ')'
for userName, oUser of listUserRules
for ruleName, oMyRule of oUser
ruleEvent = oMyRule.rule.event
if oMyRule.rule.timestamp
ruleEvent += '_created:' + oMyRule.rule.timestamp
if evt.event is ruleEvent and validConditions evt, oMyRule.rule, userName, ruleName
@log.info 'EN | EVENT FIRED: ' + evt.event + '(' + evt.eventid + ') for rule ' + ruleName
for action in oMyRule.rule.actions
arr = action.split ' -> '
fSearchAndInvokeAction listUserRules, [ userName, ruleName, 'actions', arr[0]], arr[1], evt, 0
exports.shutDown = () ->
isRunning = false
listUserRules = {}