From a63ec924e5440b17434ecc91e84d1419ae39ef2a Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Fri, 21 Dec 2012 22:24:41 +0100 Subject: o Adding a sequence number to jenkins_build. o Making the table poller poll based on a sequence number instead. --- .../io/trygvis/esper/testing/core/TablePoller.java | 86 ++++++++++------------ .../io/trygvis/esper/testing/sql/ResultSetF.java | 18 +++++ .../io/trygvis/esper/testing/sql/SqlOption.java | 26 +++++++ src/main/resources/ddl-core.sql | 10 +-- src/main/resources/ddl-jenkins.sql | 7 +- 5 files changed, 94 insertions(+), 53 deletions(-) create mode 100644 src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java (limited to 'src') diff --git a/src/main/java/io/trygvis/esper/testing/core/TablePoller.java b/src/main/java/io/trygvis/esper/testing/core/TablePoller.java index 58a3e75..a5e5d80 100644 --- a/src/main/java/io/trygvis/esper/testing/core/TablePoller.java +++ b/src/main/java/io/trygvis/esper/testing/core/TablePoller.java @@ -7,10 +7,9 @@ import org.slf4j.*; import javax.sql.*; import java.sql.*; -import java.util.*; -import java.util.List; -import static fj.data.Option.*; +import static io.trygvis.esper.testing.sql.ResultSetF.*; +import static io.trygvis.esper.testing.sql.SqlOption.*; import static java.lang.System.*; public class TablePoller { @@ -40,35 +39,42 @@ public class TablePoller { long start = currentTimeMillis(); TablePollerDao dao = new TablePollerDao(c); - Option o = dao.getLastCreatedDateForPoller(); + SqlOption o = dao.getLastSequenceForPoller(); if (o.isNone()) { logger.info("First run of poller '{}'", pollerName); } else { - logger.info("Running poller '{}', last run was {}", pollerName, formatter.print(o.some().getTime())); + logger.info("Running poller '{}', last seq was {}", pollerName, o.get()); } - Timestamp lastCreatedDate = o.orSome(new Timestamp(0)); + int seq = o.getOrElse(0); + int count = 0; - Option o2 = dao.getOldestCreatedDateAfter(lastCreatedDate); + Integer seqO = dao.getMinSeqAfter(seq); - if (o2.isSome()) { - Timestamp oldestCreatedDate = o2.some(); + while (seqO != null) { + seq = seqO; - List rows = dao.getRowsCreatedAt(oldestCreatedDate); + logger.info("Processing seq={}", seq); - logger.info("Processing {} rows created at {}", rows.size(), formatter.print(oldestCreatedDate.getTime())); + A row = dao.getRow(seq); - for (A row : rows) { - callback.process(c, row); - } - } else { - logger.debug("No new rows."); + callback.process(c, row); + + seqO = dao.getMinSeqAfter(seq); + count++; + } - Thread.sleep(1000); + if(count > 0) { + logger.info("Processed {} rows.", count); + } + else { + logger.debug("No new rows."); } - dao.insertOrUpdate(o.isNone(), o2.orSome(new Timestamp(start)), new Timestamp(start), currentTimeMillis() - start, null); + Thread.sleep(10 * 1000); + + dao.insertOrUpdate(o.isNone(), seq, new Timestamp(start), currentTimeMillis() - start, null); c.commit(); } @@ -86,49 +92,35 @@ public class TablePoller { this.c = c; } - public Option getLastCreatedDateForPoller() throws SQLException { - try (PreparedStatement s = c.prepareStatement("SELECT last_created_date FROM table_poller_status WHERE poller_name=?")) { + public SqlOption getLastSequenceForPoller() throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT last_seq FROM table_poller_status WHERE poller_name=?")) { s.setString(1, pollerName); - ResultSet rs = s.executeQuery(); - if (!rs.next()) { - return none(); - } - - return some(rs.getTimestamp(1)); + return fromRs(s.executeQuery()).map(getInt); } } - public Option getOldestCreatedDateAfter(Timestamp timestamp) throws SQLException { - try (PreparedStatement s = c.prepareStatement("SELECT min(created_date) FROM " + tableName + " WHERE created_date > ? AND " + filter)) { - s.setTimestamp(1, timestamp); - ResultSet rs = s.executeQuery(); - rs.next(); - return fromNull(rs.getTimestamp(1)); + public Integer getMinSeqAfter(int seq) throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT min(seq) FROM " + tableName + " WHERE seq>? AND " + filter)) { + s.setInt(1, seq); + return fromRs(s.executeQuery()).map(getInteger).get(); } } - public List getRowsCreatedAt(Timestamp timestamp) throws SQLException { - try (PreparedStatement s = c.prepareStatement("SELECT " + columnNames + " FROM " + tableName + " WHERE created_date = ? AND " + filter)) { - s.setTimestamp(1, timestamp); - - ResultSet rs = s.executeQuery(); - - List list = new ArrayList<>(); - while (rs.next()) { - list.add(f.apply(rs)); - } - return list; + public A getRow(int seq) throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT " + columnNames + " FROM " + tableName + " WHERE seq=?")) { + s.setInt(1, seq); + return fromRs(s.executeQuery()).map(f).get(); } } - public void insertOrUpdate(boolean insert, Timestamp lastCreatedDate, Timestamp now, long duration, String status) throws SQLException { - String insertSql = "INSERT INTO table_poller_status(last_created_date, last_run, duration, status, poller_name) VALUES(?, ?, ?, ?, ?)"; + public void insertOrUpdate(boolean insert, int seq, Timestamp now, long duration, String status) throws SQLException { + String insertSql = "INSERT INTO table_poller_status(last_seq, last_run, duration, status, poller_name) VALUES(?, ?, ?, ?, ?)"; - String updateSql = "UPDATE table_poller_status SET last_created_date=?, last_run=?, duration=?, status=? WHERE poller_name=?"; + String updateSql = "UPDATE table_poller_status SET last_seq=?, last_run=?, duration=?, status=? WHERE poller_name=?"; try (PreparedStatement s = c.prepareStatement(insert ? insertSql : updateSql)) { int i = 1; - s.setTimestamp(i++, lastCreatedDate); + s.setInt(i++, seq); s.setTimestamp(i++, now); s.setLong(i++, duration); s.setString(i++, status); diff --git a/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java b/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java new file mode 100644 index 0000000..e5a9e4e --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java @@ -0,0 +1,18 @@ +package io.trygvis.esper.testing.sql; + +import java.sql.*; + +public class ResultSetF { + public static final SqlF getInt = new SqlF() { + public Integer apply(ResultSet rs) throws SQLException { + return rs.getInt(1); + } + }; + + public static final SqlF getInteger = new SqlF() { + public Integer apply(ResultSet rs) throws SQLException { + int i = rs.getInt(1); + return rs.wasNull() ? null : i; + } + }; +} diff --git a/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java b/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java index 058435e..288735a 100644 --- a/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java +++ b/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java @@ -43,6 +43,16 @@ public abstract class SqlOption { return !isSome(); } + public abstract A getOrElse(A a); + + public static SqlOption fromNull(A a) { + if (a != null) { + return some(a); + } else { + return none(); + } + } + // ----------------------------------------------------------------------- // // ----------------------------------------------------------------------- @@ -59,6 +69,14 @@ public abstract class SqlOption { public boolean isSome() { return false; } + + public A getOrElse(A a) { + return a; + } + + public String toString() { + return "None"; + } } private static class Some extends SqlOption { @@ -79,5 +97,13 @@ public abstract class SqlOption { public boolean isSome() { return true; } + + public A getOrElse(A a) { + return this.a; + } + + public String toString() { + return "Some(" + a + ")"; + } } } diff --git a/src/main/resources/ddl-core.sql b/src/main/resources/ddl-core.sql index 25f373b..9dcdd09 100644 --- a/src/main/resources/ddl-core.sql +++ b/src/main/resources/ddl-core.sql @@ -9,11 +9,11 @@ DROP TABLE IF EXISTS person; DROP TABLE IF EXISTS table_poller_status; CREATE TABLE table_poller_status ( - poller_name VARCHAR(100) NOT NULL, - last_created_date TIMESTAMP, - last_run TIMESTAMP, - duration INT, - status VARCHAR(1000), + poller_name VARCHAR(100) NOT NULL, + last_seq INT NOT NULL, + last_run TIMESTAMP, + duration INT, + status VARCHAR(1000), CONSTRAINT pk_job_status PRIMARY KEY (poller_name) ); diff --git a/src/main/resources/ddl-jenkins.sql b/src/main/resources/ddl-jenkins.sql index bdda74b..94bfc4e 100644 --- a/src/main/resources/ddl-jenkins.sql +++ b/src/main/resources/ddl-jenkins.sql @@ -7,8 +7,10 @@ DROP TABLE IF EXISTS jenkins_server; CREATE TABLE jenkins_server ( uuid CHAR(36) NOT NULL, created_date TIMESTAMP NOT NULL, + url VARCHAR(1000) NOT NULL, enabled BOOLEAN NOT NULL, + CONSTRAINT pk_jenkins_server PRIMARY KEY (uuid), CONSTRAINT uq_jenkins_server__url UNIQUE (url) ); @@ -31,6 +33,7 @@ CREATE TABLE jenkins_job ( CREATE TABLE jenkins_build ( uuid CHAR(36) NOT NULL, created_date TIMESTAMP NOT NULL, + seq INT NOT NULL DEFAULT nextval('jenkins_build_seq'), job CHAR(36) NOT NULL, @@ -44,7 +47,8 @@ CREATE TABLE jenkins_build ( CONSTRAINT pk_jenkins_build PRIMARY KEY (UUID), CONSTRAINT fk_jenkins_build__job FOREIGN KEY (job) REFERENCES jenkins_job (uuid), - CONSTRAINT uq_jenkins_build__id UNIQUE (entry_id) + CONSTRAINT uq_jenkins_build__id UNIQUE (entry_id), + CONSTRAINT uq_jenkins_build__seq UNIQUE (seq) ); CREATE INDEX ix_jenkins_build__created_date ON jenkins_build (created_date); @@ -52,6 +56,7 @@ CREATE INDEX ix_jenkins_build__created_date ON jenkins_build (created_date); CREATE TABLE jenkins_user ( uuid CHAR(36) NOT NULL, created_date TIMESTAMP NOT NULL, + server CHAR(36) NOT NULL, absolute_url VARCHAR(1000) NOT NULL, CONSTRAINT pk_jenkins_user PRIMARY KEY (uuid), -- cgit v1.2.3