diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-16 21:26:17 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-16 21:26:17 +0200 |
commit | e57f313713ee0fc52d7dd1247c51914d1462dfc2 (patch) | |
tree | f39a49b967f6f18d0637e61f2dd4d8081615283b /src/main/java/io/trygvis/queue | |
parent | 1ec4fae12c5e5363591013e5a759590d913d6782 (diff) | |
download | quartz-based-queue-e57f313713ee0fc52d7dd1247c51914d1462dfc2.tar.gz quartz-based-queue-e57f313713ee0fc52d7dd1247c51914d1462dfc2.tar.bz2 quartz-based-queue-e57f313713ee0fc52d7dd1247c51914d1462dfc2.tar.xz quartz-based-queue-e57f313713ee0fc52d7dd1247c51914d1462dfc2.zip |
wip
Diffstat (limited to 'src/main/java/io/trygvis/queue')
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueExecutor.java | 27 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueService.java | 4 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskDao.java | 17 |
3 files changed, 28 insertions, 20 deletions
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java index a1eb3b7..3739532 100644 --- a/src/main/java/io/trygvis/queue/QueueExecutor.java +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -10,11 +10,8 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*; import static io.trygvis.queue.Task.TaskState.NEW; public class QueueExecutor { @@ -51,6 +48,10 @@ public class QueueExecutor { } } + public QueueStats getStats() { + return stats.toStats(); + } + public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { log.info("Consuming tasks: request={}", req); @@ -69,17 +70,7 @@ public class QueueExecutor { } while (tasks.size() > 0); } - public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List<Task> tasks, - ScheduledThreadPoolExecutor executor) { - executor.execute(new Runnable() { - @Override - public void run() { - applyTasks(req, taskEffect, tasks); - } - }); - } - - private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) { + public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) { for (Task task : tasks) { TaskExecutionResult result = applyTask(effect, task); @@ -94,17 +85,17 @@ public class QueueExecutor { * <p/> * If the task fails, the status is set to error in a separate transaction. */ - private TaskExecutionResult applyTask(TaskEffect effect, final Task task) { + public TaskExecutionResult applyTask(TaskEffect effect, final Task task) { try { Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() { @Override public Integer doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).update(task.markProcessing()); + return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW); } }); if (count == 0) { - log.trace("Missed task {}", task.id()); + log.warn("Missed task {}", task.id()); return MISSED; } diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index eee14ed..f4ce536 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -15,6 +15,10 @@ public interface QueueService { // TODO: saveExceptions public TaskExecutionRequest(long chunkSize, boolean stopOnError) { + if (chunkSize <= 0) { + throw new IllegalArgumentException("chunkSize has to be bigger than zero."); + } + this.chunkSize = chunkSize; this.stopOnError = stopOnError; } diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 5d77a41..365b44b 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -84,14 +84,27 @@ public class TaskDao { } public int update(Task task) throws SQLException { - try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { + return update(task, null); + } + + public int update(Task task, TaskState state) throws SQLException { + String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?"; + + if (state != null) { + sql += " AND state=?"; + } + + try (PreparedStatement stmt = c.prepareStatement(sql)) { int i = 1; stmt.setString(i++, task.state.name()); stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); setTimestamp(stmt, i++, task.lastRun); stmt.setInt(i++, task.runCount); setTimestamp(stmt, i++, task.completed); - stmt.setLong(i, task.id()); + stmt.setLong(i++, task.id()); + if (state != null) { + stmt.setString(i, state.name()); + } return stmt.executeUpdate(); } } |