From 0266bdd60cb9cccf20a5ded3eba72ea833bee72d Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Tue, 20 Oct 2015 23:18:16 +0200 Subject: o Adding a webapp. o Using di.js as dependency injection framework. --- src/Diller.js | 37 ++++-------- src/DillerConfig.js | 67 +++++++++++++++++++++ src/DillerDao.js | 63 ++++++++++++++++---- src/DillerDb.js | 29 +++++++++ src/config.js | 18 ------ src/mqtt/DillerMqtt.js | 38 ++++++++++++ src/web/DillerWeb.js | 158 +++++++++++++++++++++++++++++++++++++++++++++++++ 7 files changed, 355 insertions(+), 55 deletions(-) create mode 100644 src/DillerConfig.js create mode 100644 src/DillerDb.js delete mode 100644 src/config.js create mode 100644 src/mqtt/DillerMqtt.js create mode 100644 src/web/DillerWeb.js (limited to 'src') diff --git a/src/Diller.js b/src/Diller.js index eeaf726..a057054 100644 --- a/src/Diller.js +++ b/src/Diller.js @@ -1,22 +1,7 @@ -var DillerDao = require('./DillerDao'); -var pgpOptions = { - //query: function (e) { - // console.log("Query:", e.query); - // if (e.ctx) { - // // this query is executing inside a task or transaction, - // if (e.ctx.isTX) { - // // this query is inside a transaction; - // } else { - // // this query is inside a task; - // } - // - // } - //} -}; - -var pgp = require('pg-promise')(pgpOptions); - -function Diller(config, log) { +var di = require('di'); + +function Diller(config, db) { + var log = config.log(); function newValue(dao, device, property, value) { log.info('new value for device ' + device.key + '/' + property.key + ' = ' + value, { @@ -47,7 +32,7 @@ function Diller(config, log) { function updateAggregates(propertyId, timestamp) { log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp}); - return pgp(config.postgresqlConfig) + return db() .tx(function (pg) { var dao = new DillerDao(pg); @@ -100,7 +85,7 @@ function Diller(config, log) { return; } - return pgp(config.postgresqlConfig) + return db() .tx(function (pg) { var dao = new DillerDao(pg); @@ -136,7 +121,9 @@ function Diller(config, log) { } } -//noinspection JSUnresolvedVariable -module.exports = { - Diller: Diller -}; +var DillerConfig = require('./DillerConfig'); +var DillerDao = require('./DillerDao'); +var DillerDb = require('./DillerDb'); +di.annotate(Diller, new di.Inject(DillerConfig, DillerDb)); + +module.exports = Diller; diff --git a/src/DillerConfig.js b/src/DillerConfig.js new file mode 100644 index 0000000..84afe42 --- /dev/null +++ b/src/DillerConfig.js @@ -0,0 +1,67 @@ +var fs = require('fs'); +var bunyan = require('bunyan'); + +function isProd() { + return process.env.NODE_ENV == 'prod'; +} + +var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io'; + +var postgresqlConfig = { + host: process.env.DB_HOST || '/var/run/postgresql', + database: process.env.DB_DATABASE || 'diller', + user: process.env.DB_USER || 'diller', + password: process.env.DB_PASSWORD || 'diller' +}; + +var log; + +function configureLogging(app) { + if (log) { + return; + } + + var cfg = { + name: app + }; + + if (isProd()) { + cfg.streams = [ + { + level: 'warn', + stream: process.stdout + }, + { + level: 'debug', + path: 'log/diller-' + app + '.log' + } + ]; + + var stat; + try { + stat = fs.lstatSync('log'); + } catch (e) { + // Assume this to to be ENOENT + fs.mkdirSync('log'); + } + if (stat && !stat.isDirectory()) { + throw 'Not a directory: log'; + } + } + + log = bunyan.createLogger(cfg); +} + +function DillerConfig() { + return { + isProd: isProd, + mqttUrl: mqttUrl, + postgresqlConfig: postgresqlConfig, + configureLogging: configureLogging, + log: function () { + return log; + } + }; +} + +module.exports = DillerConfig; diff --git a/src/DillerDao.js b/src/DillerDao.js index e0c69d3..c4d0e67 100644 --- a/src/DillerDao.js +++ b/src/DillerDao.js @@ -1,40 +1,74 @@ -function DillerDao(client) { +function DillerDao(tx) { var deviceColumns = 'id, key, created_timestamp'; var propertyColumns = 'id, device, key, created_timestamp'; + var valueColumns = 'property, timestamp, value'; + + // ------------------------------------------------------------------------------------------------------------------- + // Device + // ------------------------------------------------------------------------------------------------------------------- + + function devices() { + return tx.many("SELECT " + deviceColumns + " FROM device"); + } + + function deviceById(id) { + return tx.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE id=$1", id); + } function deviceByKey(key) { - return client.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key); + return tx.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key); } function insertDevice(key) { - return client.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key); + return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key); + } + + // ------------------------------------------------------------------------------------------------------------------- + // Device Property + // ------------------------------------------------------------------------------------------------------------------- + + function devicePropertyById(id) { + return tx.one('SELECT ' + propertyColumns + ' FROM device_property WHERE id=$1', [id]); } function devicePropertyByDeviceIdAndKey(deviceId, key) { - return client.oneOrNone('SELECT id FROM device_property WHERE device=$1 AND key=$2', [deviceId, key]); + return tx.oneOrNone('SELECT id FROM device_property WHERE device=$1 AND key=$2', [deviceId, key]); + } + + function devicePropertiesByDeviceId(deviceId) { + return tx.many('SELECT ' + propertyColumns + ' FROM device_property WHERE device=$1', [deviceId]); } function insertDeviceProperty(deviceId, key) { - return client.oneOrNone('INSERT INTO device_property(id, device, key, created_timestamp) VALUES(DEFAULT, $1, $2, CURRENT_TIMESTAMP) RETURNING ' + propertyColumns, [deviceId, key]); + return tx.oneOrNone('INSERT INTO device_property(id, device, key, created_timestamp) VALUES(DEFAULT, $1, $2, CURRENT_TIMESTAMP) RETURNING ' + propertyColumns, [deviceId, key]); } function updatePropertyName(id, name) { - return client.none('UPDATE device_property SET name=$1 WHERE id=$2', name, id); + return tx.none('UPDATE device_property SET name=$1 WHERE id=$2', name, id); } function updatePropertyDescription(id, description) { - return client.none('UPDATE device_property SET description=$1 WHERE id=$2', description, id); + return tx.none('UPDATE device_property SET description=$1 WHERE id=$2', description, id); + } + + // ------------------------------------------------------------------------------------------------------------------- + // Value + // ------------------------------------------------------------------------------------------------------------------- + + function valuesByPropertyId(propertyId, limit) { + limit = limit || 10; + return tx.many('SELECT timestamp, value FROM value WHERE property=$1 LIMIT $2', [propertyId, limit]); } function insertValue(propertyId, value) { - return client.one('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2) RETURNING timestamp', [propertyId, value]); + return tx.one('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2) RETURNING timestamp', [propertyId, value]); } function updateHourAggregatesForProperty(propertyId, timestamp) { - return client.none('DELETE FROM value_by_hour WHERE property=$1 AND timestamp=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ)', [propertyId, timestamp]) + return tx.none('DELETE FROM value_by_hour WHERE property=$1 AND timestamp=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ)', [propertyId, timestamp]) .then(function() { - return client.one('INSERT INTO value_by_hour(property, timestamp, count, max, min, avg) ' + + return tx.one('INSERT INTO value_by_hour(property, timestamp, count, max, min, avg) ' + 'SELECT property, DATE_TRUNC(\'hour\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' + 'FROM value WHERE property=$1 AND DATE_TRUNC(\'hour\', timestamp)=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ) ' + 'GROUP BY property, DATE_TRUNC(\'hour\', timestamp) ' + @@ -44,9 +78,9 @@ function DillerDao(client) { } function updateMinuteAggregatesForProperty(propertyId, timestamp) { - return client.none('DELETE FROM value_by_minute WHERE property=$1 AND timestamp=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ)', [propertyId, timestamp]) + return tx.none('DELETE FROM value_by_minute WHERE property=$1 AND timestamp=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ)', [propertyId, timestamp]) .then(function() { - return client.one('INSERT INTO value_by_minute(property, timestamp, count, max, min, avg) ' + + return tx.one('INSERT INTO value_by_minute(property, timestamp, count, max, min, avg) ' + 'SELECT property, DATE_TRUNC(\'minute\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' + 'FROM value ' + 'WHERE property=$1 AND DATE_TRUNC(\'minute\', timestamp)=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ) ' + @@ -57,14 +91,19 @@ function DillerDao(client) { } return { + devices: devices, + deviceById: deviceById, deviceByKey: deviceByKey, insertDevice: insertDevice, + devicePropertyById: devicePropertyById, devicePropertyByDeviceIdAndKey: devicePropertyByDeviceIdAndKey, + devicePropertiesByDeviceId: devicePropertiesByDeviceId, insertDeviceProperty: insertDeviceProperty, updatePropertyName: updatePropertyName, updatePropertyDescription: updatePropertyDescription, + valuesByPropertyId: valuesByPropertyId, insertValue: insertValue, updateHourAggregatesForProperty: updateHourAggregatesForProperty, updateMinuteAggregatesForProperty: updateMinuteAggregatesForProperty diff --git a/src/DillerDb.js b/src/DillerDb.js new file mode 100644 index 0000000..c31dde7 --- /dev/null +++ b/src/DillerDb.js @@ -0,0 +1,29 @@ +var di = require('di'); +var DillerConfig = require('./DillerConfig'); + +var pgpOptions = { + //query: function (e) { + // console.log("Query:", e.query); + // if (e.ctx) { + // // this query is executing inside a task or transaction, + // if (e.ctx.isTX) { + // // this query is inside a transaction; + // } else { + // // this query is inside a task; + // } + // + // } + //} +}; + +var pgp = require('pg-promise')(pgpOptions); + +function DillerDb(config) { + + return function () { + return pgp(config.postgresqlConfig) + } +} +di.annotate(DillerDb, new di.Inject(DillerConfig)); + +module.exports = DillerDb; diff --git a/src/config.js b/src/config.js deleted file mode 100644 index a42ec87..0000000 --- a/src/config.js +++ /dev/null @@ -1,18 +0,0 @@ -function isProd() { - return process.env.NODE_ENV == 'prod'; -} - -var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io'; - -var postgresqlConfig = { - host: process.env.DB_HOST || '/var/run/postgresql', - database: process.env.DB_DATABASE || 'diller', - user: process.env.DB_USER || 'diller', - password: process.env.DB_PASSWORD || 'diller' -}; - -module.exports = { - isProd: isProd, - mqttUrl: mqttUrl, - postgresqlConfig: postgresqlConfig -}; diff --git a/src/mqtt/DillerMqtt.js b/src/mqtt/DillerMqtt.js new file mode 100644 index 0000000..3fe43dc --- /dev/null +++ b/src/mqtt/DillerMqtt.js @@ -0,0 +1,38 @@ +var di = require('di'); +var mqtt = require('mqtt'); + +function DillerMqtt(config, diller) { + var log = config.log(); + + function run() { + log.info('Connecting to ' + config.mqttUrl); + var mqttClient = mqtt.connect(config.mqttUrl); + + mqttClient.on('offline', function () { + log.info('offline'); + }); + + mqttClient.on('error', function (error) { + log.info('error', {error: error}); + }); + + mqttClient.on('connect', function () { + log.info('Connected'); + mqttClient.subscribe('/diller/#'); + }); + + mqttClient.on('message', function (topic, message, payload) { + diller.onMessage(topic, message, payload); + }); + } + + return { + run: run + }; +} +var Diller = require('../Diller'); +var DillerConfig = require('../DillerConfig'); + +di.annotate(DillerMqtt, new di.Inject(DillerConfig, Diller)); + +module.exports = DillerMqtt; diff --git a/src/web/DillerWeb.js b/src/web/DillerWeb.js new file mode 100644 index 0000000..40fbc3d --- /dev/null +++ b/src/web/DillerWeb.js @@ -0,0 +1,158 @@ +var express = require('express'); +var bodyParser = require('body-parser'); +var _ = require('lodash'); +var di = require('di'); + +var DillerConfig = require('../DillerConfig'); +var Diller = require('../Diller'); +var DillerDb = require('../DillerDb'); +var DillerDao = require('../DillerDao'); + +function DillerWeb(diller, db, config) { + var log = config.log(); + + var calls = []; + var app; + + function getDevices(req, res) { + db().tx(function (pg) { + var dao = new DillerDao(pg); + return dao.devices(); + }).then(function (devices) { + res.json({devices: devices}); + }, function (err) { + log.warn('fail', err); + res.status(500).json({message: 'fail'}); + }); + } + + function getDevice(req, res) { + db().tx(function (tx) { + var deviceId = req.params.deviceId; + + var dao = new DillerDao(tx); + return tx.batch([ + dao.deviceById(deviceId), + dao.devicePropertiesByDeviceId(deviceId)] + ); + }).then(function (data) { + var device = data[0]; + device.properties = data[1]; + res.json({device: device}); + }, function (err) { + log.warn('fail', err); + res.status(500).json({message: 'fail'}); + }); + } + + function getValues(req, res) { + db().tx(function (tx) { + var propertyId = req.params.propertyId; + + var dao = new DillerDao(tx); + return dao.valuesByPropertyId(propertyId, 10); + }).then(function (values) { + res.json({values: values}); + }, function (err) { + log.warn('fail', err); + res.status(500).json({message: 'fail'}); + }); + } + + function init() { + app = express(); + + app.use(bodyParser.urlencoded({extended: true})); + app.use(bodyParser.json()); + + var router = express.Router(); + + function addRoute(name, method, path, callback) { + router[method](path, callback); + var layer = _.last(router.stack); + + calls.push({ + name: name, + method: method, + path: path, + layer: layer, + keys: _.map(layer.keys, function (key) { + return key.name; + }) + }); + } + + addRoute('getDevices', 'get', '/device', getDevices); + addRoute('getDevice', 'get', '/device/:deviceId', getDevice); + addRoute('getValues', 'get', '/property/:propertyId/values', getValues); + + app.use('/api', router); + app.use(express.static('web')); + } + + function listen() { + var port = process.env.HTTP_PORT || 8080; + app.listen(port); + } + + function generateRpc() { + console.log('function DillerRpc($http) {'); + + var s = _.map(calls, function (call) { + + //console.error(call); + console.error('call.layer', call.layer); + var s = ' function ' + call.name + '(' + call.keys.join(', ') + ') {\n' + + ' var req = {};\n' + + ' req.method = \'' + call.method + '\';\n' + + ' req.url = \'/api' + call.path + '\';\n'; + + s += _.map(call.layer.keys, function (key) { + return ' req.url = req.url.replace(/:' + key.name + '/, ' + key.name + ');\n' + }).join(''); + + s += + ' return $http(req);\n' + + ' }\n'; + + return s; + }); + _.each(s, function (x) { + console.log(x); + }); + + console.log(' return {'); + console.log(_.map(calls, function (call) { + return ' ' + call.name + ': ' + call.name + }).join(',\n')); + console.log(' };'); + console.log('}'); + console.log(''); + + console.log('DillerRpcResolve = {};'); + _.each(calls, function (call) { + var args = ['DillerRpc']; + + if (call.keys.length > 0) { + args.push('$route'); + } + console.log('DillerRpcResolve.' + call.name + ' = function(' + args.join(', ') + ') {'); + + args = _.map(call.keys, function (key) { + return '$route.current.params.' + key; + }); + console.log(' return DillerRpc.' + call.name + '(' + args.join(', ') + ');'); + console.log('};'); + }); + } + + return { + init: init, + listen: listen, + generateRpc: generateRpc + } +} + +di.annotate(DillerWeb, new di.Inject(Diller, DillerDb, DillerConfig)); + +module.exports = DillerWeb; -- cgit v1.2.3