aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-11-01 16:13:45 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2015-11-01 16:13:45 +0100
commit478a69d834904f62cc4fb5ff52d5ed345e6859af (patch)
treeefa4839e878f1debf3db6a11f0636405538af761
parent85474565b0a0d92a09fecb14e8bc0afaad7fbc64 (diff)
downloaddiller-server-478a69d834904f62cc4fb5ff52d5ed345e6859af.tar.gz
diller-server-478a69d834904f62cc4fb5ff52d5ed345e6859af.tar.bz2
diller-server-478a69d834904f62cc4fb5ff52d5ed345e6859af.tar.xz
diller-server-478a69d834904f62cc4fb5ff52d5ed345e6859af.zip
web:
o Publishing messages when the name/description is changed from the web.
-rw-r--r--diller-mqtt.js18
-rw-r--r--gulpfile.js1
-rw-r--r--src/DillerDao.js7
-rw-r--r--src/mqtt/DillerMqtt.js40
-rw-r--r--src/mqtt/DillerMqttClient.js60
-rw-r--r--src/web/DillerWeb.js73
6 files changed, 123 insertions, 76 deletions
diff --git a/diller-mqtt.js b/diller-mqtt.js
index db6fc2b..e528bb4 100644
--- a/diller-mqtt.js
+++ b/diller-mqtt.js
@@ -4,5 +4,19 @@ var injector = new di.Injector();
var config = injector.get(require('./src/DillerConfig'));
config.configureLogging('mqtt');
-var dillerMqtt = injector.get(require('./src/mqtt/DillerMqtt'));
-dillerMqtt.run();
+/**
+ * @type {function(function(PgTx, DillerDao, Diller))} tx
+ */
+var tx = injector.get(require('./src/DillerTx'));
+
+/**
+ * @type DillerMqttClient
+ */
+var dillerMqttClient = injector.get(require('./src/mqtt/DillerMqttClient'));
+dillerMqttClient.run('mqtt');
+
+dillerMqttClient.on('message', function (topic, message, payload) {
+ tx(function (pg, dao, diller) {
+ return diller.onMessage(topic, message, payload);
+ });
+});
diff --git a/gulpfile.js b/gulpfile.js
index 1e98a86..9777173 100644
--- a/gulpfile.js
+++ b/gulpfile.js
@@ -66,7 +66,6 @@ gulp.task('diller-web', ['bower'], function () {
'src'
],
ignore: [
- 'src/mqtt/'
],
env: {NODE_ENV: 'development'},
tasks: ['diller-web-reload'],
diff --git a/src/DillerDao.js b/src/DillerDao.js
index d81b418..34f4205 100644
--- a/src/DillerDao.js
+++ b/src/DillerDao.js
@@ -164,11 +164,4 @@ function DillerDao(tx) {
};
}
-/**
- * @type DillerDao
- */
-var x;
-
-x.deviceById(123)
-
module.exports = DillerDao;
diff --git a/src/mqtt/DillerMqtt.js b/src/mqtt/DillerMqtt.js
deleted file mode 100644
index 0bd8ed9..0000000
--- a/src/mqtt/DillerMqtt.js
+++ /dev/null
@@ -1,40 +0,0 @@
-var di = require('di');
-var mqtt = require('mqtt');
-
-function DillerMqtt(config, tx) {
- var log = config.log();
-
- function run() {
- log.info('Connecting to ' + config.mqttUrl);
- var mqttClient = mqtt.connect(config.mqttUrl);
-
- mqttClient.on('offline', function () {
- log.info('offline');
- });
-
- mqttClient.on('error', function (error) {
- log.info('error', {error: error});
- });
-
- mqttClient.on('connect', function () {
- log.info('Connected');
- mqttClient.subscribe('/diller/#');
- });
-
- mqttClient.on('message', function (topic, message, payload) {
- tx(function (pg, dao, diller) {
- return diller.onMessage(topic, message, payload);
- });
- });
- }
-
- return {
- run: run
- };
-}
-var _DillerTx = require('../DillerTx');
-var _DillerConfig = require('../DillerConfig');
-
-di.annotate(DillerMqtt, new di.Inject(_DillerConfig, _DillerTx));
-
-module.exports = DillerMqtt;
diff --git a/src/mqtt/DillerMqttClient.js b/src/mqtt/DillerMqttClient.js
new file mode 100644
index 0000000..62f5aef
--- /dev/null
+++ b/src/mqtt/DillerMqttClient.js
@@ -0,0 +1,60 @@
+var di = require('di');
+var mqtt = require('mqtt');
+
+var os = require("os");
+
+/**
+ * @constructor
+ */
+function DillerMqttClient(config, tx) {
+ var log = config.log();
+
+ var mqttClient;
+ var hostname = os.hostname();
+
+ function run(clientType) {
+ if (mqttClient) {
+ throw 'Already connected';
+ }
+
+ var clientId = 'diller_' + clientType + '_' + hostname;
+ log.info('Connecting to ' + config.mqttUrl + ' as ' + clientId);
+ var cfg = {
+ clientId: clientId
+ };
+
+ mqttClient = mqtt.connect(config.mqttUrl, cfg);
+
+ mqttClient.on('offline', function () {
+ log.info('offline');
+ });
+
+ mqttClient.on('error', function (error) {
+ log.info('error', {error: error});
+ });
+
+ mqttClient.on('connect', function () {
+ log.info('Connected');
+ mqttClient.subscribe('/diller/#');
+ });
+ }
+
+ /**
+ * @lends DillerMqttClient.prototype
+ */
+ return {
+ run: run,
+ on: function () {
+ return mqttClient.on.apply(mqttClient, arguments);
+ },
+ publish: function () {
+ return mqttClient.publish.apply(mqttClient, arguments);
+ }
+ };
+}
+var _DillerTx = require('../DillerTx');
+var _DillerConfig = require('../DillerConfig');
+
+di.annotate(DillerMqttClient, new di.Inject(_DillerConfig, _DillerTx));
+
+module.exports = DillerMqttClient;
diff --git a/src/web/DillerWeb.js b/src/web/DillerWeb.js
index 42aa7bf..5f52bde 100644
--- a/src/web/DillerWeb.js
+++ b/src/web/DillerWeb.js
@@ -3,20 +3,20 @@ var bodyParser = require('body-parser');
var _ = require('lodash');
var di = require('di');
-var _DillerConfig = require('../DillerConfig');
-var _DillerTx = require('../DillerTx');
-
/**
- * @param {function(function(PgTx, DillerDao, Diller))} tx
* @param {DillerConfig} config
+ * @param {DillerMqttClient} mqttClient
+ * @param {function(function(PgTx, DillerDao, Diller))} tx
* @constructor
*/
-function DillerWeb(tx, config) {
+function DillerWeb(config, mqttClient, tx) {
var log = config.log();
var calls = [];
var app;
+ mqttClient.run('web');
+
/**
* @param {HttpRes} res
*/
@@ -124,26 +124,44 @@ function DillerWeb(tx, config) {
tx(function (pg, dao, diller) {
var propertyId = req.params.propertyId;
- var body = req.body;
-
- var p;
- if (!body.attribute) {
- res.status(400).json({message: 'Required keys: "attribute" and "value".'});
- } else if (body.attribute == 'name' || body.attribute == 'description') {
- var attributes = {};
- attributes[body.attribute] = body.value;
-
- p = diller.updatePropertyAttributes(propertyId, attributes);
- } else {
- p = Promise.reject('Unsupported attribute: ' + body.attribute);
- }
-
- return p.then(function (property) {
- return pg.batch([
- dao.deviceById(property.device),
- dao.devicePropertiesByDeviceId(property.device)]
- );
- });
+ return dao.devicePropertyById(propertyId)
+ .then(function (property) {
+ return dao.deviceById(property.device)
+ .then(function (device) {
+ return {property: property, device: device};
+ });
+ })
+ .then(function (data) {
+ var property = data.property;
+ var device = data.device;
+ var body = req.body;
+
+ var p;
+ if (!body.attribute) {
+ res.status(400).json({message: 'Required keys: "attribute" and "value".'});
+ } else if (body.attribute == 'name' || body.attribute == 'description') {
+ var attributes = {};
+ attributes[body.attribute] = body.value;
+
+ p = diller.updatePropertyAttributes(propertyId, attributes);
+
+ var topic = '/diller/' + device.key + '/sensors/' + property.key + '/' + body.attribute;
+ var opts = {
+ retain: true
+ };
+ mqttClient.publish(topic, body.value, opts);
+
+ } else {
+ p = Promise.reject('Unsupported attribute: ' + body.attribute);
+ }
+
+ return p.then(function () {
+ return pg.batch([
+ dao.deviceById(property.device),
+ dao.devicePropertiesByDeviceId(property.device)]
+ );
+ });
+ });
}).then(function (data) {
var device = data[0];
device.properties = data[1];
@@ -293,6 +311,9 @@ function DillerWeb(tx, config) {
}
}
-di.annotate(DillerWeb, new di.Inject(_DillerTx, _DillerConfig));
+di.annotate(DillerWeb, new di.Inject(
+ require('../DillerConfig'),
+ require('../mqtt/DillerMqttClient'),
+ require('../DillerTx')));
module.exports = DillerWeb;