From 73d272ffe8954b3169901eda74428bad3d2740fe Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Mon, 19 Oct 2015 21:53:49 +0200 Subject: o Adding aggregation tables. o Adding migration scripts for the schema. --- README.md | 16 +++++++- database.json | 10 +++++ migrations/20151019162208-base.js | 30 +++++++++++++++ migrations/20151019162254-by-hour-and-minute.js | 30 +++++++++++++++ migrations/sqls/20151019162208-base-down.sql | 1 + migrations/sqls/20151019162208-base-up.sql | 45 ++++++++++++++++++++++ .../20151019162254-by-hour-and-minute-down.sql | 1 + .../sqls/20151019162254-by-hour-and-minute-up.sql | 19 +++++++++ src/Diller.js | 45 +++++++++++++++++++++- src/DillerDao.js | 31 ++++++++++++++- src/config.js | 2 +- 11 files changed, 223 insertions(+), 7 deletions(-) create mode 100644 database.json create mode 100644 migrations/20151019162208-base.js create mode 100644 migrations/20151019162254-by-hour-and-minute.js create mode 100644 migrations/sqls/20151019162208-base-down.sql create mode 100644 migrations/sqls/20151019162208-base-up.sql create mode 100644 migrations/sqls/20151019162254-by-hour-and-minute-down.sql create mode 100644 migrations/sqls/20151019162254-by-hour-and-minute-up.sql diff --git a/README.md b/README.md index a9a7f6a..092fba3 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,16 @@ -# MQTT Topics +# Getting started + + npm install + +Create a database user and database called 'diller' with the password 'diller' and run: + + ./node_modules/.bin/db-migrate up + +Run Diller: + + node diller.js | node_modules/.bin/bunyan + +# MQTT topics /diller / @@ -12,4 +24,4 @@ mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/value -m 12.3 mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/name -m Bathroom - mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/description -m "Second Floot" + mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/description -m "Second Floor" diff --git a/database.json b/database.json new file mode 100644 index 0000000..b37dd20 --- /dev/null +++ b/database.json @@ -0,0 +1,10 @@ +{ + "dev": { + "driver": "pg", + "host": "localhost", + "user": "diller", + "password": "diller", + "database": "diller" + }, + "sql-file": true +} \ No newline at end of file diff --git a/migrations/20151019162208-base.js b/migrations/20151019162208-base.js new file mode 100644 index 0000000..3a672b3 --- /dev/null +++ b/migrations/20151019162208-base.js @@ -0,0 +1,30 @@ +var dbm = global.dbm || require('db-migrate'); +var type = dbm.dataType; +var fs = require('fs'); +var path = require('path'); + +exports.up = function(db, callback) { + var filePath = path.join(__dirname + '/sqls/20151019162208-base-up.sql'); + fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){ + if (err) return callback(err); + console.log('received data: ' + data); + + db.runSql(data, function(err) { + if (err) return callback(err); + callback(); + }); + }); +}; + +exports.down = function(db, callback) { + var filePath = path.join(__dirname + '/sqls/20151019162208-base-down.sql'); + fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){ + if (err) return callback(err); + console.log('received data: ' + data); + + db.runSql(data, function(err) { + if (err) return callback(err); + callback(); + }); + }); +}; diff --git a/migrations/20151019162254-by-hour-and-minute.js b/migrations/20151019162254-by-hour-and-minute.js new file mode 100644 index 0000000..a9272a6 --- /dev/null +++ b/migrations/20151019162254-by-hour-and-minute.js @@ -0,0 +1,30 @@ +var dbm = global.dbm || require('db-migrate'); +var type = dbm.dataType; +var fs = require('fs'); +var path = require('path'); + +exports.up = function(db, callback) { + var filePath = path.join(__dirname + '/sqls/20151019162254-by-hour-and-minute-up.sql'); + fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){ + if (err) return callback(err); + console.log('received data: ' + data); + + db.runSql(data, function(err) { + if (err) return callback(err); + callback(); + }); + }); +}; + +exports.down = function(db, callback) { + var filePath = path.join(__dirname + '/sqls/20151019162254-by-hour-and-minute-down.sql'); + fs.readFile(filePath, {encoding: 'utf-8'}, function(err,data){ + if (err) return callback(err); + console.log('received data: ' + data); + + db.runSql(data, function(err) { + if (err) return callback(err); + callback(); + }); + }); +}; diff --git a/migrations/sqls/20151019162208-base-down.sql b/migrations/sqls/20151019162208-base-down.sql new file mode 100644 index 0000000..44f074e --- /dev/null +++ b/migrations/sqls/20151019162208-base-down.sql @@ -0,0 +1 @@ +/* Replace with your SQL commands */ \ No newline at end of file diff --git a/migrations/sqls/20151019162208-base-up.sql b/migrations/sqls/20151019162208-base-up.sql new file mode 100644 index 0000000..97b17a6 --- /dev/null +++ b/migrations/sqls/20151019162208-base-up.sql @@ -0,0 +1,45 @@ +DROP TABLE IF EXISTS value; +DROP TABLE IF EXISTS device_property; +DROP TABLE IF EXISTS device; +DROP TABLE IF EXISTS message; + +DROP SEQUENCE IF EXISTS id_seq; + +CREATE SEQUENCE id_seq; + +CREATE TABLE message ( + timestamp TIMESTAMPTZ NOT NULL, + topic VARCHAR(1000) NOT NULL, + message BYTEA NOT NULL +); + +CREATE TABLE device ( + id BIGINT NOT NULL DEFAULT nextval('id_seq'), + key VARCHAR(1000) NOT NULL, + created_timestamp TIMESTAMPTZ NOT NULL, + name VARCHAR(1000), + description VARCHAR(1000), + + PRIMARY KEY (id), + CONSTRAINT uq_device__key UNIQUE (key) +); + +CREATE TABLE device_property ( + id BIGINT NOT NULL DEFAULT nextval('id_seq'), + device BIGINT NOT NULL REFERENCES device, + key VARCHAR(1000) NOT NULL, + created_timestamp TIMESTAMPTZ NOT NULL, + name VARCHAR(1000), + description VARCHAR(1000), + last_value VARCHAR(1000), + + PRIMARY KEY (id), + CONSTRAINT uq_device_property__key_name UNIQUE (id, name) +); + +-- no constraints! +CREATE TABLE value ( + property BIGINT NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + value VARCHAR(1000) +); diff --git a/migrations/sqls/20151019162254-by-hour-and-minute-down.sql b/migrations/sqls/20151019162254-by-hour-and-minute-down.sql new file mode 100644 index 0000000..44f074e --- /dev/null +++ b/migrations/sqls/20151019162254-by-hour-and-minute-down.sql @@ -0,0 +1 @@ +/* Replace with your SQL commands */ \ No newline at end of file diff --git a/migrations/sqls/20151019162254-by-hour-and-minute-up.sql b/migrations/sqls/20151019162254-by-hour-and-minute-up.sql new file mode 100644 index 0000000..8e212f4 --- /dev/null +++ b/migrations/sqls/20151019162254-by-hour-and-minute-up.sql @@ -0,0 +1,19 @@ +DROP TABLE IF EXISTS value_by_hour; +CREATE TABLE value_by_hour ( + property BIGINT NOT NULL, + timestamp TIMESTAMP NOT NULL, + count NUMERIC NOT NULL, + max NUMERIC NOT NULL, + min NUMERIC NOT NULL, + avg NUMERIC NOT NULL +); + +DROP TABLE IF EXISTS value_by_minute; +CREATE TABLE value_by_minute ( + property BIGINT NOT NULL, + timestamp TIMESTAMP NOT NULL, + count NUMERIC NOT NULL, + max NUMERIC NOT NULL, + min NUMERIC NOT NULL, + avg NUMERIC NOT NULL +); diff --git a/src/Diller.js b/src/Diller.js index 9a1de23..eeaf726 100644 --- a/src/Diller.js +++ b/src/Diller.js @@ -1,5 +1,20 @@ var DillerDao = require('./DillerDao'); -var pgp = require('pg-promise')(); +var pgpOptions = { + //query: function (e) { + // console.log("Query:", e.query); + // if (e.ctx) { + // // this query is executing inside a task or transaction, + // if (e.ctx.isTX) { + // // this query is inside a transaction; + // } else { + // // this query is inside a task; + // } + // + // } + //} +}; + +var pgp = require('pg-promise')(pgpOptions); function Diller(config, log) { @@ -10,7 +25,12 @@ function Diller(config, log) { value: value }); - return dao.insertValue(property.id, value); + return dao.insertValue(property.id, value) + .then(function (res) { + log.info('typeof', typeof res.timestamp, res.timestamp, res.timestamp.getTime()); + updateAggregates(property.id, res.timestamp); + return res; + }); } function newName(dao, device, property, name) { @@ -25,6 +45,27 @@ function Diller(config, log) { return dao.updatePropertyDescription(property.id, description); } + function updateAggregates(propertyId, timestamp) { + log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp}); + return pgp(config.postgresqlConfig) + .tx(function (pg) { + var dao = new DillerDao(pg); + + return dao.updateMinuteAggregatesForProperty(propertyId, timestamp).then(function (minute) { + return dao.updateHourAggregatesForProperty(propertyId, timestamp).then(function (hour) { + return { + minute: minute, + hour: hour + }; + }); + }); + }).then(function (res) { + log.info('updateAggregates: ok', {propertyId: propertyId, aggregate: res}); + }, function (res) { + log.warn('updateAggregates: failed', {res: res, propertyId: propertyId}); + }); + } + function onMessage(topic, message, payload) { var parts = topic.split(/\//); diff --git a/src/DillerDao.js b/src/DillerDao.js index ed6fcf0..e0c69d3 100644 --- a/src/DillerDao.js +++ b/src/DillerDao.js @@ -28,7 +28,32 @@ function DillerDao(client) { } function insertValue(propertyId, value) { - return client.none('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2)', [propertyId, value]); + return client.one('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2) RETURNING timestamp', [propertyId, value]); + } + + function updateHourAggregatesForProperty(propertyId, timestamp) { + return client.none('DELETE FROM value_by_hour WHERE property=$1 AND timestamp=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ)', [propertyId, timestamp]) + .then(function() { + return client.one('INSERT INTO value_by_hour(property, timestamp, count, max, min, avg) ' + + 'SELECT property, DATE_TRUNC(\'hour\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' + + 'FROM value WHERE property=$1 AND DATE_TRUNC(\'hour\', timestamp)=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ) ' + + 'GROUP BY property, DATE_TRUNC(\'hour\', timestamp) ' + + 'ORDER BY 2, 1 ' + + 'RETURNING *;', [propertyId, timestamp]); + }); + } + + function updateMinuteAggregatesForProperty(propertyId, timestamp) { + return client.none('DELETE FROM value_by_minute WHERE property=$1 AND timestamp=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ)', [propertyId, timestamp]) + .then(function() { + return client.one('INSERT INTO value_by_minute(property, timestamp, count, max, min, avg) ' + + 'SELECT property, DATE_TRUNC(\'minute\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' + + 'FROM value ' + + 'WHERE property=$1 AND DATE_TRUNC(\'minute\', timestamp)=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ) ' + + 'GROUP BY property, DATE_TRUNC(\'minute\', timestamp) ' + + 'ORDER BY 2, 1 ' + + 'RETURNING *;', [propertyId, timestamp]); + }); } return { @@ -40,7 +65,9 @@ function DillerDao(client) { updatePropertyName: updatePropertyName, updatePropertyDescription: updatePropertyDescription, - insertValue: insertValue + insertValue: insertValue, + updateHourAggregatesForProperty: updateHourAggregatesForProperty, + updateMinuteAggregatesForProperty: updateMinuteAggregatesForProperty } } diff --git a/src/config.js b/src/config.js index a80fceb..a42ec87 100644 --- a/src/config.js +++ b/src/config.js @@ -3,7 +3,7 @@ function isProd() { } var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io'; -//var postgresqlUrl = process.env.DATABASE_URL || 'postgres://diller:diller@localhost:5432/diller'; + var postgresqlConfig = { host: process.env.DB_HOST || '/var/run/postgresql', database: process.env.DB_DATABASE || 'diller', -- cgit v1.2.3