diff options
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcQueueService.java')
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcQueueService.java | 24 |
1 files changed, 15 insertions, 9 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index edd6c80..cb7af4b 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -12,7 +12,6 @@ import java.util.List; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; -import static io.trygvis.queue.Task.TaskState.PROCESSING; public class JdbcQueueService { @@ -27,15 +26,22 @@ public class JdbcQueueService { this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException { - final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { - @Override - public List<Task> doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); - } - }); + public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + log.info("Consuming tasks: request={}", req); + + List<Task> tasks; + do { + tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { + @Override + public List<Task> doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); + } + }); + + log.info("Consuming chunk with {} tasks", tasks.size()); - applyTasks(req, effect, tasks); + applyTasks(req, effect, tasks); + } while (tasks.size() > 0); } public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException { |