aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/QueueController.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/QueueController.java')
-rw-r--r--src/main/java/io/trygvis/async/QueueController.java46
1 files changed, 44 insertions, 2 deletions
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java
index ea1c42d..863d6a5 100644
--- a/src/main/java/io/trygvis/async/QueueController.java
+++ b/src/main/java/io/trygvis/async/QueueController.java
@@ -1,6 +1,7 @@
package io.trygvis.async;
import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -9,9 +10,15 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult;
import static io.trygvis.queue.QueueService.TaskExecutionRequest;
import static io.trygvis.queue.Task.TaskState.NEW;
@@ -46,6 +53,38 @@ public class QueueController {
this.queue = queue;
}
+ public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List<Task> tasks,
+ ScheduledThreadPoolExecutor executor) {
+ ExecutorCompletionService<TaskExecutionResult> completionService = new ExecutorCompletionService<>(executor);
+
+ List<Future<TaskExecutionResult>> results = new ArrayList<>(tasks.size());
+
+ for (final Task task : tasks) {
+ results.add(completionService.submit(new Callable<TaskExecutionResult>() {
+ @Override
+ public TaskExecutionResult call() throws Exception {
+ return queue.applyTask(effect, task);
+ }
+ }));
+ }
+
+ for (Future<TaskExecutionResult> result : results) {
+ while(!result.isDone()) {
+ try {
+ result.get();
+ break;
+ } catch (InterruptedException e) {
+ if(shouldRun) {
+ continue;
+ }
+ } catch (ExecutionException e) {
+ log.error("Unexpected exception.", e);
+ break;
+ }
+ }
+ }
+ }
+
private class QueueThread implements Runnable {
public void run() {
while (shouldRun) {
@@ -61,7 +100,7 @@ public class QueueController {
log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name);
if (tasks.size() > 0) {
- queue.executeTasks(req, taskEffect, tasks, executor);
+ executeTasks(req, taskEffect, tasks, executor);
}
} catch (Throwable e) {
if (shouldRun) {
@@ -71,6 +110,7 @@ public class QueueController {
// If we found exactly the same number of tasks that we asked for, there is most likely more to go.
if (tasks != null && tasks.size() == req.chunkSize) {
+ log.info("Got a full chunk, continuing directly.");
continue;
}
@@ -101,7 +141,7 @@ public class QueueController {
throw new IllegalStateException("Already running");
}
- log.info("Starting thread for queue {} with poll interval = {}s", queue.queue.name, queue.queue.interval);
+ log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval);
running = true;
this.executor = executor;
@@ -120,6 +160,8 @@ public class QueueController {
shouldRun = false;
+ // TODO: empty out the currently executing tasks.
+
thread.interrupt();
while (running) {
try {