aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-05-29 22:16:50 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-05-29 22:16:50 +0200
commit7d704feb86c44fca57941d223e8605b55fcf68f0 (patch)
tree7a33458a46bcc6e211ef3e833441f80762c61a63
parentb65d39ab617d19ac48f44bc41f04a18803ca75e6 (diff)
downloadquartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.tar.gz
quartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.tar.bz2
quartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.tar.xz
quartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.zip
o Splitting out the parts that implement the "async" features vs the "queue" features.
-rwxr-xr-xJpaAsyncService.java242
-rwxr-xr-xsrc/main/java/io/trygvis/CreateArticleCallable.java2
-rwxr-xr-xsrc/main/java/io/trygvis/Main.java2
-rwxr-xr-xsrc/main/java/io/trygvis/UpdateArticleCallable.java2
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java (renamed from src/main/java/io/trygvis/queue/AsyncService.java)7
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java (renamed from src/main/java/io/trygvis/queue/JdbcAsyncService.java)6
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java (renamed from src/main/java/io/trygvis/queue/QueueThread.java)5
-rw-r--r--src/main/java/io/trygvis/async/TaskFailureException.java (renamed from src/main/java/io/trygvis/queue/TaskFailureException.java)2
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Task.java2
9 files changed, 20 insertions, 250 deletions
diff --git a/JpaAsyncService.java b/JpaAsyncService.java
deleted file mode 100755
index eaef782..0000000
--- a/JpaAsyncService.java
+++ /dev/null
@@ -1,242 +0,0 @@
-package io.trygvis.queue;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
-
-@SuppressWarnings("SpringJavaAutowiringInspection")
-@Component
-public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>/*, ApplicationContextAware*/ {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
- @PersistenceContext
- private EntityManager entityManager;
-
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private QueueRepository queueRepository;
-
- @Autowired
- private TaskRepository taskRepository;
-
- @Transactional
- public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
- log.info("registerQueue: ENTER");
-
- Queue q = queueRepository.findByName(name);
-
- log.info("q = {}", q);
-
- if (q == null) {
- q = new Queue(name, interval);
- q = queueRepository.save(q);
- } else {
- boolean dirty = false;
- if (interval != q.getInterval()) {
- q.setInterval(interval);
- dirty = true;
- }
-
- if (dirty) {
- q = queueRepository.save(q);
- }
- }
-
- log.info("q = {}", q);
- entityManager.flush();
-// entityManager.detach(q);
- log.info("q = {}", q);
-
- JpaQueueRef queueRef = new JpaQueueRef(q);
-
- log.info("registerQueue: LEAVE");
-
- registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef));
-
- return queueRef;
- }
-
- @Transactional(readOnly = true)
- public JpaQueueRef getQueue(String name) {
- Queue queue = queueRepository.findByName(name);
-
- if (queue == null) {
- throw new RollbackException("No such queue: '" + name + "'.");
- }
-
- entityManager.detach(queue);
-
- return new JpaQueueRef(queue);
- }
-
- @Transactional
- public JpaExecutionRef schedule(JpaQueueRef queue, String... args) {
- log.info("schedule: ENTER");
- Date scheduled = new Date();
- Task task = new Task(queue.queue, scheduled, args);
- log.info("task = {}", task);
- taskRepository.save(task);
- log.info("task = {}", task);
-// entityManager.detach(task);
- log.info("schedule: LEAVE");
- return new JpaExecutionRef(task);
- }
-
- @Transactional(readOnly = true)
- public JpaExecutionRef update(JpaExecutionRef ref) {
- return new JpaExecutionRef(taskRepository.findOne(ref.task.getId()));
- }
-
- public static class JpaQueueRef implements AsyncService.QueueRef {
- public final Queue queue;
-
- JpaQueueRef(Queue queue) {
- this.queue = queue;
- }
-
- public String toString() {
- return "JpaQueueRef{" +
- "queue=" + queue +
- '}';
- }
- }
-
- public static class JpaExecutionRef implements AsyncService.ExecutionRef {
- private final Task task;
-
- public JpaExecutionRef(Task task) {
- this.task = task;
- }
-
- public List<String> getArguments() {
- return Arrays.asList(task.getArguments());
- }
-
- public Date getScheduled() {
- return task.getScheduled();
- }
-
- public Date getLastRun() {
- return task.getLastRun();
- }
-
- public Date getCompleted() {
- return task.getCompleted();
- }
-
- public boolean isDone() {
- return task.isDone();
- }
-
- public String toString() {
- return "JpaExecutionRef{" +
- "task=" + task +
- '}';
- }
- }
-
- private static class TaskFailureException extends RuntimeException {
- public TaskFailureException(Exception e) {
- super(e);
- }
- }
-
- private class CheckTimerTask implements Runnable {
- private final AsyncCallable callable;
- private final JpaQueueRef queueRef;
-
- private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) {
- this.callable = callable;
- this.queueRef = queueRef;
- }
-
- public void run() {
- log.info("JpaAsyncService$CheckTimerTask.run");
-
- List<Task> tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue);
-
- System.out.println("tasks.size() = " + tasks.size());
-
- try {
- for (final Task task : tasks) {
- try {
- executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
- log.warn("Task execution failed", e);
- }
- }
- } catch (Exception e) {
- log.warn("Error while executing tasks.", e);
- }
- }
-
- private void executeTask(final Task task) {
- final Date run = new Date();
- log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- task.registerRun();
- taskRepository.save(task);
- }
- });
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- try {
- callable.run();
- Date completed = new Date();
- log.info("Setting completed on task. date = {}, task = {}", completed, task);
- task.registerComplete(completed);
- taskRepository.save(task);
- } catch (Exception e) {
- throw new TaskFailureException(e);
- }
- }
- });
- }
- }
-
- private class MyTransactionSynchronization implements TransactionSynchronization {
-
- private final AsyncCallable callable;
-
- private final int interval;
-
- private final JpaQueueRef queueRef;
-
- public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) {
- this.callable = callable;
- this.interval = interval;
- this.queueRef = queueRef;
- }
-
- public void suspend() {
- }
-
- public void resume() {
- }
-
- public void flush() {
- }
-
- public void beforeCommit(boolean readOnly) {
- }
-
- public void beforeCompletion() {
- }
-
- public void afterCommit() {
- }
-
- public void afterCompletion(int status) {
- log.info("status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS);
- }
- }
- }
-}
diff --git a/src/main/java/io/trygvis/CreateArticleCallable.java b/src/main/java/io/trygvis/CreateArticleCallable.java
index 420a5b5..471b59d 100755
--- a/src/main/java/io/trygvis/CreateArticleCallable.java
+++ b/src/main/java/io/trygvis/CreateArticleCallable.java
@@ -1,7 +1,7 @@
package io.trygvis;
import io.trygvis.model.Article;
-import io.trygvis.queue.AsyncService;
+import io.trygvis.async.AsyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java
index d22cab2..08b9b75 100755
--- a/src/main/java/io/trygvis/Main.java
+++ b/src/main/java/io/trygvis/Main.java
@@ -1,6 +1,6 @@
package io.trygvis;
-import io.trygvis.queue.AsyncService;
+import io.trygvis.async.AsyncService;
import io.trygvis.queue.Queue;
import io.trygvis.queue.Task;
import org.hibernate.dialect.PostgreSQL82Dialect;
diff --git a/src/main/java/io/trygvis/UpdateArticleCallable.java b/src/main/java/io/trygvis/UpdateArticleCallable.java
index f1ea0e2..a910855 100755
--- a/src/main/java/io/trygvis/UpdateArticleCallable.java
+++ b/src/main/java/io/trygvis/UpdateArticleCallable.java
@@ -1,6 +1,6 @@
package io.trygvis;
-import io.trygvis.queue.AsyncService;
+import io.trygvis.async.AsyncService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
index c9e5861..e90a0e4 100755
--- a/src/main/java/io/trygvis/queue/AsyncService.java
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -1,9 +1,14 @@
-package io.trygvis.queue;
+package io.trygvis.async;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.Task;
import org.quartz.SchedulerException;
import java.util.List;
+/**
+ * A simple framework for running tasks.
+ */
public interface AsyncService {
/**
diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 276541f..4e78a37 100644
--- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -1,5 +1,9 @@
-package io.trygvis.queue;
+package io.trygvis.async;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueDao;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskDao;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index a981c7e..69466df 100644
--- a/src/main/java/io/trygvis/queue/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -1,5 +1,8 @@
-package io.trygvis.queue;
+package io.trygvis.async;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.transaction.TransactionException;
diff --git a/src/main/java/io/trygvis/queue/TaskFailureException.java b/src/main/java/io/trygvis/async/TaskFailureException.java
index d3d8c48..7278e17 100644
--- a/src/main/java/io/trygvis/queue/TaskFailureException.java
+++ b/src/main/java/io/trygvis/async/TaskFailureException.java
@@ -1,4 +1,4 @@
-package io.trygvis.queue;
+package io.trygvis.async;
class TaskFailureException extends RuntimeException {
public TaskFailureException(Exception e) {
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
index 09d5060..2b1103b 100755
--- a/src/main/java/io/trygvis/queue/Task.java
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -21,7 +21,7 @@ public class Task {
public final List<String> arguments;
- Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
+ public Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
this.id = id;
this.parent = parent;
this.queue = queue;