From 803148d5a23afe207fb5de9ac73c986a324feb9c Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 31 Oct 2015 14:09:39 +0100 Subject: core: o Improved transaction handling. web: o Supporting changing a device's name and description. --- src/Diller.js | 109 +++++++++++++++++++------------------------------ src/DillerConfig.js | 1 + src/DillerDao.js | 30 +++++++------- src/DillerDb.js | 29 ------------- src/DillerTx.js | 63 ++++++++++++++++++++++++++++ src/mqtt/DillerMqtt.js | 10 +++-- src/web/DillerWeb.js | 56 +++++++++++++------------ 7 files changed, 156 insertions(+), 142 deletions(-) delete mode 100644 src/DillerDb.js create mode 100644 src/DillerTx.js (limited to 'src') diff --git a/src/Diller.js b/src/Diller.js index d52e3f1..edb501d 100644 --- a/src/Diller.js +++ b/src/Diller.js @@ -1,12 +1,6 @@ -var di = require('di'); - -/** - * @param config DillerConfig - * @param db DillerDb - * @returns {{onMessage: onMessage, updateDeviceName: updateDeviceName}} - * @constructor - */ -function Diller(config, db) { +var _ = require('lodash'); + +function Diller(config, pg, dao) { var log = config.log(); function newValue(dao, device, property, value) { @@ -38,39 +32,29 @@ function Diller(config, db) { function updateAggregates(propertyId, timestamp) { log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp}); - return db() - .tx(function (pg) { - var dao = new DillerDao(pg); - - return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) { - return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) { - return { - minute: minute, - hour: hour - }; - }); - }); - }).then(function (res) { - log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res}); - }, function (res) { - log.warn('updateAggregates: failed', {res: res, propertyId: propertyId}); + return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) { + return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) { + return { + minute: minute, + hour: hour + }; }); + }).then(function (res) { + log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res}); + }, function (res) { + log.warn('updateAggregates: failed', {res: res, propertyId: propertyId}); + }); } - function updateDeviceName(deviceId, name) { - log.info('Updating device name', {deviceId: deviceId, name: name}); - return db() - .tx(function (tx) { - var dao = new DillerDao(tx); + function updateDeviceAttributes(deviceId, attributes) { + var x = _.clone(attributes); + x.deviceId = deviceId; + log.info('Updating device attributes', x); - return dao.updateDevice(deviceId, {name: name}) - .then(function (res) { - log.info('Device name updated', {deviceId: deviceId, name: name}); - return res; - }); - }) + return dao.updateDevice(deviceId, attributes); } + //noinspection JSUnusedLocalSymbols function onMessage(topic, message, payload) { var parts = topic.split(/\//); @@ -105,32 +89,28 @@ function Diller(config, db) { return; } - return db() - .tx(function (pg) { - var dao = new DillerDao(pg); - - return dao.deviceByKey(device_key) - .then(function (device) { - return device || dao.insertDevice(device_key).then(function (device) { - log.info('New device created', {device_key: device_key, id: device.id}); - return device; - }); - }) - .then(function (device) { - return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) { - var ret = {device: device, property: property}; - return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) { - log.info('Created new device property', {id: p.id, key: p.key}); - ret.property = p; - return ret; - }); - }); - }) - .then(function (data) { - // log.info('data.device', data.device, 'data.property', data.property); - return f(dao, data.device, data.property, message.toString()); + return dao.deviceByKey(device_key) + .then(function (device) { + return device || dao.insertDevice(device_key).then(function (device) { + log.info('New device created', {device_key: device_key, id: device.id}); + return device; }); - }).then(function (res) { + }) + .then(function (device) { + return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) { + var ret = {device: device, property: property}; + return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) { + log.info('Created new device property', {id: p.id, key: p.key}); + ret.property = p; + return ret; + }); + }); + }) + .then(function (data) { + // log.info('data.device', data.device, 'data.property', data.property); + return f(dao, data.device, data.property, message.toString()); + }) + .then(function (res) { log.warn('success', res); }, function (res) { log.warn('fail', res); @@ -139,13 +119,8 @@ function Diller(config, db) { return { onMessage: onMessage, - updateDeviceName: updateDeviceName + updateDeviceAttributes: updateDeviceAttributes } } -var DillerConfig = require('./DillerConfig'); -var DillerDao = require('./DillerDao'); -var DillerDb = require('./DillerDb'); -di.annotate(Diller, new di.Inject(DillerConfig, DillerDb)); - module.exports = Diller; diff --git a/src/DillerConfig.js b/src/DillerConfig.js index 81ff046..440aadb 100644 --- a/src/DillerConfig.js +++ b/src/DillerConfig.js @@ -62,6 +62,7 @@ function DillerConfig() { postgresqlConfig: postgresqlConfig, httpPort: httpPort, configureLogging: configureLogging, + logQueries: false, log: function () { return log; } diff --git a/src/DillerDao.js b/src/DillerDao.js index 2133dfc..a47f181 100644 --- a/src/DillerDao.js +++ b/src/DillerDao.js @@ -2,7 +2,7 @@ var _ = require('lodash'); function DillerDao(tx) { - var deviceColumns = 'id, key, created_timestamp'; + var deviceColumns = 'id, created_timestamp, key, name, description'; var propertyColumns = 'id, device, key, created_timestamp'; var valueColumns = 'property, timestamp, value_text, value_numeric'; @@ -27,27 +27,25 @@ function DillerDao(tx) { } function updateDevice(id, attributes) { - var values = [id]; var i = 2; - var fields = _.map(attributes, function (value, name) { - console.log('name', name, 'value', value, 'i', i); - if (name == 'name') { - values.push(value); - return 'name = $' + i++; - } - }); + var fields = _(attributes).chain() + .map(function (value, attribute) { + if (attribute == 'name' || attribute == 'description') { + values.push(value); + return attribute + ' = $' + i++; + } + }) + .collect() + .join(', ') + .value(); if (fields.length == 0) { - return; // TODO: return an empty promise; + return Promise.resolve({}); } - fields = _.collect(fields); - - var x = 'UPDATE device SET ' + fields.join(', ') + ' WHERE id = $1'; - console.log('x', x); - console.log('values', values); - return tx.none(x, values); + var sql = 'UPDATE device SET ' + fields + ' WHERE id = $1'; + return tx.none(sql, values); } // ------------------------------------------------------------------------------------------------------------------- diff --git a/src/DillerDb.js b/src/DillerDb.js deleted file mode 100644 index c31dde7..0000000 --- a/src/DillerDb.js +++ /dev/null @@ -1,29 +0,0 @@ -var di = require('di'); -var DillerConfig = require('./DillerConfig'); - -var pgpOptions = { - //query: function (e) { - // console.log("Query:", e.query); - // if (e.ctx) { - // // this query is executing inside a task or transaction, - // if (e.ctx.isTX) { - // // this query is inside a transaction; - // } else { - // // this query is inside a task; - // } - // - // } - //} -}; - -var pgp = require('pg-promise')(pgpOptions); - -function DillerDb(config) { - - return function () { - return pgp(config.postgresqlConfig) - } -} -di.annotate(DillerDb, new di.Inject(DillerConfig)); - -module.exports = DillerDb; diff --git a/src/DillerTx.js b/src/DillerTx.js new file mode 100644 index 0000000..85daee5 --- /dev/null +++ b/src/DillerTx.js @@ -0,0 +1,63 @@ +var di = require('di'); +var _ = require('lodash'); +var DillerConfig = require('./DillerConfig'); +var DillerDao = require('./DillerDao'); +var Diller = require('./Diller'); + +var pgpConstructor = require('pg-promise'); + +var pgp; + +/** + * @param config DillerConfig + * @returns {Function} + * @constructor + */ +function DillerTx(config) { + var log = config.log(); + + function queryLogger(e) { + log.info("Query:", e.query); + if (e.ctx) { + // this query is executing inside a task or transaction, + if (e.ctx.isTX) { + // this query is inside a transaction; + } else { + // this query is inside a task; + } + + } + } + + function connectLogger(client) { + log.info('PGP: connect'); + } + + function disconnectLogger(client) { + log.info('PGP: disconnect'); + } + + if (!pgp) { + var pgpOptions = {}; + + pgpOptions.query = (config.logQueries && queryLogger) || undefined; + //pgpOptions.connect = (config.logQueries && connectLogger) || undefined; + //pgpOptions.disconnect = (config.logQueries && disconnectLogger) || undefined; + + pgp = pgpConstructor(pgpOptions); + } + + return function (action) { + var con = pgp(config.postgresqlConfig); + + return con.tx(function (pg) { + var dao = new DillerDao(con); + var diller = new Diller(config, con, dao); + return action(pg, dao, diller) + }); + }; +} + +di.annotate(DillerTx, new di.Inject(DillerConfig)); + +module.exports = DillerTx; diff --git a/src/mqtt/DillerMqtt.js b/src/mqtt/DillerMqtt.js index 3fe43dc..e991b40 100644 --- a/src/mqtt/DillerMqtt.js +++ b/src/mqtt/DillerMqtt.js @@ -1,7 +1,7 @@ var di = require('di'); var mqtt = require('mqtt'); -function DillerMqtt(config, diller) { +function DillerMqtt(config, tx) { var log = config.log(); function run() { @@ -22,7 +22,9 @@ function DillerMqtt(config, diller) { }); mqttClient.on('message', function (topic, message, payload) { - diller.onMessage(topic, message, payload); + tx(function (pg, dao, diller) { + return diller.onMessage(topic, message, payload); + }); }); } @@ -30,9 +32,9 @@ function DillerMqtt(config, diller) { run: run }; } -var Diller = require('../Diller'); +var DillerTx = require('../DillerTx'); var DillerConfig = require('../DillerConfig'); -di.annotate(DillerMqtt, new di.Inject(DillerConfig, Diller)); +di.annotate(DillerMqtt, new di.Inject(DillerConfig, DillerTx)); module.exports = DillerMqtt; diff --git a/src/web/DillerWeb.js b/src/web/DillerWeb.js index 95bcbb6..bcb15d4 100644 --- a/src/web/DillerWeb.js +++ b/src/web/DillerWeb.js @@ -4,11 +4,15 @@ var _ = require('lodash'); var di = require('di'); var DillerConfig = require('../DillerConfig'); -var Diller = require('../Diller'); -var DillerDb = require('../DillerDb'); -var DillerDao = require('../DillerDao'); - -function DillerWeb(diller, db, config) { +var DillerTx = require('../DillerTx'); + +/** + * @param tx DillerTx + * @param config DillerConfig + * @returns {{init: init, listen: listen, generateRpc: generateRpc}} + * @constructor + */ +function DillerWeb(tx, config) { var log = config.log(); var calls = []; @@ -21,27 +25,25 @@ function DillerWeb(diller, db, config) { } } + function deviceResponse(data) { + var device = data[0]; + device.properties = data[1]; + return {device: device}; + } + function getDevices(req, res) { - db().tx(function (pg) { - var dao = new DillerDao(pg); + tx(function (pg, dao) { return dao.devices(); }).then(function (devices) { res.json({devices: devices}); }, genericErrorHandler(res)); } - function deviceResponse(data) { - var device = data[0]; - device.properties = data[1]; - return {device: device}; - } - function getDevice(req, res) { - db().tx(function (tx) { + tx(function (pg, dao) { var deviceId = req.params.deviceId; - var dao = new DillerDao(tx); - return tx.batch([ + return pg.batch([ dao.deviceById(deviceId), dao.devicePropertiesByDeviceId(deviceId)] ); @@ -51,16 +53,20 @@ function DillerWeb(diller, db, config) { } function patchDevice(req, res) { - db().tx(function (tx) { + tx(function (pg, dao, diller) { var deviceId = req.params.deviceId; var body = req.body; - if (body.attribute == 'name') { - diller.updateDeviceName(deviceId, body.value) + if (!body.attribute) { + res.status(400).json({message: 'Required keys: "attribute" and "value".'}); + } else if (body.attribute == 'name' || body.attribute == 'description') { + var attributes = {}; + attributes[body.attribute] = body.value; + + return diller.updateDeviceAttributes(deviceId, attributes) .then(function () { - var dao = new DillerDao(tx); - return tx.batch([ + return pg.batch([ dao.deviceById(deviceId), dao.devicePropertiesByDeviceId(deviceId)] ); @@ -69,7 +75,7 @@ function DillerWeb(diller, db, config) { res.json(deviceResponse(data)); }, genericErrorHandler(res)); } else { - res.status(400).json({message: 'Required keys: "attribute" and "value".'}); + res.status(400).json({message: 'Unsupported attribute: ' + body.attribute}); } }).then(function (data) { var device = data[0]; @@ -79,10 +85,8 @@ function DillerWeb(diller, db, config) { } function getValues(req, res) { - db().tx(function (tx) { + tx(function (tx, dao) { var propertyId = req.params.propertyId; - - var dao = new DillerDao(tx); return dao.valuesByPropertyId(propertyId, 10); }).then(function (values) { res.json({values: values}); @@ -221,6 +225,6 @@ function DillerWeb(diller, db, config) { } } -di.annotate(DillerWeb, new di.Inject(Diller, DillerDb, DillerConfig)); +di.annotate(DillerWeb, new di.Inject(DillerTx, DillerConfig)); module.exports = DillerWeb; -- cgit v1.2.3