aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-10-19 21:53:49 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-10-19 21:53:49 +0200
commit73d272ffe8954b3169901eda74428bad3d2740fe (patch)
tree1f1cca7f57809e05d0c1fea7363555a5521f8bcb
parent52eb8072664a61ea61dbdbef7485d6c81dbbcfe9 (diff)
downloaddiller-server-73d272ffe8954b3169901eda74428bad3d2740fe.tar.gz
diller-server-73d272ffe8954b3169901eda74428bad3d2740fe.tar.bz2
diller-server-73d272ffe8954b3169901eda74428bad3d2740fe.tar.xz
diller-server-73d272ffe8954b3169901eda74428bad3d2740fe.zip
o Adding aggregation tables.
o Adding migration scripts for the schema.
-rw-r--r--README.md16
-rw-r--r--database.json10
-rw-r--r--migrations/20151019162208-base.js30
-rw-r--r--migrations/20151019162254-by-hour-and-minute.js30
-rw-r--r--migrations/sqls/20151019162208-base-down.sql1
-rw-r--r--migrations/sqls/20151019162208-base-up.sql45
-rw-r--r--migrations/sqls/20151019162254-by-hour-and-minute-down.sql1
-rw-r--r--migrations/sqls/20151019162254-by-hour-and-minute-up.sql19
-rw-r--r--src/Diller.js45
-rw-r--r--src/DillerDao.js31
-rw-r--r--src/config.js2
11 files changed, 223 insertions, 7 deletions
diff --git a/README.md b/README.md
index a9a7f6a..092fba3 100644
--- a/README.md
+++ b/README.md
@@ -1,4 +1,16 @@
-# MQTT Topics
+# Getting started
+
+ npm install
+
+Create a database user and database called 'diller' with the password 'diller' and run:
+
+ ./node_modules/.bin/db-migrate up
+
+Run Diller:
+
+ node diller.js | node_modules/.bin/bunyan
+
+# MQTT topics
/diller
/<device id>
@@ -12,4 +24,4 @@
mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/value -m 12.3
mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/name -m Bathroom
- mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/description -m "Second Floot"
+ mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/description -m "Second Floor"
diff --git a/database.json b/database.json
new file mode 100644
index 0000000..b37dd20
--- /dev/null
+++ b/database.json
@@ -0,0 +1,10 @@
+{
+ "dev": {
+ "driver": "pg",
+ "host": "localhost",
+ "user": "diller",
+ "password": "diller",
+ "database": "diller"
+ },
+ "sql-file": true
+} \ No newline at end of file
diff --git a/migrations/20151019162208-base.js b/migrations/20151019162208-base.js
new file mode 100644
index 0000000..3a672b3
--- /dev/null
+++ b/migrations/20151019162208-base.js
@@ -0,0 +1,30 @@
+var dbm = global.dbm || require('db-migrate');
+var type = dbm.dataType;
+var fs = require('fs');
+var path = require('path');
+
+exports.up = function(db, callback) {
+ var filePath = path.join(__dirname + '/sqls/20151019162208-base-up.sql');
+ fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){
+ if (err) return callback(err);
+ console.log('received data: ' + data);
+
+ db.runSql(data, function(err) {
+ if (err) return callback(err);
+ callback();
+ });
+ });
+};
+
+exports.down = function(db, callback) {
+ var filePath = path.join(__dirname + '/sqls/20151019162208-base-down.sql');
+ fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){
+ if (err) return callback(err);
+ console.log('received data: ' + data);
+
+ db.runSql(data, function(err) {
+ if (err) return callback(err);
+ callback();
+ });
+ });
+};
diff --git a/migrations/20151019162254-by-hour-and-minute.js b/migrations/20151019162254-by-hour-and-minute.js
new file mode 100644
index 0000000..a9272a6
--- /dev/null
+++ b/migrations/20151019162254-by-hour-and-minute.js
@@ -0,0 +1,30 @@
+var dbm = global.dbm || require('db-migrate');
+var type = dbm.dataType;
+var fs = require('fs');
+var path = require('path');
+
+exports.up = function(db, callback) {
+ var filePath = path.join(__dirname + '/sqls/20151019162254-by-hour-and-minute-up.sql');
+ fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){
+ if (err) return callback(err);
+ console.log('received data: ' + data);
+
+ db.runSql(data, function(err) {
+ if (err) return callback(err);
+ callback();
+ });
+ });
+};
+
+exports.down = function(db, callback) {
+ var filePath = path.join(__dirname + '/sqls/20151019162254-by-hour-and-minute-down.sql');
+ fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){
+ if (err) return callback(err);
+ console.log('received data: ' + data);
+
+ db.runSql(data, function(err) {
+ if (err) return callback(err);
+ callback();
+ });
+ });
+};
diff --git a/migrations/sqls/20151019162208-base-down.sql b/migrations/sqls/20151019162208-base-down.sql
new file mode 100644
index 0000000..44f074e
--- /dev/null
+++ b/migrations/sqls/20151019162208-base-down.sql
@@ -0,0 +1 @@
+/* Replace with your SQL commands */ \ No newline at end of file
diff --git a/migrations/sqls/20151019162208-base-up.sql b/migrations/sqls/20151019162208-base-up.sql
new file mode 100644
index 0000000..97b17a6
--- /dev/null
+++ b/migrations/sqls/20151019162208-base-up.sql
@@ -0,0 +1,45 @@
+DROP TABLE IF EXISTS value;
+DROP TABLE IF EXISTS device_property;
+DROP TABLE IF EXISTS device;
+DROP TABLE IF EXISTS message;
+
+DROP SEQUENCE IF EXISTS id_seq;
+
+CREATE SEQUENCE id_seq;
+
+CREATE TABLE message (
+ timestamp TIMESTAMPTZ NOT NULL,
+ topic VARCHAR(1000) NOT NULL,
+ message BYTEA NOT NULL
+);
+
+CREATE TABLE device (
+ id BIGINT NOT NULL DEFAULT nextval('id_seq'),
+ key VARCHAR(1000) NOT NULL,
+ created_timestamp TIMESTAMPTZ NOT NULL,
+ name VARCHAR(1000),
+ description VARCHAR(1000),
+
+ PRIMARY KEY (id),
+ CONSTRAINT uq_device__key UNIQUE (key)
+);
+
+CREATE TABLE device_property (
+ id BIGINT NOT NULL DEFAULT nextval('id_seq'),
+ device BIGINT NOT NULL REFERENCES device,
+ key VARCHAR(1000) NOT NULL,
+ created_timestamp TIMESTAMPTZ NOT NULL,
+ name VARCHAR(1000),
+ description VARCHAR(1000),
+ last_value VARCHAR(1000),
+
+ PRIMARY KEY (id),
+ CONSTRAINT uq_device_property__key_name UNIQUE (id, name)
+);
+
+-- no constraints!
+CREATE TABLE value (
+ property BIGINT NOT NULL,
+ timestamp TIMESTAMPTZ NOT NULL,
+ value VARCHAR(1000)
+);
diff --git a/migrations/sqls/20151019162254-by-hour-and-minute-down.sql b/migrations/sqls/20151019162254-by-hour-and-minute-down.sql
new file mode 100644
index 0000000..44f074e
--- /dev/null
+++ b/migrations/sqls/20151019162254-by-hour-and-minute-down.sql
@@ -0,0 +1 @@
+/* Replace with your SQL commands */ \ No newline at end of file
diff --git a/migrations/sqls/20151019162254-by-hour-and-minute-up.sql b/migrations/sqls/20151019162254-by-hour-and-minute-up.sql
new file mode 100644
index 0000000..8e212f4
--- /dev/null
+++ b/migrations/sqls/20151019162254-by-hour-and-minute-up.sql
@@ -0,0 +1,19 @@
+DROP TABLE IF EXISTS value_by_hour;
+CREATE TABLE value_by_hour (
+ property BIGINT NOT NULL,
+ timestamp TIMESTAMP NOT NULL,
+ count NUMERIC NOT NULL,
+ max NUMERIC NOT NULL,
+ min NUMERIC NOT NULL,
+ avg NUMERIC NOT NULL
+);
+
+DROP TABLE IF EXISTS value_by_minute;
+CREATE TABLE value_by_minute (
+ property BIGINT NOT NULL,
+ timestamp TIMESTAMP NOT NULL,
+ count NUMERIC NOT NULL,
+ max NUMERIC NOT NULL,
+ min NUMERIC NOT NULL,
+ avg NUMERIC NOT NULL
+);
diff --git a/src/Diller.js b/src/Diller.js
index 9a1de23..eeaf726 100644
--- a/src/Diller.js
+++ b/src/Diller.js
@@ -1,5 +1,20 @@
var DillerDao = require('./DillerDao');
-var pgp = require('pg-promise')();
+var pgpOptions = {
+ //query: function (e) {
+ // console.log("Query:", e.query);
+ // if (e.ctx) {
+ // // this query is executing inside a task or transaction,
+ // if (e.ctx.isTX) {
+ // // this query is inside a transaction;
+ // } else {
+ // // this query is inside a task;
+ // }
+ //
+ // }
+ //}
+};
+
+var pgp = require('pg-promise')(pgpOptions);
function Diller(config, log) {
@@ -10,7 +25,12 @@ function Diller(config, log) {
value: value
});
- return dao.insertValue(property.id, value);
+ return dao.insertValue(property.id, value)
+ .then(function (res) {
+ log.info('typeof', typeof res.timestamp, res.timestamp, res.timestamp.getTime());
+ updateAggregates(property.id, res.timestamp);
+ return res;
+ });
}
function newName(dao, device, property, name) {
@@ -25,6 +45,27 @@ function Diller(config, log) {
return dao.updatePropertyDescription(property.id, description);
}
+ function updateAggregates(propertyId, timestamp) {
+ log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp});
+ return pgp(config.postgresqlConfig)
+ .tx(function (pg) {
+ var dao = new DillerDao(pg);
+
+ return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) {
+ return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) {
+ return {
+ minute: minute,
+ hour: hour
+ };
+ });
+ });
+ }).then(function (res) {
+ log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res});
+ }, function (res) {
+ log.warn('updateAggregates: failed', {res: res, propertyId: propertyId});
+ });
+ }
+
function onMessage(topic, message, payload) {
var parts = topic.split(/\//);
diff --git a/src/DillerDao.js b/src/DillerDao.js
index ed6fcf0..e0c69d3 100644
--- a/src/DillerDao.js
+++ b/src/DillerDao.js
@@ -28,7 +28,32 @@ function DillerDao(client) {
}
function insertValue(propertyId, value) {
- return client.none('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2)', [propertyId, value]);
+ return client.one('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2) RETURNING timestamp', [propertyId, value]);
+ }
+
+ function updateHourAggregatesForProperty(propertyId, timestamp) {
+ return client.none('DELETE FROM value_by_hour WHERE property=$1 AND timestamp=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
+ .then(function() {
+ return client.one('INSERT INTO value_by_hour(property, timestamp, count, max, min, avg) ' +
+ 'SELECT property, DATE_TRUNC(\'hour\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' +
+ 'FROM value WHERE property=$1 AND DATE_TRUNC(\'hour\', timestamp)=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ) ' +
+ 'GROUP BY property, DATE_TRUNC(\'hour\', timestamp) ' +
+ 'ORDER BY 2, 1 ' +
+ 'RETURNING *;', [propertyId, timestamp]);
+ });
+ }
+
+ function updateMinuteAggregatesForProperty(propertyId, timestamp) {
+ return client.none('DELETE FROM value_by_minute WHERE property=$1 AND timestamp=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
+ .then(function() {
+ return client.one('INSERT INTO value_by_minute(property, timestamp, count, max, min, avg) ' +
+ 'SELECT property, DATE_TRUNC(\'minute\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' +
+ 'FROM value ' +
+ 'WHERE property=$1 AND DATE_TRUNC(\'minute\', timestamp)=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ) ' +
+ 'GROUP BY property, DATE_TRUNC(\'minute\', timestamp) ' +
+ 'ORDER BY 2, 1 ' +
+ 'RETURNING *;', [propertyId, timestamp]);
+ });
}
return {
@@ -40,7 +65,9 @@ function DillerDao(client) {
updatePropertyName: updatePropertyName,
updatePropertyDescription: updatePropertyDescription,
- insertValue: insertValue
+ insertValue: insertValue,
+ updateHourAggregatesForProperty: updateHourAggregatesForProperty,
+ updateMinuteAggregatesForProperty: updateMinuteAggregatesForProperty
}
}
diff --git a/src/config.js b/src/config.js
index a80fceb..a42ec87 100644
--- a/src/config.js
+++ b/src/config.js
@@ -3,7 +3,7 @@ function isProd() {
}
var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io';
-//var postgresqlUrl = process.env.DATABASE_URL || 'postgres://diller:diller@localhost:5432/diller';
+
var postgresqlConfig = {
host: process.env.DB_HOST || '/var/run/postgresql',
database: process.env.DB_DATABASE || 'diller',