package io.trygvis.queue; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.EnumMap; import java.util.List; import static io.trygvis.queue.Task.*; public class TaskDao { private final Connection c; public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments"; TaskDao(Connection c) { this.c = c; } public long insert(Long parent, String queue, TaskState state, Date scheduled, List arguments) throws SQLException { String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " + "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)"; try (PreparedStatement stmt = c.prepareStatement(sql)) { int i = 1; if (parent == null) { stmt.setNull(i++, Types.BIGINT); } else { stmt.setLong(i++, parent); } stmt.setString(i++, queue); stmt.setString(i++, state.name()); stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); stmt.setString(i, argumentsToString(arguments)); stmt.executeUpdate(); } try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) { ResultSet rs = stmt.executeQuery(); rs.next(); return rs.getLong(1); } } public Task findById(long id) throws SQLException { try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { stmt.setLong(1, id); ResultSet rs = stmt.executeQuery(); return rs.next() ? mapRow(rs) : null; } } public List findByQueueAndState(String queue, TaskState state, long limit) throws SQLException { try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) { int i = 1; stmt.setString(i++, queue); stmt.setString(i++, state.name()); stmt.setLong(i, limit); ResultSet rs = stmt.executeQuery(); List list = new ArrayList<>(); while (rs.next()) { list.add(mapRow(rs)); } return list; } } public QueueStats findQueueStatsByName(String queue) throws SQLException { try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) { int i = 1; stmt.setString(i, queue); ResultSet rs = stmt.executeQuery(); EnumMap states = new EnumMap<>(TaskState.class); while (rs.next()) { states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2)); } return new QueueStats(queue, states); } } public int update(Task task) throws SQLException { return update(task, null); } public int update(Task task, TaskState state) throws SQLException { String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?"; if (state != null) { sql += " AND state=?"; } try (PreparedStatement stmt = c.prepareStatement(sql)) { int i = 1; stmt.setString(i++, task.state.name()); stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); setTimestamp(stmt, i++, task.lastRun); stmt.setInt(i++, task.runCount); setTimestamp(stmt, i++, task.completed); stmt.setLong(i++, task.id()); if (state != null) { stmt.setString(i, state.name()); } return stmt.executeUpdate(); } } public void setState(Task task, TaskState state) throws SQLException { try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) { int i = 1; stmt.setString(i++, state.name()); stmt.setLong(i, task.id()); stmt.executeUpdate(); } } private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException { if (date == null) { stmt.setNull(parameterIndex, Types.TIMESTAMP); } else { stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime())); } } public Task mapRow(ResultSet rs) throws SQLException { String arguments = rs.getString(9); int i = 1; return new Task( rs.getLong(i++), rs.getLong(i++), rs.getString(i++), TaskState.valueOf(rs.getString(i++)), rs.getTimestamp(i++), rs.getTimestamp(i++), rs.getInt(i++), rs.getTimestamp(i), arguments != null ? stringToArguments(arguments) : Collections.emptyList()); } public void rollback() throws SQLException { c.rollback(); c.close(); } }