diff options
Diffstat (limited to 'src/main/java/io/trygvis/queue')
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueExecutor.java | 27 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueService.java | 4 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskDao.java | 17 |
3 files changed, 28 insertions, 20 deletions
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java index a1eb3b7..3739532 100644 --- a/src/main/java/io/trygvis/queue/QueueExecutor.java +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -10,11 +10,8 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*; import static io.trygvis.queue.Task.TaskState.NEW; public class QueueExecutor { @@ -51,6 +48,10 @@ public class QueueExecutor { } } + public QueueStats getStats() { + return stats.toStats(); + } + public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { log.info("Consuming tasks: request={}", req); @@ -69,17 +70,7 @@ public class QueueExecutor { } while (tasks.size() > 0); } - public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List<Task> tasks, - ScheduledThreadPoolExecutor executor) { - executor.execute(new Runnable() { - @Override - public void run() { - applyTasks(req, taskEffect, tasks); - } - }); - } - - private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) { + public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) { for (Task task : tasks) { TaskExecutionResult result = applyTask(effect, task); @@ -94,17 +85,17 @@ public class QueueExecutor { * <p/> * If the task fails, the status is set to error in a separate transaction. */ - private TaskExecutionResult applyTask(TaskEffect effect, final Task task) { + public TaskExecutionResult applyTask(TaskEffect effect, final Task task) { try { Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() { @Override public Integer doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).update(task.markProcessing()); + return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW); } }); if (count == 0) { - log.trace("Missed task {}", task.id()); + log.warn("Missed task {}", task.id()); return MISSED; } diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index eee14ed..f4ce536 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -15,6 +15,10 @@ public interface QueueService { // TODO: saveExceptions public TaskExecutionRequest(long chunkSize, boolean stopOnError) { + if (chunkSize <= 0) { + throw new IllegalArgumentException("chunkSize has to be bigger than zero."); + } + this.chunkSize = chunkSize; this.stopOnError = stopOnError; } diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 5d77a41..365b44b 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -84,14 +84,27 @@ public class TaskDao { } public int update(Task task) throws SQLException { - try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { + return update(task, null); + } + + public int update(Task task, TaskState state) throws SQLException { + String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?"; + + if (state != null) { + sql += " AND state=?"; + } + + try (PreparedStatement stmt = c.prepareStatement(sql)) { int i = 1; stmt.setString(i++, task.state.name()); stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); setTimestamp(stmt, i++, task.lastRun); stmt.setInt(i++, task.runCount); setTimestamp(stmt, i++, task.completed); - stmt.setLong(i, task.id()); + stmt.setLong(i++, task.id()); + if (state != null) { + stmt.setString(i, state.name()); + } return stmt.executeUpdate(); } } |