diff options
Diffstat (limited to 'src/main/java/io/trygvis/async/QueueThread.java')
-rw-r--r-- | src/main/java/io/trygvis/async/QueueThread.java | 149 |
1 files changed, 0 insertions, 149 deletions
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java deleted file mode 100644 index 61196b6..0000000 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ /dev/null @@ -1,149 +0,0 @@ -package io.trygvis.async; - -import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; -import io.trygvis.queue.QueueSystem; -import io.trygvis.queue.Task; -import io.trygvis.queue.TaskEffect; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.SQLException; -import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import static io.trygvis.queue.QueueService.TaskExecutionRequest; -import static io.trygvis.queue.Task.TaskState.NEW; - -class QueueThread implements Runnable { - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final QueueSystem queueSystem; - - private final JdbcQueueService queueService; - - private final SqlEffectExecutor sqlEffectExecutor; - - private final TaskEffect taskEffect; - - public final Queue queue; - - private boolean shouldRun = true; - - private boolean checkForNewTasks; - - private boolean available; - - private boolean running; - - private Thread thread; - - QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) { - this.queueSystem = queueSystem; - this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; - this.queueService = queueSystem.createQueueService(); - this.taskEffect = taskEffect; - this.queue = queue; - } - - public void ping() { - synchronized (this) { - System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks); - if (available) { - log.info("Sending ping to " + queue); - notifyAll(); - } else { - checkForNewTasks = true; - } - } - } - - public void run() { - while (shouldRun) { - try { - TaskExecutionRequest req = new TaskExecutionRequest(100, true); - - List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { - @Override - public List<Task> doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100); - } - }); - - log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - - if (tasks.size() > 0) { - queueService.executeTask(req, taskEffect, tasks); - } - - // If we found exactly the same number of tasks that we asked for, there is most likely more to go. - if (tasks.size() == req.chunkSize) { - continue; - } - } catch (Throwable e) { - if (shouldRun) { - log.warn("Error while executing tasks.", e); - } - } - - synchronized (this) { - available = true; - - if (checkForNewTasks) { - log.info("Ping received!"); - checkForNewTasks = false; - } else { - try { - wait(queue.interval); - } catch (InterruptedException e) { - // ignore - } - } - - available = false; - } - } - - log.info("Thread for queue {} has stopped.", queue.name); - running = false; - synchronized (this) { - this.notifyAll(); - } - } - - public synchronized void start(ScheduledThreadPoolExecutor executor) { - if (running) { - throw new IllegalStateException("Already running"); - } - - log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval); - - running = true; - - thread = new Thread(this, queue.name); - thread.setDaemon(true); - thread.start(); - } - - public synchronized void stop() { - if (!running) { - return; - } - - log.info("Stopping thread for queue {}", queue.name); - - shouldRun = false; - - thread.interrupt(); - while (running) { - try { - wait(1000); - } catch (InterruptedException e) { - // continue - } - thread.interrupt(); - } - thread = null; - } -} |