From 52eb8072664a61ea61dbdbef7485d6c81dbbcfe9 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 18 Oct 2015 18:10:20 +0200 Subject: o Dropping the app concept. o Switching to pg-promise to not get overloaded by callbacks. --- README.md | 15 ++++++ apps/device.js | 55 -------------------- apps/logger.js | 39 --------------- apps/values.js | 149 ------------------------------------------------------- diller.js | 41 ++------------- package.json | 4 +- src/Diller.js | 101 +++++++++++++++++++++++++++++++++++++ src/DillerDao.js | 47 ++++++++++++++++++ src/config.js | 12 +++-- 9 files changed, 178 insertions(+), 285 deletions(-) create mode 100644 README.md delete mode 100644 apps/device.js delete mode 100644 apps/logger.js delete mode 100644 apps/values.js create mode 100644 src/Diller.js create mode 100644 src/DillerDao.js diff --git a/README.md b/README.md new file mode 100644 index 0000000..a9a7f6a --- /dev/null +++ b/README.md @@ -0,0 +1,15 @@ +# MQTT Topics + + /diller + / + /property + / + /value + /name + /description + +## Sending a new property value + + mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/value -m 12.3 + mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/name -m Bathroom + mosquitto_pub -h trygvis.io -t /diller/aa:bb:cc:dd:ee:ff/property/temp-0/description -m "Second Floot" diff --git a/apps/device.js b/apps/device.js deleted file mode 100644 index 1b8e169..0000000 --- a/apps/device.js +++ /dev/null @@ -1,55 +0,0 @@ -var pg, log; - -function init(config) { - pg = config.pg; - log = config.log; -} -exports.init = init; - -var deviceR = /^\/diller\/([0-9a-fA-F]{2}(:[0-9a-fA-F]{2}){5})\//; - -function onMessage(topic, message, payload) { - var matches = deviceR.exec(topic) - - if (!matches) { - log.warn('no match: ', topic); - return; - } - - var mac = matches[1]; - - pg.connect(function (err, client, done) { - if (err) { - done(); - log.error('Could not connect to postgres', err); - return; - } - - client.query('select count(*) as count from device where key=$1', [mac], function (err, result) { - if (err) { - done(); - log.error('error running query', err); - return; - } - - if (result.rows[0].count > 0) { - done(); - return; - } - - log.info('New device:', mac); - - client.query('insert into device(id, key, created_timestamp) values(default, $1, current_timestamp) returning id', [mac], function (err, result) { - if (err) { - done(); - log.error('error inserting new device', err); - } - - log.info('New device created', result.rows[0].id); - done(); - }); - }); - }); -} - -exports.onMessage = onMessage; diff --git a/apps/logger.js b/apps/logger.js deleted file mode 100644 index 32ceb27..0000000 --- a/apps/logger.js +++ /dev/null @@ -1,39 +0,0 @@ -var pg, log; - -function init(config) { - pg = config.pg; - log = config.log; - - log.info('loaded'); -} -exports.init = init; - -function onMessage(topic, message, payload) { - pg.connect(function (err, client, done) { - if (err) { - done(); - console.log(err); - return; - } - - var body = message.toString(); - - client.query("INSERT INTO message(timestamp, topic, message) values(current_timestamp, $1, $2)", [topic, body]); - - // pg.on('row', function(row) { - // results.push(row); - // }); - - client.on('end', function () { - log.info('message processed'); - done(); - }); - - client.on('error', function (err) { - log.info('error', err); - done(); - }); - }); -} - -exports.onMessage = onMessage; diff --git a/apps/values.js b/apps/values.js deleted file mode 100644 index 2975e9f..0000000 --- a/apps/values.js +++ /dev/null @@ -1,149 +0,0 @@ -var pg, log; - -function init(config) { - pg = config.pg; - log = config.log; -} -exports.init = init; - -function newValue(client, device_id, device_key, property_id, property_key, value) { - log.info('new value for device ', device_key + '/' + device_id, property_key + '/' + property_id, value); - - client.query('INSERT INTO value(property, timestamp, value) VALUES($1, current_timestamp, $2)', [property_id, value], function (err, result) { - log.info('new value stored'); - }); -} - -function newName(client, device_id, device_key, property_id, property_key, name) { - log.info('new name for device ', device_key + '/' + device_id, property_key + '/' + property_id, name); -} - -function newDescription(client, device_id, device_key, property_id, property_key, description) { - log.info('new description for device ', device_key + '/' + device_id, property_key + '/' + property_id, description); -} - -function onMessage(topic, message, payload) { - var parts = topic.split(/\//); - - // /diller/5c:cf:7f:06:59:a5/sensors/temp-0/value - - if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'sensors') { - log.warn('no match: ', topic, parts); - return; - } - - var device_key = parts[2]; - var property_key = parts[4]; - var msg_type = parts[5]; - - var f; - if (msg_type == 'value') { - f = newValue - } else if (msg_type == 'name') { - f = newName - } else if (msg_type == 'description') { - f = newDescription - } - - if (!f) { - log.warn('Unknown message topic:', topic); - return; - } - - pg.connect(function (err, client, done) { - if (err) { - done(); - log.error('Could not connect to postgres', err); - return; - } - - client.query('SELECT id FROM device WHERE key=$1', [device_key], function (err, result) { - if (err) { - done(); - log.error('error looking for existing device', err); - return; - } - - // TODO: insert device - if (result.rows.length == 0) { - log.warn('No device registered with key =', device_key); - done(); - return; - } - - var device_id = result.rows[0].id; - log.info('device ', device_id); - - client.query('SELECT id FROM device_property WHERE device=$1 AND key=$2', [device_id, property_key], function (err, result) { - if (err) { - done(); - log.error('error looking for existing property', err); - return; - } - - var property_id; - if (result.rows.length == 0) { - log.info('New property: ', property_key); - client.query('INSERT INTO device_property(device, key, created_timestamp) VALUES($1, $2, current_timestamp) RETURNING id', [device_id, property_key], function (err, result) { - if (err) { - done(); - log.error('Error inserting device_property', {device_id: device_id, property_key: property_key}, err); - return; - } - - property_id = result.rows[0].id; - - log.info('new property: key=', property_key, ', id =', property_id); - - f(client, device_id, device_key, property_id, property_key, message.toString()); - }); - done(); - return; - } - - property_id = result.rows[0].id; - - log.info('property: key =', property_key, ', id =', property_id); - f(client, device_id, device_key, property_id, property_key, message.toString()); - }); - }); - }); - - /* - pg.connect(function(err, client, done) { - if (err) { - done(); - log.error('Could not connect to postgres', err); - return; - } - - client.query('select count(*) as count from device where key=$1', [device_key], function(err, result) { - if (err) { - done(); - log.error('error running query', err); - return; - } - - if (result.rows[0].count > 0) { - done(); - return; - } - - log.info('New device:', device_key); - - client.query('insert into device(key, created_timestamp) values($1, current_timestamp)', [device_key], function(err, result) { - if (err) { - done(); - log.error('error inserting new device', err); - } - - log.info('New device created'); - done(); - }); - }); - }); - */ -} - -exports.onMessage = onMessage; - diff --git a/diller.js b/diller.js index aaad6c1..cfaa6a7 100644 --- a/diller.js +++ b/diller.js @@ -1,9 +1,6 @@ var mqtt = require('mqtt'); var fs = require('fs'); -var _ = require('underscore'); -var winston = require('winston'); var bunyan = require('bunyan'); -var pg = require('pg'); var config = require('./src/config'); @@ -27,7 +24,7 @@ function configureLogging(config) { var stat; try { stat = fs.lstatSync('log'); - } catch(e) { + } catch (e) { // Assume this to to be ENOENT fs.mkdirSync('log'); } @@ -41,35 +38,9 @@ function configureLogging(config) { var log = configureLogging(config); -var pgClient = new pg.Client(config.postgresqlUrl); -pgClient.connect(); -pgClient.on('connect', function () { - log.info('connected to pg'); -}); - -var apps = [ - './apps/logger', - './apps/device', - './apps/values' -]; - -appConfig = { - pg: { - connect: function (cb) { - pg.connect(config.postgresqlUrl, cb); - } - } -}; - -apps = _.map(apps, function (name) { - log.info('Loading app: ' + name); - var app = require(name); - var cfg = _.clone(appConfig); +var Diller = require('./src/Diller').Diller; - cfg.log = log.child({app: name}); - app.init(cfg); - return {name: name, instance: app}; -}); +var diller = new Diller(config, log); log.info('Connecting to ' + config.mqttUrl); var mqttClient = mqtt.connect(config.mqttUrl); @@ -88,9 +59,5 @@ mqttClient.on('connect', function () { }); mqttClient.on('message', function (topic, message, payload) { - log.info('got message', {message: message.toString()}); - - _.each(apps, function (app) { - app.instance.onMessage(topic, message, payload); - }); + diller.onMessage(topic, message, payload); }); diff --git a/package.json b/package.json index d50ad50..388c2f0 100644 --- a/package.json +++ b/package.json @@ -13,7 +13,7 @@ "db-migrate": "^0.9.23", "mqtt": "^1.4.3", "pg": "^4.4.2", - "underscore": "^1.8.3", - "winston": "^1.1.0" + "pg-promise": "^2.0.12", + "underscore": "^1.8.3" } } diff --git a/src/Diller.js b/src/Diller.js new file mode 100644 index 0000000..9a1de23 --- /dev/null +++ b/src/Diller.js @@ -0,0 +1,101 @@ +var DillerDao = require('./DillerDao'); +var pgp = require('pg-promise')(); + +function Diller(config, log) { + + function newValue(dao, device, property, value) { + log.info('new value for device ' + device.key + '/' + property.key + ' = ' + value, { + deviceId: device.id, + propertyId: property.id, + value: value + }); + + return dao.insertValue(property.id, value); + } + + function newName(dao, device, property, name) { + log.info('New name for property ', device.key + '/' + property.key + '.name = ' + name); + + return dao.updatePropertyName(property.id, name); + } + + function newDescription(dao, device, property, description) { + log.info('New description for property ', device.key + '/' + property.key + '.description = ' + description); + + return dao.updatePropertyDescription(property.id, description); + } + + function onMessage(topic, message, payload) { + var parts = topic.split(/\//); + + log.info('Processing message to ' + topic); + + // /diller/5c:cf:7f:06:59:a5/sensors/temp-0/value + + if (parts[3] == 'sensors') { + parts[3] = 'property'; + } + + if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'property') { + log.warn('no match: ', topic, parts); + return; + } + + var device_key = parts[2]; + var property_key = parts[4]; + var msg_type = parts[5]; + + var f; + if (msg_type == 'value') { + f = newValue + } else if (msg_type == 'name') { + f = newName + } else if (msg_type == 'description') { + f = newDescription + } + + if (!f) { + log.warn('Unknown message topic:', topic); + return; + } + + return pgp(config.postgresqlConfig) + .tx(function (pg) { + var dao = new DillerDao(pg); + + return dao.deviceByKey(device_key) + .then(function (device) { + return device || dao.insertDevice(device_key).then(function (device) { + log.info('New device created', {device_key: device_key, id: device.id}); + return device; + }); + }) + .then(function (device) { + return dao.devicePropertyByDeviceIdAndKey(device.id, property_key).then(function (property) { + var ret = {device: device, property: property}; + return (property && ret) || dao.insertDeviceProperty(device.id, property_key).then(function (p) { + log.info('Created new device property', {id: p.id, key: p.key}); + ret.property = p; + return ret; + }); + }); + }) + .then(function (data) { + return f(dao, data.device, data.property, message.toString()); + }); + }).then(function (res) { + log.warn('success', res); + }, function (res) { + log.warn('fail', res); + }); + } + + return { + onMessage: onMessage + } +} + +//noinspection JSUnresolvedVariable +module.exports = { + Diller: Diller +}; diff --git a/src/DillerDao.js b/src/DillerDao.js new file mode 100644 index 0000000..ed6fcf0 --- /dev/null +++ b/src/DillerDao.js @@ -0,0 +1,47 @@ +function DillerDao(client) { + + var deviceColumns = 'id, key, created_timestamp'; + var propertyColumns = 'id, device, key, created_timestamp'; + + function deviceByKey(key) { + return client.oneOrNone("SELECT " + deviceColumns + " FROM device WHERE key=$1", key); + } + + function insertDevice(key) { + return client.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + deviceColumns, key); + } + + function devicePropertyByDeviceIdAndKey(deviceId, key) { + return client.oneOrNone('SELECT id FROM device_property WHERE device=$1 AND key=$2', [deviceId, key]); + } + + function insertDeviceProperty(deviceId, key) { + return client.oneOrNone('INSERT INTO device_property(id, device, key, created_timestamp) VALUES(DEFAULT, $1, $2, CURRENT_TIMESTAMP) RETURNING ' + propertyColumns, [deviceId, key]); + } + + function updatePropertyName(id, name) { + return client.none('UPDATE device_property SET name=$1 WHERE id=$2', name, id); + } + + function updatePropertyDescription(id, description) { + return client.none('UPDATE device_property SET description=$1 WHERE id=$2', description, id); + } + + function insertValue(propertyId, value) { + return client.none('INSERT INTO value(property, timestamp, value) VALUES($1, CURRENT_TIMESTAMP, $2)', [propertyId, value]); + } + + return { + deviceByKey: deviceByKey, + insertDevice: insertDevice, + + devicePropertyByDeviceIdAndKey: devicePropertyByDeviceIdAndKey, + insertDeviceProperty: insertDeviceProperty, + updatePropertyName: updatePropertyName, + updatePropertyDescription: updatePropertyDescription, + + insertValue: insertValue + } +} + +module.exports = DillerDao; diff --git a/src/config.js b/src/config.js index 949c284..a80fceb 100644 --- a/src/config.js +++ b/src/config.js @@ -3,10 +3,16 @@ function isProd() { } var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io'; -var postgresqlUrl = process.env.DATABASE_URL || 'postgres://diller:diller@localhost:5432/diller'; +//var postgresqlUrl = process.env.DATABASE_URL || 'postgres://diller:diller@localhost:5432/diller'; +var postgresqlConfig = { + host: process.env.DB_HOST || '/var/run/postgresql', + database: process.env.DB_DATABASE || 'diller', + user: process.env.DB_USER || 'diller', + password: process.env.DB_PASSWORD || 'diller' +}; module.exports = { isProd: isProd, mqttUrl: mqttUrl, - postgresqlUrl: postgresqlUrl -} + postgresqlConfig: postgresqlConfig +}; -- cgit v1.2.3