aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/QueueThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/QueueThread.java')
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java79
1 files changed, 66 insertions, 13 deletions
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index ea77911..61196b6 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import static io.trygvis.queue.QueueService.TaskExecutionRequest;
import static io.trygvis.queue.Task.TaskState.NEW;
@@ -28,11 +29,15 @@ class QueueThread implements Runnable {
public final Queue queue;
- public boolean shouldRun = true;
+ private boolean shouldRun = true;
private boolean checkForNewTasks;
- private boolean busy;
+ private boolean available;
+
+ private boolean running;
+
+ private Thread thread;
QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) {
this.queueSystem = queueSystem;
@@ -44,9 +49,10 @@ class QueueThread implements Runnable {
public void ping() {
synchronized (this) {
- if (!busy) {
+ System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks);
+ if (available) {
log.info("Sending ping to " + queue);
- notify();
+ notifyAll();
} else {
checkForNewTasks = true;
}
@@ -70,27 +76,74 @@ class QueueThread implements Runnable {
if (tasks.size() > 0) {
queueService.executeTask(req, taskEffect, tasks);
}
+
+ // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
+ if (tasks.size() == req.chunkSize) {
+ continue;
+ }
} catch (Throwable e) {
- log.warn("Error while executing tasks.", e);
+ if (shouldRun) {
+ log.warn("Error while executing tasks.", e);
+ }
}
synchronized (this) {
- busy = false;
+ available = true;
if (checkForNewTasks) {
log.info("Ping received!");
checkForNewTasks = false;
- continue;
+ } else {
+ try {
+ wait(queue.interval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
- try {
- wait();
- } catch (InterruptedException e) {
- // ignore
- }
+ available = false;
+ }
+ }
+
+ log.info("Thread for queue {} has stopped.", queue.name);
+ running = false;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ public synchronized void start(ScheduledThreadPoolExecutor executor) {
+ if (running) {
+ throw new IllegalStateException("Already running");
+ }
- busy = true;
+ log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval);
+
+ running = true;
+
+ thread = new Thread(this, queue.name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public synchronized void stop() {
+ if (!running) {
+ return;
+ }
+
+ log.info("Stopping thread for queue {}", queue.name);
+
+ shouldRun = false;
+
+ thread.interrupt();
+ while (running) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // continue
}
+ thread.interrupt();
}
+ thread = null;
}
}