diff options
Diffstat (limited to 'src/main/java/io/trygvis/async/QueueController.java')
-rw-r--r-- | src/main/java/io/trygvis/async/QueueController.java | 46 |
1 files changed, 44 insertions, 2 deletions
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java index ea1c42d..863d6a5 100644 --- a/src/main/java/io/trygvis/async/QueueController.java +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -1,6 +1,7 @@ package io.trygvis.async; import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -9,9 +10,15 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; @@ -46,6 +53,38 @@ public class QueueController { this.queue = queue; } + public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List<Task> tasks, + ScheduledThreadPoolExecutor executor) { + ExecutorCompletionService<TaskExecutionResult> completionService = new ExecutorCompletionService<>(executor); + + List<Future<TaskExecutionResult>> results = new ArrayList<>(tasks.size()); + + for (final Task task : tasks) { + results.add(completionService.submit(new Callable<TaskExecutionResult>() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(effect, task); + } + })); + } + + for (Future<TaskExecutionResult> result : results) { + while(!result.isDone()) { + try { + result.get(); + break; + } catch (InterruptedException e) { + if(shouldRun) { + continue; + } + } catch (ExecutionException e) { + log.error("Unexpected exception.", e); + break; + } + } + } + } + private class QueueThread implements Runnable { public void run() { while (shouldRun) { @@ -61,7 +100,7 @@ public class QueueController { log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); if (tasks.size() > 0) { - queue.executeTasks(req, taskEffect, tasks, executor); + executeTasks(req, taskEffect, tasks, executor); } } catch (Throwable e) { if (shouldRun) { @@ -71,6 +110,7 @@ public class QueueController { // If we found exactly the same number of tasks that we asked for, there is most likely more to go. if (tasks != null && tasks.size() == req.chunkSize) { + log.info("Got a full chunk, continuing directly."); continue; } @@ -101,7 +141,7 @@ public class QueueController { throw new IllegalStateException("Already running"); } - log.info("Starting thread for queue {} with poll interval = {}s", queue.queue.name, queue.queue.interval); + log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval); running = true; this.executor = executor; @@ -120,6 +160,8 @@ public class QueueController { shouldRun = false; + // TODO: empty out the currently executing tasks. + thread.interrupt(); while (running) { try { |