aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2015-10-18 10:36:01 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2015-10-18 10:36:01 +0200
commitcc0dcb25054e9dff3fdc4b58eaf861d35d0b06de (patch)
treecebd1fe9b2e884ad7c11297f54204752e4d5f8ee
downloaddiller-server-cc0dcb25054e9dff3fdc4b58eaf861d35d0b06de.tar.gz
diller-server-cc0dcb25054e9dff3fdc4b58eaf861d35d0b06de.tar.bz2
diller-server-cc0dcb25054e9dff3fdc4b58eaf861d35d0b06de.tar.xz
diller-server-cc0dcb25054e9dff3fdc4b58eaf861d35d0b06de.zip
wip
-rw-r--r--.gitignore2
-rw-r--r--apps/device.js55
-rw-r--r--apps/logger.js39
-rw-r--r--apps/values.js149
-rw-r--r--diller.js63
-rw-r--r--package.json19
6 files changed, 327 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7a1537b
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,2 @@
+.idea
+node_modules
diff --git a/apps/device.js b/apps/device.js
new file mode 100644
index 0000000..1b8e169
--- /dev/null
+++ b/apps/device.js
@@ -0,0 +1,55 @@
+var pg, log;
+
+function init(config) {
+ pg = config.pg;
+ log = config.log;
+}
+exports.init = init;
+
+var deviceR = /^\/diller\/([0-9a-fA-F]{2}(:[0-9a-fA-F]{2}){5})\//;
+
+function onMessage(topic, message, payload) {
+ var matches = deviceR.exec(topic)
+
+ if (!matches) {
+ log.warn('no match: ', topic);
+ return;
+ }
+
+ var mac = matches[1];
+
+ pg.connect(function (err, client, done) {
+ if (err) {
+ done();
+ log.error('Could not connect to postgres', err);
+ return;
+ }
+
+ client.query('select count(*) as count from device where key=$1', [mac], function (err, result) {
+ if (err) {
+ done();
+ log.error('error running query', err);
+ return;
+ }
+
+ if (result.rows[0].count > 0) {
+ done();
+ return;
+ }
+
+ log.info('New device:', mac);
+
+ client.query('insert into device(id, key, created_timestamp) values(default, $1, current_timestamp) returning id', [mac], function (err, result) {
+ if (err) {
+ done();
+ log.error('error inserting new device', err);
+ }
+
+ log.info('New device created', result.rows[0].id);
+ done();
+ });
+ });
+ });
+}
+
+exports.onMessage = onMessage;
diff --git a/apps/logger.js b/apps/logger.js
new file mode 100644
index 0000000..32ceb27
--- /dev/null
+++ b/apps/logger.js
@@ -0,0 +1,39 @@
+var pg, log;
+
+function init(config) {
+ pg = config.pg;
+ log = config.log;
+
+ log.info('loaded');
+}
+exports.init = init;
+
+function onMessage(topic, message, payload) {
+ pg.connect(function (err, client, done) {
+ if (err) {
+ done();
+ console.log(err);
+ return;
+ }
+
+ var body = message.toString();
+
+ client.query("INSERT INTO message(timestamp, topic, message) values(current_timestamp, $1, $2)", [topic, body]);
+
+ // pg.on('row', function(row) {
+ // results.push(row);
+ // });
+
+ client.on('end', function () {
+ log.info('message processed');
+ done();
+ });
+
+ client.on('error', function (err) {
+ log.info('error', err);
+ done();
+ });
+ });
+}
+
+exports.onMessage = onMessage;
diff --git a/apps/values.js b/apps/values.js
new file mode 100644
index 0000000..2975e9f
--- /dev/null
+++ b/apps/values.js
@@ -0,0 +1,149 @@
+var pg, log;
+
+function init(config) {
+ pg = config.pg;
+ log = config.log;
+}
+exports.init = init;
+
+function newValue(client, device_id, device_key, property_id, property_key, value) {
+ log.info('new value for device ', device_key + '/' + device_id, property_key + '/' + property_id, value);
+
+ client.query('INSERT INTO value(property, timestamp, value) VALUES($1, current_timestamp, $2)', [property_id, value], function (err, result) {
+ log.info('new value stored');
+ });
+}
+
+function newName(client, device_id, device_key, property_id, property_key, name) {
+ log.info('new name for device ', device_key + '/' + device_id, property_key + '/' + property_id, name);
+}
+
+function newDescription(client, device_id, device_key, property_id, property_key, description) {
+ log.info('new description for device ', device_key + '/' + device_id, property_key + '/' + property_id, description);
+}
+
+function onMessage(topic, message, payload) {
+ var parts = topic.split(/\//);
+
+ // /diller/5c:cf:7f:06:59:a5/sensors/temp-0/value
+
+ if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'sensors') {
+ log.warn('no match: ', topic, parts);
+ return;
+ }
+
+ var device_key = parts[2];
+ var property_key = parts[4];
+ var msg_type = parts[5];
+
+ var f;
+ if (msg_type == 'value') {
+ f = newValue
+ } else if (msg_type == 'name') {
+ f = newName
+ } else if (msg_type == 'description') {
+ f = newDescription
+ }
+
+ if (!f) {
+ log.warn('Unknown message topic:', topic);
+ return;
+ }
+
+ pg.connect(function (err, client, done) {
+ if (err) {
+ done();
+ log.error('Could not connect to postgres', err);
+ return;
+ }
+
+ client.query('SELECT id FROM device WHERE key=$1', [device_key], function (err, result) {
+ if (err) {
+ done();
+ log.error('error looking for existing device', err);
+ return;
+ }
+
+ // TODO: insert device
+ if (result.rows.length == 0) {
+ log.warn('No device registered with key =', device_key);
+ done();
+ return;
+ }
+
+ var device_id = result.rows[0].id;
+ log.info('device ', device_id);
+
+ client.query('SELECT id FROM device_property WHERE device=$1 AND key=$2', [device_id, property_key], function (err, result) {
+ if (err) {
+ done();
+ log.error('error looking for existing property', err);
+ return;
+ }
+
+ var property_id;
+ if (result.rows.length == 0) {
+ log.info('New property: ', property_key);
+ client.query('INSERT INTO device_property(device, key, created_timestamp) VALUES($1, $2, current_timestamp) RETURNING id', [device_id, property_key], function (err, result) {
+ if (err) {
+ done();
+ log.error('Error inserting device_property', {device_id: device_id, property_key: property_key}, err);
+ return;
+ }
+
+ property_id = result.rows[0].id;
+
+ log.info('new property: key=', property_key, ', id =', property_id);
+
+ f(client, device_id, device_key, property_id, property_key, message.toString());
+ });
+ done();
+ return;
+ }
+
+ property_id = result.rows[0].id;
+
+ log.info('property: key =', property_key, ', id =', property_id);
+ f(client, device_id, device_key, property_id, property_key, message.toString());
+ });
+ });
+ });
+
+ /*
+ pg.connect(function(err, client, done) {
+ if (err) {
+ done();
+ log.error('Could not connect to postgres', err);
+ return;
+ }
+
+ client.query('select count(*) as count from device where key=$1', [device_key], function(err, result) {
+ if (err) {
+ done();
+ log.error('error running query', err);
+ return;
+ }
+
+ if (result.rows[0].count > 0) {
+ done();
+ return;
+ }
+
+ log.info('New device:', device_key);
+
+ client.query('insert into device(key, created_timestamp) values($1, current_timestamp)', [device_key], function(err, result) {
+ if (err) {
+ done();
+ log.error('error inserting new device', err);
+ }
+
+ log.info('New device created');
+ done();
+ });
+ });
+ });
+ */
+}
+
+exports.onMessage = onMessage;
+
diff --git a/diller.js b/diller.js
new file mode 100644
index 0000000..7627d31
--- /dev/null
+++ b/diller.js
@@ -0,0 +1,63 @@
+var mqtt = require('mqtt');
+var _ = require('underscore');
+var winston = require('winston');
+var bunyan = require('bunyan');
+var pg = require('pg');
+var connectionString = process.env.DATABASE_URL || 'postgres://diller:diller@localhost:5432/diller';
+var mqttUrl = process.env.MQTT_URL || 'mqtt://trygvis.io';
+
+var pgClient = new pg.Client(connectionString);
+pgClient.connect();
+pgClient.on('connect', function () {
+ log.info('connected to pg');
+});
+
+var log = bunyan.createLogger({name: 'main'});
+
+var apps = [
+ './apps/logger',
+ './apps/device',
+ './apps/values'
+];
+
+appConfig = {
+ pg: {
+ connect: function (cb) {
+ pg.connect(connectionString, cb);
+ }
+ }
+};
+
+apps = _.map(apps, function (name) {
+ log.info('Loading app: ' + name);
+ var app = require(name);
+ var cfg = _.clone(appConfig);
+
+ cfg.log = bunyan.createLogger({name: name});
+ app.init(cfg);
+ return {name: name, instance: app};
+});
+
+log.info('Connecting to ' + mqttUrl);
+var mqttClient = mqtt.connect(mqttUrl);
+
+mqttClient.on('offline', function () {
+ log.info('offline');
+});
+
+mqttClient.on('error', function (error) {
+ log.info('error', {error: error});
+});
+
+mqttClient.on('connect', function () {
+ log.info('Connected');
+ mqttClient.subscribe('/diller/#');
+});
+
+mqttClient.on('message', function (topic, message, payload) {
+ log.info('got message', {message: message.toString()});
+
+ _.each(apps, function (app) {
+ app.instance.onMessage(topic, message, payload);
+ });
+});
diff --git a/package.json b/package.json
new file mode 100644
index 0000000..d50ad50
--- /dev/null
+++ b/package.json
@@ -0,0 +1,19 @@
+{
+ "name": "diller-server",
+ "version": "0.0.0",
+ "description": "",
+ "main": "diller.js",
+ "scripts": {
+ "test": "echo \"Error: no test specified\" && exit 1"
+ },
+ "author": "",
+ "license": "MIT",
+ "dependencies": {
+ "bunyan": "^1.5.1",
+ "db-migrate": "^0.9.23",
+ "mqtt": "^1.4.3",
+ "pg": "^4.4.2",
+ "underscore": "^1.8.3",
+ "winston": "^1.1.0"
+ }
+}