package io.trygvis.async; import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.SqlEffect; import io.trygvis.queue.SqlEffectExecutor; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; public class QueueController { private final Logger log = LoggerFactory.getLogger(getClass()); private final QueueSystem queueSystem; private final SqlEffectExecutor sqlEffectExecutor; private final TaskEffect taskEffect; private final TaskExecutionRequest req; public final QueueExecutor queue; private boolean shouldRun = true; private boolean checkForNewTasks; private boolean running; private Thread thread; private ScheduledThreadPoolExecutor executor; private List stopListeners = new ArrayList<>(); public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { this.queueSystem = queueSystem; this.req = req; this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; this.taskEffect = taskEffect; this.queue = queue; } public void scheduleAll(final List tasks) throws InterruptedException { ExecutorCompletionService service = new ExecutorCompletionService<>(executor); for (final Task task : tasks) { service.submit(new Callable() { @Override public TaskExecutionResult call() throws Exception { return queue.applyTask(taskEffect, task); } }); } } public void invokeAll(final List tasks) throws InterruptedException { List> callables = new ArrayList<>(tasks.size()); for (final Task task : tasks) { callables.add(new Callable() { @Override public TaskExecutionResult call() throws Exception { return queue.applyTask(taskEffect, task); } }); } executor.invokeAll(callables); } public void hint(final long id) throws SQLException { sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { Task task = queueSystem.createTaskDao(c).findById(id); try { scheduleAll(Collections.singletonList(task)); } catch (InterruptedException e) { throw new SQLException(e); } } }); } public synchronized void addOnStopListener(Runnable runnable) { stopListeners.add(runnable); } private class QueueThread implements Runnable { public void run() { while (shouldRun) { List tasks = null; try { tasks = sqlEffectExecutor.transaction(new SqlEffect>() { public List doInConnection(Connection c) throws SQLException { return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); if (tasks.size() > 0) { invokeAll(tasks); } } catch (Throwable e) { if (shouldRun) { log.warn("Error while executing tasks.", e); } } // If we found exactly the same number of tasks that we asked for, there is most likely more to go. if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) { log.info("Got a full chunk, continuing directly."); continue; } synchronized (this) { if (checkForNewTasks) { log.info("Ping received!"); checkForNewTasks = false; } else { try { wait(req.interval(queue.queue)); } catch (InterruptedException e) { // ignore } } } } log.info("Thread for queue {} has stopped.", queue.queue.name); running = false; synchronized (this) { this.notifyAll(); } } } public synchronized void start(ScheduledThreadPoolExecutor executor) { if (running) { throw new IllegalStateException("Already running"); } log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval); running = true; this.executor = executor; thread = new Thread(new QueueThread(), "queue: " + queue.queue.name); thread.setDaemon(true); thread.start(); } public synchronized void stop() { if (!running) { return; } log.info("Stopping thread for queue {}", queue.queue.name); for (Runnable runnable : stopListeners) { try { runnable.run(); } catch (Throwable e) { log.error("Error while running stop listener " + runnable, e); } } shouldRun = false; // TODO: empty out the currently executing tasks. thread.interrupt(); while (running) { try { wait(1000); } catch (InterruptedException e) { // continue } thread.interrupt(); } thread = null; executor.shutdownNow(); } }