package io.trygvis.queue; import io.trygvis.data.*; import io.trygvis.model.Queue; import io.trygvis.model.*; import org.quartz.*; import org.slf4j.*; import org.springframework.beans.factory.annotation.*; import org.springframework.stereotype.*; import org.springframework.transaction.*; import org.springframework.transaction.annotation.*; import org.springframework.transaction.support.*; import javax.persistence.*; import java.util.*; import java.util.concurrent.*; import static java.util.concurrent.TimeUnit.*; import static org.springframework.transaction.support.TransactionSynchronizationManager.*; @SuppressWarnings("SpringJavaAutowiringInspection") @Component public class JpaAsyncService implements AsyncService/*, ApplicationContextAware*/ { private final Logger log = LoggerFactory.getLogger(getClass()); private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); @PersistenceContext private EntityManager entityManager; @Autowired private TransactionTemplate transactionTemplate; @Autowired private QueueRepository queueRepository; @Autowired private TaskRepository taskRepository; @Transactional public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { log.info("registerQueue: ENTER"); Queue q = queueRepository.findByName(name); log.info("q = {}", q); if (q == null) { q = new Queue(name, interval); q = queueRepository.save(q); } else { boolean dirty = false; if (interval != q.getInterval()) { q.setInterval(interval); dirty = true; } if (dirty) { q = queueRepository.save(q); } } log.info("q = {}", q); entityManager.flush(); // entityManager.detach(q); log.info("q = {}", q); JpaQueueRef queueRef = new JpaQueueRef(q); log.info("registerQueue: LEAVE"); registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef)); return queueRef; } @Transactional(readOnly = true) public JpaQueueRef getQueue(String name) { Queue queue = queueRepository.findByName(name); if (queue == null) { throw new RollbackException("No such queue: '" + name + "'."); } entityManager.detach(queue); return new JpaQueueRef(queue); } @Transactional public JpaExecutionRef schedule(JpaQueueRef queue, String... args) { log.info("schedule: ENTER"); Date scheduled = new Date(); Task task = new Task(queue.queue, scheduled, args); log.info("task = {}", task); taskRepository.save(task); log.info("task = {}", task); // entityManager.detach(task); log.info("schedule: LEAVE"); return new JpaExecutionRef(task); } @Transactional(readOnly = true) public JpaExecutionRef update(JpaExecutionRef ref) { return new JpaExecutionRef(taskRepository.findOne(ref.task.getId())); } public static class JpaQueueRef implements AsyncService.QueueRef { public final Queue queue; JpaQueueRef(Queue queue) { this.queue = queue; } public String toString() { return "JpaQueueRef{" + "queue=" + queue + '}'; } } public static class JpaExecutionRef implements AsyncService.ExecutionRef { private final Task task; public JpaExecutionRef(Task task) { this.task = task; } public List getArguments() { return Arrays.asList(task.getArguments()); } public Date getScheduled() { return task.getScheduled(); } public Date getLastRun() { return task.getLastRun(); } public Date getCompleted() { return task.getCompleted(); } public boolean isDone() { return task.isDone(); } public String toString() { return "JpaExecutionRef{" + "task=" + task + '}'; } } private static class TaskFailureException extends RuntimeException { public TaskFailureException(Exception e) { super(e); } } private class CheckTimerTask implements Runnable { private final AsyncCallable callable; private final JpaQueueRef queueRef; private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) { this.callable = callable; this.queueRef = queueRef; } public void run() { log.info("JpaAsyncService$CheckTimerTask.run"); List tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue); System.out.println("tasks.size() = " + tasks.size()); try { for (final Task task : tasks) { try { executeTask(task); } catch (TransactionException | TaskFailureException e) { log.warn("Task execution failed", e); } } } catch (Exception e) { log.warn("Error while executing tasks.", e); } } private void executeTask(final Task task) { final Date run = new Date(); log.info("Setting last run on task. date = {}, task = {}", run, task); transactionTemplate.execute(new TransactionCallbackWithoutResult() { protected void doInTransactionWithoutResult(TransactionStatus status) { task.registerRun(); taskRepository.save(task); } }); transactionTemplate.execute(new TransactionCallbackWithoutResult() { protected void doInTransactionWithoutResult(TransactionStatus status) { try { callable.run(); Date completed = new Date(); log.info("Setting completed on task. date = {}, task = {}", completed, task); task.registerComplete(completed); taskRepository.save(task); } catch (Exception e) { throw new TaskFailureException(e); } } }); } } private class MyTransactionSynchronization implements TransactionSynchronization { private final AsyncCallable callable; private final int interval; private final JpaQueueRef queueRef; public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) { this.callable = callable; this.interval = interval; this.queueRef = queueRef; } public void suspend() { } public void resume() { } public void flush() { } public void beforeCommit(boolean readOnly) { } public void beforeCompletion() { } public void afterCommit() { } public void afterCompletion(int status) { log.info("status = {}", status); if (status == TransactionSynchronization.STATUS_COMMITTED) { executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS); } } } }