diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2012-12-21 22:24:41 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2012-12-21 22:24:41 +0100 |
commit | a63ec924e5440b17434ecc91e84d1419ae39ef2a (patch) | |
tree | 933459602e0fcfd76a5f0d98d5465dc9f4aabed9 /src/main/java | |
parent | e7b1958ce5e93ead2d7d3c74eabe00a4186a048a (diff) | |
download | esper-testing-a63ec924e5440b17434ecc91e84d1419ae39ef2a.tar.gz esper-testing-a63ec924e5440b17434ecc91e84d1419ae39ef2a.tar.bz2 esper-testing-a63ec924e5440b17434ecc91e84d1419ae39ef2a.tar.xz esper-testing-a63ec924e5440b17434ecc91e84d1419ae39ef2a.zip |
o Adding a sequence number to jenkins_build.
o Making the table poller poll based on a sequence number instead.
Diffstat (limited to 'src/main/java')
3 files changed, 83 insertions, 47 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/core/TablePoller.java b/src/main/java/io/trygvis/esper/testing/core/TablePoller.java index 58a3e75..a5e5d80 100644 --- a/src/main/java/io/trygvis/esper/testing/core/TablePoller.java +++ b/src/main/java/io/trygvis/esper/testing/core/TablePoller.java @@ -7,10 +7,9 @@ import org.slf4j.*; import javax.sql.*; import java.sql.*; -import java.util.*; -import java.util.List; -import static fj.data.Option.*; +import static io.trygvis.esper.testing.sql.ResultSetF.*; +import static io.trygvis.esper.testing.sql.SqlOption.*; import static java.lang.System.*; public class TablePoller<A> { @@ -40,35 +39,42 @@ public class TablePoller<A> { long start = currentTimeMillis(); TablePollerDao dao = new TablePollerDao(c); - Option<Timestamp> o = dao.getLastCreatedDateForPoller(); + SqlOption<Integer> o = dao.getLastSequenceForPoller(); if (o.isNone()) { logger.info("First run of poller '{}'", pollerName); } else { - logger.info("Running poller '{}', last run was {}", pollerName, formatter.print(o.some().getTime())); + logger.info("Running poller '{}', last seq was {}", pollerName, o.get()); } - Timestamp lastCreatedDate = o.orSome(new Timestamp(0)); + int seq = o.getOrElse(0); + int count = 0; - Option<Timestamp> o2 = dao.getOldestCreatedDateAfter(lastCreatedDate); + Integer seqO = dao.getMinSeqAfter(seq); - if (o2.isSome()) { - Timestamp oldestCreatedDate = o2.some(); + while (seqO != null) { + seq = seqO; - List<A> rows = dao.getRowsCreatedAt(oldestCreatedDate); + logger.info("Processing seq={}", seq); - logger.info("Processing {} rows created at {}", rows.size(), formatter.print(oldestCreatedDate.getTime())); + A row = dao.getRow(seq); - for (A row : rows) { - callback.process(c, row); - } - } else { - logger.debug("No new rows."); + callback.process(c, row); + + seqO = dao.getMinSeqAfter(seq); + count++; + } - Thread.sleep(1000); + if(count > 0) { + logger.info("Processed {} rows.", count); + } + else { + logger.debug("No new rows."); } - dao.insertOrUpdate(o.isNone(), o2.orSome(new Timestamp(start)), new Timestamp(start), currentTimeMillis() - start, null); + Thread.sleep(10 * 1000); + + dao.insertOrUpdate(o.isNone(), seq, new Timestamp(start), currentTimeMillis() - start, null); c.commit(); } @@ -86,49 +92,35 @@ public class TablePoller<A> { this.c = c; } - public Option<Timestamp> getLastCreatedDateForPoller() throws SQLException { - try (PreparedStatement s = c.prepareStatement("SELECT last_created_date FROM table_poller_status WHERE poller_name=?")) { + public SqlOption<Integer> getLastSequenceForPoller() throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT last_seq FROM table_poller_status WHERE poller_name=?")) { s.setString(1, pollerName); - ResultSet rs = s.executeQuery(); - if (!rs.next()) { - return none(); - } - - return some(rs.getTimestamp(1)); + return fromRs(s.executeQuery()).map(getInt); } } - public Option<Timestamp> getOldestCreatedDateAfter(Timestamp timestamp) throws SQLException { - try (PreparedStatement s = c.prepareStatement("SELECT min(created_date) FROM " + tableName + " WHERE created_date > ? AND " + filter)) { - s.setTimestamp(1, timestamp); - ResultSet rs = s.executeQuery(); - rs.next(); - return fromNull(rs.getTimestamp(1)); + public Integer getMinSeqAfter(int seq) throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT min(seq) FROM " + tableName + " WHERE seq>? AND " + filter)) { + s.setInt(1, seq); + return fromRs(s.executeQuery()).map(getInteger).get(); } } - public List<A> getRowsCreatedAt(Timestamp timestamp) throws SQLException { - try (PreparedStatement s = c.prepareStatement("SELECT " + columnNames + " FROM " + tableName + " WHERE created_date = ? AND " + filter)) { - s.setTimestamp(1, timestamp); - - ResultSet rs = s.executeQuery(); - - List<A> list = new ArrayList<>(); - while (rs.next()) { - list.add(f.apply(rs)); - } - return list; + public A getRow(int seq) throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT " + columnNames + " FROM " + tableName + " WHERE seq=?")) { + s.setInt(1, seq); + return fromRs(s.executeQuery()).map(f).get(); } } - public void insertOrUpdate(boolean insert, Timestamp lastCreatedDate, Timestamp now, long duration, String status) throws SQLException { - String insertSql = "INSERT INTO table_poller_status(last_created_date, last_run, duration, status, poller_name) VALUES(?, ?, ?, ?, ?)"; + public void insertOrUpdate(boolean insert, int seq, Timestamp now, long duration, String status) throws SQLException { + String insertSql = "INSERT INTO table_poller_status(last_seq, last_run, duration, status, poller_name) VALUES(?, ?, ?, ?, ?)"; - String updateSql = "UPDATE table_poller_status SET last_created_date=?, last_run=?, duration=?, status=? WHERE poller_name=?"; + String updateSql = "UPDATE table_poller_status SET last_seq=?, last_run=?, duration=?, status=? WHERE poller_name=?"; try (PreparedStatement s = c.prepareStatement(insert ? insertSql : updateSql)) { int i = 1; - s.setTimestamp(i++, lastCreatedDate); + s.setInt(i++, seq); s.setTimestamp(i++, now); s.setLong(i++, duration); s.setString(i++, status); diff --git a/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java b/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java new file mode 100644 index 0000000..e5a9e4e --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java @@ -0,0 +1,18 @@ +package io.trygvis.esper.testing.sql; + +import java.sql.*; + +public class ResultSetF { + public static final SqlF<ResultSet, Integer> getInt = new SqlF<ResultSet, Integer>() { + public Integer apply(ResultSet rs) throws SQLException { + return rs.getInt(1); + } + }; + + public static final SqlF<ResultSet, Integer> getInteger = new SqlF<ResultSet, Integer>() { + public Integer apply(ResultSet rs) throws SQLException { + int i = rs.getInt(1); + return rs.wasNull() ? null : i; + } + }; +} diff --git a/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java b/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java index 058435e..288735a 100644 --- a/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java +++ b/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java @@ -43,6 +43,16 @@ public abstract class SqlOption<A> { return !isSome(); } + public abstract A getOrElse(A a); + + public static <A> SqlOption<A> fromNull(A a) { + if (a != null) { + return some(a); + } else { + return none(); + } + } + // ----------------------------------------------------------------------- // // ----------------------------------------------------------------------- @@ -59,6 +69,14 @@ public abstract class SqlOption<A> { public boolean isSome() { return false; } + + public A getOrElse(A a) { + return a; + } + + public String toString() { + return "None"; + } } private static class Some<A> extends SqlOption<A> { @@ -79,5 +97,13 @@ public abstract class SqlOption<A> { public boolean isSome() { return true; } + + public A getOrElse(A a) { + return this.a; + } + + public String toString() { + return "Some(" + a + ")"; + } } } |