aboutsummaryrefslogtreecommitdiff
path: root/src/Diller.js
diff options
context:
space:
mode:
Diffstat (limited to 'src/Diller.js')
-rw-r--r--src/Diller.js92
1 files changed, 85 insertions, 7 deletions
diff --git a/src/Diller.js b/src/Diller.js
index 230af7a..e4c6b9f 100644
--- a/src/Diller.js
+++ b/src/Diller.js
@@ -86,11 +86,84 @@ function Diller(config, pg, dao) {
parts[3] = 'property';
}
- if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'property') {
- log.warn('no match: ', topic, parts);
- return;
+ try {
+ if ((parts.length == 4 || parts.length == 5) && parts[0] == '$SYS' && parts[1] == 'broker' && parts[2] == 'log') {
+ onLogMessage(timestamp, parts, topic, message);
+ } else if (parts.length == 6 && parts[1] == 'diller' && parts[3] == 'property') {
+ onDillerMessage(timestamp, parts, topic, message);
+ } else {
+ log.warn('no match', {topic: topic, parts: parts, 'l': parts.length});
+ }
+ } catch (e) {
+ log.warn('Exception while processing message', {topic: topic, message: message, exception: e, payload: payload});
+ }
+ }
+
+ function onLogMessage(timestamp, parts, topic, message) {
+ message = message.toString();
+ //log.info('broker message: ' + parts[3] + '/' + parts[4] + ': ' + message);
+
+ var p;
+ var msg = message.match(/^([0-9]+): New client connected from ([^ ]+) as ([^ ]+) \(c([0-9]), k([0-9]+)\)/);
+ if (msg) {
+ p = newClientConnected(msg);
}
+ msg = message.match(/^([0-9]+): Client ([^ ]+) disconnected/);
+ if (msg) {
+ p = clientDisconnected(msg);
+ }
+
+ msg = message.match(/^([0-9]+): Socket error on client ([^ ]+), disconnecting./);
+ if (msg) {
+ p = clientDisconnected(msg);
+ }
+
+ p && p.then(function (res) {
+ if (res && res.id) {
+ log.info('Status updated');
+ }
+ }, function (e) {
+ log.info('Status update failed', {exception: e});
+ });
+ }
+
+ function newClientConnected(msg) {
+ var ts = new Date(parseInt(msg[1]) * 1000);
+ var host = msg[2];
+ var key = msg[3].replace(/^esp8266-/, '');
+
+ var c = parseInt(msg[4]);
+ var k = parseInt(msg[5]);
+
+ return dao.deviceByKey(key).then(function (device) {
+ log.debug('New client connected', {key: key, device: device && device.id, host: host, c: c, k: k});
+
+ if (device && device.id) {
+ return dao.updateDeviceStatus(device.id, ts, true, host)
+ .then(_.constant(device));
+ } else {
+ return Promise.resolve('Unknown device: ' + key);
+ }
+ });
+ }
+
+ function clientDisconnected(msg) {
+ log.debug('clientDisconnected', {msg: msg});
+ var ts = new Date(parseInt(msg[1]) * 1000);
+ var key = msg[2].replace(/^esp8266-/, '');
+
+ return dao.deviceByKey(key).then(function (device) {
+ if (device && device.id) {
+ return dao.updateDeviceStatus(device.id, ts, false)
+ .then(_.constant(device));
+ } else {
+ return Promise.resolve('Unknown device: ' + key);
+ }
+ });
+ }
+
+ function onDillerMessage(timestamp, parts, topic, message) {
var device_key = parts[2];
var property_key = parts[4];
var msg_type = parts[5];
@@ -111,12 +184,17 @@ function Diller(config, pg, dao) {
return dao.deviceByKey(device_key)
.then(function (device) {
- return device || dao.insertDevice(device_key).then(function (device) {
- log.info('New device created', {device_key: device_key, id: device.id});
- return device;
- });
+ return device || dao.insertDevice(device_key)
+ .then(function (device) {
+ return dao.insertDeviceStatus(device.id, true, timestamp, null).then(_.constant(device));
+ })
+ .then(function (device) {
+ log.info('New device created', {device_key: device_key, id: device.id});
+ return device;
+ });
})
.then(function (device) {
+ log.info('devicePropertyByDeviceIdAndKey', {device: device});
return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) {
var ret = {device: device, property: property};
return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) {