Events can now be pushed into the engine via webpage.

The events are now queued through the DB.
This commit is contained in:
Dominic Bosch 2013-11-28 19:14:05 +01:00
parent 32f5553471
commit 1342d0b2de
12 changed files with 162 additions and 41 deletions

View file

@ -3,11 +3,12 @@ README: webapi-eca
# TODO Remake
>A Modular ECA Engine Server which acts as a middleware between WebAPI's.
>This folder continues examples of an ECA engine and how certain use cases could be implemented together with a rules language.
>
>
>The server is started through the [server.js](server.html) module by calling `node rule_server.js`.
> A Modular ECA Engine Server which acts as a middleware between WebAPI's.
> This folder continues examples of an ECA engine and how certain use cases
> could be implemented together with a rules language.
>
> The server is started through the [server.js](server.html) module by calling
> `node rule_server.js`.
Getting started
@ -16,7 +17,8 @@ Getting started
**Prerequisites:**
- node.js (find it [here](http://nodejs.org/))
- *(optional) [CoffeeScript](http://coffeescript.org/), if you want to compile from coffee sources:*
- *(optional) [CoffeeScript](http://coffeescript.org/), if you want to develop
and compile from coffee sources:*
sudo npm -g install coffee-script

View file

@ -73,6 +73,27 @@ exports.isConnected = ( cb ) =>
setTimeout fCheckConnection, 500
###
Push an event into the event queue.
@public pushEvent( *event* )
@param {Object} event
###
exports.pushEvent = ( event ) =>
log.print 'DB', 'Event pushed into the queue: ' + event.eventid
@db.rpush 'event_queue', JSON.stringify(event)
###
Pop an event from the event queue and pass it to the callback(err, obj) function.
@public popEvent( *cb* )
@param {function} cb
###
exports.popEvent = ( cb )=>
@db.lpop 'event_queue', cb
###
Hashes a string based on SHA-3-512.

View file

@ -52,8 +52,8 @@ exports = module.exports = ( args ) ->
sess_sec = config.getSessionSecret() || sess_sec
module.exports
exports.addHandlers = ( fEvtHandler, fShutDown ) ->
requestHandler.addHandlers fEvtHandler, fShutDown
exports.addHandlers = ( fShutDown ) ->
requestHandler.addHandlers fShutDown
# Add cookie support for session handling.
app.use express.cookieParser()
app.use express.session { secret: sess_sec }

View file

@ -53,16 +53,16 @@ This allows the parent to add handlers. The event handler will receive
the events that were received. The shutdown function will be called if the
admin command shutdown is issued.
@public addHandlers( *fEvtHandler, fShutdown* )
@param {function} fEvtHandler
@public addHandlers( *fShutdown* )
@param {function} fShutdown
###
exports.addHandlers = ( fEvtHandler, fShutdown ) =>
@eventHanlder = fEvtHandler
exports.addHandlers = ( fShutdown ) =>
objAdminCmds.shutdown = fShutdown
###
Handles possible events that were posted to this server and pushes them into the
event queue.
*Requires
the [request](http://nodejs.org/api/http.html#http_class_http_clientrequest)
@ -80,7 +80,7 @@ exports.handleEvent = ( req, resp ) =>
# If required event properties are present we process the event #
if obj and obj.event and obj.eventid
resp.send 'Thank you for the event (' + obj.event + '[' + obj.eventid + '])!'
@eventHandler obj
db.pushEvent obj
else
resp.writeHead 400, { "Content-Type": "text/plain" }
resp.send 'Your event was missing important parameters!'
@ -132,14 +132,26 @@ exports.handleLogout = ( req, resp ) ->
resp.send 'Bye!'
getHandlerPath = (name) ->
###
Resolves the path to a handler webpage.
@private getHandlerPath( *name* )
@param {String} name
###
getHandlerPath = ( name ) ->
path.resolve __dirname, '..', 'webpages', 'handlers', name + '.html'
getHandlerFileAsString = (name) ->
fs.readFileSync getHandlerPath( name ), 'utf8'
###
Resolves the path to a handler webpage and returns it as a string.
@private getHandlerFileAsString( *name* )
@param {String} name
###
getHandlerFileAsString = ( name ) ->
fs.readFileSync getHandlerPath( name ), 'utf8'
###
*Requires
the [request](http://nodejs.org/api/http.html#http_class_http_clientrequest)
and [response](http://nodejs.org/api/http.html#http_class_http_serverresponse)

View file

@ -99,7 +99,7 @@ init = ->
engine.addDBLinkAndLoadActionsAndRules db
log.print 'RS', 'Passing handlers to http listener'
#TODO engine pushEvent needs to go into redis queue
http_listener.addHandlers db, engine.pushEvent, shutDown
http_listener.addHandlers shutDown
#log.print 'RS', 'Passing handlers to module manager'
#TODO loadAction and addRule will be removed
#mm.addHandlers db, engine.loadActionModule, engine.addRule

View file

@ -88,6 +88,31 @@ DB Interface
}
};
/*
Push an event into the event queue.
@public pushEvent( *event* )
@param {Object} event
*/
exports.pushEvent = function(event) {
log.print('DB', 'Event pushed into the queue: ' + event.eventid);
return _this.db.rpush('event_queue', JSON.stringify(event));
};
/*
Pop an event from the event queue and pass it to the callback(err, obj) function.
@public popEvent( *cb* )
@param {function} cb
*/
exports.popEvent = function(cb) {
return _this.db.lpop('event_queue', cb);
};
/*
Hashes a string based on SHA-3-512.

View file

@ -18,8 +18,6 @@ exports = module.exports = function(args) {
poller.on('message', function(evt) {
exports.pushEvent(evt);
});
//start to poll the event queue
pollQueue();
return module.exports;
};
@ -31,6 +29,8 @@ exports = module.exports = function(args) {
*/
exports.addDBLinkAndLoadActionsAndRules = function(db_link) {
db = db_link;
if(!db) log.error('EN', 'No DB!');
console.log(db);
if(ml && db) db.getActionModules(function(err, obj) {
if(err) log.error('EN', 'retrieving Action Modules from DB!');
else {
@ -68,6 +68,8 @@ function loadRulesFromDB() {
}
});
//start to poll the event queue
pollQueue();
}
/**
@ -101,22 +103,19 @@ exports.addRule = function(objRule) {
function pollQueue() {
if(isRunning) {
var evt = qEvents.dequeue();
if(evt) {
processEvent(evt);
}
setTimeout(pollQueue, 50); //TODO adapt to load
db.popEvent(function (err, text) {
if(!err && text) {
processEvent(JSON.parse(text));
}
setTimeout(pollQueue, 50); //TODO adapt to load
});
// var evt = qEvents.dequeue();
// if(evt) {
// processEvent(evt);
// }
}
}
/**
* Stores correctly posted events in the queue
* @param {Object} evt The event object
*/
exports.pushEvent = function(evt) {
qEvents.enqueue(evt);
};
/**
* Handles correctly posted events
* @param {Object} evt The event object

View file

@ -48,9 +48,9 @@ HTTP Listener
return module.exports;
};
exports.addHandlers = function(fEvtHandler, fShutDown) {
exports.addHandlers = function(fShutDown) {
var e, http_port;
requestHandler.addHandlers(fEvtHandler, fShutDown);
requestHandler.addHandlers(fShutDown);
app.use(express.cookieParser());
app.use(express.session({
secret: sess_sec

View file

@ -55,18 +55,18 @@ Request Handler
the events that were received. The shutdown function will be called if the
admin command shutdown is issued.
@public addHandlers( *fEvtHandler, fShutdown* )
@param {function} fEvtHandler
@public addHandlers( *fShutdown* )
@param {function} fShutdown
*/
exports.addHandlers = function(fEvtHandler, fShutdown) {
_this.eventHanlder = fEvtHandler;
exports.addHandlers = function(fShutdown) {
return objAdminCmds.shutdown = fShutdown;
};
/*
Handles possible events that were posted to this server and pushes them into the
event queue.
*Requires
the [request](http://nodejs.org/api/http.html#http_class_http_clientrequest)
@ -88,7 +88,7 @@ Request Handler
obj = qs.parse(body);
if (obj && obj.event && obj.eventid) {
resp.send('Thank you for the event (' + obj.event + '[' + obj.eventid + '])!');
return _this.eventHandler(obj);
return db.pushEvent(obj);
} else {
resp.writeHead(400, {
"Content-Type": "text/plain"
@ -157,16 +157,31 @@ Request Handler
}
};
/*
Resolves the path to a handler webpage.
@private getHandlerPath( *name* )
@param {String} name
*/
getHandlerPath = function(name) {
return path.resolve(__dirname, '..', 'webpages', 'handlers', name + '.html');
};
/*
Resolves the path to a handler webpage and returns it as a string.
@private getHandlerFileAsString( *name* )
@param {String} name
*/
getHandlerFileAsString = function(name) {
return fs.readFileSync(getHandlerPath(name), 'utf8');
};
/*
*Requires
the [request](http://nodejs.org/api/http.html#http_class_http_clientrequest)
and [response](http://nodejs.org/api/http.html#http_class_http_serverresponse)

View file

@ -104,7 +104,7 @@ Rules Server
log.print('RS', 'Passing handlers to engine');
engine.addDBLinkAndLoadActionsAndRules(db);
log.print('RS', 'Passing handlers to http listener');
return http_listener.addHandlers(db, engine.pushEvent, shutDown);
return http_listener.addHandlers(shutDown);
}
});
};

View file

@ -0,0 +1,41 @@
<!DOCTYPE HTML>
<html>
<head>
<title>Invoke an event</title>
<link rel="stylesheet" type="text/css" href="style.css">
<script src='//ajax.googleapis.com/ajax/libs/jquery/1/jquery.min.js' type='text/javascript'></script>
</head>
<body>
<div id="mainbody">
<div id="pagetitle">Invoke an Event</div>
<p>
<textarea id="textarea_event" rows="20" cols="50">
{
"event": "mail",
"eventid": "mail_0",
"payload": {
"subject": "hello"
}
}
</textarea>
</p>
<p>
<button id="but_submit">invoke</button>
</p>
</div>
<script>
//{ \"event\": \"mail\", \"eventid\": \"0\",... }"
$('#but_submit').click(function() {
$.post('../event', JSON.parse($('#textarea_event').val()))
.done(function(data) {
alert(data);
})
.fail(function(err) {
console.log(err);
alert('Posting of event failed: ' + err);
});
});
</script>
</body>
</html>

View file

@ -5,6 +5,12 @@ body {
margin: 0px;
}
textarea {
-moz-tab-size: 2;
-o-tab-size:2;
tab-size:2;
}
#menubar {
font-size: 0.75em;
width: 100%;