var _ = require('lodash'); /** * @param {DillerConfig} config * @param {PgTx} pg * @param {DillerDao} dao * @constructor */ function Diller(config, pg, dao) { var log = config.log(); function newValue(dao, device, property, timestamp, value) { log.info('new value for device ' + device.key + '/' + property.key + ' = ' + value, { deviceId: device.id, propertyId: property.id, value: value }); return dao.insertValue(property.id, timestamp, value) .then(function (res) { // ignore the result updateAggregates(property.id, res.timestamp); return res; }); } function newName(dao, device, property, timestamp, name) { log.info('New name for property ', device.key + '/' + property.key + '.name = ' + name); return dao.updateProperty(property.id, {name: name}); } function newDescription(dao, device, property, timestamp, description) { log.info('New description for property ', device.key + '/' + property.key + '.description = ' + description); return dao.updateProperty(property.id, {description: description}); } function updateAggregates(propertyId, timestamp) { log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp}); return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) { return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) { return { minute: minute, hour: hour }; }); }).then(function (res) { log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res}); }, function (res) { log.warn('updateAggregates: failed', {res: res, propertyId: propertyId}); }); } /** * @param deviceId int * @param attributes object */ function updateDeviceAttributes(deviceId, attributes) { var x = _.clone(attributes); x.deviceId = deviceId; log.info('Updating device attributes', x); return dao.updateDevice(deviceId, attributes); } function updatePropertyAttributes(propertyId, attributes) { var x = _.clone(attributes); x.propertyId = propertyId; log.info('Updating property attributes', x); return dao.updateProperty(propertyId, attributes); } //noinspection JSUnusedLocalSymbols function onMessage(topic, message, payload) { var timestamp = new Date(); var parts = topic.split(/\//); log.info('Processing message to ' + topic); // /diller/5c:cf:7f:06:59:a5/property/temp-0/value if (parts[3] == 'sensors' || parts[3] == 'sensor') { parts[3] = 'property'; } try { if ((parts.length == 4 || parts.length == 5) && parts[0] == '$SYS' && parts[1] == 'broker' && parts[2] == 'log') { onLogMessage(timestamp, parts, topic, message); } else if (parts.length == 6 && parts[1] == 'diller' && parts[3] == 'property') { onDillerMessage(timestamp, parts, topic, message); } else { log.warn('no match', {topic: topic, parts: parts, 'l': parts.length}); } } catch (e) { log.warn('Exception while processing message', {topic: topic, message: message, exception: e, payload: payload}); } } function onLogMessage(timestamp, parts, topic, message) { message = message.toString(); //log.info('broker message: ' + parts[3] + '/' + parts[4] + ': ' + message); var p; var msg = message.match(/^([0-9]+): New client connected from ([^ ]+) as ([^ ]+) \(c([0-9]), k([0-9]+)\)/); if (msg) { p = newClientConnected(msg); } msg = message.match(/^([0-9]+): Client ([^ ]+) disconnected/); if (msg) { p = clientDisconnected(msg); } msg = message.match(/^([0-9]+): Socket error on client ([^ ]+), disconnecting./); if (msg) { p = clientDisconnected(msg); } p && p.then(function (res) { if (res && res.id) { log.info('Status updated'); } }, function (e) { log.info('Status update failed', {exception: e}); }); } function newClientConnected(msg) { var ts = new Date(parseInt(msg[1]) * 1000); var host = msg[2]; var key = msg[3].replace(/^esp8266-/, ''); var c = parseInt(msg[4]); var k = parseInt(msg[5]); return dao.deviceByKey(key).then(function (device) { log.debug('New client connected', {key: key, device: device && device.id, host: host, c: c, k: k}); if (device && device.id) { return dao.updateDeviceStatus(device.id, ts, true, host) .then(_.constant(device)); } else { return Promise.resolve('Unknown device: ' + key); } }); } function clientDisconnected(msg) { log.debug('clientDisconnected', {msg: msg}); var ts = new Date(parseInt(msg[1]) * 1000); var key = msg[2].replace(/^esp8266-/, ''); return dao.deviceByKey(key).then(function (device) { if (device && device.id) { return dao.updateDeviceStatus(device.id, ts, false) .then(_.constant(device)); } else { return Promise.resolve('Unknown device: ' + key); } }); } function onDillerMessage(timestamp, parts, topic, message) { var device_key = parts[2]; var property_key = parts[4]; var msg_type = parts[5]; var f; if (msg_type == 'value') { f = newValue } else if (msg_type == 'name') { f = newName } else if (msg_type == 'description') { f = newDescription } if (!f) { log.warn('Unknown topic:', topic); return; } return dao.deviceByKey(device_key) .then(function (device) { return device || dao.insertDevice(device_key) .then(function (device) { return dao.insertDeviceStatus(device.id, true, timestamp, null).then(_.constant(device)); }) .then(function (device) { log.info('New device created', {device_key: device_key, id: device.id}); return device; }); }) .then(function (device) { log.info('devicePropertyByDeviceIdAndKey', {device: device}); return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) { var ret = {device: device, property: property}; return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) { log.info('Created new device property', {id: p.id, key: p.key}); ret.property = p; return ret; }); }); }) .then(function (data) { // log.info('data.device', data.device, 'data.property', data.property); return f(dao, data.device, data.property, timestamp, message.toString()); }) .then(function (res) { log.warn('success', res); }, function (res) { log.warn('fail', res); }); } /** @lends Diller.prototype */ return { onMessage: onMessage, updateDeviceAttributes: updateDeviceAttributes, updatePropertyAttributes: updatePropertyAttributes }; } module.exports = Diller;