From 7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 23 Jun 2013 09:37:57 +0200 Subject: o Initial import of JDBC queue. --- src/main/java/io/trygvis/queue/TaskDao.java | 148 ++++++++++++++++++++++++++++ 1 file changed, 148 insertions(+) create mode 100644 src/main/java/io/trygvis/queue/TaskDao.java (limited to 'src/main/java/io/trygvis/queue/TaskDao.java') diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java new file mode 100644 index 0000000..365b44b --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -0,0 +1,148 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.EnumMap; +import java.util.List; + +import static io.trygvis.queue.Task.*; + +public class TaskDao { + + private final Connection c; + + public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments"; + + TaskDao(Connection c) { + this.c = c; + } + + public long insert(Long parent, String queue, TaskState state, Date scheduled, List arguments) throws SQLException { + String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " + + "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)"; + try (PreparedStatement stmt = c.prepareStatement(sql)) { + int i = 1; + if (parent == null) { + stmt.setNull(i++, Types.BIGINT); + } else { + stmt.setLong(i++, parent); + } + stmt.setString(i++, queue); + stmt.setString(i++, state.name()); + stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); + stmt.setString(i, argumentsToString(arguments)); + stmt.executeUpdate(); + } + try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) { + ResultSet rs = stmt.executeQuery(); + rs.next(); + return rs.getLong(1); + } + } + + public Task findById(long id) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { + stmt.setLong(1, id); + ResultSet rs = stmt.executeQuery(); + return rs.next() ? mapRow(rs) : null; + } + } + + public List findByQueueAndState(String queue, TaskState state, long limit) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) { + int i = 1; + stmt.setString(i++, queue); + stmt.setString(i++, state.name()); + stmt.setLong(i, limit); + ResultSet rs = stmt.executeQuery(); + List list = new ArrayList<>(); + while (rs.next()) { + list.add(mapRow(rs)); + } + return list; + } + } + + public QueueStats findQueueStatsByName(String queue) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) { + int i = 1; + stmt.setString(i, queue); + ResultSet rs = stmt.executeQuery(); + EnumMap states = new EnumMap<>(TaskState.class); + while (rs.next()) { + states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2)); + } + return new QueueStats(queue, states); + } + } + + public int update(Task task) throws SQLException { + return update(task, null); + } + + public int update(Task task, TaskState state) throws SQLException { + String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?"; + + if (state != null) { + sql += " AND state=?"; + } + + try (PreparedStatement stmt = c.prepareStatement(sql)) { + int i = 1; + stmt.setString(i++, task.state.name()); + stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); + setTimestamp(stmt, i++, task.lastRun); + stmt.setInt(i++, task.runCount); + setTimestamp(stmt, i++, task.completed); + stmt.setLong(i++, task.id()); + if (state != null) { + stmt.setString(i, state.name()); + } + return stmt.executeUpdate(); + } + } + + public void setState(Task task, TaskState state) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) { + int i = 1; + stmt.setString(i++, state.name()); + stmt.setLong(i, task.id()); + stmt.executeUpdate(); + } + } + + private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException { + if (date == null) { + stmt.setNull(parameterIndex, Types.TIMESTAMP); + } else { + stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime())); + } + } + + public Task mapRow(ResultSet rs) throws SQLException { + String arguments = rs.getString(9); + int i = 1; + return new Task( + rs.getLong(i++), + rs.getLong(i++), + rs.getString(i++), + TaskState.valueOf(rs.getString(i++)), + rs.getTimestamp(i++), + rs.getTimestamp(i++), + rs.getInt(i++), + rs.getTimestamp(i), + arguments != null ? stringToArguments(arguments) : Collections.emptyList()); + } + + public void rollback() throws SQLException { + c.rollback(); + c.close(); + } +} -- cgit v1.2.3