From dda9ef2ae7971bceaa792e328c8489cb0695b77e Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Mon, 16 Nov 2015 18:46:56 +0100 Subject: core: o Adding device_status table containing the latest device status. mqtt: o Listening on $SYS/broker/log, parse out messages about clients connecting and disconnecting and store in database. --- README.md | 11 +-- diller-mqtt.js | 2 +- migrations/20151115192327-device-online.js | 30 +++++++ .../sqls/20151115192327-device-online-down.sql | 1 + .../sqls/20151115192327-device-online-up.sql | 10 +++ src/Diller.js | 92 ++++++++++++++++++++-- src/DillerDao.js | 86 ++++++++++++++++++-- src/mqtt/DillerMqttClient.js | 5 +- src/web/DillerWeb.js | 2 +- web/static/app/templates/front-page.html | 8 ++ 10 files changed, 227 insertions(+), 20 deletions(-) create mode 100644 migrations/20151115192327-device-online.js create mode 100644 migrations/sqls/20151115192327-device-online-down.sql create mode 100644 migrations/sqls/20151115192327-device-online-up.sql diff --git a/README.md b/README.md index 59413bb..454df09 100644 --- a/README.md +++ b/README.md @@ -28,7 +28,7 @@ Start daemons sudo systemctl start diller-mqtt sudo systemctl start diller-web - + sudo systemctl status diller-mqtt sudo systemctl status diller-web @@ -55,13 +55,14 @@ Start daemons * Device * Update device name * Property - * Update property name and description - * Push to device + * DONE: Update property name and description + * DONE: Push to device ## MQTT * Update a property's last value and last timestamp * Update aggregates in the background * Worker process with process.spawn() -* Listen to /$SYS, register when the clients send a PING message - * Keep the data in memory. +* DONE: Listen to /$SYS, register when the clients send a PING message + * -Keep the data in memory.- + * DONE: Store in database. diff --git a/diller-mqtt.js b/diller-mqtt.js index e528bb4..eb3c7f2 100644 --- a/diller-mqtt.js +++ b/diller-mqtt.js @@ -13,7 +13,7 @@ var tx = injector.get(require('./src/DillerTx')); * @type DillerMqttClient */ var dillerMqttClient = injector.get(require('./src/mqtt/DillerMqttClient')); -dillerMqttClient.run('mqtt'); +dillerMqttClient.run('mqtt', true); dillerMqttClient.on('message', function (topic, message, payload) { tx(function (pg, dao, diller) { diff --git a/migrations/20151115192327-device-online.js b/migrations/20151115192327-device-online.js new file mode 100644 index 0000000..ee28cc3 --- /dev/null +++ b/migrations/20151115192327-device-online.js @@ -0,0 +1,30 @@ +var dbm = global.dbm || require('db-migrate'); +var type = dbm.dataType; +var fs = require('fs'); +var path = require('path'); + +exports.up = function(db, callback) { + var filePath = path.join(__dirname + '/sqls/20151115192327-device-online-up.sql'); + fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){ + if (err) return callback(err); + console.log('received data: ' + data); + + db.runSql(data, function(err) { + if (err) return callback(err); + callback(); + }); + }); +}; + +exports.down = function(db, callback) { + var filePath = path.join(__dirname + '/sqls/20151115192327-device-online-down.sql'); + fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){ + if (err) return callback(err); + console.log('received data: ' + data); + + db.runSql(data, function(err) { + if (err) return callback(err); + callback(); + }); + }); +}; diff --git a/migrations/sqls/20151115192327-device-online-down.sql b/migrations/sqls/20151115192327-device-online-down.sql new file mode 100644 index 0000000..44f074e --- /dev/null +++ b/migrations/sqls/20151115192327-device-online-down.sql @@ -0,0 +1 @@ +/* Replace with your SQL commands */ \ No newline at end of file diff --git a/migrations/sqls/20151115192327-device-online-up.sql b/migrations/sqls/20151115192327-device-online-up.sql new file mode 100644 index 0000000..bd80a63 --- /dev/null +++ b/migrations/sqls/20151115192327-device-online-up.sql @@ -0,0 +1,10 @@ +DROP TABLE IF EXISTS device_status; +CREATE UNLOGGED TABLE device_status( + device BIGINT PRIMARY KEY REFERENCES device, + online BOOLEAN NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + host VARCHAR(1000) +); + +INSERT INTO device_status(device, online, timestamp) +SELECT id, false, created_timestamp FROM device; diff --git a/src/Diller.js b/src/Diller.js index 230af7a..e4c6b9f 100644 --- a/src/Diller.js +++ b/src/Diller.js @@ -86,11 +86,84 @@ function Diller(config, pg, dao) { parts[3] = 'property'; } - if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'property') { - log.warn('no match: ', topic, parts); - return; + try { + if ((parts.length == 4 || parts.length == 5) && parts[0] == '$SYS' && parts[1] == 'broker' && parts[2] == 'log') { + onLogMessage(timestamp, parts, topic, message); + } else if (parts.length == 6 && parts[1] == 'diller' && parts[3] == 'property') { + onDillerMessage(timestamp, parts, topic, message); + } else { + log.warn('no match', {topic: topic, parts: parts, 'l': parts.length}); + } + } catch (e) { + log.warn('Exception while processing message', {topic: topic, message: message, exception: e, payload: payload}); + } + } + + function onLogMessage(timestamp, parts, topic, message) { + message = message.toString(); + //log.info('broker message: ' + parts[3] + '/' + parts[4] + ': ' + message); + + var p; + var msg = message.match(/^([0-9]+): New client connected from ([^ ]+) as ([^ ]+) \(c([0-9]), k([0-9]+)\)/); + if (msg) { + p = newClientConnected(msg); } + msg = message.match(/^([0-9]+): Client ([^ ]+) disconnected/); + if (msg) { + p = clientDisconnected(msg); + } + + msg = message.match(/^([0-9]+): Socket error on client ([^ ]+), disconnecting./); + if (msg) { + p = clientDisconnected(msg); + } + + p && p.then(function (res) { + if (res && res.id) { + log.info('Status updated'); + } + }, function (e) { + log.info('Status update failed', {exception: e}); + }); + } + + function newClientConnected(msg) { + var ts = new Date(parseInt(msg[1]) * 1000); + var host = msg[2]; + var key = msg[3].replace(/^esp8266-/, ''); + + var c = parseInt(msg[4]); + var k = parseInt(msg[5]); + + return dao.deviceByKey(key).then(function (device) { + log.debug('New client connected', {key: key, device: device && device.id, host: host, c: c, k: k}); + + if (device && device.id) { + return dao.updateDeviceStatus(device.id, ts, true, host) + .then(_.constant(device)); + } else { + return Promise.resolve('Unknown device: ' + key); + } + }); + } + + function clientDisconnected(msg) { + log.debug('clientDisconnected', {msg: msg}); + var ts = new Date(parseInt(msg[1]) * 1000); + var key = msg[2].replace(/^esp8266-/, ''); + + return dao.deviceByKey(key).then(function (device) { + if (device && device.id) { + return dao.updateDeviceStatus(device.id, ts, false) + .then(_.constant(device)); + } else { + return Promise.resolve('Unknown device: ' + key); + } + }); + } + + function onDillerMessage(timestamp, parts, topic, message) { var device_key = parts[2]; var property_key = parts[4]; var msg_type = parts[5]; @@ -111,12 +184,17 @@ function Diller(config, pg, dao) { return dao.deviceByKey(device_key) .then(function (device) { - return device || dao.insertDevice(device_key).then(function (device) { - log.info('New device created', {device_key: device_key, id: device.id}); - return device; - }); + return device || dao.insertDevice(device_key) + .then(function (device) { + return dao.insertDeviceStatus(device.id, true, timestamp, null).then(_.constant(device)); + }) + .then(function (device) { + log.info('New device created', {device_key: device_key, id: device.id}); + return device; + }); }) .then(function (device) { + log.info('devicePropertyByDeviceIdAndKey', {device: device}); return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) { var ret = {device: device, property: property}; return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) { diff --git a/src/DillerDao.js b/src/DillerDao.js index 34f4205..cf4c75b 100644 --- a/src/DillerDao.js +++ b/src/DillerDao.js @@ -6,9 +6,70 @@ var _ = require('lodash'); */ function DillerDao(tx) { - var deviceColumns = 'id, created_timestamp, key, name, description'; + var deviceColumns = ['id', 'created_timestamp', 'key', 'name', 'description']; + var deviceStatusColumns = ['device', 'online', 'timestamp', 'host']; var propertyColumns = 'id, created_timestamp, device, key, name, description, last_value, last_timestamp'; + function cols(columns, select_prefix, as_prefix) { + var s; + if (select_prefix) { + as_prefix = as_prefix || select_prefix; + for (var i = 0; i < columns.length; i++) { + var c = columns[i]; + if (!s) { + s = ''; + } else { + s += ', '; + } + + s += select_prefix + '.' + c + ' AS ' + as_prefix + '_' + c; + } + return s; + } + + return columns.join(', '); + } + + function unjoin(main) { + var children = Array.prototype.slice.call(arguments).slice(1); + + function convertRow(row) { + if (!row) { + return row; + } + var r = {}; + for (var field in row) { + var f = '' + field; + if (f.startsWith(main + '_')) { + r[f.substring(main.length + 1)] = row[f]; + } else { + for (var i = 0; i < children.length; i++) { + var child = children[i]; + var prefix = child + '_'; + + if (f.startsWith(prefix)) { + var o = r[child]; + if (!o) { + o = r[child] = {}; + } + + o[f.substr(prefix.length)] = row[f]; + } + } + } + } + return r; + } + + return function (res) { + if (Array.isArray(res)) { + return res.map(convertRow); + } else { + return convertRow(res); + } + } + } + // ------------------------------------------------------------------------------------------------------------------- // Device // ------------------------------------------------------------------------------------------------------------------- @@ -17,19 +78,26 @@ function DillerDao(tx) { * @returns {Promise} */ function devices() { - return tx.manyOrNone("SELECT " + deviceColumns + " FROM device"); + var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device)"; + return tx.manyOrNone(sql).then(unjoin('d', 'status')); } function deviceById(id) { - return tx.one("SELECT " + deviceColumns + " FROM device WHERE id=$1", id); + var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device) WHERE d.id=$1"; + return tx.one(sql, [id]).then(unjoin('d', 'status')); } function deviceByKey(key) { - return tx.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key); + var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device) WHERE d.key=$1"; + return tx.oneOrNone(sql, [key]).then(unjoin('d', 'status')); } function insertDevice(key) { - return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key); + return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + cols(deviceColumns), key); + } + + function insertDeviceStatus(device, online, timestamp, host) { + return tx.none("INSERT INTO device_status(device, online, timestamp, host) VALUES($1, $2, $3, $4)", [device, online, timestamp, host]); } function updateDevice(id, attributes) { @@ -55,6 +123,11 @@ function DillerDao(tx) { return tx.none(sql, values); } + function updateDeviceStatus(device, timestamp, online, host) { + var sql = 'UPDATE device_status SET timestamp=$2, online=$3, host=$4 WHERE device=$1'; + return tx.none(sql, [device, timestamp, online, host]); + } + // ------------------------------------------------------------------------------------------------------------------- // Device Property // ------------------------------------------------------------------------------------------------------------------- @@ -151,6 +224,9 @@ function DillerDao(tx) { insertDevice: insertDevice, updateDevice: updateDevice, + insertDeviceStatus: insertDeviceStatus, + updateDeviceStatus: updateDeviceStatus, + devicePropertyById: devicePropertyById, devicePropertyByDeviceIdAndKey: devicePropertyByDeviceIdAndKey, devicePropertiesByDeviceId: devicePropertiesByDeviceId, diff --git a/src/mqtt/DillerMqttClient.js b/src/mqtt/DillerMqttClient.js index 62f5aef..b9cf01b 100644 --- a/src/mqtt/DillerMqttClient.js +++ b/src/mqtt/DillerMqttClient.js @@ -12,7 +12,7 @@ function DillerMqttClient(config, tx) { var mqttClient; var hostname = os.hostname(); - function run(clientType) { + function run(clientType, subscribeToLog) { if (mqttClient) { throw 'Already connected'; } @@ -36,6 +36,9 @@ function DillerMqttClient(config, tx) { mqttClient.on('connect', function () { log.info('Connected'); mqttClient.subscribe('/diller/#'); + if (subscribeToLog) { + mqttClient.subscribe('$SYS/broker/log/#'); + } }); } diff --git a/src/web/DillerWeb.js b/src/web/DillerWeb.js index 4d032ed..396c2f8 100644 --- a/src/web/DillerWeb.js +++ b/src/web/DillerWeb.js @@ -15,7 +15,7 @@ function DillerWeb(config, mqttClient, tx) { var calls = []; var app; - mqttClient.run('web'); + mqttClient.run('web', false); /** * @param {HttpRes} res diff --git a/web/static/app/templates/front-page.html b/web/static/app/templates/front-page.html index e2ee38b..a49bc61 100644 --- a/web/static/app/templates/front-page.html +++ b/web/static/app/templates/front-page.html @@ -5,12 +5,20 @@ + +
Status Name Registered
+ ONLINE + OFFLINE + + since {{d.status.timestamp | date:'medium'}} + + {{(d.name || d.key)}} -- cgit v1.2.3