aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/JdbcQueueService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcQueueService.java')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java24
1 files changed, 15 insertions, 9 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index edd6c80..cb7af4b 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -12,7 +12,6 @@ import java.util.List;
import static io.trygvis.queue.QueueService.TaskExecutionRequest;
import static io.trygvis.queue.Task.TaskState.NEW;
-import static io.trygvis.queue.Task.TaskState.PROCESSING;
public class JdbcQueueService {
@@ -27,15 +26,22 @@ public class JdbcQueueService {
this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
- final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
- @Override
- public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW);
- }
- });
+ public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
+ log.info("Consuming tasks: request={}", req);
+
+ List<Task> tasks;
+ do {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Consuming chunk with {} tasks", tasks.size());
- applyTasks(req, effect, tasks);
+ applyTasks(req, effect, tasks);
+ } while (tasks.size() > 0);
}
public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException {