aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/TaskDao.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/TaskDao.java')
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java68
1 files changed, 44 insertions, 24 deletions
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index 3aa2ac2..025823b 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -11,27 +11,25 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
+import static io.trygvis.queue.Task.TaskState;
+import static io.trygvis.queue.Task.TaskState.valueOf;
import static io.trygvis.queue.Task.argumentsToString;
import static io.trygvis.queue.Task.stringToArguments;
public class TaskDao {
- private final Connection connection;
+ private final Connection c;
- public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments";
+ public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments";
- public TaskDao(Connection connection) {
- this.connection = connection;
+ TaskDao(Connection c) {
+ this.c = c;
}
- public long insert(String queue, Date scheduled, List<String> arguments) throws SQLException {
- return insert(null, queue, scheduled, arguments);
- }
-
- public long insert(Long parent, String queue, Date scheduled, List<String> arguments) throws SQLException {
- String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " +
- "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)";
- try (PreparedStatement stmt = connection.prepareStatement(sql)) {
+ public long insert(Long parent, String queue, TaskState state, Date scheduled, List<String> arguments) throws SQLException {
+ String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)";
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
int i = 1;
if (parent == null) {
stmt.setNull(i++, Types.BIGINT);
@@ -39,11 +37,12 @@ public class TaskDao {
stmt.setLong(i++, parent);
}
stmt.setString(i++, queue);
+ stmt.setString(i++, state.name());
stmt.setTimestamp(i++, new Timestamp(scheduled.getTime()));
stmt.setString(i, argumentsToString(arguments));
stmt.executeUpdate();
}
- try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) {
ResultSet rs = stmt.executeQuery();
rs.next();
return rs.getLong(1);
@@ -51,7 +50,7 @@ public class TaskDao {
}
public Task findById(long id) throws SQLException {
- try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
stmt.setLong(1, id);
ResultSet rs = stmt.executeQuery();
return rs.next() ? mapRow(rs) : null;
@@ -59,7 +58,7 @@ public class TaskDao {
}
public List<Task> findByNameAndCompletedIsNull(String name) throws SQLException {
- try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) {
int i = 1;
stmt.setString(i, name);
ResultSet rs = stmt.executeQuery();
@@ -72,8 +71,9 @@ public class TaskDao {
}
public void update(Task task) throws SQLException {
- try (PreparedStatement stmt = connection.prepareStatement("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
+ try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
int i = 1;
+ stmt.setString(i++, task.state.name());
stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime()));
setTimestamp(stmt, i++, task.lastRun);
stmt.setInt(i++, task.runCount);
@@ -83,6 +83,19 @@ public class TaskDao {
}
}
+ public void setState(List<Task> tasks, TaskState state) throws SQLException {
+ Long[] ids = new Long[tasks.size()];
+ for (int i = 0, tasksSize = tasks.size(); i < tasksSize; i++) {
+ ids[i] = tasks.get(i).id();
+ }
+ try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ANY (?)")) {
+ int i = 1;
+ stmt.setString(i++, state.name());
+ stmt.setObject(i, c.createArrayOf("bigint", ids));
+ stmt.executeUpdate();
+ }
+ }
+
private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException {
if (date == null) {
stmt.setNull(parameterIndex, Types.TIMESTAMP);
@@ -92,15 +105,22 @@ public class TaskDao {
}
public Task mapRow(ResultSet rs) throws SQLException {
- String arguments = rs.getString(8);
+ String arguments = rs.getString(9);
+ int i = 1;
return new Task(
- rs.getLong(1),
- rs.getLong(2),
- rs.getString(3),
- rs.getTimestamp(4),
- rs.getTimestamp(5),
- rs.getInt(6),
- rs.getTimestamp(7),
+ rs.getLong(i++),
+ rs.getLong(i++),
+ rs.getString(i++),
+ valueOf(rs.getString(i++)),
+ rs.getTimestamp(i++),
+ rs.getTimestamp(i++),
+ rs.getInt(i++),
+ rs.getTimestamp(i),
arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList());
}
+
+ public void rollback() throws SQLException {
+ c.rollback();
+ c.close();
+ }
}