diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/Diller.js | 92 | ||||
-rw-r--r-- | src/DillerDao.js | 86 | ||||
-rw-r--r-- | src/mqtt/DillerMqttClient.js | 5 | ||||
-rw-r--r-- | src/web/DillerWeb.js | 2 |
4 files changed, 171 insertions, 14 deletions
diff --git a/src/Diller.js b/src/Diller.js index 230af7a..e4c6b9f 100644 --- a/src/Diller.js +++ b/src/Diller.js @@ -86,11 +86,84 @@ function Diller(config, pg, dao) { parts[3] = 'property'; } - if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'property') { - log.warn('no match: ', topic, parts); - return; + try { + if ((parts.length == 4 || parts.length == 5) && parts[0] == '$SYS' && parts[1] == 'broker' && parts[2] == 'log') { + onLogMessage(timestamp, parts, topic, message); + } else if (parts.length == 6 && parts[1] == 'diller' && parts[3] == 'property') { + onDillerMessage(timestamp, parts, topic, message); + } else { + log.warn('no match', {topic: topic, parts: parts, 'l': parts.length}); + } + } catch (e) { + log.warn('Exception while processing message', {topic: topic, message: message, exception: e, payload: payload}); + } + } + + function onLogMessage(timestamp, parts, topic, message) { + message = message.toString(); + //log.info('broker message: ' + parts[3] + '/' + parts[4] + ': ' + message); + + var p; + var msg = message.match(/^([0-9]+): New client connected from ([^ ]+) as ([^ ]+) \(c([0-9]), k([0-9]+)\)/); + if (msg) { + p = newClientConnected(msg); } + msg = message.match(/^([0-9]+): Client ([^ ]+) disconnected/); + if (msg) { + p = clientDisconnected(msg); + } + + msg = message.match(/^([0-9]+): Socket error on client ([^ ]+), disconnecting./); + if (msg) { + p = clientDisconnected(msg); + } + + p && p.then(function (res) { + if (res && res.id) { + log.info('Status updated'); + } + }, function (e) { + log.info('Status update failed', {exception: e}); + }); + } + + function newClientConnected(msg) { + var ts = new Date(parseInt(msg[1]) * 1000); + var host = msg[2]; + var key = msg[3].replace(/^esp8266-/, ''); + + var c = parseInt(msg[4]); + var k = parseInt(msg[5]); + + return dao.deviceByKey(key).then(function (device) { + log.debug('New client connected', {key: key, device: device && device.id, host: host, c: c, k: k}); + + if (device && device.id) { + return dao.updateDeviceStatus(device.id, ts, true, host) + .then(_.constant(device)); + } else { + return Promise.resolve('Unknown device: ' + key); + } + }); + } + + function clientDisconnected(msg) { + log.debug('clientDisconnected', {msg: msg}); + var ts = new Date(parseInt(msg[1]) * 1000); + var key = msg[2].replace(/^esp8266-/, ''); + + return dao.deviceByKey(key).then(function (device) { + if (device && device.id) { + return dao.updateDeviceStatus(device.id, ts, false) + .then(_.constant(device)); + } else { + return Promise.resolve('Unknown device: ' + key); + } + }); + } + + function onDillerMessage(timestamp, parts, topic, message) { var device_key = parts[2]; var property_key = parts[4]; var msg_type = parts[5]; @@ -111,12 +184,17 @@ function Diller(config, pg, dao) { return dao.deviceByKey(device_key) .then(function (device) { - return device || dao.insertDevice(device_key).then(function (device) { - log.info('New device created', {device_key: device_key, id: device.id}); - return device; - }); + return device || dao.insertDevice(device_key) + .then(function (device) { + return dao.insertDeviceStatus(device.id, true, timestamp, null).then(_.constant(device)); + }) + .then(function (device) { + log.info('New device created', {device_key: device_key, id: device.id}); + return device; + }); }) .then(function (device) { + log.info('devicePropertyByDeviceIdAndKey', {device: device}); return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) { var ret = {device: device, property: property}; return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) { diff --git a/src/DillerDao.js b/src/DillerDao.js index 34f4205..cf4c75b 100644 --- a/src/DillerDao.js +++ b/src/DillerDao.js @@ -6,9 +6,70 @@ var _ = require('lodash'); */ function DillerDao(tx) { - var deviceColumns = 'id, created_timestamp, key, name, description'; + var deviceColumns = ['id', 'created_timestamp', 'key', 'name', 'description']; + var deviceStatusColumns = ['device', 'online', 'timestamp', 'host']; var propertyColumns = 'id, created_timestamp, device, key, name, description, last_value, last_timestamp'; + function cols(columns, select_prefix, as_prefix) { + var s; + if (select_prefix) { + as_prefix = as_prefix || select_prefix; + for (var i = 0; i < columns.length; i++) { + var c = columns[i]; + if (!s) { + s = ''; + } else { + s += ', '; + } + + s += select_prefix + '.' + c + ' AS ' + as_prefix + '_' + c; + } + return s; + } + + return columns.join(', '); + } + + function unjoin(main) { + var children = Array.prototype.slice.call(arguments).slice(1); + + function convertRow(row) { + if (!row) { + return row; + } + var r = {}; + for (var field in row) { + var f = '' + field; + if (f.startsWith(main + '_')) { + r[f.substring(main.length + 1)] = row[f]; + } else { + for (var i = 0; i < children.length; i++) { + var child = children[i]; + var prefix = child + '_'; + + if (f.startsWith(prefix)) { + var o = r[child]; + if (!o) { + o = r[child] = {}; + } + + o[f.substr(prefix.length)] = row[f]; + } + } + } + } + return r; + } + + return function (res) { + if (Array.isArray(res)) { + return res.map(convertRow); + } else { + return convertRow(res); + } + } + } + // ------------------------------------------------------------------------------------------------------------------- // Device // ------------------------------------------------------------------------------------------------------------------- @@ -17,19 +78,26 @@ function DillerDao(tx) { * @returns {Promise} */ function devices() { - return tx.manyOrNone("SELECT " + deviceColumns + " FROM device"); + var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device)"; + return tx.manyOrNone(sql).then(unjoin('d', 'status')); } function deviceById(id) { - return tx.one("SELECT " + deviceColumns + " FROM device WHERE id=$1", id); + var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device) WHERE d.id=$1"; + return tx.one(sql, [id]).then(unjoin('d', 'status')); } function deviceByKey(key) { - return tx.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key); + var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device) WHERE d.key=$1"; + return tx.oneOrNone(sql, [key]).then(unjoin('d', 'status')); } function insertDevice(key) { - return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key); + return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + cols(deviceColumns), key); + } + + function insertDeviceStatus(device, online, timestamp, host) { + return tx.none("INSERT INTO device_status(device, online, timestamp, host) VALUES($1, $2, $3, $4)", [device, online, timestamp, host]); } function updateDevice(id, attributes) { @@ -55,6 +123,11 @@ function DillerDao(tx) { return tx.none(sql, values); } + function updateDeviceStatus(device, timestamp, online, host) { + var sql = 'UPDATE device_status SET timestamp=$2, online=$3, host=$4 WHERE device=$1'; + return tx.none(sql, [device, timestamp, online, host]); + } + // ------------------------------------------------------------------------------------------------------------------- // Device Property // ------------------------------------------------------------------------------------------------------------------- @@ -151,6 +224,9 @@ function DillerDao(tx) { insertDevice: insertDevice, updateDevice: updateDevice, + insertDeviceStatus: insertDeviceStatus, + updateDeviceStatus: updateDeviceStatus, + devicePropertyById: devicePropertyById, devicePropertyByDeviceIdAndKey: devicePropertyByDeviceIdAndKey, devicePropertiesByDeviceId: devicePropertiesByDeviceId, diff --git a/src/mqtt/DillerMqttClient.js b/src/mqtt/DillerMqttClient.js index 62f5aef..b9cf01b 100644 --- a/src/mqtt/DillerMqttClient.js +++ b/src/mqtt/DillerMqttClient.js @@ -12,7 +12,7 @@ function DillerMqttClient(config, tx) { var mqttClient; var hostname = os.hostname(); - function run(clientType) { + function run(clientType, subscribeToLog) { if (mqttClient) { throw 'Already connected'; } @@ -36,6 +36,9 @@ function DillerMqttClient(config, tx) { mqttClient.on('connect', function () { log.info('Connected'); mqttClient.subscribe('/diller/#'); + if (subscribeToLog) { + mqttClient.subscribe('$SYS/broker/log/#'); + } }); } diff --git a/src/web/DillerWeb.js b/src/web/DillerWeb.js index 4d032ed..396c2f8 100644 --- a/src/web/DillerWeb.js +++ b/src/web/DillerWeb.js @@ -15,7 +15,7 @@ function DillerWeb(config, mqttClient, tx) { var calls = []; var app; - mqttClient.run('web'); + mqttClient.run('web', false); /** * @param {HttpRes} res |