aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/TaskDao.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/TaskDao.java')
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java148
1 files changed, 148 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
new file mode 100644
index 0000000..365b44b
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -0,0 +1,148 @@
+package io.trygvis.queue;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.List;
+
+import static io.trygvis.queue.Task.*;
+
+public class TaskDao {
+
+ private final Connection c;
+
+ public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments";
+
+ TaskDao(Connection c) {
+ this.c = c;
+ }
+
+ public long insert(Long parent, String queue, TaskState state, Date scheduled, List<String> arguments) throws SQLException {
+ String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)";
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
+ int i = 1;
+ if (parent == null) {
+ stmt.setNull(i++, Types.BIGINT);
+ } else {
+ stmt.setLong(i++, parent);
+ }
+ stmt.setString(i++, queue);
+ stmt.setString(i++, state.name());
+ stmt.setTimestamp(i++, new Timestamp(scheduled.getTime()));
+ stmt.setString(i, argumentsToString(arguments));
+ stmt.executeUpdate();
+ }
+ try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) {
+ ResultSet rs = stmt.executeQuery();
+ rs.next();
+ return rs.getLong(1);
+ }
+ }
+
+ public Task findById(long id) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
+ stmt.setLong(1, id);
+ ResultSet rs = stmt.executeQuery();
+ return rs.next() ? mapRow(rs) : null;
+ }
+ }
+
+ public List<Task> findByQueueAndState(String queue, TaskState state, long limit) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) {
+ int i = 1;
+ stmt.setString(i++, queue);
+ stmt.setString(i++, state.name());
+ stmt.setLong(i, limit);
+ ResultSet rs = stmt.executeQuery();
+ List<Task> list = new ArrayList<>();
+ while (rs.next()) {
+ list.add(mapRow(rs));
+ }
+ return list;
+ }
+ }
+
+ public QueueStats findQueueStatsByName(String queue) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) {
+ int i = 1;
+ stmt.setString(i, queue);
+ ResultSet rs = stmt.executeQuery();
+ EnumMap<TaskState, Long> states = new EnumMap<>(TaskState.class);
+ while (rs.next()) {
+ states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2));
+ }
+ return new QueueStats(queue, states);
+ }
+ }
+
+ public int update(Task task) throws SQLException {
+ return update(task, null);
+ }
+
+ public int update(Task task, TaskState state) throws SQLException {
+ String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?";
+
+ if (state != null) {
+ sql += " AND state=?";
+ }
+
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
+ int i = 1;
+ stmt.setString(i++, task.state.name());
+ stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime()));
+ setTimestamp(stmt, i++, task.lastRun);
+ stmt.setInt(i++, task.runCount);
+ setTimestamp(stmt, i++, task.completed);
+ stmt.setLong(i++, task.id());
+ if (state != null) {
+ stmt.setString(i, state.name());
+ }
+ return stmt.executeUpdate();
+ }
+ }
+
+ public void setState(Task task, TaskState state) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) {
+ int i = 1;
+ stmt.setString(i++, state.name());
+ stmt.setLong(i, task.id());
+ stmt.executeUpdate();
+ }
+ }
+
+ private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException {
+ if (date == null) {
+ stmt.setNull(parameterIndex, Types.TIMESTAMP);
+ } else {
+ stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime()));
+ }
+ }
+
+ public Task mapRow(ResultSet rs) throws SQLException {
+ String arguments = rs.getString(9);
+ int i = 1;
+ return new Task(
+ rs.getLong(i++),
+ rs.getLong(i++),
+ rs.getString(i++),
+ TaskState.valueOf(rs.getString(i++)),
+ rs.getTimestamp(i++),
+ rs.getTimestamp(i++),
+ rs.getInt(i++),
+ rs.getTimestamp(i),
+ arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList());
+ }
+
+ public void rollback() throws SQLException {
+ c.rollback();
+ c.close();
+ }
+}