var DillerDao = require('./DillerDao'); var pgpOptions = { //query: function (e) { // console.log("Query:", e.query); // if (e.ctx) { // // this query is executing inside a task or transaction, // if (e.ctx.isTX) { // // this query is inside a transaction; // } else { // // this query is inside a task; // } // // } //} }; var pgp = require('pg-promise')(pgpOptions); function Diller(config, log) { function newValue(dao, device, property, value) { log.info('new value for device ' + device.key + '/' + property.key + ' = ' + value, { deviceId: device.id, propertyId: property.id, value: value }); return dao.insertValue(property.id, value) .then(function (res) { log.info('typeof', typeof res.timestamp, res.timestamp, res.timestamp.getTime()); updateAggregates(property.id, res.timestamp); return res; }); } function newName(dao, device, property, name) { log.info('New name for property ', device.key + '/' + property.key + '.name = ' + name); return dao.updatePropertyName(property.id, name); } function newDescription(dao, device, property, description) { log.info('New description for property ', device.key + '/' + property.key + '.description = ' + description); return dao.updatePropertyDescription(property.id, description); } function updateAggregates(propertyId, timestamp) { log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp}); return pgp(config.postgresqlConfig) .tx(function (pg) { var dao = new DillerDao(pg); return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) { return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) { return { minute: minute, hour: hour }; }); }); }).then(function (res) { log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res}); }, function (res) { log.warn('updateAggregates: failed', {res: res, propertyId: propertyId}); }); } function onMessage(topic, message, payload) { var parts = topic.split(/\//); log.info('Processing message to ' + topic); // /diller/5c:cf:7f:06:59:a5/sensors/temp-0/value if (parts[3] == 'sensors') { parts[3] = 'property'; } if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'property') { log.warn('no match: ', topic, parts); return; } var device_key = parts[2]; var property_key = parts[4]; var msg_type = parts[5]; var f; if (msg_type == 'value') { f = newValue } else if (msg_type == 'name') { f = newName } else if (msg_type == 'description') { f = newDescription } if (!f) { log.warn('Unknown message topic:', topic); return; } return pgp(config.postgresqlConfig) .tx(function (pg) { var dao = new DillerDao(pg); return dao.deviceByKey(device_key) .then(function (device) { return device || dao.insertDevice(device_key).then(function (device) { log.info('New device created', {device_key: device_key, id: device.id}); return device; }); }) .then(function (device) { return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) { var ret = {device: device, property: property}; return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) { log.info('Created new device property', {id: p.id, key: p.key}); ret.property = p; return ret; }); }); }) .then(function (data) { return f(dao, data.device, data.property, message.toString()); }); }).then(function (res) { log.warn('success', res); }, function (res) { log.warn('fail', res); }); } return { onMessage: onMessage } } //noinspection JSUnresolvedVariable module.exports = { Diller: Diller };