aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/QueueThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/QueueThread.java')
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java7
1 files changed, 5 insertions, 2 deletions
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index 33753a3..558e769 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -12,6 +12,9 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
+import static io.trygvis.queue.QueueService.TaskExecutionRequest;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
class QueueThread implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -56,14 +59,14 @@ class QueueThread implements Runnable {
List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
@Override
public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByNameAndCompletedIsNull(queue.name);
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW);
}
});
log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
if (tasks.size() > 0) {
- queueService.executeTask(taskEffect, tasks);
+ queueService.executeTask(new TaskExecutionRequest(true), taskEffect, tasks);
}
} catch (Throwable e) {
log.warn("Error while executing tasks.", e);