From e57f313713ee0fc52d7dd1247c51914d1462dfc2 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 16 Jun 2013 21:26:17 +0200 Subject: wip --- src/main/java/io/trygvis/queue/QueueExecutor.java | 27 ++++++++--------------- 1 file changed, 9 insertions(+), 18 deletions(-) (limited to 'src/main/java/io/trygvis/queue/QueueExecutor.java') diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java index a1eb3b7..3739532 100644 --- a/src/main/java/io/trygvis/queue/QueueExecutor.java +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -10,11 +10,8 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*; import static io.trygvis.queue.Task.TaskState.NEW; public class QueueExecutor { @@ -51,6 +48,10 @@ public class QueueExecutor { } } + public QueueStats getStats() { + return stats.toStats(); + } + public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { log.info("Consuming tasks: request={}", req); @@ -69,17 +70,7 @@ public class QueueExecutor { } while (tasks.size() > 0); } - public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List tasks, - ScheduledThreadPoolExecutor executor) { - executor.execute(new Runnable() { - @Override - public void run() { - applyTasks(req, taskEffect, tasks); - } - }); - } - - private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List tasks) { + public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List tasks) { for (Task task : tasks) { TaskExecutionResult result = applyTask(effect, task); @@ -94,17 +85,17 @@ public class QueueExecutor { *

* If the task fails, the status is set to error in a separate transaction. */ - private TaskExecutionResult applyTask(TaskEffect effect, final Task task) { + public TaskExecutionResult applyTask(TaskEffect effect, final Task task) { try { Integer count = sqlEffectExecutor.transaction(new SqlEffect() { @Override public Integer doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).update(task.markProcessing()); + return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW); } }); if (count == 0) { - log.trace("Missed task {}", task.id()); + log.warn("Missed task {}", task.id()); return MISSED; } -- cgit v1.2.3