From 73d272ffe8954b3169901eda74428bad3d2740fe Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Mon, 19 Oct 2015 21:53:49 +0200 Subject: o Adding aggregation tables. o Adding migration scripts for the schema. --- src/Diller.js | 45 +++++++++++++++++++++++++++++++++++++++++++-- src/DillerDao.js | 31 +++++++++++++++++++++++++++++-- src/config.js | 2 +- 3 files changed, 73 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/Diller.js b/src/Diller.js index 9a1de23..eeaf726 100644 --- a/src/Diller.js +++ b/src/Diller.js @@ -1,5 +1,20 @@ var DillerDao = require('./DillerDao'); -var pgp = require('pg-promise')(); +var pgpOptions = { + //query: function (e) { + // console.log("Query:", e.query); + // if (e.ctx) { + // // this query is executing inside a task or transaction, + // if (e.ctx.isTX) { + // // this query is inside a transaction; + // } else { + // // this query is inside a task; + // } + // + // } + //} +}; + +var pgp = require('pg-promise')(pgpOptions); function Diller(config, log) { @@ -10,7 +25,12 @@ function Diller(config, log) { value: value }); - return dao.insertValue(property.id, value); + return dao.insertValue(property.id, value) + .then(function (res) { + log.info('typeof', typeof res.timestamp, res.timestamp, res.timestamp.getTime()); + updateAggregates(property.id, res.timestamp); + return res; + }); } function newName(dao, device, property, name) { @@ -25,6 +45,27 @@ function Diller(config, log) { return dao.updatePropertyDescription(property.id, description); } + function updateAggregates(propertyId, timestamp) { + log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp}); + return pgp(config.postgresqlConfig) + .tx(function (pg) { + var dao = new DillerDao(pg); + + return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) { + return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) { + return { + minute: minute, + hour: hour + }; + }); + }); + }).then(function (res) { + log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res}); + }, function (res) { + log.warn('updateAggregates: failed', {res: res, propertyId: propertyId}); + }); + } + function onMessage(topic, message, payload) { var parts = topic.split(/\//); diff --git a/src/DillerDao.js b/src/DillerDao.js index ed6fcf0..e0c69d3 100644 --- a/src/DillerDao.js +++ b/src/DillerDao.js @@ -28,7 +28,32 @@ function DillerDao(client) { } function insertValue(propertyId, value) { - return client.none('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2)', [propertyId, value]); + return client.one('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2) RETURNING timestamp', [propertyId, value]); + } + + function updateHourAggregatesForProperty(propertyId, timestamp) { + return client.none('DELETE FROM value_by_hour WHERE property=$1 AND timestamp=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ)', [propertyId, timestamp]) + .then(function() { + return client.one('INSERT INTO value_by_hour(property, timestamp, count, max, min, avg) ' + + 'SELECT property, DATE_TRUNC(\'hour\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' + + 'FROM value WHERE property=$1 AND DATE_TRUNC(\'hour\', timestamp)=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ) ' + + 'GROUP BY property, DATE_TRUNC(\'hour\', timestamp) ' + + 'ORDER BY 2, 1 ' + + 'RETURNING *;', [propertyId, timestamp]); + }); + } + + function updateMinuteAggregatesForProperty(propertyId, timestamp) { + return client.none('DELETE FROM value_by_minute WHERE property=$1 AND timestamp=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ)', [propertyId, timestamp]) + .then(function() { + return client.one('INSERT INTO value_by_minute(property, timestamp, count, max, min, avg) ' + + 'SELECT property, DATE_TRUNC(\'minute\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' + + 'FROM value ' + + 'WHERE property=$1 AND DATE_TRUNC(\'minute\', timestamp)=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ) ' + + 'GROUP BY property, DATE_TRUNC(\'minute\', timestamp) ' + + 'ORDER BY 2, 1 ' + + 'RETURNING *;', [propertyId, timestamp]); + }); } return { @@ -40,7 +65,9 @@ function DillerDao(client) { updatePropertyName: updatePropertyName, updatePropertyDescription: updatePropertyDescription, - insertValue: insertValue + insertValue: insertValue, + updateHourAggregatesForProperty: updateHourAggregatesForProperty, + updateMinuteAggregatesForProperty: updateMinuteAggregatesForProperty } } diff --git a/src/config.js b/src/config.js index a80fceb..a42ec87 100644 --- a/src/config.js +++ b/src/config.js @@ -3,7 +3,7 @@ function isProd() { } var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io'; -//var postgresqlUrl = process.env.DATABASE_URL || 'postgres://diller:diller@localhost:5432/diller'; + var postgresqlConfig = { host: process.env.DB_HOST || '/var/run/postgresql', database: process.env.DB_DATABASE || 'diller', -- cgit v1.2.3