From 73d272ffe8954b3169901eda74428bad3d2740fe Mon Sep 17 00:00:00 2001
From: Trygve Laugstøl <trygvis@inamo.no>
Date: Mon, 19 Oct 2015 21:53:49 +0200
Subject: o Adding aggregation tables. o Adding migration scripts for the
 schema.

---
 src/Diller.js    | 45 +++++++++++++++++++++++++++++++++++++++++++--
 src/DillerDao.js | 31 +++++++++++++++++++++++++++++--
 src/config.js    |  2 +-
 3 files changed, 73 insertions(+), 5 deletions(-)

(limited to 'src')

diff --git a/src/Diller.js b/src/Diller.js
index 9a1de23..eeaf726 100644
--- a/src/Diller.js
+++ b/src/Diller.js
@@ -1,5 +1,20 @@
 var DillerDao = require('./DillerDao');
-var pgp = require('pg-promise')();
+var pgpOptions = {
+  //query: function (e) {
+  //  console.log("Query:", e.query);
+  //  if (e.ctx) {
+  //    // this query is executing inside a task or transaction,
+  //    if (e.ctx.isTX) {
+  //      // this query is inside a transaction;
+  //    } else {
+  //      // this query is inside a task;
+  //    }
+  //
+  //  }
+  //}
+};
+
+var pgp = require('pg-promise')(pgpOptions);
 
 function Diller(config, log) {
 
@@ -10,7 +25,12 @@ function Diller(config, log) {
       value: value
     });
 
-    return dao.insertValue(property.id, value);
+    return dao.insertValue(property.id, value)
+      .then(function (res) {
+        log.info('typeof', typeof res.timestamp, res.timestamp, res.timestamp.getTime());
+        updateAggregates(property.id, res.timestamp);
+        return res;
+      });
   }
 
   function newName(dao, device, property, name) {
@@ -25,6 +45,27 @@ function Diller(config, log) {
     return dao.updatePropertyDescription(property.id, description);
   }
 
+  function updateAggregates(propertyId, timestamp) {
+    log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp});
+    return pgp(config.postgresqlConfig)
+      .tx(function (pg) {
+        var dao = new DillerDao(pg);
+
+        return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) {
+          return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) {
+            return {
+              minute: minute,
+              hour: hour
+            };
+          });
+        });
+      }).then(function (res) {
+        log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res});
+      }, function (res) {
+        log.warn('updateAggregates: failed', {res: res, propertyId: propertyId});
+      });
+  }
+
   function onMessage(topic, message, payload) {
     var parts = topic.split(/\//);
 
diff --git a/src/DillerDao.js b/src/DillerDao.js
index ed6fcf0..e0c69d3 100644
--- a/src/DillerDao.js
+++ b/src/DillerDao.js
@@ -28,7 +28,32 @@ function DillerDao(client) {
   }
 
   function insertValue(propertyId, value) {
-    return client.none('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2)', [propertyId, value]);
+    return client.one('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2) RETURNING timestamp', [propertyId, value]);
+  }
+
+  function updateHourAggregatesForProperty(propertyId, timestamp) {
+    return client.none('DELETE FROM value_by_hour WHERE property=$1 AND timestamp=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
+      .then(function() {
+        return client.one('INSERT INTO value_by_hour(property, timestamp, count, max, min, avg) ' +
+          'SELECT property, DATE_TRUNC(\'hour\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' +
+          'FROM value WHERE property=$1 AND DATE_TRUNC(\'hour\', timestamp)=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ) ' +
+          'GROUP BY property, DATE_TRUNC(\'hour\', timestamp) ' +
+          'ORDER BY 2, 1 ' +
+          'RETURNING *;', [propertyId, timestamp]);
+      });
+  }
+
+  function updateMinuteAggregatesForProperty(propertyId, timestamp) {
+    return client.none('DELETE FROM value_by_minute WHERE property=$1 AND timestamp=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
+      .then(function() {
+        return client.one('INSERT INTO value_by_minute(property, timestamp, count, max, min, avg) ' +
+          'SELECT property, DATE_TRUNC(\'minute\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' +
+          'FROM value ' +
+          'WHERE property=$1 AND DATE_TRUNC(\'minute\', timestamp)=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ) ' +
+          'GROUP BY property, DATE_TRUNC(\'minute\', timestamp) ' +
+          'ORDER BY 2, 1 ' +
+          'RETURNING *;', [propertyId, timestamp]);
+      });
   }
 
   return {
@@ -40,7 +65,9 @@ function DillerDao(client) {
     updatePropertyName: updatePropertyName,
     updatePropertyDescription: updatePropertyDescription,
 
-    insertValue: insertValue
+    insertValue: insertValue,
+    updateHourAggregatesForProperty: updateHourAggregatesForProperty,
+    updateMinuteAggregatesForProperty: updateMinuteAggregatesForProperty
   }
 }
 
diff --git a/src/config.js b/src/config.js
index a80fceb..a42ec87 100644
--- a/src/config.js
+++ b/src/config.js
@@ -3,7 +3,7 @@ function isProd() {
 }
 
 var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io';
-//var postgresqlUrl = process.env.DATABASE_URL || 'postgres://diller:diller@localhost:5432/diller';
+
 var postgresqlConfig = {
   host: process.env.DB_HOST || '/var/run/postgresql',
   database: process.env.DB_DATABASE || 'diller',
-- 
cgit v1.2.3