aboutsummaryrefslogtreecommitdiff
path: root/JpaAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'JpaAsyncService.java')
-rwxr-xr-xJpaAsyncService.java242
1 files changed, 242 insertions, 0 deletions
diff --git a/JpaAsyncService.java b/JpaAsyncService.java
new file mode 100755
index 0000000..eaef782
--- /dev/null
+++ b/JpaAsyncService.java
@@ -0,0 +1,242 @@
+package io.trygvis.queue;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
+
+@SuppressWarnings("SpringJavaAutowiringInspection")
+@Component
+public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>/*, ApplicationContextAware*/ {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+
+ @PersistenceContext
+ private EntityManager entityManager;
+
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+
+ @Autowired
+ private QueueRepository queueRepository;
+
+ @Autowired
+ private TaskRepository taskRepository;
+
+ @Transactional
+ public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
+ log.info("registerQueue: ENTER");
+
+ Queue q = queueRepository.findByName(name);
+
+ log.info("q = {}", q);
+
+ if (q == null) {
+ q = new Queue(name, interval);
+ q = queueRepository.save(q);
+ } else {
+ boolean dirty = false;
+ if (interval != q.getInterval()) {
+ q.setInterval(interval);
+ dirty = true;
+ }
+
+ if (dirty) {
+ q = queueRepository.save(q);
+ }
+ }
+
+ log.info("q = {}", q);
+ entityManager.flush();
+// entityManager.detach(q);
+ log.info("q = {}", q);
+
+ JpaQueueRef queueRef = new JpaQueueRef(q);
+
+ log.info("registerQueue: LEAVE");
+
+ registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef));
+
+ return queueRef;
+ }
+
+ @Transactional(readOnly = true)
+ public JpaQueueRef getQueue(String name) {
+ Queue queue = queueRepository.findByName(name);
+
+ if (queue == null) {
+ throw new RollbackException("No such queue: '" + name + "'.");
+ }
+
+ entityManager.detach(queue);
+
+ return new JpaQueueRef(queue);
+ }
+
+ @Transactional
+ public JpaExecutionRef schedule(JpaQueueRef queue, String... args) {
+ log.info("schedule: ENTER");
+ Date scheduled = new Date();
+ Task task = new Task(queue.queue, scheduled, args);
+ log.info("task = {}", task);
+ taskRepository.save(task);
+ log.info("task = {}", task);
+// entityManager.detach(task);
+ log.info("schedule: LEAVE");
+ return new JpaExecutionRef(task);
+ }
+
+ @Transactional(readOnly = true)
+ public JpaExecutionRef update(JpaExecutionRef ref) {
+ return new JpaExecutionRef(taskRepository.findOne(ref.task.getId()));
+ }
+
+ public static class JpaQueueRef implements AsyncService.QueueRef {
+ public final Queue queue;
+
+ JpaQueueRef(Queue queue) {
+ this.queue = queue;
+ }
+
+ public String toString() {
+ return "JpaQueueRef{" +
+ "queue=" + queue +
+ '}';
+ }
+ }
+
+ public static class JpaExecutionRef implements AsyncService.ExecutionRef {
+ private final Task task;
+
+ public JpaExecutionRef(Task task) {
+ this.task = task;
+ }
+
+ public List<String> getArguments() {
+ return Arrays.asList(task.getArguments());
+ }
+
+ public Date getScheduled() {
+ return task.getScheduled();
+ }
+
+ public Date getLastRun() {
+ return task.getLastRun();
+ }
+
+ public Date getCompleted() {
+ return task.getCompleted();
+ }
+
+ public boolean isDone() {
+ return task.isDone();
+ }
+
+ public String toString() {
+ return "JpaExecutionRef{" +
+ "task=" + task +
+ '}';
+ }
+ }
+
+ private static class TaskFailureException extends RuntimeException {
+ public TaskFailureException(Exception e) {
+ super(e);
+ }
+ }
+
+ private class CheckTimerTask implements Runnable {
+ private final AsyncCallable callable;
+ private final JpaQueueRef queueRef;
+
+ private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) {
+ this.callable = callable;
+ this.queueRef = queueRef;
+ }
+
+ public void run() {
+ log.info("JpaAsyncService$CheckTimerTask.run");
+
+ List<Task> tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue);
+
+ System.out.println("tasks.size() = " + tasks.size());
+
+ try {
+ for (final Task task : tasks) {
+ try {
+ executeTask(task);
+ } catch (TransactionException | TaskFailureException e) {
+ log.warn("Task execution failed", e);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Error while executing tasks.", e);
+ }
+ }
+
+ private void executeTask(final Task task) {
+ final Date run = new Date();
+ log.info("Setting last run on task. date = {}, task = {}", run, task);
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ task.registerRun();
+ taskRepository.save(task);
+ }
+ });
+
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ try {
+ callable.run();
+ Date completed = new Date();
+ log.info("Setting completed on task. date = {}, task = {}", completed, task);
+ task.registerComplete(completed);
+ taskRepository.save(task);
+ } catch (Exception e) {
+ throw new TaskFailureException(e);
+ }
+ }
+ });
+ }
+ }
+
+ private class MyTransactionSynchronization implements TransactionSynchronization {
+
+ private final AsyncCallable callable;
+
+ private final int interval;
+
+ private final JpaQueueRef queueRef;
+
+ public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) {
+ this.callable = callable;
+ this.interval = interval;
+ this.queueRef = queueRef;
+ }
+
+ public void suspend() {
+ }
+
+ public void resume() {
+ }
+
+ public void flush() {
+ }
+
+ public void beforeCommit(boolean readOnly) {
+ }
+
+ public void beforeCompletion() {
+ }
+
+ public void afterCommit() {
+ }
+
+ public void afterCompletion(int status) {
+ log.info("status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS);
+ }
+ }
+ }
+}