var pg, log; function init(config) { pg = config.pg; log = config.log; } exports.init = init; function newValue(client, device_id, device_key, property_id, property_key, value) { log.info('new value for device ', device_key + '/' + device_id, property_key + '/' + property_id, value); client.query('INSERT INTO value(property, timestamp, value) VALUES($1, current_timestamp, $2)', [property_id, value], function (err, result) { log.info('new value stored'); }); } function newName(client, device_id, device_key, property_id, property_key, name) { log.info('new name for device ', device_key + '/' + device_id, property_key + '/' + property_id, name); } function newDescription(client, device_id, device_key, property_id, property_key, description) { log.info('new description for device ', device_key + '/' + device_id, property_key + '/' + property_id, description); } function onMessage(topic, message, payload) { var parts = topic.split(/\//); // /diller/5c:cf:7f:06:59:a5/sensors/temp-0/value if (parts.length != 6 || parts[1] != 'diller' || parts[3] != 'sensors') { log.warn('no match: ', topic, parts); return; } var device_key = parts[2]; var property_key = parts[4]; var msg_type = parts[5]; var f; if (msg_type == 'value') { f = newValue } else if (msg_type == 'name') { f = newName } else if (msg_type == 'description') { f = newDescription } if (!f) { log.warn('Unknown message topic:', topic); return; } pg.connect(function (err, client, done) { if (err) { done(); log.error('Could not connect to postgres', err); return; } client.query('SELECT id FROM device WHERE key=$1', [device_key], function (err, result) { if (err) { done(); log.error('error looking for existing device', err); return; } // TODO: insert device if (result.rows.length == 0) { log.warn('No device registered with key =', device_key); done(); return; } var device_id = result.rows[0].id; log.info('device ', device_id); client.query('SELECT id FROM device_property WHERE device=$1 AND key=$2', [device_id, property_key], function (err, result) { if (err) { done(); log.error('error looking for existing property', err); return; } var property_id; if (result.rows.length == 0) { log.info('New property: ', property_key); client.query('INSERT INTO device_property(device, key, created_timestamp) VALUES($1, $2, current_timestamp) RETURNING id', [device_id, property_key], function (err, result) { if (err) { done(); log.error('Error inserting device_property', {device_id: device_id, property_key: property_key}, err); return; } property_id = result.rows[0].id; log.info('new property: key=', property_key, ', id =', property_id); f(client, device_id, device_key, property_id, property_key, message.toString()); }); done(); return; } property_id = result.rows[0].id; log.info('property: key =', property_key, ', id =', property_id); f(client, device_id, device_key, property_id, property_key, message.toString()); }); }); }); /* pg.connect(function(err, client, done) { if (err) { done(); log.error('Could not connect to postgres', err); return; } client.query('select count(*) as count from device where key=$1', [device_key], function(err, result) { if (err) { done(); log.error('error running query', err); return; } if (result.rows[0].count > 0) { done(); return; } log.info('New device:', device_key); client.query('insert into device(key, created_timestamp) values($1, current_timestamp)', [device_key], function(err, result) { if (err) { done(); log.error('error inserting new device', err); } log.info('New device created'); done(); }); }); }); */ } exports.onMessage = onMessage;