aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/QueueExecutor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/QueueExecutor.java')
-rw-r--r--src/main/java/io/trygvis/queue/QueueExecutor.java27
1 files changed, 9 insertions, 18 deletions
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java
index a1eb3b7..3739532 100644
--- a/src/main/java/io/trygvis/queue/QueueExecutor.java
+++ b/src/main/java/io/trygvis/queue/QueueExecutor.java
@@ -10,11 +10,8 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED;
-import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED;
-import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK;
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*;
import static io.trygvis.queue.Task.TaskState.NEW;
public class QueueExecutor {
@@ -51,6 +48,10 @@ public class QueueExecutor {
}
}
+ public QueueStats getStats() {
+ return stats.toStats();
+ }
+
public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
log.info("Consuming tasks: request={}", req);
@@ -69,17 +70,7 @@ public class QueueExecutor {
} while (tasks.size() > 0);
}
- public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List<Task> tasks,
- ScheduledThreadPoolExecutor executor) {
- executor.execute(new Runnable() {
- @Override
- public void run() {
- applyTasks(req, taskEffect, tasks);
- }
- });
- }
-
- private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
+ public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
for (Task task : tasks) {
TaskExecutionResult result = applyTask(effect, task);
@@ -94,17 +85,17 @@ public class QueueExecutor {
* <p/>
* If the task fails, the status is set to error in a separate transaction.
*/
- private TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
+ public TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
try {
Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
@Override
public Integer doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).update(task.markProcessing());
+ return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW);
}
});
if (count == 0) {
- log.trace("Missed task {}", task.id());
+ log.warn("Missed task {}", task.id());
return MISSED;
}