diff options
Diffstat (limited to 'src/Diller.js')
-rw-r--r-- | src/Diller.js | 92 |
1 files changed, 85 insertions, 7 deletions
diff --git a/src/Diller.js b/src/Diller.js index 230af7a..e4c6b9f 100644 --- a/src/Diller.js +++ b/src/Diller.js @@ -86,11 +86,84 @@ function Diller(config, pg, dao) { parts[3] = 'property'; } - if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'property') { - log.warn('no match: ', topic, parts); - return; + try { + if ((parts.length == 4 || parts.length == 5) && parts[0] == '$SYS' && parts[1] == 'broker' && parts[2] == 'log') { + onLogMessage(timestamp, parts, topic, message); + } else if (parts.length == 6 && parts[1] == 'diller' && parts[3] == 'property') { + onDillerMessage(timestamp, parts, topic, message); + } else { + log.warn('no match', {topic: topic, parts: parts, 'l': parts.length}); + } + } catch (e) { + log.warn('Exception while processing message', {topic: topic, message: message, exception: e, payload: payload}); + } + } + + function onLogMessage(timestamp, parts, topic, message) { + message = message.toString(); + //log.info('broker message: ' + parts[3] + '/' + parts[4] + ': ' + message); + + var p; + var msg = message.match(/^([0-9]+): New client connected from ([^ ]+) as ([^ ]+) \(c([0-9]), k([0-9]+)\)/); + if (msg) { + p = newClientConnected(msg); } + msg = message.match(/^([0-9]+): Client ([^ ]+) disconnected/); + if (msg) { + p = clientDisconnected(msg); + } + + msg = message.match(/^([0-9]+): Socket error on client ([^ ]+), disconnecting./); + if (msg) { + p = clientDisconnected(msg); + } + + p && p.then(function (res) { + if (res && res.id) { + log.info('Status updated'); + } + }, function (e) { + log.info('Status update failed', {exception: e}); + }); + } + + function newClientConnected(msg) { + var ts = new Date(parseInt(msg[1]) * 1000); + var host = msg[2]; + var key = msg[3].replace(/^esp8266-/, ''); + + var c = parseInt(msg[4]); + var k = parseInt(msg[5]); + + return dao.deviceByKey(key).then(function (device) { + log.debug('New client connected', {key: key, device: device && device.id, host: host, c: c, k: k}); + + if (device && device.id) { + return dao.updateDeviceStatus(device.id, ts, true, host) + .then(_.constant(device)); + } else { + return Promise.resolve('Unknown device: ' + key); + } + }); + } + + function clientDisconnected(msg) { + log.debug('clientDisconnected', {msg: msg}); + var ts = new Date(parseInt(msg[1]) * 1000); + var key = msg[2].replace(/^esp8266-/, ''); + + return dao.deviceByKey(key).then(function (device) { + if (device && device.id) { + return dao.updateDeviceStatus(device.id, ts, false) + .then(_.constant(device)); + } else { + return Promise.resolve('Unknown device: ' + key); + } + }); + } + + function onDillerMessage(timestamp, parts, topic, message) { var device_key = parts[2]; var property_key = parts[4]; var msg_type = parts[5]; @@ -111,12 +184,17 @@ function Diller(config, pg, dao) { return dao.deviceByKey(device_key) .then(function (device) { - return device || dao.insertDevice(device_key).then(function (device) { - log.info('New device created', {device_key: device_key, id: device.id}); - return device; - }); + return device || dao.insertDevice(device_key) + .then(function (device) { + return dao.insertDeviceStatus(device.id, true, timestamp, null).then(_.constant(device)); + }) + .then(function (device) { + log.info('New device created', {device_key: device_key, id: device.id}); + return device; + }); }) .then(function (device) { + log.info('devicePropertyByDeviceIdAndKey', {device: device}); return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) { var ret = {device: device, property: property}; return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) { |