From 478a69d834904f62cc4fb5ff52d5ed345e6859af Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 1 Nov 2015 16:13:45 +0100 Subject: web: o Publishing messages when the name/description is changed from the web. --- diller-mqtt.js | 18 +++++++++-- gulpfile.js | 1 - src/DillerDao.js | 7 ----- src/mqtt/DillerMqtt.js | 40 ------------------------ src/mqtt/DillerMqttClient.js | 60 ++++++++++++++++++++++++++++++++++++ src/web/DillerWeb.js | 73 ++++++++++++++++++++++++++++---------------- 6 files changed, 123 insertions(+), 76 deletions(-) delete mode 100644 src/mqtt/DillerMqtt.js create mode 100644 src/mqtt/DillerMqttClient.js diff --git a/diller-mqtt.js b/diller-mqtt.js index db6fc2b..e528bb4 100644 --- a/diller-mqtt.js +++ b/diller-mqtt.js @@ -4,5 +4,19 @@ var injector = new di.Injector(); var config = injector.get(require('./src/DillerConfig')); config.configureLogging('mqtt'); -var dillerMqtt = injector.get(require('./src/mqtt/DillerMqtt')); -dillerMqtt.run(); +/** + * @type {function(function(PgTx, DillerDao, Diller))} tx + */ +var tx = injector.get(require('./src/DillerTx')); + +/** + * @type DillerMqttClient + */ +var dillerMqttClient = injector.get(require('./src/mqtt/DillerMqttClient')); +dillerMqttClient.run('mqtt'); + +dillerMqttClient.on('message', function (topic, message, payload) { + tx(function (pg, dao, diller) { + return diller.onMessage(topic, message, payload); + }); +}); diff --git a/gulpfile.js b/gulpfile.js index 1e98a86..9777173 100644 --- a/gulpfile.js +++ b/gulpfile.js @@ -66,7 +66,6 @@ gulp.task('diller-web', ['bower'], function () { 'src' ], ignore: [ - 'src/mqtt/' ], env: {NODE_ENV: 'development'}, tasks: ['diller-web-reload'], diff --git a/src/DillerDao.js b/src/DillerDao.js index d81b418..34f4205 100644 --- a/src/DillerDao.js +++ b/src/DillerDao.js @@ -164,11 +164,4 @@ function DillerDao(tx) { }; } -/** - * @type DillerDao - */ -var x; - -x.deviceById(123) - module.exports = DillerDao; diff --git a/src/mqtt/DillerMqtt.js b/src/mqtt/DillerMqtt.js deleted file mode 100644 index 0bd8ed9..0000000 --- a/src/mqtt/DillerMqtt.js +++ /dev/null @@ -1,40 +0,0 @@ -var di = require('di'); -var mqtt = require('mqtt'); - -function DillerMqtt(config, tx) { - var log = config.log(); - - function run() { - log.info('Connecting to ' + config.mqttUrl); - var mqttClient = mqtt.connect(config.mqttUrl); - - mqttClient.on('offline', function () { - log.info('offline'); - }); - - mqttClient.on('error', function (error) { - log.info('error', {error: error}); - }); - - mqttClient.on('connect', function () { - log.info('Connected'); - mqttClient.subscribe('/diller/#'); - }); - - mqttClient.on('message', function (topic, message, payload) { - tx(function (pg, dao, diller) { - return diller.onMessage(topic, message, payload); - }); - }); - } - - return { - run: run - }; -} -var _DillerTx = require('../DillerTx'); -var _DillerConfig = require('../DillerConfig'); - -di.annotate(DillerMqtt, new di.Inject(_DillerConfig, _DillerTx)); - -module.exports = DillerMqtt; diff --git a/src/mqtt/DillerMqttClient.js b/src/mqtt/DillerMqttClient.js new file mode 100644 index 0000000..62f5aef --- /dev/null +++ b/src/mqtt/DillerMqttClient.js @@ -0,0 +1,60 @@ +var di = require('di'); +var mqtt = require('mqtt'); + +var os = require("os"); + +/** + * @constructor + */ +function DillerMqttClient(config, tx) { + var log = config.log(); + + var mqttClient; + var hostname = os.hostname(); + + function run(clientType) { + if (mqttClient) { + throw 'Already connected'; + } + + var clientId = 'diller_' + clientType + '_' + hostname; + log.info('Connecting to ' + config.mqttUrl + ' as ' + clientId); + var cfg = { + clientId: clientId + }; + + mqttClient = mqtt.connect(config.mqttUrl, cfg); + + mqttClient.on('offline', function () { + log.info('offline'); + }); + + mqttClient.on('error', function (error) { + log.info('error', {error: error}); + }); + + mqttClient.on('connect', function () { + log.info('Connected'); + mqttClient.subscribe('/diller/#'); + }); + } + + /** + * @lends DillerMqttClient.prototype + */ + return { + run: run, + on: function () { + return mqttClient.on.apply(mqttClient, arguments); + }, + publish: function () { + return mqttClient.publish.apply(mqttClient, arguments); + } + }; +} +var _DillerTx = require('../DillerTx'); +var _DillerConfig = require('../DillerConfig'); + +di.annotate(DillerMqttClient, new di.Inject(_DillerConfig, _DillerTx)); + +module.exports = DillerMqttClient; diff --git a/src/web/DillerWeb.js b/src/web/DillerWeb.js index 42aa7bf..5f52bde 100644 --- a/src/web/DillerWeb.js +++ b/src/web/DillerWeb.js @@ -3,20 +3,20 @@ var bodyParser = require('body-parser'); var _ = require('lodash'); var di = require('di'); -var _DillerConfig = require('../DillerConfig'); -var _DillerTx = require('../DillerTx'); - /** - * @param {function(function(PgTx, DillerDao, Diller))} tx * @param {DillerConfig} config + * @param {DillerMqttClient} mqttClient + * @param {function(function(PgTx, DillerDao, Diller))} tx * @constructor */ -function DillerWeb(tx, config) { +function DillerWeb(config, mqttClient, tx) { var log = config.log(); var calls = []; var app; + mqttClient.run('web'); + /** * @param {HttpRes} res */ @@ -124,26 +124,44 @@ function DillerWeb(tx, config) { tx(function (pg, dao, diller) { var propertyId = req.params.propertyId; - var body = req.body; - - var p; - if (!body.attribute) { - res.status(400).json({message: 'Required keys: "attribute" and "value".'}); - } else if (body.attribute == 'name' || body.attribute == 'description') { - var attributes = {}; - attributes[body.attribute] = body.value; - - p = diller.updatePropertyAttributes(propertyId, attributes); - } else { - p = Promise.reject('Unsupported attribute: ' + body.attribute); - } - - return p.then(function (property) { - return pg.batch([ - dao.deviceById(property.device), - dao.devicePropertiesByDeviceId(property.device)] - ); - }); + return dao.devicePropertyById(propertyId) + .then(function (property) { + return dao.deviceById(property.device) + .then(function (device) { + return {property: property, device: device}; + }); + }) + .then(function (data) { + var property = data.property; + var device = data.device; + var body = req.body; + + var p; + if (!body.attribute) { + res.status(400).json({message: 'Required keys: "attribute" and "value".'}); + } else if (body.attribute == 'name' || body.attribute == 'description') { + var attributes = {}; + attributes[body.attribute] = body.value; + + p = diller.updatePropertyAttributes(propertyId, attributes); + + var topic = '/diller/' + device.key + '/sensors/' + property.key + '/' + body.attribute; + var opts = { + retain: true + }; + mqttClient.publish(topic, body.value, opts); + + } else { + p = Promise.reject('Unsupported attribute: ' + body.attribute); + } + + return p.then(function () { + return pg.batch([ + dao.deviceById(property.device), + dao.devicePropertiesByDeviceId(property.device)] + ); + }); + }); }).then(function (data) { var device = data[0]; device.properties = data[1]; @@ -293,6 +311,9 @@ function DillerWeb(tx, config) { } } -di.annotate(DillerWeb, new di.Inject(_DillerTx, _DillerConfig)); +di.annotate(DillerWeb, new di.Inject( + require('../DillerConfig'), + require('../mqtt/DillerMqttClient'), + require('../DillerTx'))); module.exports = DillerWeb; -- cgit v1.2.3