DROP TABLE IF EXISTS raw_table CASCADE; DROP TABLE IF EXISTS raw_incoming CASCADE; DROP TABLE IF EXISTS raw_processed CASCADE; DROP TABLE IF EXISTS sample_second CASCADE; DROP TABLE IF EXISTS sample_hour; DROP TABLE IF EXISTS sample_hour_invalid; DROP TABLE IF EXISTS log; CREATE TABLE raw_table ( timestamp VARCHAR(1000), now VARCHAR(1000), last_watering_stopped VARCHAR(1000), analog VARCHAR(1000), dry VARCHAR(1000), pumping_water VARCHAR(1000), last_watering_started VARCHAR(1000), millis VARCHAR(1000), "#0" VARCHAR(1000), "#1" VARCHAR(1000) ); CREATE OR REPLACE VIEW raw AS SELECT * FROM raw_table; CREATE TABLE sample_second ( ts TIMESTAMP, last_watering_started TIMESTAMP, last_watering_stopped TIMESTAMP, analog INT2, dry BOOLEAN, pumping_water BOOLEAN ); CREATE TABLE sample_hour ( ts TIMESTAMP PRIMARY KEY, analog_avg INT2 NOT NULL, analog_max INT2 NOT NULL, analog_min INT2 NOT NULL ); CREATE TABLE sample_hour_invalid ( hour TIMESTAMP NOT NULL PRIMARY KEY, created_date TIMESTAMP NOT NULL DEFAULT now(), updated_date TIMESTAMP NOT NULL DEFAULT now() ); CREATE OR REPLACE FUNCTION raw_instead_of_function() RETURNS TRIGGER LANGUAGE plpgsql AS $$ DECLARE hr TIMESTAMP; cnt INT; BEGIN IF TG_OP = 'INSERT' THEN INSERT INTO sample_second ( ts, last_watering_started, last_watering_stopped, analog, dry, pumping_water) VALUES ( to_timestamp(new.timestamp :: DOUBLE PRECISION), to_timestamp(new.timestamp :: DOUBLE PRECISION + new.last_watering_started :: BIGINT - new.now :: BIGINT), to_timestamp(new.timestamp :: DOUBLE PRECISION + new.last_watering_stopped :: BIGINT - new.now :: BIGINT), new.analog :: INT2, new.dry = '1', new.pumping_water = '1') RETURNING date_trunc('hour', ts) INTO hr; UPDATE sample_hour_invalid SET hour = hour, updated_date = now() WHERE hour = hr; GET DIAGNOSTICS cnt = ROW_COUNT; IF cnt = 0 THEN INSERT INTO sample_hour_invalid (hour) VALUES (hr); END IF; RETURN NEW; END IF; RETURN NEW; END; $$; CREATE TRIGGER raw_instead_of_trigger INSTEAD OF INSERT OR UPDATE OR DELETE ON raw FOR EACH ROW EXECUTE PROCEDURE raw_instead_of_function(); CREATE TABLE log ( ts TIMESTAMP DEFAULT now(), message VARCHAR(1000) ); CREATE INDEX ix_log__timestamp ON log (ts); CREATE OR REPLACE FUNCTION info(message VARCHAR) RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN INSERT INTO log (message) VALUES (message); END $$; CREATE OR REPLACE FUNCTION invalidate_all_hours() RETURNS VOID LANGUAGE plpgsql AS $$ BEGIN TRUNCATE sample_hour_invalid; INSERT INTO sample_hour_invalid (hour) SELECT DISTINCT date_trunc('hour', ts) FROM sample_second; END; $$; CREATE OR REPLACE FUNCTION update_hours() RETURNS VOID LANGUAGE plpgsql AS $$ DECLARE hour_c CURSOR FOR SELECT hour FROM sample_hour_invalid ORDER BY hour; second_c CURSOR (hr TIMESTAMP) FOR SELECT ts, analog FROM sample_second WHERE date_trunc('hour', ts) = hr; BEGIN FOR hour_rec IN hour_c LOOP EXECUTE info('Processing hour ' || hour_rec.hour); DELETE FROM sample_hour WHERE date_trunc('hour', ts) = date_trunc('hour', hour_rec.hour); INSERT INTO sample_hour (ts, analog_avg, analog_max, analog_min) SELECT hour_rec.hour, avg(analog) AS analog_avg, max(analog) AS analog_max, min(analog) AS analog_min FROM sample_second WHERE date_trunc('hour', ts) = date_trunc('hour', hour_rec.hour) GROUP BY date_trunc('hour', ts) HAVING avg(analog) IS NOT NULL; DELETE FROM sample_hour_invalid WHERE hour = date_trunc('hour', hour_rec.hour); END LOOP; END; $$; TRUNCATE log; SELECT invalidate_all_hours(); SELECT update_hours(); SELECT * FROM log ORDER BY ts DESC; SELECT * FROM sample_hour ORDER BY ts; SELECT * FROM sample_second; INSERT INTO raw(timestamp, "millis", "#1", "#0") VALUES(1115015587, 101629234, 531, 1021); select count(*) from log; select * from; SELECT DISTINCT date_trunc('hour', ts) FROM sample_second