aboutsummaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2012-12-21 22:24:41 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2012-12-21 22:24:41 +0100
commita63ec924e5440b17434ecc91e84d1419ae39ef2a (patch)
tree933459602e0fcfd76a5f0d98d5465dc9f4aabed9 /src/main/java
parente7b1958ce5e93ead2d7d3c74eabe00a4186a048a (diff)
downloadesper-testing-a63ec924e5440b17434ecc91e84d1419ae39ef2a.tar.gz
esper-testing-a63ec924e5440b17434ecc91e84d1419ae39ef2a.tar.bz2
esper-testing-a63ec924e5440b17434ecc91e84d1419ae39ef2a.tar.xz
esper-testing-a63ec924e5440b17434ecc91e84d1419ae39ef2a.zip
o Adding a sequence number to jenkins_build.
o Making the table poller poll based on a sequence number instead.
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/io/trygvis/esper/testing/core/TablePoller.java86
-rw-r--r--src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java18
-rw-r--r--src/main/java/io/trygvis/esper/testing/sql/SqlOption.java26
3 files changed, 83 insertions, 47 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/core/TablePoller.java b/src/main/java/io/trygvis/esper/testing/core/TablePoller.java
index 58a3e75..a5e5d80 100644
--- a/src/main/java/io/trygvis/esper/testing/core/TablePoller.java
+++ b/src/main/java/io/trygvis/esper/testing/core/TablePoller.java
@@ -7,10 +7,9 @@ import org.slf4j.*;
import javax.sql.*;
import java.sql.*;
-import java.util.*;
-import java.util.List;
-import static fj.data.Option.*;
+import static io.trygvis.esper.testing.sql.ResultSetF.*;
+import static io.trygvis.esper.testing.sql.SqlOption.*;
import static java.lang.System.*;
public class TablePoller<A> {
@@ -40,35 +39,42 @@ public class TablePoller<A> {
long start = currentTimeMillis();
TablePollerDao dao = new TablePollerDao(c);
- Option<Timestamp> o = dao.getLastCreatedDateForPoller();
+ SqlOption<Integer> o = dao.getLastSequenceForPoller();
if (o.isNone()) {
logger.info("First run of poller '{}'", pollerName);
} else {
- logger.info("Running poller '{}', last run was {}", pollerName, formatter.print(o.some().getTime()));
+ logger.info("Running poller '{}', last seq was {}", pollerName, o.get());
}
- Timestamp lastCreatedDate = o.orSome(new Timestamp(0));
+ int seq = o.getOrElse(0);
+ int count = 0;
- Option<Timestamp> o2 = dao.getOldestCreatedDateAfter(lastCreatedDate);
+ Integer seqO = dao.getMinSeqAfter(seq);
- if (o2.isSome()) {
- Timestamp oldestCreatedDate = o2.some();
+ while (seqO != null) {
+ seq = seqO;
- List<A> rows = dao.getRowsCreatedAt(oldestCreatedDate);
+ logger.info("Processing seq={}", seq);
- logger.info("Processing {} rows created at {}", rows.size(), formatter.print(oldestCreatedDate.getTime()));
+ A row = dao.getRow(seq);
- for (A row : rows) {
- callback.process(c, row);
- }
- } else {
- logger.debug("No new rows.");
+ callback.process(c, row);
+
+ seqO = dao.getMinSeqAfter(seq);
+ count++;
+ }
- Thread.sleep(1000);
+ if(count > 0) {
+ logger.info("Processed {} rows.", count);
+ }
+ else {
+ logger.debug("No new rows.");
}
- dao.insertOrUpdate(o.isNone(), o2.orSome(new Timestamp(start)), new Timestamp(start), currentTimeMillis() - start, null);
+ Thread.sleep(10 * 1000);
+
+ dao.insertOrUpdate(o.isNone(), seq, new Timestamp(start), currentTimeMillis() - start, null);
c.commit();
}
@@ -86,49 +92,35 @@ public class TablePoller<A> {
this.c = c;
}
- public Option<Timestamp> getLastCreatedDateForPoller() throws SQLException {
- try (PreparedStatement s = c.prepareStatement("SELECT last_created_date FROM table_poller_status WHERE poller_name=?")) {
+ public SqlOption<Integer> getLastSequenceForPoller() throws SQLException {
+ try (PreparedStatement s = c.prepareStatement("SELECT last_seq FROM table_poller_status WHERE poller_name=?")) {
s.setString(1, pollerName);
- ResultSet rs = s.executeQuery();
- if (!rs.next()) {
- return none();
- }
-
- return some(rs.getTimestamp(1));
+ return fromRs(s.executeQuery()).map(getInt);
}
}
- public Option<Timestamp> getOldestCreatedDateAfter(Timestamp timestamp) throws SQLException {
- try (PreparedStatement s = c.prepareStatement("SELECT min(created_date) FROM " + tableName + " WHERE created_date > ? AND " + filter)) {
- s.setTimestamp(1, timestamp);
- ResultSet rs = s.executeQuery();
- rs.next();
- return fromNull(rs.getTimestamp(1));
+ public Integer getMinSeqAfter(int seq) throws SQLException {
+ try (PreparedStatement s = c.prepareStatement("SELECT min(seq) FROM " + tableName + " WHERE seq>? AND " + filter)) {
+ s.setInt(1, seq);
+ return fromRs(s.executeQuery()).map(getInteger).get();
}
}
- public List<A> getRowsCreatedAt(Timestamp timestamp) throws SQLException {
- try (PreparedStatement s = c.prepareStatement("SELECT " + columnNames + " FROM " + tableName + " WHERE created_date = ? AND " + filter)) {
- s.setTimestamp(1, timestamp);
-
- ResultSet rs = s.executeQuery();
-
- List<A> list = new ArrayList<>();
- while (rs.next()) {
- list.add(f.apply(rs));
- }
- return list;
+ public A getRow(int seq) throws SQLException {
+ try (PreparedStatement s = c.prepareStatement("SELECT " + columnNames + " FROM " + tableName + " WHERE seq=?")) {
+ s.setInt(1, seq);
+ return fromRs(s.executeQuery()).map(f).get();
}
}
- public void insertOrUpdate(boolean insert, Timestamp lastCreatedDate, Timestamp now, long duration, String status) throws SQLException {
- String insertSql = "INSERT INTO table_poller_status(last_created_date, last_run, duration, status, poller_name) VALUES(?, ?, ?, ?, ?)";
+ public void insertOrUpdate(boolean insert, int seq, Timestamp now, long duration, String status) throws SQLException {
+ String insertSql = "INSERT INTO table_poller_status(last_seq, last_run, duration, status, poller_name) VALUES(?, ?, ?, ?, ?)";
- String updateSql = "UPDATE table_poller_status SET last_created_date=?, last_run=?, duration=?, status=? WHERE poller_name=?";
+ String updateSql = "UPDATE table_poller_status SET last_seq=?, last_run=?, duration=?, status=? WHERE poller_name=?";
try (PreparedStatement s = c.prepareStatement(insert ? insertSql : updateSql)) {
int i = 1;
- s.setTimestamp(i++, lastCreatedDate);
+ s.setInt(i++, seq);
s.setTimestamp(i++, now);
s.setLong(i++, duration);
s.setString(i++, status);
diff --git a/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java b/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java
new file mode 100644
index 0000000..e5a9e4e
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java
@@ -0,0 +1,18 @@
+package io.trygvis.esper.testing.sql;
+
+import java.sql.*;
+
+public class ResultSetF {
+ public static final SqlF<ResultSet, Integer> getInt = new SqlF<ResultSet, Integer>() {
+ public Integer apply(ResultSet rs) throws SQLException {
+ return rs.getInt(1);
+ }
+ };
+
+ public static final SqlF<ResultSet, Integer> getInteger = new SqlF<ResultSet, Integer>() {
+ public Integer apply(ResultSet rs) throws SQLException {
+ int i = rs.getInt(1);
+ return rs.wasNull() ? null : i;
+ }
+ };
+}
diff --git a/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java b/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java
index 058435e..288735a 100644
--- a/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java
+++ b/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java
@@ -43,6 +43,16 @@ public abstract class SqlOption<A> {
return !isSome();
}
+ public abstract A getOrElse(A a);
+
+ public static <A> SqlOption<A> fromNull(A a) {
+ if (a != null) {
+ return some(a);
+ } else {
+ return none();
+ }
+ }
+
// -----------------------------------------------------------------------
//
// -----------------------------------------------------------------------
@@ -59,6 +69,14 @@ public abstract class SqlOption<A> {
public boolean isSome() {
return false;
}
+
+ public A getOrElse(A a) {
+ return a;
+ }
+
+ public String toString() {
+ return "None";
+ }
}
private static class Some<A> extends SqlOption<A> {
@@ -79,5 +97,13 @@ public abstract class SqlOption<A> {
public boolean isSome() {
return true;
}
+
+ public A getOrElse(A a) {
+ return this.a;
+ }
+
+ public String toString() {
+ return "Some(" + a + ")";
+ }
}
}