package io.trygvis.async; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.List; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); private final QueueSystem queueSystem; private final JdbcQueueService queueService; private final SqlEffectExecutor sqlEffectExecutor; private final TaskEffect taskEffect; public final Queue queue; public boolean shouldRun = true; private boolean checkForNewTasks; private boolean busy; QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) { this.queueSystem = queueSystem; this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; this.queueService = queueSystem.createQueueService(); this.taskEffect = taskEffect; this.queue = queue; } public void ping() { synchronized (this) { if (!busy) { log.info("Sending ping to " + queue); notify(); } else { checkForNewTasks = true; } } } public void run() { while (shouldRun) { try { TaskExecutionRequest req = new TaskExecutionRequest(100, true); List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { @Override public List doInConnection(Connection c) throws SQLException { return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { queueService.executeTask(req, taskEffect, tasks); } } catch (Throwable e) { log.warn("Error while executing tasks.", e); } synchronized (this) { busy = false; if (checkForNewTasks) { log.info("Ping received!"); checkForNewTasks = false; continue; } try { wait(); } catch (InterruptedException e) { // ignore } busy = true; } } } }