var _ = require('lodash'); /** * @param tx {PgTx} * @class */ function DillerDao(tx, as) { var deviceColumns = ['id', 'created_timestamp', 'key', 'name', 'description']; var deviceStatusColumns = ['device', 'online', 'timestamp', 'host']; var propertyColumns = 'id, created_timestamp, device, key, name, description, last_value, last_timestamp'; function cols(columns, select_prefix, as_prefix) { var s; if (select_prefix) { as_prefix = as_prefix || select_prefix; for (var i = 0; i < columns.length; i++) { var c = columns[i]; if (!s) { s = ''; } else { s += ', '; } s += select_prefix + '.' + c + ' AS ' + as_prefix + '_' + c; } return s; } return columns.join(', '); } function unjoin(main) { var children = Array.prototype.slice.call(arguments).slice(1); function convertRow(row) { if (!row) { return row; } var r = {}; for (var field in row) { var f = '' + field; if (f.startsWith(main + '_')) { r[f.substring(main.length + 1)] = row[f]; } else { for (var i = 0; i < children.length; i++) { var child = children[i]; var prefix = child + '_'; if (f.startsWith(prefix)) { var o = r[child]; if (!o) { o = r[child] = {}; } o[f.substr(prefix.length)] = row[f]; } } } } return r; } return function (res) { if (Array.isArray(res)) { return res.map(convertRow); } else { return convertRow(res); } } } // ------------------------------------------------------------------------------------------------------------------- // Device // ------------------------------------------------------------------------------------------------------------------- /** * @returns {Promise} */ function devices() { var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device)"; return tx.manyOrNone(sql).then(unjoin('d', 'status')); } function deviceById(id) { var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device) WHERE d.id=$1"; return tx.one(sql, [id]).then(unjoin('d', 'status')); } function deviceByKey(key) { var sql = 'SELECT ' + cols(deviceColumns, 'd') + ', ' + cols(deviceStatusColumns, 'ds', 'status') + " FROM device d LEFT JOIN device_status ds on (d.id = ds.device) WHERE d.key=$1"; return tx.oneOrNone(sql, [key]).then(unjoin('d', 'status')); } function insertDevice(key) { return tx.one("INSERT INTO device(id, key, created_timestamp) VALUES(DEFAULT, $1, CURRENT_TIMESTAMP) RETURNING " + cols(deviceColumns), key); } function insertDeviceStatus(device, online, timestamp, host) { return tx.none("INSERT INTO device_status(device, online, timestamp, host) VALUES($1, $2, $3, $4)", [device, online, timestamp, host]); } function updateDevice(id, attributes) { var values = [id]; var i = 2; var fields = _(attributes).chain() .map(function (value, attribute) { if (attribute == 'name' || attribute == 'description') { value = (value || '').trim(); values.push(value.length > 0 ? value : null); return attribute + ' = $' + i++; } }) .collect() .join(', ') .value(); if (fields.length == 0) { return Promise.resolve({}); } var sql = 'UPDATE device SET ' + fields + ' WHERE id = $1'; return tx.none(sql, values); } function updateDeviceStatus(device, timestamp, online, host) { var sql = 'UPDATE device_status SET timestamp=$2, online=$3, host=$4 WHERE device=$1'; return tx.none(sql, [device, timestamp, online, host]); } // ------------------------------------------------------------------------------------------------------------------- // Device Property // ------------------------------------------------------------------------------------------------------------------- function devicePropertyById(id) { return tx.one('SELECT ' + propertyColumns + ' FROM device_property WHERE id=$1', [id]); } function devicePropertyByDeviceIdAndKey(deviceId, key) { return tx.oneOrNone('SELECT ' + propertyColumns + ' FROM device_property WHERE device=$1 AND key=$2', [deviceId, key]); } function devicePropertiesByDeviceId(deviceId) { return tx.manyOrNone('SELECT ' + propertyColumns + ' FROM device_property WHERE device=$1', [deviceId]); } function insertDeviceProperty(deviceId, key) { return tx.one('INSERT INTO device_property(id, device, key, created_timestamp) VALUES(DEFAULT, $1, $2, CURRENT_TIMESTAMP) RETURNING ' + propertyColumns, [deviceId, key]); } function updateProperty(id, attributes) { var values = [id]; var i = 2; var fields = _(attributes).chain() .map(function (value, attribute) { if (attribute == 'name' || attribute == 'description') { value = (value || '').trim(); values.push(value.length > 0 ? value : null); return attribute + ' = $' + i++; } }) .collect() .join(', ') .value(); if (fields.length == 0) { return Promise.resolve({}); } var sql = 'UPDATE device_property SET ' + fields + ' WHERE id = $1 RETURNING *'; return tx.one(sql, values); } // ------------------------------------------------------------------------------------------------------------------- // Value // ------------------------------------------------------------------------------------------------------------------- function valuesByPropertyId(propertyId, limit) { var sql = 'SELECT timestamp, coalesce(value_numeric::text, value_text) AS value FROM value WHERE property=$1'; var args = [propertyId]; sql += ' ORDER BY timestamp DESC'; if (limit) { args.push(limit); sql += ' LIMIT $' + args.length; } return tx.manyOrNone(sql, args); } function aggregateValuesByPropertyId(propertyId, level, from, to) { var sql, args = [level, as.date(from), as.date(to), propertyId]; if (level == 'hour' || level == 'minute' || level == 'day') { // TODO: use correct table instead of querying raw table } else { throw 'Unsupported level: ' + level; } sql = 'with g as (select date_trunc($1, ts) as ts from generate_series($2::timestamp, $3::timestamp, (\'1 \' || $1)::interval) as times(ts)),\n' + 'v as (select\n' + ' date_trunc($1, timestamp) as ts,\n' + ' count(timestamp)::real as count,\n' + ' min(value_numeric)::real as min,\n' + ' max(value_numeric)::real as max,\n' + ' avg(value_numeric)::real as avg\n' + 'FROM value\n' + 'WHERE timestamp >= $2::timestamp\n' + ' AND timestamp < $3::timestamp\n' + ' AND property=$4\n' + ' AND value_numeric is not null\n' + ' GROUP BY date_trunc($1, timestamp)\n' + ')\n' + 'select g.ts as timestamp, v.count, v.min, v.max, v.avg\n' + 'from g left outer join v on g.ts = v.ts\n' + 'order by 1'; return tx.manyOrNone(sql, args); } function insertValue(propertyId, timestamp, value) { var value_numeric = parseFloat(value) || undefined, value_text = value_numeric ? null : value; return tx.none('UPDATE device_property SET last_value = $2, last_timestamp = $3 WHERE id = $1', [propertyId, value, timestamp]) .then(function () { return tx.one('INSERT INTO value(property, timestamp, value_text, value_numeric) VALUES($1, $2, $3, $4::NUMERIC) RETURNING timestamp', [propertyId, timestamp, value_text, value_numeric]); }); } function updateHourAggregatesForProperty(propertyId, timestamp) { return tx.none('DELETE FROM value_by_hour WHERE property=$1 AND timestamp=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ)', [propertyId, timestamp]) .then(function () { return tx.oneOrNone('INSERT INTO value_by_hour(property, timestamp, count, max, min, avg) ' + 'SELECT property, DATE_TRUNC(\'hour\', timestamp) AS timestamp, COUNT(value_numeric) AS count, MAX(value_numeric) AS max, MIN(value_numeric) AS min, AVG(value_numeric) AS avg ' + 'FROM value WHERE property=$1 AND DATE_TRUNC(\'hour\', timestamp)=DATE_TRUNC(\'hour\', $2::TIMESTAMPTZ) AND value_numeric IS NOT NULL ' + 'GROUP BY property, DATE_TRUNC(\'hour\', timestamp) ' + 'RETURNING *;', [propertyId, timestamp]); }); } /** * @returns {Promise} */ function updateMinuteAggregatesForProperty(propertyId, timestamp) { return tx.none('DELETE FROM value_by_minute WHERE property=$1 AND timestamp=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ)', [propertyId, timestamp]) .then(function () { return tx.oneOrNone('INSERT INTO value_by_minute(property, timestamp, count, max, min, avg) ' + 'SELECT property, DATE_TRUNC(\'minute\', timestamp) AS timestamp, COUNT(value_numeric) AS count, MAX(value_numeric) AS max, MIN(value_numeric) AS min, AVG(value_numeric) AS avg ' + 'FROM value ' + 'WHERE property=$1 AND DATE_TRUNC(\'minute\', timestamp)=DATE_TRUNC(\'minute\', $2::TIMESTAMPTZ) AND value_numeric IS NOT NULL ' + 'GROUP BY property, DATE_TRUNC(\'minute\', timestamp) ' + 'RETURNING *;', [propertyId, timestamp]); }); } /** @lends DillerDao.prototype */ return { devices: devices, deviceById: deviceById, deviceByKey: deviceByKey, insertDevice: insertDevice, updateDevice: updateDevice, insertDeviceStatus: insertDeviceStatus, updateDeviceStatus: updateDeviceStatus, devicePropertyById: devicePropertyById, devicePropertyByDeviceIdAndKey: devicePropertyByDeviceIdAndKey, devicePropertiesByDeviceId: devicePropertiesByDeviceId, insertDeviceProperty: insertDeviceProperty, updateProperty: updateProperty, valuesByPropertyId: valuesByPropertyId, aggregateValuesByPropertyId: aggregateValuesByPropertyId, insertValue: insertValue, updateHourAggregatesForProperty: updateHourAggregatesForProperty, updateMinuteAggregatesForProperty: updateMinuteAggregatesForProperty }; } module.exports = DillerDao;