var di = require('di'); function Diller(config, db) { var log = config.log(); function newValue(dao, device, property, value) { log.info('new value for device ' + device.key + '/' + property.key + ' = ' + value, { deviceId: device.id, propertyId: property.id, value: value }); return dao.insertValue(property.id, value) .then(function (res) { // ignore the result updateAggregates(property.id, res.timestamp); return res; }); } function newName(dao, device, property, name) { log.info('New name for property ', device.key + '/' + property.key + '.name = ' + name); return dao.updatePropertyName(property.id, name); } function newDescription(dao, device, property, description) { log.info('New description for property ', device.key + '/' + property.key + '.description = ' + description); return dao.updatePropertyDescription(property.id, description); } function updateAggregates(propertyId, timestamp) { log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp}); return db() .tx(function (pg) { var dao = new DillerDao(pg); return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) { return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) { return { minute: minute, hour: hour }; }); }); }).then(function (res) { log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res}); }, function (res) { log.warn('updateAggregates: failed', {res: res, propertyId: propertyId}); }); } function onMessage(topic, message, payload) { var parts = topic.split(/\//); log.info('Processing message to ' + topic); // /diller/5c:cf:7f:06:59:a5/sensors/temp-0/value if (parts[3] == 'sensors') { parts[3] = 'property'; } if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'property') { log.warn('no match: ', topic, parts); return; } var device_key = parts[2]; var property_key = parts[4]; var msg_type = parts[5]; var f; if (msg_type == 'value') { f = newValue } else if (msg_type == 'name') { f = newName } else if (msg_type == 'description') { f = newDescription } if (!f) { log.warn('Unknown topic:', topic); return; } return db() .tx(function (pg) { var dao = new DillerDao(pg); return dao.deviceByKey(device_key) .then(function (device) { return device || dao.insertDevice(device_key).then(function (device) { log.info('New device created', {device_key: device_key, id: device.id}); return device; }); }) .then(function (device) { return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) { var ret = {device: device, property: property}; return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) { log.info('Created new device property', {id: p.id, key: p.key}); ret.property = p; return ret; }); }); }) .then(function (data) { // log.info('data.device', data.device, 'data.property', data.property); return f(dao, data.device, data.property, message.toString()); }); }).then(function (res) { log.warn('success', res); }, function (res) { log.warn('fail', res); }); } return { onMessage: onMessage } } var DillerConfig = require('./DillerConfig'); var DillerDao = require('./DillerDao'); var DillerDb = require('./DillerDb'); di.annotate(Diller, new di.Inject(DillerConfig, DillerDb)); module.exports = Diller;