diff options
-rwxr-xr-x | JpaAsyncService.java | 242 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/CreateArticleCallable.java | 2 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/Main.java | 2 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/UpdateArticleCallable.java | 2 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/async/AsyncService.java (renamed from src/main/java/io/trygvis/queue/AsyncService.java) | 7 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/JdbcAsyncService.java (renamed from src/main/java/io/trygvis/queue/JdbcAsyncService.java) | 6 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/QueueThread.java (renamed from src/main/java/io/trygvis/queue/QueueThread.java) | 5 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/TaskFailureException.java (renamed from src/main/java/io/trygvis/queue/TaskFailureException.java) | 2 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/Task.java | 2 |
9 files changed, 20 insertions, 250 deletions
diff --git a/JpaAsyncService.java b/JpaAsyncService.java deleted file mode 100755 index eaef782..0000000 --- a/JpaAsyncService.java +++ /dev/null @@ -1,242 +0,0 @@ -package io.trygvis.queue; - -import static java.util.concurrent.TimeUnit.*; -import static org.springframework.transaction.support.TransactionSynchronizationManager.*; - -@SuppressWarnings("SpringJavaAutowiringInspection") -@Component -public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>/*, ApplicationContextAware*/ { - - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); - - @PersistenceContext - private EntityManager entityManager; - - @Autowired - private TransactionTemplate transactionTemplate; - - @Autowired - private QueueRepository queueRepository; - - @Autowired - private TaskRepository taskRepository; - - @Transactional - public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { - log.info("registerQueue: ENTER"); - - Queue q = queueRepository.findByName(name); - - log.info("q = {}", q); - - if (q == null) { - q = new Queue(name, interval); - q = queueRepository.save(q); - } else { - boolean dirty = false; - if (interval != q.getInterval()) { - q.setInterval(interval); - dirty = true; - } - - if (dirty) { - q = queueRepository.save(q); - } - } - - log.info("q = {}", q); - entityManager.flush(); -// entityManager.detach(q); - log.info("q = {}", q); - - JpaQueueRef queueRef = new JpaQueueRef(q); - - log.info("registerQueue: LEAVE"); - - registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef)); - - return queueRef; - } - - @Transactional(readOnly = true) - public JpaQueueRef getQueue(String name) { - Queue queue = queueRepository.findByName(name); - - if (queue == null) { - throw new RollbackException("No such queue: '" + name + "'."); - } - - entityManager.detach(queue); - - return new JpaQueueRef(queue); - } - - @Transactional - public JpaExecutionRef schedule(JpaQueueRef queue, String... args) { - log.info("schedule: ENTER"); - Date scheduled = new Date(); - Task task = new Task(queue.queue, scheduled, args); - log.info("task = {}", task); - taskRepository.save(task); - log.info("task = {}", task); -// entityManager.detach(task); - log.info("schedule: LEAVE"); - return new JpaExecutionRef(task); - } - - @Transactional(readOnly = true) - public JpaExecutionRef update(JpaExecutionRef ref) { - return new JpaExecutionRef(taskRepository.findOne(ref.task.getId())); - } - - public static class JpaQueueRef implements AsyncService.QueueRef { - public final Queue queue; - - JpaQueueRef(Queue queue) { - this.queue = queue; - } - - public String toString() { - return "JpaQueueRef{" + - "queue=" + queue + - '}'; - } - } - - public static class JpaExecutionRef implements AsyncService.ExecutionRef { - private final Task task; - - public JpaExecutionRef(Task task) { - this.task = task; - } - - public List<String> getArguments() { - return Arrays.asList(task.getArguments()); - } - - public Date getScheduled() { - return task.getScheduled(); - } - - public Date getLastRun() { - return task.getLastRun(); - } - - public Date getCompleted() { - return task.getCompleted(); - } - - public boolean isDone() { - return task.isDone(); - } - - public String toString() { - return "JpaExecutionRef{" + - "task=" + task + - '}'; - } - } - - private static class TaskFailureException extends RuntimeException { - public TaskFailureException(Exception e) { - super(e); - } - } - - private class CheckTimerTask implements Runnable { - private final AsyncCallable callable; - private final JpaQueueRef queueRef; - - private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) { - this.callable = callable; - this.queueRef = queueRef; - } - - public void run() { - log.info("JpaAsyncService$CheckTimerTask.run"); - - List<Task> tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue); - - System.out.println("tasks.size() = " + tasks.size()); - - try { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (TransactionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } - } catch (Exception e) { - log.warn("Error while executing tasks.", e); - } - } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - task.registerRun(); - taskRepository.save(task); - } - }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - try { - callable.run(); - Date completed = new Date(); - log.info("Setting completed on task. date = {}, task = {}", completed, task); - task.registerComplete(completed); - taskRepository.save(task); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); - } - } - - private class MyTransactionSynchronization implements TransactionSynchronization { - - private final AsyncCallable callable; - - private final int interval; - - private final JpaQueueRef queueRef; - - public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) { - this.callable = callable; - this.interval = interval; - this.queueRef = queueRef; - } - - public void suspend() { - } - - public void resume() { - } - - public void flush() { - } - - public void beforeCommit(boolean readOnly) { - } - - public void beforeCompletion() { - } - - public void afterCommit() { - } - - public void afterCompletion(int status) { - log.info("status = {}", status); - if (status == TransactionSynchronization.STATUS_COMMITTED) { - executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS); - } - } - } -} diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/main/java/io/trygvis/CreateArticleCallable.java index 420a5b5..471b59d 100755 --- a/src/main/java/io/trygvis/CreateArticleCallable.java +++ b/src/main/java/io/trygvis/CreateArticleCallable.java @@ -1,7 +1,7 @@ package io.trygvis; import io.trygvis.model.Article; -import io.trygvis.queue.AsyncService; +import io.trygvis.async.AsyncService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java index d22cab2..08b9b75 100755 --- a/src/main/java/io/trygvis/Main.java +++ b/src/main/java/io/trygvis/Main.java @@ -1,6 +1,6 @@ package io.trygvis; -import io.trygvis.queue.AsyncService; +import io.trygvis.async.AsyncService; import io.trygvis.queue.Queue; import io.trygvis.queue.Task; import org.hibernate.dialect.PostgreSQL82Dialect; diff --git a/src/main/java/io/trygvis/UpdateArticleCallable.java b/src/main/java/io/trygvis/UpdateArticleCallable.java index f1ea0e2..a910855 100755 --- a/src/main/java/io/trygvis/UpdateArticleCallable.java +++ b/src/main/java/io/trygvis/UpdateArticleCallable.java @@ -1,6 +1,6 @@ package io.trygvis; -import io.trygvis.queue.AsyncService; +import io.trygvis.async.AsyncService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java index c9e5861..e90a0e4 100755 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -1,9 +1,14 @@ -package io.trygvis.queue; +package io.trygvis.async; +import io.trygvis.queue.Queue; +import io.trygvis.queue.Task; import org.quartz.SchedulerException; import java.util.List; +/** + * A simple framework for running tasks. + */ public interface AsyncService { /** diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 276541f..4e78a37 100644 --- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,5 +1,9 @@ -package io.trygvis.queue; +package io.trygvis.async; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueDao; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskDao; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index a981c7e..69466df 100644 --- a/src/main/java/io/trygvis/queue/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -1,5 +1,8 @@ -package io.trygvis.queue; +package io.trygvis.async; +import io.trygvis.queue.Queue; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskDao; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.transaction.TransactionException; diff --git a/src/main/java/io/trygvis/queue/TaskFailureException.java b/src/main/java/io/trygvis/async/TaskFailureException.java index d3d8c48..7278e17 100644 --- a/src/main/java/io/trygvis/queue/TaskFailureException.java +++ b/src/main/java/io/trygvis/async/TaskFailureException.java @@ -1,4 +1,4 @@ -package io.trygvis.queue; +package io.trygvis.async; class TaskFailureException extends RuntimeException { public TaskFailureException(Exception e) { diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java index 09d5060..2b1103b 100755 --- a/src/main/java/io/trygvis/queue/Task.java +++ b/src/main/java/io/trygvis/queue/Task.java @@ -21,7 +21,7 @@ public class Task { public final List<String> arguments; - Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) { + public Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) { this.id = id; this.parent = parent; this.queue = queue; |