diff options
Diffstat (limited to 'src/main/java/io/trygvis/async/QueueThread.java')
-rw-r--r-- | src/main/java/io/trygvis/async/QueueThread.java | 7 |
1 files changed, 5 insertions, 2 deletions
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 33753a3..558e769 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -12,6 +12,9 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.List; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; +import static io.trygvis.queue.Task.TaskState.NEW; + class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -56,14 +59,14 @@ class QueueThread implements Runnable { List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { @Override public List<Task> doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByNameAndCompletedIsNull(queue.name); + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { - queueService.executeTask(taskEffect, tasks); + queueService.executeTask(new TaskExecutionRequest(true), taskEffect, tasks); } } catch (Throwable e) { log.warn("Error while executing tasks.", e); |