var _ = require('lodash'); /** * @param {DillerConfig} config * @param {PgTx} pg * @param {DillerDao} dao * @constructor */ function Diller(config, pg, dao) { var log = config.log(); function newValue(dao, device, property, timestamp, value) { log.info('new value for device ' + device.key + '/' + property.key + ' = ' + value, { deviceId: device.id, propertyId: property.id, value: value }); return dao.insertValue(property.id, timestamp, value) .then(function (res) { // ignore the result updateAggregates(property.id, res.timestamp); return res; }); } function newName(dao, device, property, timestamp, name) { log.info('New name for property ', device.key + '/' + property.key + '.name = ' + name); return dao.updateProperty(property.id, {name: name}); } function newDescription(dao, device, property, timestamp, description) { log.info('New description for property ', device.key + '/' + property.key + '.description = ' + description); return dao.updateProperty(property.id, {description: description}); } function updateAggregates(propertyId, timestamp) { log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp}); return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) { return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) { return { minute: minute, hour: hour }; }); }).then(function (res) { log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res}); }, function (res) { log.warn('updateAggregates: failed', {res: res, propertyId: propertyId}); }); } /** * @param deviceId int * @param attributes object */ function updateDeviceAttributes(deviceId, attributes) { var x = _.clone(attributes); x.deviceId = deviceId; log.info('Updating device attributes', x); return dao.updateDevice(deviceId, attributes); } function updatePropertyAttributes(propertyId, attributes) { var x = _.clone(attributes); x.propertyId = propertyId; log.info('Updating property attributes', x); return dao.updateProperty(propertyId, attributes); } //noinspection JSUnusedLocalSymbols function onMessage(topic, message, payload) { var timestamp = new Date(); var parts = topic.split(/\//); log.info('Processing message to ' + topic); // /diller/5c:cf:7f:06:59:a5/property/temp-0/value if (parts[3] == 'sensors' || parts[3] == 'sensor') { parts[3] = 'property'; } if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'property') { log.warn('no match: ', topic, parts); return; } var device_key = parts[2]; var property_key = parts[4]; var msg_type = parts[5]; var f; if (msg_type == 'value') { f = newValue } else if (msg_type == 'name') { f = newName } else if (msg_type == 'description') { f = newDescription } if (!f) { log.warn('Unknown topic:', topic); return; } return dao.deviceByKey(device_key) .then(function (device) { return device || dao.insertDevice(device_key).then(function (device) { log.info('New device created', {device_key: device_key, id: device.id}); return device; }); }) .then(function (device) { return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) { var ret = {device: device, property: property}; return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) { log.info('Created new device property', {id: p.id, key: p.key}); ret.property = p; return ret; }); }); }) .then(function (data) { // log.info('data.device', data.device, 'data.property', data.property); return f(dao, data.device, data.property, timestamp, message.toString()); }) .then(function (res) { log.warn('success', res); }, function (res) { log.warn('fail', res); }); } /** @lends Diller.prototype */ return { onMessage: onMessage, updateDeviceAttributes: updateDeviceAttributes, updatePropertyAttributes: updatePropertyAttributes }; } module.exports = Diller;