package io.trygvis.queue; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Timestamp; import java.sql.Types; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; import static java.util.Arrays.asList; public class TaskDao { private final Connection connection; public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments"; public TaskDao(Connection connection) { this.connection = connection; } public long insert(String queue, Date scheduled, String arguments) throws SQLException { return insert(null, queue, scheduled, arguments); } public long insert(Long parent, String queue, Date scheduled, String arguments) throws SQLException { String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " + "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)"; try (PreparedStatement stmt = connection.prepareStatement(sql)) { int i = 1; if (parent == null) { stmt.setNull(i++, Types.BIGINT); } else { stmt.setLong(i++, parent); } stmt.setString(i++, queue); stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); stmt.setString(i, arguments); stmt.executeUpdate(); } try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) { ResultSet rs = stmt.executeQuery(); rs.next(); return rs.getLong(1); } } public Task findById(long id) throws SQLException { try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { ResultSet rs = stmt.executeQuery(); return rs.next() ? mapRow(rs) : null; } } public List findByNameAndCompletedIsNull(String name) throws SQLException { try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) { int i = 1; stmt.setString(i, name); ResultSet rs = stmt.executeQuery(); List list = new ArrayList<>(); while (rs.next()) { list.add(mapRow(rs)); } return list; } } public void update(Task task) throws SQLException { try (PreparedStatement stmt = connection.prepareStatement("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { int i = 1; stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); setTimestamp(stmt, i++, task.lastRun); stmt.setInt(i++, task.runCount); setTimestamp(stmt, i++, task.completed); stmt.setLong(i, task.id); stmt.executeUpdate(); } } private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException { if (date == null) { stmt.setNull(parameterIndex, Types.TIMESTAMP); } else { stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime())); } } public Task mapRow(ResultSet rs) throws SQLException { String arguments = rs.getString(8); return new Task( rs.getLong(1), rs.getLong(2), rs.getString(3), rs.getTimestamp(4), rs.getTimestamp(5), rs.getInt(6), rs.getTimestamp(7), arguments != null ? asList(arguments.split(" ")) : Collections.emptyList()); } }