aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-10-18 18:10:20 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-10-18 18:10:20 +0200
commit52eb8072664a61ea61dbdbef7485d6c81dbbcfe9 (patch)
treef4218bc276a0188da26da472d1233c264ff31dfe
parent9339e6269d6f5a5d0421c23c5e9ba3c8500fb535 (diff)
downloaddiller-server-52eb8072664a61ea61dbdbef7485d6c81dbbcfe9.tar.gz
diller-server-52eb8072664a61ea61dbdbef7485d6c81dbbcfe9.tar.bz2
diller-server-52eb8072664a61ea61dbdbef7485d6c81dbbcfe9.tar.xz
diller-server-52eb8072664a61ea61dbdbef7485d6c81dbbcfe9.zip
o Dropping the app concept.
o Switching to pg-promise to not get overloaded by callbacks.
-rw-r--r--README.md15
-rw-r--r--apps/device.js55
-rw-r--r--apps/logger.js39
-rw-r--r--apps/values.js149
-rw-r--r--diller.js41
-rw-r--r--package.json4
-rw-r--r--src/Diller.js101
-rw-r--r--src/DillerDao.js47
-rw-r--r--src/config.js12
9 files changed, 178 insertions, 285 deletions
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..a9a7f6a
--- /dev/null
+++ b/README.md
@@ -0,0 +1,15 @@
+# MQTT Topics
+
+ /diller
+ /<device id>
+ /property
+ /<property id>
+ /value
+ /name
+ /description
+
+## Sending a new property value
+
+ mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/value -m 12.3
+ mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/name -m Bathroom
+ mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/description -m "Second Floot"
diff --git a/apps/device.js b/apps/device.js
deleted file mode 100644
index 1b8e169..0000000
--- a/apps/device.js
+++ /dev/null
@@ -1,55 +0,0 @@
-var pg, log;
-
-function init(config) {
- pg = config.pg;
- log = config.log;
-}
-exports.init = init;
-
-var deviceR = /^\/diller\/([0-9a-fA-F]{2}(:[0-9a-fA-F]{2}){5})\//;
-
-function onMessage(topic, message, payload) {
- var matches = deviceR.exec(topic)
-
- if (!matches) {
- log.warn('no match: ', topic);
- return;
- }
-
- var mac = matches[1];
-
- pg.connect(function (err, client, done) {
- if (err) {
- done();
- log.error('Could not connect to postgres', err);
- return;
- }
-
- client.query('select count(*) as count from device where key=$1', [mac], function (err, result) {
- if (err) {
- done();
- log.error('error running query', err);
- return;
- }
-
- if (result.rows[0].count > 0) {
- done();
- return;
- }
-
- log.info('New device:', mac);
-
- client.query('insert into device(id, key, created_timestamp) values(default, $1, current_timestamp) returning id', [mac], function (err, result) {
- if (err) {
- done();
- log.error('error inserting new device', err);
- }
-
- log.info('New device created', result.rows[0].id);
- done();
- });
- });
- });
-}
-
-exports.onMessage = onMessage;
diff --git a/apps/logger.js b/apps/logger.js
deleted file mode 100644
index 32ceb27..0000000
--- a/apps/logger.js
+++ /dev/null
@@ -1,39 +0,0 @@
-var pg, log;
-
-function init(config) {
- pg = config.pg;
- log = config.log;
-
- log.info('loaded');
-}
-exports.init = init;
-
-function onMessage(topic, message, payload) {
- pg.connect(function (err, client, done) {
- if (err) {
- done();
- console.log(err);
- return;
- }
-
- var body = message.toString();
-
- client.query("INSERT INTO message(timestamp, topic, message) values(current_timestamp, $1, $2)", [topic, body]);
-
- // pg.on('row', function(row) {
- // results.push(row);
- // });
-
- client.on('end', function () {
- log.info('message processed');
- done();
- });
-
- client.on('error', function (err) {
- log.info('error', err);
- done();
- });
- });
-}
-
-exports.onMessage = onMessage;
diff --git a/apps/values.js b/apps/values.js
deleted file mode 100644
index 2975e9f..0000000
--- a/apps/values.js
+++ /dev/null
@@ -1,149 +0,0 @@
-var pg, log;
-
-function init(config) {
- pg = config.pg;
- log = config.log;
-}
-exports.init = init;
-
-function newValue(client, device_id, device_key, property_id, property_key, value) {
- log.info('new value for device ', device_key + '/' + device_id, property_key + '/' + property_id, value);
-
- client.query('INSERT INTO value(property, timestamp, value) VALUES($1, current_timestamp, $2)', [property_id, value], function (err, result) {
- log.info('new value stored');
- });
-}
-
-function newName(client, device_id, device_key, property_id, property_key, name) {
- log.info('new name for device ', device_key + '/' + device_id, property_key + '/' + property_id, name);
-}
-
-function newDescription(client, device_id, device_key, property_id, property_key, description) {
- log.info('new description for device ', device_key + '/' + device_id, property_key + '/' + property_id, description);
-}
-
-function onMessage(topic, message, payload) {
- var parts = topic.split(/\//);
-
- // /diller/5c:cf:7f:06:59:a5/sensors/temp-0/value
-
- if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'sensors') {
- log.warn('no match: ', topic, parts);
- return;
- }
-
- var device_key = parts[2];
- var property_key = parts[4];
- var msg_type = parts[5];
-
- var f;
- if (msg_type == 'value') {
- f = newValue
- } else if (msg_type == 'name') {
- f = newName
- } else if (msg_type == 'description') {
- f = newDescription
- }
-
- if (!f) {
- log.warn('Unknown message topic:', topic);
- return;
- }
-
- pg.connect(function (err, client, done) {
- if (err) {
- done();
- log.error('Could not connect to postgres', err);
- return;
- }
-
- client.query('SELECT id FROM device WHERE key=$1', [device_key], function (err, result) {
- if (err) {
- done();
- log.error('error looking for existing device', err);
- return;
- }
-
- // TODO: insert device
- if (result.rows.length == 0) {
- log.warn('No device registered with key =', device_key);
- done();
- return;
- }
-
- var device_id = result.rows[0].id;
- log.info('device ', device_id);
-
- client.query('SELECT id FROM device_property WHERE device=$1 AND key=$2', [device_id, property_key], function (err, result) {
- if (err) {
- done();
- log.error('error looking for existing property', err);
- return;
- }
-
- var property_id;
- if (result.rows.length == 0) {
- log.info('New property: ', property_key);
- client.query('INSERT INTO device_property(device, key, created_timestamp) VALUES($1, $2, current_timestamp) RETURNING id', [device_id, property_key], function (err, result) {
- if (err) {
- done();
- log.error('Error inserting device_property', {device_id: device_id, property_key: property_key}, err);
- return;
- }
-
- property_id = result.rows[0].id;
-
- log.info('new property: key=', property_key, ', id =', property_id);
-
- f(client, device_id, device_key, property_id, property_key, message.toString());
- });
- done();
- return;
- }
-
- property_id = result.rows[0].id;
-
- log.info('property: key =', property_key, ', id =', property_id);
- f(client, device_id, device_key, property_id, property_key, message.toString());
- });
- });
- });
-
- /*
- pg.connect(function(err, client, done) {
- if (err) {
- done();
- log.error('Could not connect to postgres', err);
- return;
- }
-
- client.query('select count(*) as count from device where key=$1', [device_key], function(err, result) {
- if (err) {
- done();
- log.error('error running query', err);
- return;
- }
-
- if (result.rows[0].count > 0) {
- done();
- return;
- }
-
- log.info('New device:', device_key);
-
- client.query('insert into device(key, created_timestamp) values($1, current_timestamp)', [device_key], function(err, result) {
- if (err) {
- done();
- log.error('error inserting new device', err);
- }
-
- log.info('New device created');
- done();
- });
- });
- });
- */
-}
-
-exports.onMessage = onMessage;
-
diff --git a/diller.js b/diller.js
index aaad6c1..cfaa6a7 100644
--- a/diller.js
+++ b/diller.js
@@ -1,9 +1,6 @@
var mqtt = require('mqtt');
var fs = require('fs');
-var _ = require('underscore');
-var winston = require('winston');
var bunyan = require('bunyan');
-var pg = require('pg');
var config = require('./src/config');
@@ -27,7 +24,7 @@ function configureLogging(config) {
var stat;
try {
stat = fs.lstatSync('log');
- } catch(e) {
+ } catch (e) {
// Assume this to to be ENOENT
fs.mkdirSync('log');
}
@@ -41,35 +38,9 @@ function configureLogging(config) {
var log = configureLogging(config);
-var pgClient = new pg.Client(config.postgresqlUrl);
-pgClient.connect();
-pgClient.on('connect', function () {
- log.info('connected to pg');
-});
-
-var apps = [
- './apps/logger',
- './apps/device',
- './apps/values'
-];
-
-appConfig = {
- pg: {
- connect: function (cb) {
- pg.connect(config.postgresqlUrl, cb);
- }
- }
-};
-
-apps = _.map(apps, function (name) {
- log.info('Loading app: ' + name);
- var app = require(name);
- var cfg = _.clone(appConfig);
+var Diller = require('./src/Diller').Diller;
- cfg.log = log.child({app: name});
- app.init(cfg);
- return {name: name, instance: app};
-});
+var diller = new Diller(config, log);
log.info('Connecting to ' + config.mqttUrl);
var mqttClient = mqtt.connect(config.mqttUrl);
@@ -88,9 +59,5 @@ mqttClient.on('connect', function () {
});
mqttClient.on('message', function (topic, message, payload) {
- log.info('got message', {message: message.toString()});
-
- _.each(apps, function (app) {
- app.instance.onMessage(topic, message, payload);
- });
+ diller.onMessage(topic, message, payload);
});
diff --git a/package.json b/package.json
index d50ad50..388c2f0 100644
--- a/package.json
+++ b/package.json
@@ -13,7 +13,7 @@
"db-migrate": "^0.9.23",
"mqtt": "^1.4.3",
"pg": "^4.4.2",
- "underscore": "^1.8.3",
- "winston": "^1.1.0"
+ "pg-promise": "^2.0.12",
+ "underscore": "^1.8.3"
}
}
diff --git a/src/Diller.js b/src/Diller.js
new file mode 100644
index 0000000..9a1de23
--- /dev/null
+++ b/src/Diller.js
@@ -0,0 +1,101 @@
+var DillerDao = require('./DillerDao');
+var pgp = require('pg-promise')();
+
+function Diller(config, log) {
+
+ function newValue(dao, device, property, value) {
+ log.info('new value for device ' + device.key + '/' + property.key + ' = ' + value, {
+ deviceId: device.id,
+ propertyId: property.id,
+ value: value
+ });
+
+ return dao.insertValue(property.id, value);
+ }
+
+ function newName(dao, device, property, name) {
+ log.info('New name for property ', device.key + '/' + property.key + '.name = ' + name);
+
+ return dao.updatePropertyName(property.id, name);
+ }
+
+ function newDescription(dao, device, property, description) {
+ log.info('New description for property ', device.key + '/' + property.key + '.description = ' + description);
+
+ return dao.updatePropertyDescription(property.id, description);
+ }
+
+ function onMessage(topic, message, payload) {
+ var parts = topic.split(/\//);
+
+ log.info('Processing message to ' + topic);
+
+ // /diller/5c:cf:7f:06:59:a5/sensors/temp-0/value
+
+ if (parts[3] == 'sensors') {
+ parts[3] = 'property';
+ }
+
+ if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'property') {
+ log.warn('no match: ', topic, parts);
+ return;
+ }
+
+ var device_key = parts[2];
+ var property_key = parts[4];
+ var msg_type = parts[5];
+
+ var f;
+ if (msg_type == 'value') {
+ f = newValue
+ } else if (msg_type == 'name') {
+ f = newName
+ } else if (msg_type == 'description') {
+ f = newDescription
+ }
+
+ if (!f) {
+ log.warn('Unknown message topic:', topic);
+ return;
+ }
+
+ return pgp(config.postgresqlConfig)
+ .tx(function (pg) {
+ var dao = new DillerDao(pg);
+
+ return dao.deviceByKey(device_key)
+ .then(function (device) {
+ return device || dao.insertDevice(device_key).then(function (device) {
+ log.info('New device created', {device_key: device_key, id: device.id});
+ return device;
+ });
+ })
+ .then(function (device) {
+ return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) {
+ var ret = {device: device, property: property};
+ return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) {
+ log.info('Created new device property', {id: p.id, key: p.key});
+ ret.property = p;
+ return ret;
+ });
+ });
+ })
+ .then(function (data) {
+ return f(dao, data.device, data.property, message.toString());
+ });
+ }).then(function (res) {
+ log.warn('success', res);
+ }, function (res) {
+ log.warn('fail', res);
+ });
+ }
+
+ return {
+ onMessage: onMessage
+ }
+}
+
+//noinspection JSUnresolvedVariable
+module.exports = {
+ Diller: Diller
+};
diff --git a/src/DillerDao.js b/src/DillerDao.js
new file mode 100644
index 0000000..ed6fcf0
--- /dev/null
+++ b/src/DillerDao.js
@@ -0,0 +1,47 @@
+function DillerDao(client) {
+
+ var deviceColumns = 'id, key, created_timestamp';
+ var propertyColumns = 'id, device, key, created_timestamp';
+
+ function deviceByKey(key) {
+ return client.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key);
+ }
+
+ function insertDevice(key) {
+ return client.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key);
+ }
+
+ function devicePropertyByDeviceIdAndKey(deviceId, key) {
+ return client.oneOrNone('SELECT id FROM device_property WHERE device=$1 AND key=$2', [deviceId, key]);
+ }
+
+ function insertDeviceProperty(deviceId, key) {
+ return client.oneOrNone('INSERT INTO device_property(id, device, key, created_timestamp) VALUES(DEFAULT, $1, $2, CURRENT_TIMESTAMP) RETURNING ' + propertyColumns, [deviceId, key]);
+ }
+
+ function updatePropertyName(id, name) {
+ return client.none('UPDATE device_property SET name=$1 WHERE id=$2', name, id);
+ }
+
+ function updatePropertyDescription(id, description) {
+ return client.none('UPDATE device_property SET description=$1 WHERE id=$2', description, id);
+ }
+
+ function insertValue(propertyId, value) {
+ return client.none('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2)', [propertyId, value]);
+ }
+
+ return {
+ deviceByKey: deviceByKey,
+ insertDevice: insertDevice,
+
+ devicePropertyByDeviceIdAndKey: devicePropertyByDeviceIdAndKey,
+ insertDeviceProperty: insertDeviceProperty,
+ updatePropertyName: updatePropertyName,
+ updatePropertyDescription: updatePropertyDescription,
+
+ insertValue: insertValue
+ }
+}
+
+module.exports = DillerDao;
diff --git a/src/config.js b/src/config.js
index 949c284..a80fceb 100644
--- a/src/config.js
+++ b/src/config.js
@@ -3,10 +3,16 @@ function isProd() {
}
var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io';
-var postgresqlUrl = process.env.DATABASE_URL || 'postgres://diller:diller@localhost:5432/diller';
+//var postgresqlUrl = process.env.DATABASE_URL || 'postgres://diller:diller@localhost:5432/diller';
+var postgresqlConfig = {
+ host: process.env.DB_HOST || '/var/run/postgresql',
+ database: process.env.DB_DATABASE || 'diller',
+ user: process.env.DB_USER || 'diller',
+ password: process.env.DB_PASSWORD || 'diller'
+};
module.exports = {
isProd: isProd,
mqttUrl: mqttUrl,
- postgresqlUrl: postgresqlUrl
-}
+ postgresqlConfig: postgresqlConfig
+};