From 0266bdd60cb9cccf20a5ded3eba72ea833bee72d Mon Sep 17 00:00:00 2001
From: Trygve Laugstøl <trygvis@inamo.no>
Date: Tue, 20 Oct 2015 23:18:16 +0200
Subject: o Adding a webapp. o Using di.js as dependency injection framework.

---
 src/Diller.js          |  37 ++++--------
 src/DillerConfig.js    |  67 +++++++++++++++++++++
 src/DillerDao.js       |  63 ++++++++++++++++----
 src/DillerDb.js        |  29 +++++++++
 src/config.js          |  18 ------
 src/mqtt/DillerMqtt.js |  38 ++++++++++++
 src/web/DillerWeb.js   | 158 +++++++++++++++++++++++++++++++++++++++++++++++++
 7 files changed, 355 insertions(+), 55 deletions(-)
 create mode 100644 src/DillerConfig.js
 create mode 100644 src/DillerDb.js
 delete mode 100644 src/config.js
 create mode 100644 src/mqtt/DillerMqtt.js
 create mode 100644 src/web/DillerWeb.js

(limited to 'src')

diff --git a/src/Diller.js b/src/Diller.js
index eeaf726..a057054 100644
--- a/src/Diller.js
+++ b/src/Diller.js
@@ -1,22 +1,7 @@
-var DillerDao = require('./DillerDao');
-var pgpOptions = {
-  //query: function (e) {
-  //  console.log("Query:", e.query);
-  //  if (e.ctx) {
-  //    // this query is executing inside a task or transaction,
-  //    if (e.ctx.isTX) {
-  //      // this query is inside a transaction;
-  //    } else {
-  //      // this query is inside a task;
-  //    }
-  //
-  //  }
-  //}
-};
-
-var pgp = require('pg-promise')(pgpOptions);
-
-function Diller(config, log) {
+var di = require('di');
+
+function Diller(config, db) {
+  var log = config.log();
 
   function newValue(dao, device, property, value) {
     log.info('new value for device ' + device.key + '/' + property.key + ' = ' + value, {
@@ -47,7 +32,7 @@ function Diller(config, log) {
 
   function updateAggregates(propertyId, timestamp) {
     log.info('Updating aggregates', {propertyId: propertyId, timestamp: timestamp});
-    return pgp(config.postgresqlConfig)
+    return db()
       .tx(function (pg) {
         var dao = new DillerDao(pg);
 
@@ -100,7 +85,7 @@ function Diller(config, log) {
       return;
     }
 
-    return pgp(config.postgresqlConfig)
+    return db()
       .tx(function (pg) {
         var dao = new DillerDao(pg);
 
@@ -136,7 +121,9 @@ function Diller(config, log) {
   }
 }
 
-//noinspection JSUnresolvedVariable
-module.exports = {
-  Diller: Diller
-};
+var DillerConfig = require('./DillerConfig');
+var DillerDao = require('./DillerDao');
+var DillerDb = require('./DillerDb');
+di.annotate(Diller, new di.Inject(DillerConfig, DillerDb));
+
+module.exports = Diller;
diff --git a/src/DillerConfig.js b/src/DillerConfig.js
new file mode 100644
index 0000000..84afe42
--- /dev/null
+++ b/src/DillerConfig.js
@@ -0,0 +1,67 @@
+var fs = require('fs');
+var bunyan = require('bunyan');
+
+function isProd() {
+  return process.env.NODE_ENV == 'prod';
+}
+
+var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io';
+
+var postgresqlConfig = {
+  host: process.env.DB_HOST || '/var/run/postgresql',
+  database: process.env.DB_DATABASE || 'diller',
+  user: process.env.DB_USER || 'diller',
+  password: process.env.DB_PASSWORD || 'diller'
+};
+
+var log;
+
+function configureLogging(app) {
+  if (log) {
+    return;
+  }
+
+  var cfg = {
+    name: app
+  };
+
+  if (isProd()) {
+    cfg.streams = [
+      {
+        level: 'warn',
+        stream: process.stdout
+      },
+      {
+        level: 'debug',
+        path: 'log/diller-' + app + '.log'
+      }
+    ];
+
+    var stat;
+    try {
+      stat = fs.lstatSync('log');
+    } catch (e) {
+      // Assume this to to be ENOENT
+      fs.mkdirSync('log');
+    }
+    if (stat && !stat.isDirectory()) {
+      throw 'Not a directory: log';
+    }
+  }
+
+  log = bunyan.createLogger(cfg);
+}
+
+function DillerConfig() {
+  return {
+    isProd: isProd,
+    mqttUrl: mqttUrl,
+    postgresqlConfig: postgresqlConfig,
+    configureLogging: configureLogging,
+    log: function () {
+      return log;
+    }
+  };
+}
+
+module.exports = DillerConfig;
diff --git a/src/DillerDao.js b/src/DillerDao.js
index e0c69d3..c4d0e67 100644
--- a/src/DillerDao.js
+++ b/src/DillerDao.js
@@ -1,40 +1,74 @@
-function DillerDao(client) {
+function DillerDao(tx) {
 
   var deviceColumns = 'id, key, created_timestamp';
   var propertyColumns = 'id, device, key, created_timestamp';
+  var valueColumns = 'property, timestamp, value';
+
+  // -------------------------------------------------------------------------------------------------------------------
+  // Device
+  // -------------------------------------------------------------------------------------------------------------------
+
+  function devices() {
+    return tx.many("SELECT " + deviceColumns + " FROM device");
+  }
+
+  function deviceById(id) {
+    return tx.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE id=$1", id);
+  }
 
   function deviceByKey(key) {
-    return client.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key);
+    return tx.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key);
   }
 
   function insertDevice(key) {
-    return client.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key);
+    return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key);
+  }
+
+  // -------------------------------------------------------------------------------------------------------------------
+  // Device Property
+  // -------------------------------------------------------------------------------------------------------------------
+
+  function devicePropertyById(id) {
+    return tx.one('SELECT ' + propertyColumns + ' FROM device_property WHERE id=$1', [id]);
   }
 
   function devicePropertyByDeviceIdAndKey(deviceId, key) {
-    return client.oneOrNone('SELECT id FROM device_property WHERE device=$1 AND key=$2', [deviceId, key]);
+    return tx.oneOrNone('SELECT id FROM device_property WHERE device=$1 AND key=$2', [deviceId, key]);
+  }
+
+  function devicePropertiesByDeviceId(deviceId) {
+    return tx.many('SELECT ' + propertyColumns + ' FROM device_property WHERE device=$1', [deviceId]);
   }
 
   function insertDeviceProperty(deviceId, key) {
-    return client.oneOrNone('INSERT INTO device_property(id, device, key, created_timestamp) VALUES(DEFAULT, $1, $2, CURRENT_TIMESTAMP) RETURNING ' + propertyColumns, [deviceId, key]);
+    return tx.oneOrNone('INSERT INTO device_property(id, device, key, created_timestamp) VALUES(DEFAULT, $1, $2, CURRENT_TIMESTAMP) RETURNING ' + propertyColumns, [deviceId, key]);
   }
 
   function updatePropertyName(id, name) {
-    return client.none('UPDATE device_property SET name=$1 WHERE id=$2', name, id);
+    return tx.none('UPDATE device_property SET name=$1 WHERE id=$2', name, id);
   }
 
   function updatePropertyDescription(id, description) {
-    return client.none('UPDATE device_property SET description=$1 WHERE id=$2', description, id);
+    return tx.none('UPDATE device_property SET description=$1 WHERE id=$2', description, id);
+  }
+
+  // -------------------------------------------------------------------------------------------------------------------
+  // Value
+  // -------------------------------------------------------------------------------------------------------------------
+
+  function valuesByPropertyId(propertyId, limit) {
+    limit = limit || 10;
+    return tx.many('SELECT timestamp, value FROM value WHERE property=$1 LIMIT $2', [propertyId, limit]);
   }
 
   function insertValue(propertyId, value) {
-    return client.one('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2) RETURNING timestamp', [propertyId, value]);
+    return tx.one('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2) RETURNING timestamp', [propertyId, value]);
   }
 
   function updateHourAggregatesForProperty(propertyId, timestamp) {
-    return client.none('DELETE FROM value_by_hour WHERE property=$1 AND timestamp=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
+    return tx.none('DELETE FROM value_by_hour WHERE property=$1 AND timestamp=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
       .then(function() {
-        return client.one('INSERT INTO value_by_hour(property, timestamp, count, max, min, avg) ' +
+        return tx.one('INSERT INTO value_by_hour(property, timestamp, count, max, min, avg) ' +
           'SELECT property, DATE_TRUNC(\'hour\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' +
           'FROM value WHERE property=$1 AND DATE_TRUNC(\'hour\', timestamp)=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ) ' +
           'GROUP BY property, DATE_TRUNC(\'hour\', timestamp) ' +
@@ -44,9 +78,9 @@ function DillerDao(client) {
   }
 
   function updateMinuteAggregatesForProperty(propertyId, timestamp) {
-    return client.none('DELETE FROM value_by_minute WHERE property=$1 AND timestamp=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
+    return tx.none('DELETE FROM value_by_minute WHERE property=$1 AND timestamp=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ)', [propertyId, timestamp])
       .then(function() {
-        return client.one('INSERT INTO value_by_minute(property, timestamp, count, max, min, avg) ' +
+        return tx.one('INSERT INTO value_by_minute(property, timestamp, count, max, min, avg) ' +
           'SELECT property, DATE_TRUNC(\'minute\', timestamp) AS timestamp, COUNT(value) AS count, MAX(value::NUMERIC) AS max, MIN(value::NUMERIC) AS min, AVG(value::NUMERIC) AS avg ' +
           'FROM value ' +
           'WHERE property=$1 AND DATE_TRUNC(\'minute\', timestamp)=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ) ' +
@@ -57,14 +91,19 @@ function DillerDao(client) {
   }
 
   return {
+    devices: devices,
+    deviceById: deviceById,
     deviceByKey: deviceByKey,
     insertDevice: insertDevice,
 
+    devicePropertyById: devicePropertyById,
     devicePropertyByDeviceIdAndKey: devicePropertyByDeviceIdAndKey,
+    devicePropertiesByDeviceId: devicePropertiesByDeviceId,
     insertDeviceProperty: insertDeviceProperty,
     updatePropertyName: updatePropertyName,
     updatePropertyDescription: updatePropertyDescription,
 
+    valuesByPropertyId: valuesByPropertyId,
     insertValue: insertValue,
     updateHourAggregatesForProperty: updateHourAggregatesForProperty,
     updateMinuteAggregatesForProperty: updateMinuteAggregatesForProperty
diff --git a/src/DillerDb.js b/src/DillerDb.js
new file mode 100644
index 0000000..c31dde7
--- /dev/null
+++ b/src/DillerDb.js
@@ -0,0 +1,29 @@
+var di = require('di');
+var DillerConfig = require('./DillerConfig');
+
+var pgpOptions = {
+  //query: function (e) {
+  //  console.log("Query:", e.query);
+  //  if (e.ctx) {
+  //    // this query is executing inside a task or transaction,
+  //    if (e.ctx.isTX) {
+  //      // this query is inside a transaction;
+  //    } else {
+  //      // this query is inside a task;
+  //    }
+  //
+  //  }
+  //}
+};
+
+var pgp = require('pg-promise')(pgpOptions);
+
+function DillerDb(config) {
+
+  return function () {
+    return pgp(config.postgresqlConfig)
+  }
+}
+di.annotate(DillerDb, new di.Inject(DillerConfig));
+
+module.exports = DillerDb;
diff --git a/src/config.js b/src/config.js
deleted file mode 100644
index a42ec87..0000000
--- a/src/config.js
+++ /dev/null
@@ -1,18 +0,0 @@
-function isProd() {
-  return process.env.NODE_ENV == 'prod';
-}
-
-var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io';
-
-var postgresqlConfig = {
-  host: process.env.DB_HOST || '/var/run/postgresql',
-  database: process.env.DB_DATABASE || 'diller',
-  user: process.env.DB_USER || 'diller',
-  password: process.env.DB_PASSWORD || 'diller'
-};
-
-module.exports = {
-  isProd: isProd,
-  mqttUrl: mqttUrl,
-  postgresqlConfig: postgresqlConfig
-};
diff --git a/src/mqtt/DillerMqtt.js b/src/mqtt/DillerMqtt.js
new file mode 100644
index 0000000..3fe43dc
--- /dev/null
+++ b/src/mqtt/DillerMqtt.js
@@ -0,0 +1,38 @@
+var di = require('di');
+var mqtt = require('mqtt');
+
+function DillerMqtt(config, diller) {
+  var log = config.log();
+
+  function run() {
+    log.info('Connecting to ' + config.mqttUrl);
+    var mqttClient = mqtt.connect(config.mqttUrl);
+
+    mqttClient.on('offline', function () {
+      log.info('offline');
+    });
+
+    mqttClient.on('error', function (error) {
+      log.info('error', {error: error});
+    });
+
+    mqttClient.on('connect', function () {
+      log.info('Connected');
+      mqttClient.subscribe('/diller/#');
+    });
+
+    mqttClient.on('message', function (topic, message, payload) {
+      diller.onMessage(topic, message, payload);
+    });
+  }
+
+  return {
+    run: run
+  };
+}
+var Diller = require('../Diller');
+var DillerConfig = require('../DillerConfig');
+
+di.annotate(DillerMqtt, new di.Inject(DillerConfig, Diller));
+
+module.exports = DillerMqtt;
diff --git a/src/web/DillerWeb.js b/src/web/DillerWeb.js
new file mode 100644
index 0000000..40fbc3d
--- /dev/null
+++ b/src/web/DillerWeb.js
@@ -0,0 +1,158 @@
+var express = require('express');
+var bodyParser = require('body-parser');
+var _ = require('lodash');
+var di = require('di');
+
+var DillerConfig = require('../DillerConfig');
+var Diller = require('../Diller');
+var DillerDb = require('../DillerDb');
+var DillerDao = require('../DillerDao');
+
+function DillerWeb(diller, db, config) {
+  var log = config.log();
+
+  var calls = [];
+  var app;
+
+  function getDevices(req, res) {
+    db().tx(function (pg) {
+      var dao = new DillerDao(pg);
+      return dao.devices();
+    }).then(function (devices) {
+      res.json({devices: devices});
+    }, function (err) {
+      log.warn('fail', err);
+      res.status(500).json({message: 'fail'});
+    });
+  }
+
+  function getDevice(req, res) {
+    db().tx(function (tx) {
+      var deviceId = req.params.deviceId;
+
+      var dao = new DillerDao(tx);
+      return tx.batch([
+        dao.deviceById(deviceId),
+        dao.devicePropertiesByDeviceId(deviceId)]
+      );
+    }).then(function (data) {
+      var device = data[0];
+      device.properties = data[1];
+      res.json({device: device});
+    }, function (err) {
+      log.warn('fail', err);
+      res.status(500).json({message: 'fail'});
+    });
+  }
+
+  function getValues(req, res) {
+    db().tx(function (tx) {
+      var propertyId = req.params.propertyId;
+
+      var dao = new DillerDao(tx);
+      return dao.valuesByPropertyId(propertyId, 10);
+    }).then(function (values) {
+      res.json({values: values});
+    }, function (err) {
+      log.warn('fail', err);
+      res.status(500).json({message: 'fail'});
+    });
+  }
+
+  function init() {
+    app = express();
+
+    app.use(bodyParser.urlencoded({extended: true}));
+    app.use(bodyParser.json());
+
+    var router = express.Router();
+
+    function addRoute(name, method, path, callback) {
+      router[method](path, callback);
+      var layer = _.last(router.stack);
+
+      calls.push({
+        name: name,
+        method: method,
+        path: path,
+        layer: layer,
+        keys: _.map(layer.keys, function (key) {
+          return key.name;
+        })
+      });
+    }
+
+    addRoute('getDevices', 'get', '/device', getDevices);
+    addRoute('getDevice', 'get', '/device/:deviceId', getDevice);
+    addRoute('getValues', 'get', '/property/:propertyId/values', getValues);
+
+    app.use('/api', router);
+    app.use(express.static('web'));
+  }
+
+  function listen() {
+    var port = process.env.HTTP_PORT || 8080;
+    app.listen(port);
+  }
+
+  function generateRpc() {
+    console.log('function DillerRpc($http) {');
+
+    var s = _.map(calls, function (call) {
+
+      //console.error(call);
+      console.error('call.layer', call.layer);
+      var s = '  function ' + call.name + '(' + call.keys.join(', ') + ') {\n' +
+        '    var req = {};\n' +
+        '    req.method = \'' + call.method + '\';\n' +
+        '    req.url = \'/api' + call.path + '\';\n';
+
+      s += _.map(call.layer.keys, function (key) {
+        return '    req.url = req.url.replace(/:' + key.name + '/, ' + key.name + ');\n'
+      }).join('');
+
+      s +=
+        '    return $http(req);\n' +
+        '  }\n';
+
+      return s;
+    });
+    _.each(s, function (x) {
+      console.log(x);
+    });
+
+    console.log('  return {');
+    console.log(_.map(calls, function (call) {
+      return '    ' + call.name + ': ' + call.name
+    }).join(',\n'));
+    console.log('  };');
+    console.log('}');
+    console.log('');
+
+    console.log('DillerRpcResolve = {};');
+    _.each(calls, function (call) {
+      var args = ['DillerRpc'];
+
+      if (call.keys.length > 0) {
+        args.push('$route');
+      }
+      console.log('DillerRpcResolve.' + call.name + ' = function(' + args.join(', ') + ') {');
+
+      args = _.map(call.keys, function (key) {
+        return '$route.current.params.' + key;
+      });
+      console.log('  return DillerRpc.' + call.name + '(' + args.join(', ') + ');');
+      console.log('};');
+    });
+  }
+
+  return {
+    init: init,
+    listen: listen,
+    generateRpc: generateRpc
+  }
+}
+
+di.annotate(DillerWeb, new di.Inject(Diller, DillerDb, DillerConfig));
+
+module.exports = DillerWeb;
-- 
cgit v1.2.3