aboutsummaryrefslogtreecommitdiff
path: root/JpaAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'JpaAsyncService.java')
-rwxr-xr-xJpaAsyncService.java242
1 files changed, 0 insertions, 242 deletions
diff --git a/JpaAsyncService.java b/JpaAsyncService.java
deleted file mode 100755
index eaef782..0000000
--- a/JpaAsyncService.java
+++ /dev/null
@@ -1,242 +0,0 @@
-package io.trygvis.queue;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
-
-@SuppressWarnings("SpringJavaAutowiringInspection")
-@Component
-public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>/*, ApplicationContextAware*/ {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
- @PersistenceContext
- private EntityManager entityManager;
-
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private QueueRepository queueRepository;
-
- @Autowired
- private TaskRepository taskRepository;
-
- @Transactional
- public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
- log.info("registerQueue: ENTER");
-
- Queue q = queueRepository.findByName(name);
-
- log.info("q = {}", q);
-
- if (q == null) {
- q = new Queue(name, interval);
- q = queueRepository.save(q);
- } else {
- boolean dirty = false;
- if (interval != q.getInterval()) {
- q.setInterval(interval);
- dirty = true;
- }
-
- if (dirty) {
- q = queueRepository.save(q);
- }
- }
-
- log.info("q = {}", q);
- entityManager.flush();
-// entityManager.detach(q);
- log.info("q = {}", q);
-
- JpaQueueRef queueRef = new JpaQueueRef(q);
-
- log.info("registerQueue: LEAVE");
-
- registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef));
-
- return queueRef;
- }
-
- @Transactional(readOnly = true)
- public JpaQueueRef getQueue(String name) {
- Queue queue = queueRepository.findByName(name);
-
- if (queue == null) {
- throw new RollbackException("No such queue: '" + name + "'.");
- }
-
- entityManager.detach(queue);
-
- return new JpaQueueRef(queue);
- }
-
- @Transactional
- public JpaExecutionRef schedule(JpaQueueRef queue, String... args) {
- log.info("schedule: ENTER");
- Date scheduled = new Date();
- Task task = new Task(queue.queue, scheduled, args);
- log.info("task = {}", task);
- taskRepository.save(task);
- log.info("task = {}", task);
-// entityManager.detach(task);
- log.info("schedule: LEAVE");
- return new JpaExecutionRef(task);
- }
-
- @Transactional(readOnly = true)
- public JpaExecutionRef update(JpaExecutionRef ref) {
- return new JpaExecutionRef(taskRepository.findOne(ref.task.getId()));
- }
-
- public static class JpaQueueRef implements AsyncService.QueueRef {
- public final Queue queue;
-
- JpaQueueRef(Queue queue) {
- this.queue = queue;
- }
-
- public String toString() {
- return "JpaQueueRef{" +
- "queue=" + queue +
- '}';
- }
- }
-
- public static class JpaExecutionRef implements AsyncService.ExecutionRef {
- private final Task task;
-
- public JpaExecutionRef(Task task) {
- this.task = task;
- }
-
- public List<String> getArguments() {
- return Arrays.asList(task.getArguments());
- }
-
- public Date getScheduled() {
- return task.getScheduled();
- }
-
- public Date getLastRun() {
- return task.getLastRun();
- }
-
- public Date getCompleted() {
- return task.getCompleted();
- }
-
- public boolean isDone() {
- return task.isDone();
- }
-
- public String toString() {
- return "JpaExecutionRef{" +
- "task=" + task +
- '}';
- }
- }
-
- private static class TaskFailureException extends RuntimeException {
- public TaskFailureException(Exception e) {
- super(e);
- }
- }
-
- private class CheckTimerTask implements Runnable {
- private final AsyncCallable callable;
- private final JpaQueueRef queueRef;
-
- private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) {
- this.callable = callable;
- this.queueRef = queueRef;
- }
-
- public void run() {
- log.info("JpaAsyncService$CheckTimerTask.run");
-
- List<Task> tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue);
-
- System.out.println("tasks.size() = " + tasks.size());
-
- try {
- for (final Task task : tasks) {
- try {
- executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
- log.warn("Task execution failed", e);
- }
- }
- } catch (Exception e) {
- log.warn("Error while executing tasks.", e);
- }
- }
-
- private void executeTask(final Task task) {
- final Date run = new Date();
- log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- task.registerRun();
- taskRepository.save(task);
- }
- });
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- try {
- callable.run();
- Date completed = new Date();
- log.info("Setting completed on task. date = {}, task = {}", completed, task);
- task.registerComplete(completed);
- taskRepository.save(task);
- } catch (Exception e) {
- throw new TaskFailureException(e);
- }
- }
- });
- }
- }
-
- private class MyTransactionSynchronization implements TransactionSynchronization {
-
- private final AsyncCallable callable;
-
- private final int interval;
-
- private final JpaQueueRef queueRef;
-
- public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) {
- this.callable = callable;
- this.interval = interval;
- this.queueRef = queueRef;
- }
-
- public void suspend() {
- }
-
- public void resume() {
- }
-
- public void flush() {
- }
-
- public void beforeCommit(boolean readOnly) {
- }
-
- public void beforeCompletion() {
- }
-
- public void afterCommit() {
- }
-
- public void afterCompletion(int status) {
- log.info("status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS);
- }
- }
- }
-}