From 637dddf11f5d60b35c9696914e1e2658b2ddc611 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 20 Apr 2013 15:43:01 +0200 Subject: wip --- .../java/io/trygvis/queue/JpaAsyncService.java | 257 --------------------- 1 file changed, 257 deletions(-) delete mode 100755 src/main/java/io/trygvis/queue/JpaAsyncService.java (limited to 'src/main/java/io/trygvis/queue/JpaAsyncService.java') diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java deleted file mode 100755 index e715ac7..0000000 --- a/src/main/java/io/trygvis/queue/JpaAsyncService.java +++ /dev/null @@ -1,257 +0,0 @@ -package io.trygvis.queue; - -import io.trygvis.data.*; -import io.trygvis.model.Queue; -import io.trygvis.model.*; -import org.quartz.*; -import org.slf4j.*; -import org.springframework.beans.factory.annotation.*; -import org.springframework.stereotype.*; -import org.springframework.transaction.*; -import org.springframework.transaction.annotation.*; -import org.springframework.transaction.support.*; - -import javax.persistence.*; -import java.util.*; -import java.util.concurrent.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.springframework.transaction.support.TransactionSynchronizationManager.*; - -@SuppressWarnings("SpringJavaAutowiringInspection") -@Component -public class JpaAsyncService implements AsyncService/*, ApplicationContextAware*/ { - - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); - - @PersistenceContext - private EntityManager entityManager; - - @Autowired - private TransactionTemplate transactionTemplate; - - @Autowired - private QueueRepository queueRepository; - - @Autowired - private TaskRepository taskRepository; - - @Transactional - public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { - log.info("registerQueue: ENTER"); - - Queue q = queueRepository.findByName(name); - - log.info("q = {}", q); - - if (q == null) { - q = new Queue(name, interval); - q = queueRepository.save(q); - } else { - boolean dirty = false; - if (interval != q.getInterval()) { - q.setInterval(interval); - dirty = true; - } - - if (dirty) { - q = queueRepository.save(q); - } - } - - log.info("q = {}", q); - entityManager.flush(); -// entityManager.detach(q); - log.info("q = {}", q); - - JpaQueueRef queueRef = new JpaQueueRef(q); - - log.info("registerQueue: LEAVE"); - - registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef)); - - return queueRef; - } - - @Transactional(readOnly = true) - public JpaQueueRef getQueue(String name) { - Queue queue = queueRepository.findByName(name); - - if (queue == null) { - throw new RollbackException("No such queue: '" + name + "'."); - } - - entityManager.detach(queue); - - return new JpaQueueRef(queue); - } - - @Transactional - public JpaExecutionRef schedule(JpaQueueRef queue, String... args) { - log.info("schedule: ENTER"); - Date scheduled = new Date(); - Task task = new Task(queue.queue, scheduled, args); - log.info("task = {}", task); - taskRepository.save(task); - log.info("task = {}", task); -// entityManager.detach(task); - log.info("schedule: LEAVE"); - return new JpaExecutionRef(task); - } - - @Transactional(readOnly = true) - public JpaExecutionRef update(JpaExecutionRef ref) { - return new JpaExecutionRef(taskRepository.findOne(ref.task.getId())); - } - - public static class JpaQueueRef implements AsyncService.QueueRef { - public final Queue queue; - - JpaQueueRef(Queue queue) { - this.queue = queue; - } - - public String toString() { - return "JpaQueueRef{" + - "queue=" + queue + - '}'; - } - } - - public static class JpaExecutionRef implements AsyncService.ExecutionRef { - private final Task task; - - public JpaExecutionRef(Task task) { - this.task = task; - } - - public List getArguments() { - return Arrays.asList(task.getArguments()); - } - - public Date getScheduled() { - return task.getScheduled(); - } - - public Date getLastRun() { - return task.getLastRun(); - } - - public Date getCompleted() { - return task.getCompleted(); - } - - public boolean isDone() { - return task.isDone(); - } - - public String toString() { - return "JpaExecutionRef{" + - "task=" + task + - '}'; - } - } - - private static class TaskFailureException extends RuntimeException { - public TaskFailureException(Exception e) { - super(e); - } - } - - private class CheckTimerTask implements Runnable { - private final AsyncCallable callable; - private final JpaQueueRef queueRef; - - private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) { - this.callable = callable; - this.queueRef = queueRef; - } - - public void run() { - log.info("JpaAsyncService$CheckTimerTask.run"); - - List tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue); - - System.out.println("tasks.size() = " + tasks.size()); - - try { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (TransactionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } - } catch (Exception e) { - log.warn("Error while executing tasks.", e); - } - } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - task.registerRun(); - taskRepository.save(task); - } - }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - try { - callable.run(); - Date completed = new Date(); - log.info("Setting completed on task. date = {}, task = {}", completed, task); - task.registerComplete(completed); - taskRepository.save(task); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); - } - } - - private class MyTransactionSynchronization implements TransactionSynchronization { - - private final AsyncCallable callable; - - private final int interval; - - private final JpaQueueRef queueRef; - - public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) { - this.callable = callable; - this.interval = interval; - this.queueRef = queueRef; - } - - public void suspend() { - } - - public void resume() { - } - - public void flush() { - } - - public void beforeCommit(boolean readOnly) { - } - - public void beforeCompletion() { - } - - public void afterCommit() { - } - - public void afterCompletion(int status) { - log.info("status = {}", status); - if (status == TransactionSynchronization.STATUS_COMMITTED) { - executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS); - } - } - } -} -- cgit v1.2.3