aboutsummaryrefslogtreecommitdiff
path: root/src/mqtt
diff options
context:
space:
mode:
Diffstat (limited to 'src/mqtt')
-rw-r--r--src/mqtt/DillerMqtt.js40
-rw-r--r--src/mqtt/DillerMqttClient.js60
2 files changed, 60 insertions, 40 deletions
diff --git a/src/mqtt/DillerMqtt.js b/src/mqtt/DillerMqtt.js
deleted file mode 100644
index 0bd8ed9..0000000
--- a/src/mqtt/DillerMqtt.js
+++ /dev/null
@@ -1,40 +0,0 @@
-var di = require('di');
-var mqtt = require('mqtt');
-
-function DillerMqtt(config, tx) {
- var log = config.log();
-
- function run() {
- log.info('Connecting to ' + config.mqttUrl);
- var mqttClient = mqtt.connect(config.mqttUrl);
-
- mqttClient.on('offline', function () {
- log.info('offline');
- });
-
- mqttClient.on('error', function (error) {
- log.info('error', {error: error});
- });
-
- mqttClient.on('connect', function () {
- log.info('Connected');
- mqttClient.subscribe('/diller/#');
- });
-
- mqttClient.on('message', function (topic, message, payload) {
- tx(function (pg, dao, diller) {
- return diller.onMessage(topic, message, payload);
- });
- });
- }
-
- return {
- run: run
- };
-}
-var _DillerTx = require('../DillerTx');
-var _DillerConfig = require('../DillerConfig');
-
-di.annotate(DillerMqtt, new di.Inject(_DillerConfig, _DillerTx));
-
-module.exports = DillerMqtt;
diff --git a/src/mqtt/DillerMqttClient.js b/src/mqtt/DillerMqttClient.js
new file mode 100644
index 0000000..62f5aef
--- /dev/null
+++ b/src/mqtt/DillerMqttClient.js
@@ -0,0 +1,60 @@
+var di = require('di');
+var mqtt = require('mqtt');
+
+var os = require("os");
+
+/**
+ * @constructor
+ */
+function DillerMqttClient(config, tx) {
+ var log = config.log();
+
+ var mqttClient;
+ var hostname = os.hostname();
+
+ function run(clientType) {
+ if (mqttClient) {
+ throw 'Already connected';
+ }
+
+ var clientId = 'diller_' + clientType + '_' + hostname;
+ log.info('Connecting to ' + config.mqttUrl + ' as ' + clientId);
+ var cfg = {
+ clientId: clientId
+ };
+
+ mqttClient = mqtt.connect(config.mqttUrl, cfg);
+
+ mqttClient.on('offline', function () {
+ log.info('offline');
+ });
+
+ mqttClient.on('error', function (error) {
+ log.info('error', {error: error});
+ });
+
+ mqttClient.on('connect', function () {
+ log.info('Connected');
+ mqttClient.subscribe('/diller/#');
+ });
+ }
+
+ /**
+ * @lends DillerMqttClient.prototype
+ */
+ return {
+ run: run,
+ on: function () {
+ return mqttClient.on.apply(mqttClient, arguments);
+ },
+ publish: function () {
+ return mqttClient.publish.apply(mqttClient, arguments);
+ }
+ };
+}
+var _DillerTx = require('../DillerTx');
+var _DillerConfig = require('../DillerConfig');
+
+di.annotate(DillerMqttClient, new di.Inject(_DillerConfig, _DillerTx));
+
+module.exports = DillerMqttClient;