aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/QueueController.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/QueueController.java')
-rw-r--r--src/main/java/io/trygvis/async/QueueController.java135
1 files changed, 135 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java
new file mode 100644
index 0000000..ea1c42d
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueController.java
@@ -0,0 +1,135 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static io.trygvis.queue.QueueService.TaskExecutionRequest;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
+public class QueueController {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ private final TaskEffect taskEffect;
+
+ private final TaskExecutionRequest req;
+
+ public final QueueExecutor queue;
+
+ private boolean shouldRun = true;
+
+ private boolean checkForNewTasks;
+
+ private boolean running;
+
+ private Thread thread;
+
+ private ScheduledThreadPoolExecutor executor;
+
+ public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) {
+ this.queueSystem = queueSystem;
+ this.req = req;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
+ this.taskEffect = taskEffect;
+ this.queue = queue;
+ }
+
+ private class QueueThread implements Runnable {
+ public void run() {
+ while (shouldRun) {
+ List<Task> tasks = null;
+
+ try {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name);
+
+ if (tasks.size() > 0) {
+ queue.executeTasks(req, taskEffect, tasks, executor);
+ }
+ } catch (Throwable e) {
+ if (shouldRun) {
+ log.warn("Error while executing tasks.", e);
+ }
+ }
+
+ // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
+ if (tasks != null && tasks.size() == req.chunkSize) {
+ continue;
+ }
+
+ synchronized (this) {
+ if (checkForNewTasks) {
+ log.info("Ping received!");
+ checkForNewTasks = false;
+ } else {
+ try {
+ wait(queue.queue.interval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ log.info("Thread for queue {} has stopped.", queue.queue.name);
+ running = false;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+ }
+
+ public synchronized void start(ScheduledThreadPoolExecutor executor) {
+ if (running) {
+ throw new IllegalStateException("Already running");
+ }
+
+ log.info("Starting thread for queue {} with poll interval = {}s", queue.queue.name, queue.queue.interval);
+
+ running = true;
+ this.executor = executor;
+
+ thread = new Thread(new QueueThread(), "queue: " + queue.queue.name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public synchronized void stop() {
+ if (!running) {
+ return;
+ }
+
+ log.info("Stopping thread for queue {}", queue.queue.name);
+
+ shouldRun = false;
+
+ thread.interrupt();
+ while (running) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ thread.interrupt();
+ }
+ thread = null;
+ executor.shutdownNow();
+ }
+}