aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/Diller.js109
-rw-r--r--src/DillerConfig.js1
-rw-r--r--src/DillerDao.js30
-rw-r--r--src/DillerDb.js29
-rw-r--r--src/DillerTx.js63
-rw-r--r--src/mqtt/DillerMqtt.js10
-rw-r--r--src/web/DillerWeb.js56
7 files changed, 156 insertions, 142 deletions
diff --git a/src/Diller.js b/src/Diller.js
index d52e3f1..edb501d 100644
--- a/src/Diller.js
+++ b/src/Diller.js
@@ -1,12 +1,6 @@
-var di = require('di');
-
-/**
- * @param config DillerConfig
- * @param db DillerDb
- * @returns {{onMessage: onMessage, updateDeviceName: updateDeviceName}}
- * @constructor
- */
-function Diller(config, db) {
+var _ = require('lodash');
+
+function Diller(config, pg, dao) {
var log = config.log();
function newValue(dao, device, property, value) {
@@ -38,39 +32,29 @@ function Diller(config, db) {
function updateAggregates(propertyId, timestamp) {
log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp});
- return db()
- .tx(function (pg) {
- var dao = new DillerDao(pg);
-
- return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) {
- return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) {
- return {
- minute: minute,
- hour: hour
- };
- });
- });
- }).then(function (res) {
- log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res});
- }, function (res) {
- log.warn('updateAggregates: failed', {res: res, propertyId: propertyId});
+ return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) {
+ return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) {
+ return {
+ minute: minute,
+ hour: hour
+ };
});
+ }).then(function (res) {
+ log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res});
+ }, function (res) {
+ log.warn('updateAggregates: failed', {res: res, propertyId: propertyId});
+ });
}
- function updateDeviceName(deviceId, name) {
- log.info('Updating device name', {deviceId: deviceId, name: name});
- return db()
- .tx(function (tx) {
- var dao = new DillerDao(tx);
+ function updateDeviceAttributes(deviceId, attributes) {
+ var x = _.clone(attributes);
+ x.deviceId = deviceId;
+ log.info('Updating device attributes', x);
- return dao.updateDevice(deviceId, {name: name})
- .then(function (res) {
- log.info('Device name updated', {deviceId: deviceId, name: name});
- return res;
- });
- })
+ return dao.updateDevice(deviceId, attributes);
}
+ //noinspection JSUnusedLocalSymbols
function onMessage(topic, message, payload) {
var parts = topic.split(/\//);
@@ -105,32 +89,28 @@ function Diller(config, db) {
return;
}
- return db()
- .tx(function (pg) {
- var dao = new DillerDao(pg);
-
- return dao.deviceByKey(device_key)
- .then(function (device) {
- return device || dao.insertDevice(device_key).then(function (device) {
- log.info('New device created', {device_key: device_key, id: device.id});
- return device;
- });
- })
- .then(function (device) {
- return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) {
- var ret = {device: device, property: property};
- return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) {
- log.info('Created new device property', {id: p.id, key: p.key});
- ret.property = p;
- return ret;
- });
- });
- })
- .then(function (data) {
- // log.info('data.device', data.device, 'data.property', data.property);
- return f(dao, data.device, data.property, message.toString());
+ return dao.deviceByKey(device_key)
+ .then(function (device) {
+ return device || dao.insertDevice(device_key).then(function (device) {
+ log.info('New device created', {device_key: device_key, id: device.id});
+ return device;
});
- }).then(function (res) {
+ })
+ .then(function (device) {
+ return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) {
+ var ret = {device: device, property: property};
+ return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) {
+ log.info('Created new device property', {id: p.id, key: p.key});
+ ret.property = p;
+ return ret;
+ });
+ });
+ })
+ .then(function (data) {
+ // log.info('data.device', data.device, 'data.property', data.property);
+ return f(dao, data.device, data.property, message.toString());
+ })
+ .then(function (res) {
log.warn('success', res);
}, function (res) {
log.warn('fail', res);
@@ -139,13 +119,8 @@ function Diller(config, db) {
return {
onMessage: onMessage,
- updateDeviceName: updateDeviceName
+ updateDeviceAttributes: updateDeviceAttributes
}
}
-var DillerConfig = require('./DillerConfig');
-var DillerDao = require('./DillerDao');
-var DillerDb = require('./DillerDb');
-di.annotate(Diller, new di.Inject(DillerConfig, DillerDb));
-
module.exports = Diller;
diff --git a/src/DillerConfig.js b/src/DillerConfig.js
index 81ff046..440aadb 100644
--- a/src/DillerConfig.js
+++ b/src/DillerConfig.js
@@ -62,6 +62,7 @@ function DillerConfig() {
postgresqlConfig: postgresqlConfig,
httpPort: httpPort,
configureLogging: configureLogging,
+ logQueries: false,
log: function () {
return log;
}
diff --git a/src/DillerDao.js b/src/DillerDao.js
index 2133dfc..a47f181 100644
--- a/src/DillerDao.js
+++ b/src/DillerDao.js
@@ -2,7 +2,7 @@ var _ = require('lodash');
function DillerDao(tx) {
- var deviceColumns = 'id, key, created_timestamp';
+ var deviceColumns = 'id, created_timestamp, key, name, description';
var propertyColumns = 'id, device, key, created_timestamp';
var valueColumns = 'property, timestamp, value_text, value_numeric';
@@ -27,27 +27,25 @@ function DillerDao(tx) {
}
function updateDevice(id, attributes) {
-
var values = [id];
var i = 2;
- var fields = _.map(attributes, function (value, name) {
- console.log('name', name, 'value', value, 'i', i);
- if (name == 'name') {
- values.push(value);
- return 'name = $' + i++;
- }
- });
+ var fields = _(attributes).chain()
+ .map(function (value, attribute) {
+ if (attribute == 'name' || attribute == 'description') {
+ values.push(value);
+ return attribute + ' = $' + i++;
+ }
+ })
+ .collect()
+ .join(', ')
+ .value();
if (fields.length == 0) {
- return; // TODO: return an empty promise;
+ return Promise.resolve({});
}
- fields = _.collect(fields);
-
- var x = 'UPDATE device SET ' + fields.join(', ') + ' WHERE id = $1';
- console.log('x', x);
- console.log('values', values);
- return tx.none(x, values);
+ var sql = 'UPDATE device SET ' + fields + ' WHERE id = $1';
+ return tx.none(sql, values);
}
// -------------------------------------------------------------------------------------------------------------------
diff --git a/src/DillerDb.js b/src/DillerDb.js
deleted file mode 100644
index c31dde7..0000000
--- a/src/DillerDb.js
+++ /dev/null
@@ -1,29 +0,0 @@
-var di = require('di');
-var DillerConfig = require('./DillerConfig');
-
-var pgpOptions = {
- //query: function (e) {
- // console.log("Query:", e.query);
- // if (e.ctx) {
- // // this query is executing inside a task or transaction,
- // if (e.ctx.isTX) {
- // // this query is inside a transaction;
- // } else {
- // // this query is inside a task;
- // }
- //
- // }
- //}
-};
-
-var pgp = require('pg-promise')(pgpOptions);
-
-function DillerDb(config) {
-
- return function () {
- return pgp(config.postgresqlConfig)
- }
-}
-di.annotate(DillerDb, new di.Inject(DillerConfig));
-
-module.exports = DillerDb;
diff --git a/src/DillerTx.js b/src/DillerTx.js
new file mode 100644
index 0000000..85daee5
--- /dev/null
+++ b/src/DillerTx.js
@@ -0,0 +1,63 @@
+var di = require('di');
+var _ = require('lodash');
+var DillerConfig = require('./DillerConfig');
+var DillerDao = require('./DillerDao');
+var Diller = require('./Diller');
+
+var pgpConstructor = require('pg-promise');
+
+var pgp;
+
+/**
+ * @param config DillerConfig
+ * @returns {Function}
+ * @constructor
+ */
+function DillerTx(config) {
+ var log = config.log();
+
+ function queryLogger(e) {
+ log.info("Query:", e.query);
+ if (e.ctx) {
+ // this query is executing inside a task or transaction,
+ if (e.ctx.isTX) {
+ // this query is inside a transaction;
+ } else {
+ // this query is inside a task;
+ }
+
+ }
+ }
+
+ function connectLogger(client) {
+ log.info('PGP: connect');
+ }
+
+ function disconnectLogger(client) {
+ log.info('PGP: disconnect');
+ }
+
+ if (!pgp) {
+ var pgpOptions = {};
+
+ pgpOptions.query = (config.logQueries && queryLogger) || undefined;
+ //pgpOptions.connect = (config.logQueries && connectLogger) || undefined;
+ //pgpOptions.disconnect = (config.logQueries && disconnectLogger) || undefined;
+
+ pgp = pgpConstructor(pgpOptions);
+ }
+
+ return function (action) {
+ var con = pgp(config.postgresqlConfig);
+
+ return con.tx(function (pg) {
+ var dao = new DillerDao(con);
+ var diller = new Diller(config, con, dao);
+ return action(pg, dao, diller)
+ });
+ };
+}
+
+di.annotate(DillerTx, new di.Inject(DillerConfig));
+
+module.exports = DillerTx;
diff --git a/src/mqtt/DillerMqtt.js b/src/mqtt/DillerMqtt.js
index 3fe43dc..e991b40 100644
--- a/src/mqtt/DillerMqtt.js
+++ b/src/mqtt/DillerMqtt.js
@@ -1,7 +1,7 @@
var di = require('di');
var mqtt = require('mqtt');
-function DillerMqtt(config, diller) {
+function DillerMqtt(config, tx) {
var log = config.log();
function run() {
@@ -22,7 +22,9 @@ function DillerMqtt(config, diller) {
});
mqttClient.on('message', function (topic, message, payload) {
- diller.onMessage(topic, message, payload);
+ tx(function (pg, dao, diller) {
+ return diller.onMessage(topic, message, payload);
+ });
});
}
@@ -30,9 +32,9 @@ function DillerMqtt(config, diller) {
run: run
};
}
-var Diller = require('../Diller');
+var DillerTx = require('../DillerTx');
var DillerConfig = require('../DillerConfig');
-di.annotate(DillerMqtt, new di.Inject(DillerConfig, Diller));
+di.annotate(DillerMqtt, new di.Inject(DillerConfig, DillerTx));
module.exports = DillerMqtt;
diff --git a/src/web/DillerWeb.js b/src/web/DillerWeb.js
index 95bcbb6..bcb15d4 100644
--- a/src/web/DillerWeb.js
+++ b/src/web/DillerWeb.js
@@ -4,11 +4,15 @@ var _ = require('lodash');
var di = require('di');
var DillerConfig = require('../DillerConfig');
-var Diller = require('../Diller');
-var DillerDb = require('../DillerDb');
-var DillerDao = require('../DillerDao');
-
-function DillerWeb(diller, db, config) {
+var DillerTx = require('../DillerTx');
+
+/**
+ * @param tx DillerTx
+ * @param config DillerConfig
+ * @returns {{init: init, listen: listen, generateRpc: generateRpc}}
+ * @constructor
+ */
+function DillerWeb(tx, config) {
var log = config.log();
var calls = [];
@@ -21,27 +25,25 @@ function DillerWeb(diller, db, config) {
}
}
+ function deviceResponse(data) {
+ var device = data[0];
+ device.properties = data[1];
+ return {device: device};
+ }
+
function getDevices(req, res) {
- db().tx(function (pg) {
- var dao = new DillerDao(pg);
+ tx(function (pg, dao) {
return dao.devices();
}).then(function (devices) {
res.json({devices: devices});
}, genericErrorHandler(res));
}
- function deviceResponse(data) {
- var device = data[0];
- device.properties = data[1];
- return {device: device};
- }
-
function getDevice(req, res) {
- db().tx(function (tx) {
+ tx(function (pg, dao) {
var deviceId = req.params.deviceId;
- var dao = new DillerDao(tx);
- return tx.batch([
+ return pg.batch([
dao.deviceById(deviceId),
dao.devicePropertiesByDeviceId(deviceId)]
);
@@ -51,16 +53,20 @@ function DillerWeb(diller, db, config) {
}
function patchDevice(req, res) {
- db().tx(function (tx) {
+ tx(function (pg, dao, diller) {
var deviceId = req.params.deviceId;
var body = req.body;
- if (body.attribute == 'name') {
- diller.updateDeviceName(deviceId, body.value)
+ if (!body.attribute) {
+ res.status(400).json({message: 'Required keys: "attribute" and "value".'});
+ } else if (body.attribute == 'name' || body.attribute == 'description') {
+ var attributes = {};
+ attributes[body.attribute] = body.value;
+
+ return diller.updateDeviceAttributes(deviceId, attributes)
.then(function () {
- var dao = new DillerDao(tx);
- return tx.batch([
+ return pg.batch([
dao.deviceById(deviceId),
dao.devicePropertiesByDeviceId(deviceId)]
);
@@ -69,7 +75,7 @@ function DillerWeb(diller, db, config) {
res.json(deviceResponse(data));
}, genericErrorHandler(res));
} else {
- res.status(400).json({message: 'Required keys: "attribute" and "value".'});
+ res.status(400).json({message: 'Unsupported attribute: ' + body.attribute});
}
}).then(function (data) {
var device = data[0];
@@ -79,10 +85,8 @@ function DillerWeb(diller, db, config) {
}
function getValues(req, res) {
- db().tx(function (tx) {
+ tx(function (tx, dao) {
var propertyId = req.params.propertyId;
-
- var dao = new DillerDao(tx);
return dao.valuesByPropertyId(propertyId, 10);
}).then(function (values) {
res.json({values: values});
@@ -221,6 +225,6 @@ function DillerWeb(diller, db, config) {
}
}
-di.annotate(DillerWeb, new di.Inject(Diller, DillerDb, DillerConfig));
+di.annotate(DillerWeb, new di.Inject(DillerTx, DillerConfig));
module.exports = DillerWeb;