diff options
Diffstat (limited to 'JpaAsyncService.java')
-rwxr-xr-x | JpaAsyncService.java | 242 |
1 files changed, 242 insertions, 0 deletions
diff --git a/JpaAsyncService.java b/JpaAsyncService.java new file mode 100755 index 0000000..eaef782 --- /dev/null +++ b/JpaAsyncService.java @@ -0,0 +1,242 @@ +package io.trygvis.queue; + +import static java.util.concurrent.TimeUnit.*; +import static org.springframework.transaction.support.TransactionSynchronizationManager.*; + +@SuppressWarnings("SpringJavaAutowiringInspection") +@Component +public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>/*, ApplicationContextAware*/ { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + @PersistenceContext + private EntityManager entityManager; + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private QueueRepository queueRepository; + + @Autowired + private TaskRepository taskRepository; + + @Transactional + public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { + log.info("registerQueue: ENTER"); + + Queue q = queueRepository.findByName(name); + + log.info("q = {}", q); + + if (q == null) { + q = new Queue(name, interval); + q = queueRepository.save(q); + } else { + boolean dirty = false; + if (interval != q.getInterval()) { + q.setInterval(interval); + dirty = true; + } + + if (dirty) { + q = queueRepository.save(q); + } + } + + log.info("q = {}", q); + entityManager.flush(); +// entityManager.detach(q); + log.info("q = {}", q); + + JpaQueueRef queueRef = new JpaQueueRef(q); + + log.info("registerQueue: LEAVE"); + + registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef)); + + return queueRef; + } + + @Transactional(readOnly = true) + public JpaQueueRef getQueue(String name) { + Queue queue = queueRepository.findByName(name); + + if (queue == null) { + throw new RollbackException("No such queue: '" + name + "'."); + } + + entityManager.detach(queue); + + return new JpaQueueRef(queue); + } + + @Transactional + public JpaExecutionRef schedule(JpaQueueRef queue, String... args) { + log.info("schedule: ENTER"); + Date scheduled = new Date(); + Task task = new Task(queue.queue, scheduled, args); + log.info("task = {}", task); + taskRepository.save(task); + log.info("task = {}", task); +// entityManager.detach(task); + log.info("schedule: LEAVE"); + return new JpaExecutionRef(task); + } + + @Transactional(readOnly = true) + public JpaExecutionRef update(JpaExecutionRef ref) { + return new JpaExecutionRef(taskRepository.findOne(ref.task.getId())); + } + + public static class JpaQueueRef implements AsyncService.QueueRef { + public final Queue queue; + + JpaQueueRef(Queue queue) { + this.queue = queue; + } + + public String toString() { + return "JpaQueueRef{" + + "queue=" + queue + + '}'; + } + } + + public static class JpaExecutionRef implements AsyncService.ExecutionRef { + private final Task task; + + public JpaExecutionRef(Task task) { + this.task = task; + } + + public List<String> getArguments() { + return Arrays.asList(task.getArguments()); + } + + public Date getScheduled() { + return task.getScheduled(); + } + + public Date getLastRun() { + return task.getLastRun(); + } + + public Date getCompleted() { + return task.getCompleted(); + } + + public boolean isDone() { + return task.isDone(); + } + + public String toString() { + return "JpaExecutionRef{" + + "task=" + task + + '}'; + } + } + + private static class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } + } + + private class CheckTimerTask implements Runnable { + private final AsyncCallable callable; + private final JpaQueueRef queueRef; + + private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) { + this.callable = callable; + this.queueRef = queueRef; + } + + public void run() { + log.info("JpaAsyncService$CheckTimerTask.run"); + + List<Task> tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue); + + System.out.println("tasks.size() = " + tasks.size()); + + try { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } catch (Exception e) { + log.warn("Error while executing tasks.", e); + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + task.registerRun(); + taskRepository.save(task); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(); + Date completed = new Date(); + log.info("Setting completed on task. date = {}, task = {}", completed, task); + task.registerComplete(completed); + taskRepository.save(task); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } + } + + private class MyTransactionSynchronization implements TransactionSynchronization { + + private final AsyncCallable callable; + + private final int interval; + + private final JpaQueueRef queueRef; + + public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) { + this.callable = callable; + this.interval = interval; + this.queueRef = queueRef; + } + + public void suspend() { + } + + public void resume() { + } + + public void flush() { + } + + public void beforeCommit(boolean readOnly) { + } + + public void beforeCompletion() { + } + + public void afterCommit() { + } + + public void afterCompletion(int status) { + log.info("status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS); + } + } + } +} |