From cc0dcb25054e9dff3fdc4b58eaf861d35d0b06de Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 18 Oct 2015 10:36:01 +0200 Subject: wip --- .gitignore | 2 + apps/device.js | 55 +++++++++++++++++++++ apps/logger.js | 39 +++++++++++++++ apps/values.js | 149 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ diller.js | 63 ++++++++++++++++++++++++ package.json | 19 ++++++++ 6 files changed, 327 insertions(+) create mode 100644 .gitignore create mode 100644 apps/device.js create mode 100644 apps/logger.js create mode 100644 apps/values.js create mode 100644 diller.js create mode 100644 package.json diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7a1537b --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.idea +node_modules diff --git a/apps/device.js b/apps/device.js new file mode 100644 index 0000000..1b8e169 --- /dev/null +++ b/apps/device.js @@ -0,0 +1,55 @@ +var pg, log; + +function init(config) { + pg = config.pg; + log = config.log; +} +exports.init = init; + +var deviceR = /^\/diller\/([0-9a-fA-F]{2}(:[0-9a-fA-F]{2}){5})\//; + +function onMessage(topic, message, payload) { + var matches = deviceR.exec(topic) + + if (!matches) { + log.warn('no match: ', topic); + return; + } + + var mac = matches[1]; + + pg.connect(function (err, client, done) { + if (err) { + done(); + log.error('Could not connect to postgres', err); + return; + } + + client.query('select count(*) as count from device where key=$1', [mac], function (err, result) { + if (err) { + done(); + log.error('error running query', err); + return; + } + + if (result.rows[0].count > 0) { + done(); + return; + } + + log.info('New device:', mac); + + client.query('insert into device(id, key, created_timestamp) values(default, $1, current_timestamp) returning id', [mac], function (err, result) { + if (err) { + done(); + log.error('error inserting new device', err); + } + + log.info('New device created', result.rows[0].id); + done(); + }); + }); + }); +} + +exports.onMessage = onMessage; diff --git a/apps/logger.js b/apps/logger.js new file mode 100644 index 0000000..32ceb27 --- /dev/null +++ b/apps/logger.js @@ -0,0 +1,39 @@ +var pg, log; + +function init(config) { + pg = config.pg; + log = config.log; + + log.info('loaded'); +} +exports.init = init; + +function onMessage(topic, message, payload) { + pg.connect(function (err, client, done) { + if (err) { + done(); + console.log(err); + return; + } + + var body = message.toString(); + + client.query("INSERT INTO message(timestamp, topic, message) values(current_timestamp, $1, $2)", [topic, body]); + + // pg.on('row', function(row) { + // results.push(row); + // }); + + client.on('end', function () { + log.info('message processed'); + done(); + }); + + client.on('error', function (err) { + log.info('error', err); + done(); + }); + }); +} + +exports.onMessage = onMessage; diff --git a/apps/values.js b/apps/values.js new file mode 100644 index 0000000..2975e9f --- /dev/null +++ b/apps/values.js @@ -0,0 +1,149 @@ +var pg, log; + +function init(config) { + pg = config.pg; + log = config.log; +} +exports.init = init; + +function newValue(client, device_id, device_key, property_id, property_key, value) { + log.info('new value for device ', device_key + '/' + device_id, property_key + '/' + property_id, value); + + client.query('INSERT INTO value(property, timestamp, value) VALUES($1, current_timestamp, $2)', [property_id, value], function (err, result) { + log.info('new value stored'); + }); +} + +function newName(client, device_id, device_key, property_id, property_key, name) { + log.info('new name for device ', device_key + '/' + device_id, property_key + '/' + property_id, name); +} + +function newDescription(client, device_id, device_key, property_id, property_key, description) { + log.info('new description for device ', device_key + '/' + device_id, property_key + '/' + property_id, description); +} + +function onMessage(topic, message, payload) { + var parts = topic.split(/\//); + + // /diller/5c:cf:7f:06:59:a5/sensors/temp-0/value + + if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'sensors') { + log.warn('no match: ', topic, parts); + return; + } + + var device_key = parts[2]; + var property_key = parts[4]; + var msg_type = parts[5]; + + var f; + if (msg_type == 'value') { + f = newValue + } else if (msg_type == 'name') { + f = newName + } else if (msg_type == 'description') { + f = newDescription + } + + if (!f) { + log.warn('Unknown message topic:', topic); + return; + } + + pg.connect(function (err, client, done) { + if (err) { + done(); + log.error('Could not connect to postgres', err); + return; + } + + client.query('SELECT id FROM device WHERE key=$1', [device_key], function (err, result) { + if (err) { + done(); + log.error('error looking for existing device', err); + return; + } + + // TODO: insert device + if (result.rows.length == 0) { + log.warn('No device registered with key =', device_key); + done(); + return; + } + + var device_id = result.rows[0].id; + log.info('device ', device_id); + + client.query('SELECT id FROM device_property WHERE device=$1 AND key=$2', [device_id, property_key], function (err, result) { + if (err) { + done(); + log.error('error looking for existing property', err); + return; + } + + var property_id; + if (result.rows.length == 0) { + log.info('New property: ', property_key); + client.query('INSERT INTO device_property(device, key, created_timestamp) VALUES($1, $2, current_timestamp) RETURNING id', [device_id, property_key], function (err, result) { + if (err) { + done(); + log.error('Error inserting device_property', {device_id: device_id, property_key: property_key}, err); + return; + } + + property_id = result.rows[0].id; + + log.info('new property: key=', property_key, ', id =', property_id); + + f(client, device_id, device_key, property_id, property_key, message.toString()); + }); + done(); + return; + } + + property_id = result.rows[0].id; + + log.info('property: key =', property_key, ', id =', property_id); + f(client, device_id, device_key, property_id, property_key, message.toString()); + }); + }); + }); + + /* + pg.connect(function(err, client, done) { + if (err) { + done(); + log.error('Could not connect to postgres', err); + return; + } + + client.query('select count(*) as count from device where key=$1', [device_key], function(err, result) { + if (err) { + done(); + log.error('error running query', err); + return; + } + + if (result.rows[0].count > 0) { + done(); + return; + } + + log.info('New device:', device_key); + + client.query('insert into device(key, created_timestamp) values($1, current_timestamp)', [device_key], function(err, result) { + if (err) { + done(); + log.error('error inserting new device', err); + } + + log.info('New device created'); + done(); + }); + }); + }); + */ +} + +exports.onMessage = onMessage; + diff --git a/diller.js b/diller.js new file mode 100644 index 0000000..7627d31 --- /dev/null +++ b/diller.js @@ -0,0 +1,63 @@ +var mqtt = require('mqtt'); +var _ = require('underscore'); +var winston = require('winston'); +var bunyan = require('bunyan'); +var pg = require('pg'); +var connectionString = process.env.DATABASE_URL || 'postgres://diller:diller@localhost:5432/diller'; +var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io'; + +var pgClient = new pg.Client(connectionString); +pgClient.connect(); +pgClient.on('connect', function () { + log.info('connected to pg'); +}); + +var log = bunyan.createLogger({name: 'main'}); + +var apps = [ + './apps/logger', + './apps/device', + './apps/values' +]; + +appConfig = { + pg: { + connect: function (cb) { + pg.connect(connectionString, cb); + } + } +}; + +apps = _.map(apps, function (name) { + log.info('Loading app: ' + name); + var app = require(name); + var cfg = _.clone(appConfig); + + cfg.log = bunyan.createLogger({name: name}); + app.init(cfg); + return {name: name, instance: app}; +}); + +log.info('Connecting to ' + mqttUrl); +var mqttClient = mqtt.connect(mqttUrl); + +mqttClient.on('offline', function () { + log.info('offline'); +}); + +mqttClient.on('error', function (error) { + log.info('error', {error: error}); +}); + +mqttClient.on('connect', function () { + log.info('Connected'); + mqttClient.subscribe('/diller/#'); +}); + +mqttClient.on('message', function (topic, message, payload) { + log.info('got message', {message: message.toString()}); + + _.each(apps, function (app) { + app.instance.onMessage(topic, message, payload); + }); +}); diff --git a/package.json b/package.json new file mode 100644 index 0000000..d50ad50 --- /dev/null +++ b/package.json @@ -0,0 +1,19 @@ +{ + "name": "diller-server", + "version": "0.0.0", + "description": "", + "main": "diller.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "", + "license": "MIT", + "dependencies": { + "bunyan": "^1.5.1", + "db-migrate": "^0.9.23", + "mqtt": "^1.4.3", + "pg": "^4.4.2", + "underscore": "^1.8.3", + "winston": "^1.1.0" + } +} -- cgit v1.2.3