package io.trygvis.async; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; import static java.util.concurrent.TimeUnit.SECONDS; public class JdbcAsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); private final Map queues = new HashMap<>(); private final QueueSystem queueSystem; private final JdbcQueueService queueService; public JdbcAsyncService(QueueSystem queueSystem) { this.queueSystem = queueSystem; this.queueService = queueSystem.createQueueService(); } public void registerQueue(Queue queue, TaskEffect processor) { final QueueThread queueThread = new QueueThread(queueSystem, processor, queue); queues.put(queue.name, queueThread); log.info("registerQueue: LEAVE"); } public void startQueue(ScheduledThreadPoolExecutor executor, String name) { final QueueThread queueThread = queues.get(name); if (queueThread == null) { throw new RuntimeException("No such queue: " + name); } long interval = queueThread.queue.interval; log.info("Starting thread for queue {} with poll interval = {}s", name, interval); executor.scheduleAtFixedRate(new Runnable() { public void run() { queueThread.ping(); } }, 10, interval, SECONDS); Thread thread = new Thread(queueThread, name); thread.setDaemon(true); thread.start(); } public Queue getQueue(String name) { QueueThread queueThread = queues.get(name); if (queueThread == null) { throw new RuntimeException("No such queue: '" + name + "'."); } return queueThread.queue; } public Task schedule(Connection c, final Queue queue, List args) throws SQLException { return scheduleInner(c, null, queue, args); } public Task schedule(Connection c, long parent, Queue queue, List args) throws SQLException { return scheduleInner(c, parent, queue, args); } private Task scheduleInner(Connection c, Long parent, final Queue queue, List args) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); Date scheduled = new Date(); Task task = queueService.schedule(c, queue, parent, scheduled, args); log.info("Created task = {}", task); return task; } public Task await(Connection c, Task task, long timeout) throws SQLException { final long start = currentTimeMillis(); final long end = start + timeout; while (currentTimeMillis() < end) { task = update(c, task); if (task == null) { throw new RuntimeException("The task went away."); } try { sleep(100); } catch (InterruptedException e) { // break } } return task; } public Task update(Connection c, Task ref) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); return taskDao.findById(ref.id()); } }