package io.trygvis.queue; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; import org.springframework.transaction.support.TransactionSynchronization; import org.springframework.transaction.support.TransactionSynchronizationAdapter; import org.springframework.transaction.support.TransactionTemplate; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.SECONDS; import static org.springframework.transaction.annotation.Propagation.REQUIRED; import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; @Component public class JdbcAsyncService implements AsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); private final Map queues = new HashMap<>(); @Autowired private TransactionTemplate transactionTemplate; @Autowired private QueueDao queueDao; @Autowired private TaskDao taskDao; @Transactional(propagation = REQUIRED) public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { log.info("registerQueue: ENTER"); Queue q = queueDao.findByName(name); log.info("q = {}", q); final long interval_; if (q == null) { q = new Queue(name, interval); queueDao.insert(q); interval_ = interval; } else { // Found an existing queue. Use the Settings from the database. interval_ = q.interval; } final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); queues.put(name, queueThread); registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { log.info("Transaction completed with status = {}", status); if (status == TransactionSynchronization.STATUS_COMMITTED) { log.info("Starting thread for queue {} with poll interval = {}s", name, interval); executor.scheduleAtFixedRate(new Runnable() { public void run() { queueThread.ping(); } }, 10, interval_, SECONDS); Thread thread = new Thread(queueThread, name); thread.setDaemon(true); thread.start(); } } }); log.info("registerQueue: LEAVE"); return q; } public Queue getQueue(String name) { QueueThread queueThread = queues.get(name); if (queueThread == null) { throw new RuntimeException("No such queue: '" + name + "'."); } return queueThread.queue; } @Transactional(propagation = REQUIRED) public Task schedule(final Queue queue, String... args) { return scheduleInner(null, queue, args); } public Task schedule(long parent, Queue queue, String... args) { return scheduleInner(parent, queue, args); } private Task scheduleInner(Long parent, final Queue queue, String... args) { Date scheduled = new Date(); StringBuilder arguments = new StringBuilder(); for (String arg : args) { arguments.append(arg).append(' '); } long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString()); Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args)); log.info("Created task = {}", task); registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { if (status == TransactionSynchronization.STATUS_COMMITTED) { queues.get(queue.name).ping(); } } }); return task; } @Transactional public Task await(Task task, long timeout) { final long start = currentTimeMillis(); final long end = start + timeout; while (currentTimeMillis() < end) { task = update(task); try { sleep(100); } catch (InterruptedException e) { // break } } return task; } @Transactional(readOnly = true) public Task update(Task ref) { return taskDao.findById(ref.id); } }