diff options
Diffstat (limited to 'src/mqtt')
-rw-r--r-- | src/mqtt/DillerMqtt.js | 40 | ||||
-rw-r--r-- | src/mqtt/DillerMqttClient.js | 60 |
2 files changed, 60 insertions, 40 deletions
diff --git a/src/mqtt/DillerMqtt.js b/src/mqtt/DillerMqtt.js deleted file mode 100644 index 0bd8ed9..0000000 --- a/src/mqtt/DillerMqtt.js +++ /dev/null @@ -1,40 +0,0 @@ -var di = require('di'); -var mqtt = require('mqtt'); - -function DillerMqtt(config, tx) { - var log = config.log(); - - function run() { - log.info('Connecting to ' + config.mqttUrl); - var mqttClient = mqtt.connect(config.mqttUrl); - - mqttClient.on('offline', function () { - log.info('offline'); - }); - - mqttClient.on('error', function (error) { - log.info('error', {error: error}); - }); - - mqttClient.on('connect', function () { - log.info('Connected'); - mqttClient.subscribe('/diller/#'); - }); - - mqttClient.on('message', function (topic, message, payload) { - tx(function (pg, dao, diller) { - return diller.onMessage(topic, message, payload); - }); - }); - } - - return { - run: run - }; -} -var _DillerTx = require('../DillerTx'); -var _DillerConfig = require('../DillerConfig'); - -di.annotate(DillerMqtt, new di.Inject(_DillerConfig, _DillerTx)); - -module.exports = DillerMqtt; diff --git a/src/mqtt/DillerMqttClient.js b/src/mqtt/DillerMqttClient.js new file mode 100644 index 0000000..62f5aef --- /dev/null +++ b/src/mqtt/DillerMqttClient.js @@ -0,0 +1,60 @@ +var di = require('di'); +var mqtt = require('mqtt'); + +var os = require("os"); + +/** + * @constructor + */ +function DillerMqttClient(config, tx) { + var log = config.log(); + + var mqttClient; + var hostname = os.hostname(); + + function run(clientType) { + if (mqttClient) { + throw 'Already connected'; + } + + var clientId = 'diller_' + clientType + '_' + hostname; + log.info('Connecting to ' + config.mqttUrl + ' as ' + clientId); + var cfg = { + clientId: clientId + }; + + mqttClient = mqtt.connect(config.mqttUrl, cfg); + + mqttClient.on('offline', function () { + log.info('offline'); + }); + + mqttClient.on('error', function (error) { + log.info('error', {error: error}); + }); + + mqttClient.on('connect', function () { + log.info('Connected'); + mqttClient.subscribe('/diller/#'); + }); + } + + /** + * @lends DillerMqttClient.prototype + */ + return { + run: run, + on: function () { + return mqttClient.on.apply(mqttClient, arguments); + }, + publish: function () { + return mqttClient.publish.apply(mqttClient, arguments); + } + }; +} +var _DillerTx = require('../DillerTx'); +var _DillerConfig = require('../DillerConfig'); + +di.annotate(DillerMqttClient, new di.Inject(_DillerConfig, _DillerTx)); + +module.exports = DillerMqttClient; |