aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-11-16 18:46:56 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2015-11-16 18:46:56 +0100
commitdda9ef2ae7971bceaa792e328c8489cb0695b77e (patch)
treef4ce29c170dbd4af136418949a857eb9e89154ca
parentf17922bcafe1f2f369c9be0f236570771c8ab214 (diff)
downloaddiller-server-dda9ef2ae7971bceaa792e328c8489cb0695b77e.tar.gz
diller-server-dda9ef2ae7971bceaa792e328c8489cb0695b77e.tar.bz2
diller-server-dda9ef2ae7971bceaa792e328c8489cb0695b77e.tar.xz
diller-server-dda9ef2ae7971bceaa792e328c8489cb0695b77e.zip
core:
o Adding device_status table containing the latest device status. mqtt: o Listening on $SYS/broker/log, parse out messages about clients connecting and disconnecting and store in database.
-rw-r--r--README.md11
-rw-r--r--diller-mqtt.js2
-rw-r--r--migrations/20151115192327-device-online.js30
-rw-r--r--migrations/sqls/20151115192327-device-online-down.sql1
-rw-r--r--migrations/sqls/20151115192327-device-online-up.sql10
-rw-r--r--src/Diller.js92
-rw-r--r--src/DillerDao.js86
-rw-r--r--src/mqtt/DillerMqttClient.js5
-rw-r--r--src/web/DillerWeb.js2
-rw-r--r--web/static/app/templates/front-page.html8
10 files changed, 227 insertions, 20 deletions
diff --git a/README.md b/README.md
index 59413bb..454df09 100644
--- a/README.md
+++ b/README.md
@@ -28,7 +28,7 @@ Start daemons
sudo systemctl start diller-mqtt
sudo systemctl start diller-web
-
+
sudo systemctl status diller-mqtt
sudo systemctl status diller-web
@@ -55,13 +55,14 @@ Start daemons
* Device
* Update device name
* Property
- * Update property name and description
- * Push to device
+ * DONE: Update property name and description
+ * DONE: Push to device
## MQTT
* Update a property's last value and last timestamp
* Update aggregates in the background
* Worker process with process.spawn()
-* Listen to /$SYS, register when the clients send a PING message
- * Keep the data in memory.
+* DONE: Listen to /$SYS, register when the clients send a PING message
+ * -Keep the data in memory.-
+ * DONE: Store in database.
diff --git a/diller-mqtt.js b/diller-mqtt.js
index e528bb4..eb3c7f2 100644
--- a/diller-mqtt.js
+++ b/diller-mqtt.js
@@ -13,7 +13,7 @@ var tx = injector.get(require('./src/DillerTx'));
* @type DillerMqttClient
*/
var dillerMqttClient = injector.get(require('./src/mqtt/DillerMqttClient'));
-dillerMqttClient.run('mqtt');
+dillerMqttClient.run('mqtt', true);
dillerMqttClient.on('message', function (topic, message, payload) {
tx(function (pg, dao, diller) {
diff --git a/migrations/20151115192327-device-online.js b/migrations/20151115192327-device-online.js
new file mode 100644
index 0000000..ee28cc3
--- /dev/null
+++ b/migrations/20151115192327-device-online.js
@@ -0,0 +1,30 @@
+var dbm = global.dbm || require('db-migrate');
+var type = dbm.dataType;
+var fs = require('fs');
+var path = require('path');
+
+exports.up = function(db, callback) {
+ var filePath = path.join(__dirname + '/sqls/20151115192327-device-online-up.sql');
+ fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){
+ if (err) return callback(err);
+ console.log('received data: ' + data);
+
+ db.runSql(data, function(err) {
+ if (err) return callback(err);
+ callback();
+ });
+ });
+};
+
+exports.down = function(db, callback) {
+ var filePath = path.join(__dirname + '/sqls/20151115192327-device-online-down.sql');
+ fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){
+ if (err) return callback(err);
+ console.log('received data: ' + data);
+
+ db.runSql(data, function(err) {
+ if (err) return callback(err);
+ callback();
+ });
+ });
+};
diff --git a/migrations/sqls/20151115192327-device-online-down.sql b/migrations/sqls/20151115192327-device-online-down.sql
new file mode 100644
index 0000000..44f074e
--- /dev/null
+++ b/migrations/sqls/20151115192327-device-online-down.sql
@@ -0,0 +1 @@
+/* Replace with your SQL commands */ \ No newline at end of file
diff --git a/migrations/sqls/20151115192327-device-online-up.sql b/migrations/sqls/20151115192327-device-online-up.sql
new file mode 100644
index 0000000..bd80a63
--- /dev/null
+++ b/migrations/sqls/20151115192327-device-online-up.sql
@@ -0,0 +1,10 @@
+DROP TABLE IF EXISTS device_status;
+CREATE UNLOGGED TABLE device_status(
+ device BIGINT PRIMARY KEY REFERENCES device,
+ online BOOLEAN NOT NULL,
+ timestamp TIMESTAMPTZ NOT NULL,
+ host VARCHAR(1000)
+);
+
+INSERT INTO device_status(device, online, timestamp)
+SELECT id, false, created_timestamp FROM device;
diff --git a/src/Diller.js b/src/Diller.js
index 230af7a..e4c6b9f 100644
--- a/src/Diller.js
+++ b/src/Diller.js
@@ -86,11 +86,84 @@ function Diller(config, pg, dao) {
parts[3] = 'property';
}
- if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'property') {
- log.warn('no match: ', topic, parts);
- return;
+ try {
+ if ((parts.length == 4 || parts.length == 5) && parts[0] == '$SYS' && parts[1] == 'broker' && parts[2] == 'log') {
+ onLogMessage(timestamp, parts, topic, message);
+ } else if (parts.length == 6 && parts[1] == 'diller' && parts[3] == 'property') {
+ onDillerMessage(timestamp, parts, topic, message);
+ } else {
+ log.warn('no match', {topic: topic, parts: parts, 'l': parts.length});
+ }
+ } catch (e) {
+ log.warn('Exception while processing message', {topic: topic, message: message, exception: e, payload: payload});
+ }
+ }
+
+ function onLogMessage(timestamp, parts, topic, message) {
+ message = message.toString();
+ //log.info('broker message: ' + parts[3] + '/' + parts[4] + ': ' + message);
+
+ var p;
+ var msg = message.match(/^([0-9]+): New client connected from ([^ ]+) as ([^ ]+) \(c([0-9]), k([0-9]+)\)/);
+ if (msg) {
+ p = newClientConnected(msg);
}
+ msg = message.match(/^([0-9]+): Client ([^ ]+) disconnected/);
+ if (msg) {
+ p = clientDisconnected(msg);
+ }
+
+ msg = message.match(/^([0-9]+): Socket error on client ([^ ]+), disconnecting./);
+ if (msg) {
+ p = clientDisconnected(msg);
+ }
+
+ p && p.then(function (res) {
+ if (res && res.id) {
+ log.info('Status updated');
+ }
+ }, function (e) {
+ log.info('Status update failed', {exception: e});
+ });
+ }
+
+ function newClientConnected(msg) {
+ var ts = new Date(parseInt(msg[1]) * 1000);
+ var host = msg[2];
+ var key = msg[3].replace(/^esp8266-/, '');
+
+ var c = parseInt(msg[4]);
+ var k = parseInt(msg[5]);
+
+ return dao.deviceByKey(key).then(function (device) {
+ log.debug('New client connected', {key: key, device: device && device.id, host: host, c: c, k: k});
+
+ if (device && device.id) {
+ return dao.updateDeviceStatus(device.id, ts, true, host)
+ .then(_.constant(device));
+ } else {
+ return Promise.resolve('Unknown device: ' + key);
+ }
+ });
+ }
+
+ function clientDisconnected(msg) {
+ log.debug('clientDisconnected', {msg: msg});
+ var ts = new Date(parseInt(msg[1]) * 1000);
+ var key = msg[2].replace(/^esp8266-/, '');
+
+ return dao.deviceByKey(key).then(function (device) {
+ if (device && device.id) {
+ return dao.updateDeviceStatus(device.id, ts, false)
+ .then(_.constant(device));
+ } else {
+ return Promise.resolve('Unknown device: ' + key);
+ }
+ });
+ }
+
+ function onDillerMessage(timestamp, parts, topic, message) {
var device_key = parts[2];
var property_key = parts[4];
var msg_type = parts[5];
@@ -111,12 +184,17 @@ function Diller(config, pg, dao) {
return dao.deviceByKey(device_key)
.then(function (device) {
- return device || dao.insertDevice(device_key).then(function (device) {
- log.info('New device created', {device_key: device_key, id: device.id});
- return device;
- });
+ return device || dao.insertDevice(device_key)
+ .then(function (device) {
+ return dao.insertDeviceStatus(device.id, true, timestamp, null).then(_.constant(device));
+ })
+ .then(function (device) {
+ log.info('New device created', {device_key: device_key, id: device.id});
+ return device;
+ });
})
.then(function (device) {
+ log.info('devicePropertyByDeviceIdAndKey', {device: device});
return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) {
var ret = {device: device, property: property};
return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) {
diff --git a/src/DillerDao.js b/src/DillerDao.js
index 34f4205..cf4c75b 100644
--- a/src/DillerDao.js
+++ b/src/DillerDao.js
@@ -6,9 +6,70 @@ var _ = require('lodash');
*/
function DillerDao(tx) {
- var deviceColumns = 'id, created_timestamp, key, name, description';
+ var deviceColumns = ['id', 'created_timestamp', 'key', 'name', 'description'];
+ var deviceStatusColumns = ['device', 'online', 'timestamp', 'host'];
var propertyColumns = 'id, created_timestamp, device, key, name, description, last_value, last_timestamp';
+ function cols(columns, select_prefix, as_prefix) {
+ var s;
+ if (select_prefix) {
+ as_prefix = as_prefix || select_prefix;
+ for (var i = 0; i < columns.length; i++) {
+ var c = columns[i];
+ if (!s) {
+ s = '';
+ } else {
+ s += ', ';
+ }
+
+ s += select_prefix + '.' + c + ' AS ' + as_prefix + '_' + c;
+ }
+ return s;
+ }
+
+ return columns.join(', ');
+ }
+
+ function unjoin(main) {
+ var children = Array.prototype.slice.call(arguments).slice(1);
+
+ function convertRow(row) {
+ if (!row) {
+ return row;
+ }
+ var r = {};
+ for (var field in row) {
+ var f = '' + field;
+ if (f.startsWith(main + '_')) {
+ r[f.substring(main.length + 1)] = row[f];
+ } else {
+ for (var i = 0; i < children.length; i++) {
+ var child = children[i];
+ var prefix = child + '_';
+
+ if (f.startsWith(prefix)) {
+ var o = r[child];
+ if (!o) {
+ o = r[child] = {};
+ }
+
+ o[f.substr(prefix.length)] = row[f];
+ }
+ }
+ }
+ }
+ return r;
+ }
+
+ return function (res) {
+ if (Array.isArray(res)) {
+ return res.map(convertRow);
+ } else {
+ return convertRow(res);
+ }
+ }
+ }
+
// -------------------------------------------------------------------------------------------------------------------
// Device
// -------------------------------------------------------------------------------------------------------------------
@@ -17,19 +78,26 @@ function DillerDao(tx) {
* @returns {Promise}
*/
function devices() {
- return tx.manyOrNone("SELECT " + deviceColumns + " FROM device");
+ var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device)";
+ return tx.manyOrNone(sql).then(unjoin('d', 'status'));
}
function deviceById(id) {
- return tx.one("SELECT " + deviceColumns + " FROM device WHERE id=$1", id);
+ var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device) WHERE d.id=$1";
+ return tx.one(sql, [id]).then(unjoin('d', 'status'));
}
function deviceByKey(key) {
- return tx.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key);
+ var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device) WHERE d.key=$1";
+ return tx.oneOrNone(sql, [key]).then(unjoin('d', 'status'));
}
function insertDevice(key) {
- return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key);
+ return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + cols(deviceColumns), key);
+ }
+
+ function insertDeviceStatus(device, online, timestamp, host) {
+ return tx.none("INSERT INTO device_status(device, online, timestamp, host) VALUES($1, $2, $3, $4)", [device, online, timestamp, host]);
}
function updateDevice(id, attributes) {
@@ -55,6 +123,11 @@ function DillerDao(tx) {
return tx.none(sql, values);
}
+ function updateDeviceStatus(device, timestamp, online, host) {
+ var sql = 'UPDATE device_status SET timestamp=$2, online=$3, host=$4 WHERE device=$1';
+ return tx.none(sql, [device, timestamp, online, host]);
+ }
+
// -------------------------------------------------------------------------------------------------------------------
// Device Property
// -------------------------------------------------------------------------------------------------------------------
@@ -151,6 +224,9 @@ function DillerDao(tx) {
insertDevice: insertDevice,
updateDevice: updateDevice,
+ insertDeviceStatus: insertDeviceStatus,
+ updateDeviceStatus: updateDeviceStatus,
+
devicePropertyById: devicePropertyById,
devicePropertyByDeviceIdAndKey: devicePropertyByDeviceIdAndKey,
devicePropertiesByDeviceId: devicePropertiesByDeviceId,
diff --git a/src/mqtt/DillerMqttClient.js b/src/mqtt/DillerMqttClient.js
index 62f5aef..b9cf01b 100644
--- a/src/mqtt/DillerMqttClient.js
+++ b/src/mqtt/DillerMqttClient.js
@@ -12,7 +12,7 @@ function DillerMqttClient(config, tx) {
var mqttClient;
var hostname = os.hostname();
- function run(clientType) {
+ function run(clientType, subscribeToLog) {
if (mqttClient) {
throw 'Already connected';
}
@@ -36,6 +36,9 @@ function DillerMqttClient(config, tx) {
mqttClient.on('connect', function () {
log.info('Connected');
mqttClient.subscribe('/diller/#');
+ if (subscribeToLog) {
+ mqttClient.subscribe('$SYS/broker/log/#');
+ }
});
}
diff --git a/src/web/DillerWeb.js b/src/web/DillerWeb.js
index 4d032ed..396c2f8 100644
--- a/src/web/DillerWeb.js
+++ b/src/web/DillerWeb.js
@@ -15,7 +15,7 @@ function DillerWeb(config, mqttClient, tx) {
var calls = [];
var app;
- mqttClient.run('web');
+ mqttClient.run('web', false);
/**
* @param {HttpRes} res
diff --git a/web/static/app/templates/front-page.html b/web/static/app/templates/front-page.html
index e2ee38b..a49bc61 100644
--- a/web/static/app/templates/front-page.html
+++ b/web/static/app/templates/front-page.html
@@ -5,6 +5,7 @@
<table class="table">
<thead>
<tr>
+ <th>Status</th>
<th>Name</th>
<th>Registered</th>
</tr>
@@ -12,6 +13,13 @@
<tbody>
<tr ng-repeat="d in ctrl.devices | orderBy:['name', 'key']">
<td>
+ <span ng-if="d.status.online" class="label label-success">ONLINE</span>
+ <span ng-if="!d.status.online" class="label label-danger">OFFLINE</span>
+ <small class="text-muted">
+ since {{d.status.timestamp | date:'medium'}}
+ </small>
+ </td>
+ <td>
<a href="#/device/{{d.id}}">
{{(d.name || d.key)}}
</a>