aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/QueueThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/QueueThread.java')
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java149
1 files changed, 0 insertions, 149 deletions
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
deleted file mode 100644
index 61196b6..0000000
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package io.trygvis.async;
-
-import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
-import io.trygvis.queue.QueueSystem;
-import io.trygvis.queue.Task;
-import io.trygvis.queue.TaskEffect;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import static io.trygvis.queue.QueueService.TaskExecutionRequest;
-import static io.trygvis.queue.Task.TaskState.NEW;
-
-class QueueThread implements Runnable {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final QueueSystem queueSystem;
-
- private final JdbcQueueService queueService;
-
- private final SqlEffectExecutor sqlEffectExecutor;
-
- private final TaskEffect taskEffect;
-
- public final Queue queue;
-
- private boolean shouldRun = true;
-
- private boolean checkForNewTasks;
-
- private boolean available;
-
- private boolean running;
-
- private Thread thread;
-
- QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) {
- this.queueSystem = queueSystem;
- this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
- this.queueService = queueSystem.createQueueService();
- this.taskEffect = taskEffect;
- this.queue = queue;
- }
-
- public void ping() {
- synchronized (this) {
- System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks);
- if (available) {
- log.info("Sending ping to " + queue);
- notifyAll();
- } else {
- checkForNewTasks = true;
- }
- }
- }
-
- public void run() {
- while (shouldRun) {
- try {
- TaskExecutionRequest req = new TaskExecutionRequest(100, true);
-
- List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
- @Override
- public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100);
- }
- });
-
- log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
-
- if (tasks.size() > 0) {
- queueService.executeTask(req, taskEffect, tasks);
- }
-
- // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
- if (tasks.size() == req.chunkSize) {
- continue;
- }
- } catch (Throwable e) {
- if (shouldRun) {
- log.warn("Error while executing tasks.", e);
- }
- }
-
- synchronized (this) {
- available = true;
-
- if (checkForNewTasks) {
- log.info("Ping received!");
- checkForNewTasks = false;
- } else {
- try {
- wait(queue.interval);
- } catch (InterruptedException e) {
- // ignore
- }
- }
-
- available = false;
- }
- }
-
- log.info("Thread for queue {} has stopped.", queue.name);
- running = false;
- synchronized (this) {
- this.notifyAll();
- }
- }
-
- public synchronized void start(ScheduledThreadPoolExecutor executor) {
- if (running) {
- throw new IllegalStateException("Already running");
- }
-
- log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval);
-
- running = true;
-
- thread = new Thread(this, queue.name);
- thread.setDaemon(true);
- thread.start();
- }
-
- public synchronized void stop() {
- if (!running) {
- return;
- }
-
- log.info("Stopping thread for queue {}", queue.name);
-
- shouldRun = false;
-
- thread.interrupt();
- while (running) {
- try {
- wait(1000);
- } catch (InterruptedException e) {
- // continue
- }
- thread.interrupt();
- }
- thread = null;
- }
-}