package io.trygvis.async; import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.SqlEffect; import io.trygvis.queue.SqlEffectExecutor; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; public class QueueController { private final Logger log = LoggerFactory.getLogger(getClass()); private final QueueSystem queueSystem; private final SqlEffectExecutor sqlEffectExecutor; private final TaskEffect taskEffect; private final TaskExecutionRequest req; public final QueueExecutor queue; private boolean shouldRun = true; private boolean checkForNewTasks; private boolean running; private Thread thread; private ScheduledThreadPoolExecutor executor; public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { this.queueSystem = queueSystem; this.req = req; this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; this.taskEffect = taskEffect; this.queue = queue; } public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List tasks, ScheduledThreadPoolExecutor executor) { ExecutorCompletionService completionService = new ExecutorCompletionService<>(executor); List> results = new ArrayList<>(tasks.size()); for (final Task task : tasks) { results.add(completionService.submit(new Callable() { @Override public TaskExecutionResult call() throws Exception { return queue.applyTask(effect, task); } })); } for (Future result : results) { while(!result.isDone()) { try { result.get(); break; } catch (InterruptedException e) { if(shouldRun) { continue; } } catch (ExecutionException e) { log.error("Unexpected exception.", e); break; } } } } private class QueueThread implements Runnable { public void run() { while (shouldRun) { List tasks = null; try { tasks = sqlEffectExecutor.transaction(new SqlEffect>() { public List doInConnection(Connection c) throws SQLException { return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); if (tasks.size() > 0) { executeTasks(req, taskEffect, tasks, executor); } } catch (Throwable e) { if (shouldRun) { log.warn("Error while executing tasks.", e); } } // If we found exactly the same number of tasks that we asked for, there is most likely more to go. if (tasks != null && tasks.size() == req.chunkSize) { log.info("Got a full chunk, continuing directly."); continue; } synchronized (this) { if (checkForNewTasks) { log.info("Ping received!"); checkForNewTasks = false; } else { try { wait(queue.queue.interval); } catch (InterruptedException e) { // ignore } } } } log.info("Thread for queue {} has stopped.", queue.queue.name); running = false; synchronized (this) { this.notifyAll(); } } } public synchronized void start(ScheduledThreadPoolExecutor executor) { if (running) { throw new IllegalStateException("Already running"); } log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval); running = true; this.executor = executor; thread = new Thread(new QueueThread(), "queue: " + queue.queue.name); thread.setDaemon(true); thread.start(); } public synchronized void stop() { if (!running) { return; } log.info("Stopping thread for queue {}", queue.queue.name); shouldRun = false; // TODO: empty out the currently executing tasks. thread.interrupt(); while (running) { try { wait(1000); } catch (InterruptedException e) { // continue } thread.interrupt(); } thread = null; executor.shutdownNow(); } }