aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main/java/io/trygvis/esper/testing/core/TablePoller.java86
-rw-r--r--src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java18
-rw-r--r--src/main/java/io/trygvis/esper/testing/sql/SqlOption.java26
-rw-r--r--src/main/resources/ddl-core.sql10
-rw-r--r--src/main/resources/ddl-jenkins.sql7
5 files changed, 94 insertions, 53 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/core/TablePoller.java b/src/main/java/io/trygvis/esper/testing/core/TablePoller.java
index 58a3e75..a5e5d80 100644
--- a/src/main/java/io/trygvis/esper/testing/core/TablePoller.java
+++ b/src/main/java/io/trygvis/esper/testing/core/TablePoller.java
@@ -7,10 +7,9 @@ import org.slf4j.*;
import javax.sql.*;
import java.sql.*;
-import java.util.*;
-import java.util.List;
-import static fj.data.Option.*;
+import static io.trygvis.esper.testing.sql.ResultSetF.*;
+import static io.trygvis.esper.testing.sql.SqlOption.*;
import static java.lang.System.*;
public class TablePoller<A> {
@@ -40,35 +39,42 @@ public class TablePoller<A> {
long start = currentTimeMillis();
TablePollerDao dao = new TablePollerDao(c);
- Option<Timestamp> o = dao.getLastCreatedDateForPoller();
+ SqlOption<Integer> o = dao.getLastSequenceForPoller();
if (o.isNone()) {
logger.info("First run of poller '{}'", pollerName);
} else {
- logger.info("Running poller '{}', last run was {}", pollerName, formatter.print(o.some().getTime()));
+ logger.info("Running poller '{}', last seq was {}", pollerName, o.get());
}
- Timestamp lastCreatedDate = o.orSome(new Timestamp(0));
+ int seq = o.getOrElse(0);
+ int count = 0;
- Option<Timestamp> o2 = dao.getOldestCreatedDateAfter(lastCreatedDate);
+ Integer seqO = dao.getMinSeqAfter(seq);
- if (o2.isSome()) {
- Timestamp oldestCreatedDate = o2.some();
+ while (seqO != null) {
+ seq = seqO;
- List<A> rows = dao.getRowsCreatedAt(oldestCreatedDate);
+ logger.info("Processing seq={}", seq);
- logger.info("Processing {} rows created at {}", rows.size(), formatter.print(oldestCreatedDate.getTime()));
+ A row = dao.getRow(seq);
- for (A row : rows) {
- callback.process(c, row);
- }
- } else {
- logger.debug("No new rows.");
+ callback.process(c, row);
+
+ seqO = dao.getMinSeqAfter(seq);
+ count++;
+ }
- Thread.sleep(1000);
+ if(count > 0) {
+ logger.info("Processed {} rows.", count);
+ }
+ else {
+ logger.debug("No new rows.");
}
- dao.insertOrUpdate(o.isNone(), o2.orSome(new Timestamp(start)), new Timestamp(start), currentTimeMillis() - start, null);
+ Thread.sleep(10 * 1000);
+
+ dao.insertOrUpdate(o.isNone(), seq, new Timestamp(start), currentTimeMillis() - start, null);
c.commit();
}
@@ -86,49 +92,35 @@ public class TablePoller<A> {
this.c = c;
}
- public Option<Timestamp> getLastCreatedDateForPoller() throws SQLException {
- try (PreparedStatement s = c.prepareStatement("SELECT last_created_date FROM table_poller_status WHERE poller_name=?")) {
+ public SqlOption<Integer> getLastSequenceForPoller() throws SQLException {
+ try (PreparedStatement s = c.prepareStatement("SELECT last_seq FROM table_poller_status WHERE poller_name=?")) {
s.setString(1, pollerName);
- ResultSet rs = s.executeQuery();
- if (!rs.next()) {
- return none();
- }
-
- return some(rs.getTimestamp(1));
+ return fromRs(s.executeQuery()).map(getInt);
}
}
- public Option<Timestamp> getOldestCreatedDateAfter(Timestamp timestamp) throws SQLException {
- try (PreparedStatement s = c.prepareStatement("SELECT min(created_date) FROM " + tableName + " WHERE created_date > ? AND " + filter)) {
- s.setTimestamp(1, timestamp);
- ResultSet rs = s.executeQuery();
- rs.next();
- return fromNull(rs.getTimestamp(1));
+ public Integer getMinSeqAfter(int seq) throws SQLException {
+ try (PreparedStatement s = c.prepareStatement("SELECT min(seq) FROM " + tableName + " WHERE seq>? AND " + filter)) {
+ s.setInt(1, seq);
+ return fromRs(s.executeQuery()).map(getInteger).get();
}
}
- public List<A> getRowsCreatedAt(Timestamp timestamp) throws SQLException {
- try (PreparedStatement s = c.prepareStatement("SELECT " + columnNames + " FROM " + tableName + " WHERE created_date = ? AND " + filter)) {
- s.setTimestamp(1, timestamp);
-
- ResultSet rs = s.executeQuery();
-
- List<A> list = new ArrayList<>();
- while (rs.next()) {
- list.add(f.apply(rs));
- }
- return list;
+ public A getRow(int seq) throws SQLException {
+ try (PreparedStatement s = c.prepareStatement("SELECT " + columnNames + " FROM " + tableName + " WHERE seq=?")) {
+ s.setInt(1, seq);
+ return fromRs(s.executeQuery()).map(f).get();
}
}
- public void insertOrUpdate(boolean insert, Timestamp lastCreatedDate, Timestamp now, long duration, String status) throws SQLException {
- String insertSql = "INSERT INTO table_poller_status(last_created_date, last_run, duration, status, poller_name) VALUES(?, ?, ?, ?, ?)";
+ public void insertOrUpdate(boolean insert, int seq, Timestamp now, long duration, String status) throws SQLException {
+ String insertSql = "INSERT INTO table_poller_status(last_seq, last_run, duration, status, poller_name) VALUES(?, ?, ?, ?, ?)";
- String updateSql = "UPDATE table_poller_status SET last_created_date=?, last_run=?, duration=?, status=? WHERE poller_name=?";
+ String updateSql = "UPDATE table_poller_status SET last_seq=?, last_run=?, duration=?, status=? WHERE poller_name=?";
try (PreparedStatement s = c.prepareStatement(insert ? insertSql : updateSql)) {
int i = 1;
- s.setTimestamp(i++, lastCreatedDate);
+ s.setInt(i++, seq);
s.setTimestamp(i++, now);
s.setLong(i++, duration);
s.setString(i++, status);
diff --git a/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java b/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java
new file mode 100644
index 0000000..e5a9e4e
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/sql/ResultSetF.java
@@ -0,0 +1,18 @@
+package io.trygvis.esper.testing.sql;
+
+import java.sql.*;
+
+public class ResultSetF {
+ public static final SqlF<ResultSet, Integer> getInt = new SqlF<ResultSet, Integer>() {
+ public Integer apply(ResultSet rs) throws SQLException {
+ return rs.getInt(1);
+ }
+ };
+
+ public static final SqlF<ResultSet, Integer> getInteger = new SqlF<ResultSet, Integer>() {
+ public Integer apply(ResultSet rs) throws SQLException {
+ int i = rs.getInt(1);
+ return rs.wasNull() ? null : i;
+ }
+ };
+}
diff --git a/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java b/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java
index 058435e..288735a 100644
--- a/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java
+++ b/src/main/java/io/trygvis/esper/testing/sql/SqlOption.java
@@ -43,6 +43,16 @@ public abstract class SqlOption<A> {
return !isSome();
}
+ public abstract A getOrElse(A a);
+
+ public static <A> SqlOption<A> fromNull(A a) {
+ if (a != null) {
+ return some(a);
+ } else {
+ return none();
+ }
+ }
+
// -----------------------------------------------------------------------
//
// -----------------------------------------------------------------------
@@ -59,6 +69,14 @@ public abstract class SqlOption<A> {
public boolean isSome() {
return false;
}
+
+ public A getOrElse(A a) {
+ return a;
+ }
+
+ public String toString() {
+ return "None";
+ }
}
private static class Some<A> extends SqlOption<A> {
@@ -79,5 +97,13 @@ public abstract class SqlOption<A> {
public boolean isSome() {
return true;
}
+
+ public A getOrElse(A a) {
+ return this.a;
+ }
+
+ public String toString() {
+ return "Some(" + a + ")";
+ }
}
}
diff --git a/src/main/resources/ddl-core.sql b/src/main/resources/ddl-core.sql
index 25f373b..9dcdd09 100644
--- a/src/main/resources/ddl-core.sql
+++ b/src/main/resources/ddl-core.sql
@@ -9,11 +9,11 @@ DROP TABLE IF EXISTS person;
DROP TABLE IF EXISTS table_poller_status;
CREATE TABLE table_poller_status (
- poller_name VARCHAR(100) NOT NULL,
- last_created_date TIMESTAMP,
- last_run TIMESTAMP,
- duration INT,
- status VARCHAR(1000),
+ poller_name VARCHAR(100) NOT NULL,
+ last_seq INT NOT NULL,
+ last_run TIMESTAMP,
+ duration INT,
+ status VARCHAR(1000),
CONSTRAINT pk_job_status PRIMARY KEY (poller_name)
);
diff --git a/src/main/resources/ddl-jenkins.sql b/src/main/resources/ddl-jenkins.sql
index bdda74b..94bfc4e 100644
--- a/src/main/resources/ddl-jenkins.sql
+++ b/src/main/resources/ddl-jenkins.sql
@@ -7,8 +7,10 @@ DROP TABLE IF EXISTS jenkins_server;
CREATE TABLE jenkins_server (
uuid CHAR(36) NOT NULL,
created_date TIMESTAMP NOT NULL,
+
url VARCHAR(1000) NOT NULL,
enabled BOOLEAN NOT NULL,
+
CONSTRAINT pk_jenkins_server PRIMARY KEY (uuid),
CONSTRAINT uq_jenkins_server__url UNIQUE (url)
);
@@ -31,6 +33,7 @@ CREATE TABLE jenkins_job (
CREATE TABLE jenkins_build (
uuid CHAR(36) NOT NULL,
created_date TIMESTAMP NOT NULL,
+ seq INT NOT NULL DEFAULT nextval('jenkins_build_seq'),
job CHAR(36) NOT NULL,
@@ -44,7 +47,8 @@ CREATE TABLE jenkins_build (
CONSTRAINT pk_jenkins_build PRIMARY KEY (UUID),
CONSTRAINT fk_jenkins_build__job FOREIGN KEY (job) REFERENCES jenkins_job (uuid),
- CONSTRAINT uq_jenkins_build__id UNIQUE (entry_id)
+ CONSTRAINT uq_jenkins_build__id UNIQUE (entry_id),
+ CONSTRAINT uq_jenkins_build__seq UNIQUE (seq)
);
CREATE INDEX ix_jenkins_build__created_date ON jenkins_build (created_date);
@@ -52,6 +56,7 @@ CREATE INDEX ix_jenkins_build__created_date ON jenkins_build (created_date);
CREATE TABLE jenkins_user (
uuid CHAR(36) NOT NULL,
created_date TIMESTAMP NOT NULL,
+
server CHAR(36) NOT NULL,
absolute_url VARCHAR(1000) NOT NULL,
CONSTRAINT pk_jenkins_user PRIMARY KEY (uuid),