aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/QueueController.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/QueueController.java')
-rw-r--r--src/main/java/io/trygvis/async/QueueController.java70
1 files changed, 46 insertions, 24 deletions
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java
index 8aed91f..a343d42 100644
--- a/src/main/java/io/trygvis/async/QueueController.java
+++ b/src/main/java/io/trygvis/async/QueueController.java
@@ -1,7 +1,6 @@
package io.trygvis.async;
import io.trygvis.queue.QueueExecutor;
-import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.SqlEffect;
import io.trygvis.queue.SqlEffectExecutor;
@@ -13,11 +12,10 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static io.trygvis.queue.QueueExecutor.TaskExecutionResult;
@@ -47,6 +45,8 @@ public class QueueController {
private ScheduledThreadPoolExecutor executor;
+ private List<Runnable> stopListeners = new ArrayList<>();
+
public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) {
this.queueSystem = queueSystem;
this.req = req;
@@ -55,36 +55,50 @@ public class QueueController {
this.queue = queue;
}
- public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List<Task> tasks,
- ScheduledThreadPoolExecutor executor) {
- ExecutorCompletionService<TaskExecutionResult> completionService = new ExecutorCompletionService<>(executor);
+ public void scheduleAll(final List<Task> tasks) throws InterruptedException {
+ ExecutorCompletionService<TaskExecutionResult> service = new ExecutorCompletionService<>(executor);
- List<Future<TaskExecutionResult>> results = new ArrayList<>(tasks.size());
+ for (final Task task : tasks) {
+ service.submit(new Callable<TaskExecutionResult>() {
+ @Override
+ public TaskExecutionResult call() throws Exception {
+ return queue.applyTask(taskEffect, task);
+ }
+ });
+ }
+ }
+ public void invokeAll(final List<Task> tasks) throws InterruptedException {
+ List<Callable<TaskExecutionResult>> callables = new ArrayList<>(tasks.size());
for (final Task task : tasks) {
- results.add(completionService.submit(new Callable<TaskExecutionResult>() {
+ callables.add(new Callable<TaskExecutionResult>() {
@Override
public TaskExecutionResult call() throws Exception {
- return queue.applyTask(effect, task);
+ return queue.applyTask(taskEffect, task);
}
- }));
+ });
}
- for (Future<TaskExecutionResult> result : results) {
- while(!result.isDone()) {
+ executor.invokeAll(callables);
+ }
+
+ public void hint(final long id) throws SQLException {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ Task task = queueSystem.createTaskDao(c).findById(id);
+
try {
- result.get();
- break;
+ scheduleAll(Collections.singletonList(task));
} catch (InterruptedException e) {
- if(shouldRun) {
- continue;
- }
- } catch (ExecutionException e) {
- log.error("Unexpected exception.", e);
- break;
+ throw new SQLException(e);
}
}
- }
+ });
+ }
+
+ public synchronized void addOnStopListener(Runnable runnable) {
+ stopListeners.add(runnable);
}
private class QueueThread implements Runnable {
@@ -102,7 +116,7 @@ public class QueueController {
log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name);
if (tasks.size() > 0) {
- executeTasks(req, taskEffect, tasks, executor);
+ invokeAll(tasks);
}
} catch (Throwable e) {
if (shouldRun) {
@@ -111,7 +125,7 @@ public class QueueController {
}
// If we found exactly the same number of tasks that we asked for, there is most likely more to go.
- if (tasks != null && tasks.size() == req.chunkSize) {
+ if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) {
log.info("Got a full chunk, continuing directly.");
continue;
}
@@ -122,7 +136,7 @@ public class QueueController {
checkForNewTasks = false;
} else {
try {
- wait(queue.queue.interval);
+ wait(req.interval(queue.queue));
} catch (InterruptedException e) {
// ignore
}
@@ -160,6 +174,14 @@ public class QueueController {
log.info("Stopping thread for queue {}", queue.queue.name);
+ for (Runnable runnable : stopListeners) {
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ log.error("Error while running stop listener " + runnable, e);
+ }
+ }
+
shouldRun = false;
// TODO: empty out the currently executing tasks.