var pg, log; function init(config) { pg = config.pg; log = config.log; log.info('loaded'); } exports.init = init; function onMessage(topic, message, payload) { pg.connect(function (err, client, done) { if (err) { done(); console.log(err); return; } var body = message.toString(); client.query("INSERT INTO message(timestamp, topic, message) values(current_timestamp, $1, $2)", [topic, body]); // pg.on('row', function(row) { // results.push(row); // }); client.on('end', function () { log.info('message processed'); done(); }); client.on('error', function (err) { log.info('error', err); done(); }); }); } exports.onMessage = onMessage;