aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-11-16 18:46:56 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2015-11-16 18:46:56 +0100
commitdda9ef2ae7971bceaa792e328c8489cb0695b77e (patch)
treef4ce29c170dbd4af136418949a857eb9e89154ca /src
parentf17922bcafe1f2f369c9be0f236570771c8ab214 (diff)
downloaddiller-server-dda9ef2ae7971bceaa792e328c8489cb0695b77e.tar.gz
diller-server-dda9ef2ae7971bceaa792e328c8489cb0695b77e.tar.bz2
diller-server-dda9ef2ae7971bceaa792e328c8489cb0695b77e.tar.xz
diller-server-dda9ef2ae7971bceaa792e328c8489cb0695b77e.zip
core:
o Adding device_status table containing the latest device status. mqtt: o Listening on $SYS/broker/log, parse out messages about clients connecting and disconnecting and store in database.
Diffstat (limited to 'src')
-rw-r--r--src/Diller.js92
-rw-r--r--src/DillerDao.js86
-rw-r--r--src/mqtt/DillerMqttClient.js5
-rw-r--r--src/web/DillerWeb.js2
4 files changed, 171 insertions, 14 deletions
diff --git a/src/Diller.js b/src/Diller.js
index 230af7a..e4c6b9f 100644
--- a/src/Diller.js
+++ b/src/Diller.js
@@ -86,11 +86,84 @@ function Diller(config, pg, dao) {
parts[3] = 'property';
}
- if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'property') {
- log.warn('no match: ', topic, parts);
- return;
+ try {
+ if ((parts.length == 4 || parts.length == 5) && parts[0] == '$SYS' && parts[1] == 'broker' && parts[2] == 'log') {
+ onLogMessage(timestamp, parts, topic, message);
+ } else if (parts.length == 6 && parts[1] == 'diller' && parts[3] == 'property') {
+ onDillerMessage(timestamp, parts, topic, message);
+ } else {
+ log.warn('no match', {topic: topic, parts: parts, 'l': parts.length});
+ }
+ } catch (e) {
+ log.warn('Exception while processing message', {topic: topic, message: message, exception: e, payload: payload});
+ }
+ }
+
+ function onLogMessage(timestamp, parts, topic, message) {
+ message = message.toString();
+ //log.info('broker message: ' + parts[3] + '/' + parts[4] + ': ' + message);
+
+ var p;
+ var msg = message.match(/^([0-9]+): New client connected from ([^ ]+) as ([^ ]+) \(c([0-9]), k([0-9]+)\)/);
+ if (msg) {
+ p = newClientConnected(msg);
}
+ msg = message.match(/^([0-9]+): Client ([^ ]+) disconnected/);
+ if (msg) {
+ p = clientDisconnected(msg);
+ }
+
+ msg = message.match(/^([0-9]+): Socket error on client ([^ ]+), disconnecting./);
+ if (msg) {
+ p = clientDisconnected(msg);
+ }
+
+ p && p.then(function (res) {
+ if (res && res.id) {
+ log.info('Status updated');
+ }
+ }, function (e) {
+ log.info('Status update failed', {exception: e});
+ });
+ }
+
+ function newClientConnected(msg) {
+ var ts = new Date(parseInt(msg[1]) * 1000);
+ var host = msg[2];
+ var key = msg[3].replace(/^esp8266-/, '');
+
+ var c = parseInt(msg[4]);
+ var k = parseInt(msg[5]);
+
+ return dao.deviceByKey(key).then(function (device) {
+ log.debug('New client connected', {key: key, device: device && device.id, host: host, c: c, k: k});
+
+ if (device && device.id) {
+ return dao.updateDeviceStatus(device.id, ts, true, host)
+ .then(_.constant(device));
+ } else {
+ return Promise.resolve('Unknown device: ' + key);
+ }
+ });
+ }
+
+ function clientDisconnected(msg) {
+ log.debug('clientDisconnected', {msg: msg});
+ var ts = new Date(parseInt(msg[1]) * 1000);
+ var key = msg[2].replace(/^esp8266-/, '');
+
+ return dao.deviceByKey(key).then(function (device) {
+ if (device && device.id) {
+ return dao.updateDeviceStatus(device.id, ts, false)
+ .then(_.constant(device));
+ } else {
+ return Promise.resolve('Unknown device: ' + key);
+ }
+ });
+ }
+
+ function onDillerMessage(timestamp, parts, topic, message) {
var device_key = parts[2];
var property_key = parts[4];
var msg_type = parts[5];
@@ -111,12 +184,17 @@ function Diller(config, pg, dao) {
return dao.deviceByKey(device_key)
.then(function (device) {
- return device || dao.insertDevice(device_key).then(function (device) {
- log.info('New device created', {device_key: device_key, id: device.id});
- return device;
- });
+ return device || dao.insertDevice(device_key)
+ .then(function (device) {
+ return dao.insertDeviceStatus(device.id, true, timestamp, null).then(_.constant(device));
+ })
+ .then(function (device) {
+ log.info('New device created', {device_key: device_key, id: device.id});
+ return device;
+ });
})
.then(function (device) {
+ log.info('devicePropertyByDeviceIdAndKey', {device: device});
return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) {
var ret = {device: device, property: property};
return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) {
diff --git a/src/DillerDao.js b/src/DillerDao.js
index 34f4205..cf4c75b 100644
--- a/src/DillerDao.js
+++ b/src/DillerDao.js
@@ -6,9 +6,70 @@ var _ = require('lodash');
*/
function DillerDao(tx) {
- var deviceColumns = 'id, created_timestamp, key, name, description';
+ var deviceColumns = ['id', 'created_timestamp', 'key', 'name', 'description'];
+ var deviceStatusColumns = ['device', 'online', 'timestamp', 'host'];
var propertyColumns = 'id, created_timestamp, device, key, name, description, last_value, last_timestamp';
+ function cols(columns, select_prefix, as_prefix) {
+ var s;
+ if (select_prefix) {
+ as_prefix = as_prefix || select_prefix;
+ for (var i = 0; i < columns.length; i++) {
+ var c = columns[i];
+ if (!s) {
+ s = '';
+ } else {
+ s += ', ';
+ }
+
+ s += select_prefix + '.' + c + ' AS ' + as_prefix + '_' + c;
+ }
+ return s;
+ }
+
+ return columns.join(', ');
+ }
+
+ function unjoin(main) {
+ var children = Array.prototype.slice.call(arguments).slice(1);
+
+ function convertRow(row) {
+ if (!row) {
+ return row;
+ }
+ var r = {};
+ for (var field in row) {
+ var f = '' + field;
+ if (f.startsWith(main + '_')) {
+ r[f.substring(main.length + 1)] = row[f];
+ } else {
+ for (var i = 0; i < children.length; i++) {
+ var child = children[i];
+ var prefix = child + '_';
+
+ if (f.startsWith(prefix)) {
+ var o = r[child];
+ if (!o) {
+ o = r[child] = {};
+ }
+
+ o[f.substr(prefix.length)] = row[f];
+ }
+ }
+ }
+ }
+ return r;
+ }
+
+ return function (res) {
+ if (Array.isArray(res)) {
+ return res.map(convertRow);
+ } else {
+ return convertRow(res);
+ }
+ }
+ }
+
// -------------------------------------------------------------------------------------------------------------------
// Device
// -------------------------------------------------------------------------------------------------------------------
@@ -17,19 +78,26 @@ function DillerDao(tx) {
* @returns {Promise}
*/
function devices() {
- return tx.manyOrNone("SELECT " + deviceColumns + " FROM device");
+ var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device)";
+ return tx.manyOrNone(sql).then(unjoin('d', 'status'));
}
function deviceById(id) {
- return tx.one("SELECT " + deviceColumns + " FROM device WHERE id=$1", id);
+ var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device) WHERE d.id=$1";
+ return tx.one(sql, [id]).then(unjoin('d', 'status'));
}
function deviceByKey(key) {
- return tx.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key);
+ var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device) WHERE d.key=$1";
+ return tx.oneOrNone(sql, [key]).then(unjoin('d', 'status'));
}
function insertDevice(key) {
- return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key);
+ return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + cols(deviceColumns), key);
+ }
+
+ function insertDeviceStatus(device, online, timestamp, host) {
+ return tx.none("INSERT INTO device_status(device, online, timestamp, host) VALUES($1, $2, $3, $4)", [device, online, timestamp, host]);
}
function updateDevice(id, attributes) {
@@ -55,6 +123,11 @@ function DillerDao(tx) {
return tx.none(sql, values);
}
+ function updateDeviceStatus(device, timestamp, online, host) {
+ var sql = 'UPDATE device_status SET timestamp=$2, online=$3, host=$4 WHERE device=$1';
+ return tx.none(sql, [device, timestamp, online, host]);
+ }
+
// -------------------------------------------------------------------------------------------------------------------
// Device Property
// -------------------------------------------------------------------------------------------------------------------
@@ -151,6 +224,9 @@ function DillerDao(tx) {
insertDevice: insertDevice,
updateDevice: updateDevice,
+ insertDeviceStatus: insertDeviceStatus,
+ updateDeviceStatus: updateDeviceStatus,
+
devicePropertyById: devicePropertyById,
devicePropertyByDeviceIdAndKey: devicePropertyByDeviceIdAndKey,
devicePropertiesByDeviceId: devicePropertiesByDeviceId,
diff --git a/src/mqtt/DillerMqttClient.js b/src/mqtt/DillerMqttClient.js
index 62f5aef..b9cf01b 100644
--- a/src/mqtt/DillerMqttClient.js
+++ b/src/mqtt/DillerMqttClient.js
@@ -12,7 +12,7 @@ function DillerMqttClient(config, tx) {
var mqttClient;
var hostname = os.hostname();
- function run(clientType) {
+ function run(clientType, subscribeToLog) {
if (mqttClient) {
throw 'Already connected';
}
@@ -36,6 +36,9 @@ function DillerMqttClient(config, tx) {
mqttClient.on('connect', function () {
log.info('Connected');
mqttClient.subscribe('/diller/#');
+ if (subscribeToLog) {
+ mqttClient.subscribe('$SYS/broker/log/#');
+ }
});
}
diff --git a/src/web/DillerWeb.js b/src/web/DillerWeb.js
index 4d032ed..396c2f8 100644
--- a/src/web/DillerWeb.js
+++ b/src/web/DillerWeb.js
@@ -15,7 +15,7 @@ function DillerWeb(config, mqttClient, tx) {
var calls = [];
var app;
- mqttClient.run('web');
+ mqttClient.run('web', false);
/**
* @param {HttpRes} res