package io.trygvis.queue; import org.quartz.*; import org.slf4j.*; import org.springframework.beans.factory.annotation.*; import org.springframework.stereotype.*; import org.springframework.transaction.annotation.*; import org.springframework.transaction.support.*; import java.util.*; import java.util.concurrent.*; import static java.util.Arrays.*; import static java.util.concurrent.TimeUnit.*; import static org.springframework.transaction.annotation.Propagation.*; import static org.springframework.transaction.support.TransactionSynchronizationManager.*; @Component public class JdbcAsyncService implements AsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); private final Map queues = new HashMap<>(); @Autowired private TransactionTemplate transactionTemplate; @Autowired private QueueDao queueDao; @Autowired private TaskDao taskDao; @Transactional(propagation = REQUIRED) public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { log.info("registerQueue: ENTER"); Queue q = queueDao.findByName(name); log.info("q = {}", q); final long interval_; if (q == null) { q = new Queue(name, interval); queueDao.insert(q); interval_ = interval; } else { // Found an existing queue. Use the Settings from the database. interval_ = q.interval; } final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable); queues.put(name, queueThread); registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { log.info("Transaction completed with status = {}", status); if (status == TransactionSynchronization.STATUS_COMMITTED) { log.info("Starting thread for queue {} with poll interval = {}s", name, interval); executor.scheduleAtFixedRate(new Runnable() { public void run() { queueThread.ping(); } }, 10, interval_, SECONDS); Thread thread = new Thread(queueThread, name); thread.setDaemon(true); thread.start(); } } }); log.info("registerQueue: LEAVE"); return q; } public Queue getQueue(String name) { QueueThread queueThread = queues.get(name); if (queueThread == null) { throw new RuntimeException("No such queue: '" + name + "'."); } return queueThread.queue; } @Transactional(propagation = REQUIRED) public Task schedule(final Queue queue, String... args) { Date scheduled = new Date(); StringBuilder arguments = new StringBuilder(); for (String arg : args) { arguments.append(arg).append(' '); } long id = taskDao.insert(queue.name, scheduled, arguments.toString()); Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args)); log.info("Created task = {}", task); // queues.get(queue.name).ping(); // try { // Thread.sleep(500); // } catch (InterruptedException e) { // e.printStackTrace(); // } registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { if (status == TransactionSynchronization.STATUS_COMMITTED) { queues.get(queue.name).ping(); } } }); return task; } @Transactional(readOnly = true) public Task update(Task ref) { return taskDao.findById(ref.id); } }