aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-10-20 23:18:16 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-10-20 23:18:16 +0200
commit0266bdd60cb9cccf20a5ded3eba72ea833bee72d (patch)
treed727bad80aeaef673f48bbbc171fb4e9297b72fc /src
parent73d272ffe8954b3169901eda74428bad3d2740fe (diff)
downloaddiller-server-0266bdd60cb9cccf20a5ded3eba72ea833bee72d.tar.gz
diller-server-0266bdd60cb9cccf20a5ded3eba72ea833bee72d.tar.bz2
diller-server-0266bdd60cb9cccf20a5ded3eba72ea833bee72d.tar.xz
diller-server-0266bdd60cb9cccf20a5ded3eba72ea833bee72d.zip
o Adding a webapp.
o Using di.js as dependency injection framework.
Diffstat (limited to 'src')
-rw-r--r--src/Diller.js37
-rw-r--r--src/DillerConfig.js67
-rw-r--r--src/DillerDao.js63
-rw-r--r--src/DillerDb.js29
-rw-r--r--src/config.js18
-rw-r--r--src/mqtt/DillerMqtt.js38
-rw-r--r--src/web/DillerWeb.js158
7 files changed, 355 insertions, 55 deletions
diff --git a/src/Diller.js b/src/Diller.js
index eeaf726..a057054 100644
--- a/src/Diller.js
+++ b/src/Diller.js
@@ -1,22 +1,7 @@
-var DillerDao = require('./DillerDao');
-var pgpOptions = {
- //query: function (e) {
- // console.log("Query:", e.query);
- // if (e.ctx) {
- // // this query is executing inside a task or transaction,
- // if (e.ctx.isTX) {
- // // this query is inside a transaction;
- // } else {
- // // this query is inside a task;
- // }
- //
- // }
- //}
-};
-
-var pgp = require('pg-promise')(pgpOptions);
-
-function Diller(config, log) {
+var di = require('di');
+
+function Diller(config, db) {
+ var log = config.log();
function newValue(dao, device, property, value) {
log.info('new value for device ' + device.key + '/' + property.key + ' = ' + value, {
@@ -47,7 +32,7 @@ function Diller(config, log) {
function updateAggregates(propertyId, timestamp) {
log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp});
- return pgp(config.postgresqlConfig)
+ return db()
.tx(function (pg) {
var dao = new DillerDao(pg);
@@ -100,7 +85,7 @@ function Diller(config, log) {
return;
}
- return pgp(config.postgresqlConfig)
+ return db()
.tx(function (pg) {
var dao = new DillerDao(pg);
@@ -136,7 +121,9 @@ function Diller(config, log) {
}
}
-//noinspection JSUnresolvedVariable
-module.exports = {
- Diller: Diller
-};
+var DillerConfig = require('./DillerConfig');
+var DillerDao = require('./DillerDao');
+var DillerDb = require('./DillerDb');
+di.annotate(Diller, new di.Inject(DillerConfig, DillerDb));
+
+module.exports = Diller;
diff --git a/src/DillerConfig.js b/src/DillerConfig.js
new file mode 100644
index 0000000..84afe42
--- /dev/null
+++ b/src/DillerConfig.js
@@ -0,0 +1,67 @@
+var fs = require('fs');
+var bunyan = require('bunyan');
+
+function isProd() {
+ return process.env.NODE_ENV == 'prod';
+}
+
+var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io';
+
+var postgresqlConfig = {
+ host: process.env.DB_HOST || '/var/run/postgresql',
+ database: process.env.DB_DATABASE || 'diller',
+ user: process.env.DB_USER || 'diller',
+ password: process.env.DB_PASSWORD || 'diller'
+};
+
+var log;
+
+function configureLogging(app) {
+ if (log) {
+ return;
+ }
+
+ var cfg = {
+ name: app
+ };
+
+ if (isProd()) {
+ cfg.streams = [
+ {
+ level: 'warn',
+ stream: process.stdout
+ },
+ {
+ level: 'debug',
+ path: 'log/diller-' + app + '.log'
+ }
+ ];
+
+ var stat;
+ try {
+ stat = fs.lstatSync('log');
+ } catch (e) {
+ // Assume this to to be ENOENT
+ fs.mkdirSync('log');
+ }
+ if (stat && !stat.isDirectory()) {
+ throw 'Not a directory: log';
+ }
+ }
+
+ log = bunyan.createLogger(cfg);
+}
+
+function DillerConfig() {
+ return {
+ isProd: isProd,
+ mqttUrl: mqttUrl,
+ postgresqlConfig: postgresqlConfig,
+ configureLogging: configureLogging,
+ log: function () {
+ return log;
+ }
+ };
+}
+
+module.exports = DillerConfig;
diff --git a/src/DillerDao.js b/src/DillerDao.js
index e0c69d3..c4d0e67 100644
--- a/src/DillerDao.js
+++ b/src/DillerDao.js
@@ -1,40 +1,74 @@
-function DillerDao(client) {
+function DillerDao(tx) {
var deviceColumns = 'id, key, created_timestamp';
var propertyColumns = 'id, device, key, created_timestamp';
+ var valueColumns = 'property, timestamp, value';
+
+ // -------------------------------------------------------------------------------------------------------------------
+ // Device
+ // -------------------------------------------------------------------------------------------------------------------
+
+ function devices() {
+ return tx.many("SELECT " + deviceColumns + " FROM device");
+ }
+
+ function deviceById(id) {
+ return tx.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE id=$1", id);
+ }
function deviceByKey(key) {
- return client.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key);
+ return tx.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key);
}
function insertDevice(key) {
- return client.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key);
+ return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key);
+ }
+
+ // -------------------------------------------------------------------------------------------------------------------
+ // Device Property
+ // -------------------------------------------------------------------------------------------------------------------
+
+ function devicePropertyById(id) {
+ return tx.one('SELECT ' + propertyColumns + ' FROM device_property WHERE id=$1', [id]);
}
function devicePropertyByDeviceIdAndKey(deviceId, key) {
- return client.oneOrNone('SELECT id FROM device_property WHERE device=$1 AND key=$2', [deviceId, key]);
+ return tx.oneOrNone('SELECT id FROM device_property WHERE device=$1 AND key=$2', [deviceId, key]);
+ }
+
+ function devicePropertiesByDeviceId(deviceId) {
+ return tx.many('SELECT ' + propertyColumns + ' FROM device_property WHERE device=$1', [deviceId]);
}
function insertDeviceProperty(deviceId, key) {
- return client.oneOrNone('INSERT INTO device_property(id, device, key, created_timestamp) VALUES(DEFAULT, $1, $2, CURRENT_TIMESTAMP) RETURNING ' + propertyColumns, [deviceId, key]);
+ return tx.oneOrNone('INSERT INTO device_property(id, device, key, created_timestamp) VALUES(DEFAULT, $1, $2, CURRENT_TIMESTAMP) RETURNING ' + propertyColumns, [deviceId, key]);
}
function updatePropertyName(id, name) {
- return client.none('UPDATE device_property SET name=$1 WHERE id=$2', name, id);
+ return tx.none('UPDATE device_property SET name=$1 WHERE id=$2', name, id);
}
function updatePropertyDescription(id, description) {
- return client.none('UPDATE device_property SET description=$1 WHERE id=$2', description, id);
+ return tx.none('UPDATE device_property SET description=$1 WHERE id=$2', description, id);
+ }
+
+ // -------------------------------------------------------------------------------------------------------------------
+ // Value
+ // -------------------------------------------------------------------------------------------------------------------
+
+ function valuesByPropertyId(propertyId, limit) {
+ limit = limit || 10;
+ return tx.many('SELECT timestamp, value FROM value WHERE property=$1 LIMIT $2', [propertyId, limit]);
}
function insertValue(propertyId, value) {
- return client.one('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2) RETURNING timestamp', [propertyId, value]);
+ return tx.one('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2) RETURNING timestamp', [propertyId, value]);
}
function updateHourAggregatesForProperty(propertyId, timestamp) {
- return client.none('DELETE FROM value_by_hour WHERE property=$1 AND timestamp=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
+ return tx.none('DELETE FROM value_by_hour WHERE property=$1 AND timestamp=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
.then(function() {
- return client.one('INSERT INTO value_by_hour(property, timestamp, count, max, min, avg) ' +
+ return tx.one('INSERT INTO value_by_hour(property, timestamp, count, max, min, avg) ' +
'SELECT property, DATE_TRUNC(\'hour\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' +
'FROM value WHERE property=$1 AND DATE_TRUNC(\'hour\', timestamp)=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ) ' +
'GROUP BY property, DATE_TRUNC(\'hour\', timestamp) ' +
@@ -44,9 +78,9 @@ function DillerDao(client) {
}
function updateMinuteAggregatesForProperty(propertyId, timestamp) {
- return client.none('DELETE FROM value_by_minute WHERE property=$1 AND timestamp=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
+ return tx.none('DELETE FROM value_by_minute WHERE property=$1 AND timestamp=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
.then(function() {
- return client.one('INSERT INTO value_by_minute(property, timestamp, count, max, min, avg) ' +
+ return tx.one('INSERT INTO value_by_minute(property, timestamp, count, max, min, avg) ' +
'SELECT property, DATE_TRUNC(\'minute\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' +
'FROM value ' +
'WHERE property=$1 AND DATE_TRUNC(\'minute\', timestamp)=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ) ' +
@@ -57,14 +91,19 @@ function DillerDao(client) {
}
return {
+ devices: devices,
+ deviceById: deviceById,
deviceByKey: deviceByKey,
insertDevice: insertDevice,
+ devicePropertyById: devicePropertyById,
devicePropertyByDeviceIdAndKey: devicePropertyByDeviceIdAndKey,
+ devicePropertiesByDeviceId: devicePropertiesByDeviceId,
insertDeviceProperty: insertDeviceProperty,
updatePropertyName: updatePropertyName,
updatePropertyDescription: updatePropertyDescription,
+ valuesByPropertyId: valuesByPropertyId,
insertValue: insertValue,
updateHourAggregatesForProperty: updateHourAggregatesForProperty,
updateMinuteAggregatesForProperty: updateMinuteAggregatesForProperty
diff --git a/src/DillerDb.js b/src/DillerDb.js
new file mode 100644
index 0000000..c31dde7
--- /dev/null
+++ b/src/DillerDb.js
@@ -0,0 +1,29 @@
+var di = require('di');
+var DillerConfig = require('./DillerConfig');
+
+var pgpOptions = {
+ //query: function (e) {
+ // console.log("Query:", e.query);
+ // if (e.ctx) {
+ // // this query is executing inside a task or transaction,
+ // if (e.ctx.isTX) {
+ // // this query is inside a transaction;
+ // } else {
+ // // this query is inside a task;
+ // }
+ //
+ // }
+ //}
+};
+
+var pgp = require('pg-promise')(pgpOptions);
+
+function DillerDb(config) {
+
+ return function () {
+ return pgp(config.postgresqlConfig)
+ }
+}
+di.annotate(DillerDb, new di.Inject(DillerConfig));
+
+module.exports = DillerDb;
diff --git a/src/config.js b/src/config.js
deleted file mode 100644
index a42ec87..0000000
--- a/src/config.js
+++ /dev/null
@@ -1,18 +0,0 @@
-function isProd() {
- return process.env.NODE_ENV == 'prod';
-}
-
-var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io';
-
-var postgresqlConfig = {
- host: process.env.DB_HOST || '/var/run/postgresql',
- database: process.env.DB_DATABASE || 'diller',
- user: process.env.DB_USER || 'diller',
- password: process.env.DB_PASSWORD || 'diller'
-};
-
-module.exports = {
- isProd: isProd,
- mqttUrl: mqttUrl,
- postgresqlConfig: postgresqlConfig
-};
diff --git a/src/mqtt/DillerMqtt.js b/src/mqtt/DillerMqtt.js
new file mode 100644
index 0000000..3fe43dc
--- /dev/null
+++ b/src/mqtt/DillerMqtt.js
@@ -0,0 +1,38 @@
+var di = require('di');
+var mqtt = require('mqtt');
+
+function DillerMqtt(config, diller) {
+ var log = config.log();
+
+ function run() {
+ log.info('Connecting to ' + config.mqttUrl);
+ var mqttClient = mqtt.connect(config.mqttUrl);
+
+ mqttClient.on('offline', function () {
+ log.info('offline');
+ });
+
+ mqttClient.on('error', function (error) {
+ log.info('error', {error: error});
+ });
+
+ mqttClient.on('connect', function () {
+ log.info('Connected');
+ mqttClient.subscribe('/diller/#');
+ });
+
+ mqttClient.on('message', function (topic, message, payload) {
+ diller.onMessage(topic, message, payload);
+ });
+ }
+
+ return {
+ run: run
+ };
+}
+var Diller = require('../Diller');
+var DillerConfig = require('../DillerConfig');
+
+di.annotate(DillerMqtt, new di.Inject(DillerConfig, Diller));
+
+module.exports = DillerMqtt;
diff --git a/src/web/DillerWeb.js b/src/web/DillerWeb.js
new file mode 100644
index 0000000..40fbc3d
--- /dev/null
+++ b/src/web/DillerWeb.js
@@ -0,0 +1,158 @@
+var express = require('express');
+var bodyParser = require('body-parser');
+var _ = require('lodash');
+var di = require('di');
+
+var DillerConfig = require('../DillerConfig');
+var Diller = require('../Diller');
+var DillerDb = require('../DillerDb');
+var DillerDao = require('../DillerDao');
+
+function DillerWeb(diller, db, config) {
+ var log = config.log();
+
+ var calls = [];
+ var app;
+
+ function getDevices(req, res) {
+ db().tx(function (pg) {
+ var dao = new DillerDao(pg);
+ return dao.devices();
+ }).then(function (devices) {
+ res.json({devices: devices});
+ }, function (err) {
+ log.warn('fail', err);
+ res.status(500).json({message: 'fail'});
+ });
+ }
+
+ function getDevice(req, res) {
+ db().tx(function (tx) {
+ var deviceId = req.params.deviceId;
+
+ var dao = new DillerDao(tx);
+ return tx.batch([
+ dao.deviceById(deviceId),
+ dao.devicePropertiesByDeviceId(deviceId)]
+ );
+ }).then(function (data) {
+ var device = data[0];
+ device.properties = data[1];
+ res.json({device: device});
+ }, function (err) {
+ log.warn('fail', err);
+ res.status(500).json({message: 'fail'});
+ });
+ }
+
+ function getValues(req, res) {
+ db().tx(function (tx) {
+ var propertyId = req.params.propertyId;
+
+ var dao = new DillerDao(tx);
+ return dao.valuesByPropertyId(propertyId, 10);
+ }).then(function (values) {
+ res.json({values: values});
+ }, function (err) {
+ log.warn('fail', err);
+ res.status(500).json({message: 'fail'});
+ });
+ }
+
+ function init() {
+ app = express();
+
+ app.use(bodyParser.urlencoded({extended: true}));
+ app.use(bodyParser.json());
+
+ var router = express.Router();
+
+ function addRoute(name, method, path, callback) {
+ router[method](path, callback);
+ var layer = _.last(router.stack);
+
+ calls.push({
+ name: name,
+ method: method,
+ path: path,
+ layer: layer,
+ keys: _.map(layer.keys, function (key) {
+ return key.name;
+ })
+ });
+ }
+
+ addRoute('getDevices', 'get', '/device', getDevices);
+ addRoute('getDevice', 'get', '/device/:deviceId', getDevice);
+ addRoute('getValues', 'get', '/property/:propertyId/values', getValues);
+
+ app.use('/api', router);
+ app.use(express.static('web'));
+ }
+
+ function listen() {
+ var port = process.env.HTTP_PORT || 8080;
+ app.listen(port);
+ }
+
+ function generateRpc() {
+ console.log('function DillerRpc($http) {');
+
+ var s = _.map(calls, function (call) {
+
+ //console.error(call);
+ console.error('call.layer', call.layer);
+ var s = ' function ' + call.name + '(' + call.keys.join(', ') + ') {\n' +
+ ' var req = {};\n' +
+ ' req.method = \'' + call.method + '\';\n' +
+ ' req.url = \'/api' + call.path + '\';\n';
+
+ s += _.map(call.layer.keys, function (key) {
+ return ' req.url = req.url.replace(/:' + key.name + '/, ' + key.name + ');\n'
+ }).join('');
+
+ s +=
+ ' return $http(req);\n' +
+ ' }\n';
+
+ return s;
+ });
+ _.each(s, function (x) {
+ console.log(x);
+ });
+
+ console.log(' return {');
+ console.log(_.map(calls, function (call) {
+ return ' ' + call.name + ': ' + call.name
+ }).join(',\n'));
+ console.log(' };');
+ console.log('}');
+ console.log('');
+
+ console.log('DillerRpcResolve = {};');
+ _.each(calls, function (call) {
+ var args = ['DillerRpc'];
+
+ if (call.keys.length > 0) {
+ args.push('$route');
+ }
+ console.log('DillerRpcResolve.' + call.name + ' = function(' + args.join(', ') + ') {');
+
+ args = _.map(call.keys, function (key) {
+ return '$route.current.params.' + key;
+ });
+ console.log(' return DillerRpc.' + call.name + '(' + args.join(', ') + ');');
+ console.log('};');
+ });
+ }
+
+ return {
+ init: init,
+ listen: listen,
+ generateRpc: generateRpc
+ }
+}
+
+di.annotate(DillerWeb, new di.Inject(Diller, DillerDb, DillerConfig));
+
+module.exports = DillerWeb;