package io.trygvis.async; import io.trygvis.queue.Queue; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; import static java.util.concurrent.TimeUnit.SECONDS; public class JdbcAsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); private final Map queues = new HashMap<>(); public void registerQueue(SqlEffectExecutor sqlEffectExecutor, Queue queue, AsyncService.AsyncCallable callable) { final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, queue); queues.put(queue.name, queueThread); log.info("registerQueue: LEAVE"); } public void startQueue(ScheduledThreadPoolExecutor executor, String name) { final QueueThread queueThread = queues.get(name); if (queueThread == null) { throw new RuntimeException("No such queue: " + name); } long interval = queueThread.queue.interval; log.info("Starting thread for queue {} with poll interval = {}s", name, interval); executor.scheduleAtFixedRate(new Runnable() { public void run() { queueThread.ping(); } }, 10, interval, SECONDS); Thread thread = new Thread(queueThread, name); thread.setDaemon(true); thread.start(); } public Queue getQueue(String name) { QueueThread queueThread = queues.get(name); if (queueThread == null) { throw new RuntimeException("No such queue: '" + name + "'."); } return queueThread.queue; } public Task schedule(Connection c, final Queue queue, List args) throws SQLException { return scheduleInner(c, null, queue, args); } public Task schedule(Connection c, long parent, Queue queue, List args) throws SQLException { return scheduleInner(c, parent, queue, args); } private Task scheduleInner(Connection c, Long parent, final Queue queue, List args) throws SQLException { TaskDao taskDao = new TaskDao(c); Date scheduled = new Date(); long id = taskDao.insert(parent, queue.name, scheduled, args); Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, args); log.info("Created task = {}", task); return task; } public Task await(Connection c, Task task, long timeout) throws SQLException { final long start = currentTimeMillis(); final long end = start + timeout; while (currentTimeMillis() < end) { task = update(c, task); if (task == null) { throw new RuntimeException("The task went away."); } try { sleep(100); } catch (InterruptedException e) { // break } } return task; } public Task update(Connection c, Task ref) throws SQLException { TaskDao taskDao = new TaskDao(c); return taskDao.findById(ref.id()); } }