package io.trygvis.async; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; public class JdbcAsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); private final Map queues = new HashMap<>(); private final QueueSystem queueSystem; private final JdbcQueueService queueService; public JdbcAsyncService(QueueSystem queueSystem) { this.queueSystem = queueSystem; this.queueService = queueSystem.createQueueService(); } public synchronized void registerQueue(Queue queue, TaskEffect processor) { final QueueThread queueThread = new QueueThread(queueSystem, processor, queue); queues.put(queue.name, queueThread); log.info("registerQueue: LEAVE"); } public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) { getQueueThread(queue.name).start(executor); } public synchronized void stopQueue(Queue queue) { QueueThread queueThread = queues.remove(queue.name); if (queueThread == null) { throw new RuntimeException("No such queue: '" + queue.name + "'."); } queueThread.stop(); } public Queue getQueue(String name) { QueueThread queueThread = getQueueThread(name); return queueThread.queue; } public Task schedule(Connection c, final Queue queue, List args) throws SQLException { return scheduleInner(c, null, queue, args); } public Task schedule(Connection c, long parent, Queue queue, List args) throws SQLException { return scheduleInner(c, parent, queue, args); } private Task scheduleInner(Connection c, Long parent, final Queue queue, List args) throws SQLException { Date scheduled = new Date(); Task task = queueService.schedule(c, queue, parent, scheduled, args); log.info("Created task = {}", task); return task; } public Task await(Connection c, Task task, long timeout) throws SQLException { final long start = currentTimeMillis(); final long end = start + timeout; while (currentTimeMillis() < end) { task = update(c, task); if (task == null) { throw new RuntimeException("The task went away."); } try { sleep(100); } catch (InterruptedException e) { // break } } return task; } public Task update(Connection c, Task ref) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); return taskDao.findById(ref.id()); } private QueueThread getQueueThread(String name) { QueueThread queueThread = queues.get(name); if (queueThread == null) { throw new RuntimeException("No such queue: '" + name + "'."); } return queueThread; } }