package io.trygvis.async; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); private final QueueSystem queueSystem; private final JdbcQueueService queueService; private final SqlEffectExecutor sqlEffectExecutor; private final TaskEffect taskEffect; public final Queue queue; private boolean shouldRun = true; private boolean checkForNewTasks; private boolean available; private boolean running; private Thread thread; QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) { this.queueSystem = queueSystem; this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; this.queueService = queueSystem.createQueueService(); this.taskEffect = taskEffect; this.queue = queue; } public void ping() { synchronized (this) { System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks); if (available) { log.info("Sending ping to " + queue); notifyAll(); } else { checkForNewTasks = true; } } } public void run() { while (shouldRun) { try { TaskExecutionRequest req = new TaskExecutionRequest(100, true); List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { @Override public List doInConnection(Connection c) throws SQLException { return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { queueService.executeTask(req, taskEffect, tasks); } // If we found exactly the same number of tasks that we asked for, there is most likely more to go. if (tasks.size() == req.chunkSize) { continue; } } catch (Throwable e) { if (shouldRun) { log.warn("Error while executing tasks.", e); } } synchronized (this) { available = true; if (checkForNewTasks) { log.info("Ping received!"); checkForNewTasks = false; } else { try { wait(queue.interval); } catch (InterruptedException e) { // ignore } } available = false; } } log.info("Thread for queue {} has stopped.", queue.name); running = false; synchronized (this) { this.notifyAll(); } } public synchronized void start(ScheduledThreadPoolExecutor executor) { if (running) { throw new IllegalStateException("Already running"); } log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval); running = true; thread = new Thread(this, queue.name); thread.setDaemon(true); thread.start(); } public synchronized void stop() { if (!running) { return; } log.info("Stopping thread for queue {}", queue.name); shouldRun = false; thread.interrupt(); while (running) { try { wait(1000); } catch (InterruptedException e) { // continue } thread.interrupt(); } thread = null; } }