aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-04-20 15:43:01 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-04-20 15:43:01 +0200
commit637dddf11f5d60b35c9696914e1e2658b2ddc611 (patch)
treeef320c1950d78163dd0cc3247bee172f4e3e32ff
parentc274d9177e4a495e7b793120dfd1ce12fa5632c7 (diff)
downloadquartz-based-queue-637dddf11f5d60b35c9696914e1e2658b2ddc611.tar.gz
quartz-based-queue-637dddf11f5d60b35c9696914e1e2658b2ddc611.tar.bz2
quartz-based-queue-637dddf11f5d60b35c9696914e1e2658b2ddc611.tar.xz
quartz-based-queue-637dddf11f5d60b35c9696914e1e2658b2ddc611.zip
wip
-rwxr-xr-xJpaAsyncService.java (renamed from src/main/java/io/trygvis/queue/JpaAsyncService.java)499
-rwxr-xr-xsrc/main/java/io/trygvis/Main.java45
-rwxr-xr-xsrc/main/java/io/trygvis/data/QueueRepository.java2
-rwxr-xr-xsrc/main/java/io/trygvis/data/TaskRepository.java4
-rwxr-xr-xsrc/main/java/io/trygvis/model/Queue.java58
-rwxr-xr-xsrc/main/java/io/trygvis/model/Task.java95
-rwxr-xr-xsrc/main/java/io/trygvis/queue/AsyncService.java37
-rw-r--r--src/main/java/io/trygvis/queue/JdbcAsyncService.java194
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Queue.java24
-rw-r--r--src/main/java/io/trygvis/queue/QueueDao.java34
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Task.java55
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java54
-rwxr-xr-xsrc/main/java/io/trygvis/spring/Config.java14
-rw-r--r--src/main/resources/create.sql27
-rwxr-xr-xsrc/main/resources/logback.xml3
15 files changed, 690 insertions, 455 deletions
diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/JpaAsyncService.java
index e715ac7..eaef782 100755
--- a/src/main/java/io/trygvis/queue/JpaAsyncService.java
+++ b/JpaAsyncService.java
@@ -1,257 +1,242 @@
-package io.trygvis.queue;
-
-import io.trygvis.data.*;
-import io.trygvis.model.Queue;
-import io.trygvis.model.*;
-import org.quartz.*;
-import org.slf4j.*;
-import org.springframework.beans.factory.annotation.*;
-import org.springframework.stereotype.*;
-import org.springframework.transaction.*;
-import org.springframework.transaction.annotation.*;
-import org.springframework.transaction.support.*;
-
-import javax.persistence.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
-
-@SuppressWarnings("SpringJavaAutowiringInspection")
-@Component
-public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>/*, ApplicationContextAware*/ {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
- @PersistenceContext
- private EntityManager entityManager;
-
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private QueueRepository queueRepository;
-
- @Autowired
- private TaskRepository taskRepository;
-
- @Transactional
- public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
- log.info("registerQueue: ENTER");
-
- Queue q = queueRepository.findByName(name);
-
- log.info("q = {}", q);
-
- if (q == null) {
- q = new Queue(name, interval);
- q = queueRepository.save(q);
- } else {
- boolean dirty = false;
- if (interval != q.getInterval()) {
- q.setInterval(interval);
- dirty = true;
- }
-
- if (dirty) {
- q = queueRepository.save(q);
- }
- }
-
- log.info("q = {}", q);
- entityManager.flush();
-// entityManager.detach(q);
- log.info("q = {}", q);
-
- JpaQueueRef queueRef = new JpaQueueRef(q);
-
- log.info("registerQueue: LEAVE");
-
- registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef));
-
- return queueRef;
- }
-
- @Transactional(readOnly = true)
- public JpaQueueRef getQueue(String name) {
- Queue queue = queueRepository.findByName(name);
-
- if (queue == null) {
- throw new RollbackException("No such queue: '" + name + "'.");
- }
-
- entityManager.detach(queue);
-
- return new JpaQueueRef(queue);
- }
-
- @Transactional
- public JpaExecutionRef schedule(JpaQueueRef queue, String... args) {
- log.info("schedule: ENTER");
- Date scheduled = new Date();
- Task task = new Task(queue.queue, scheduled, args);
- log.info("task = {}", task);
- taskRepository.save(task);
- log.info("task = {}", task);
-// entityManager.detach(task);
- log.info("schedule: LEAVE");
- return new JpaExecutionRef(task);
- }
-
- @Transactional(readOnly = true)
- public JpaExecutionRef update(JpaExecutionRef ref) {
- return new JpaExecutionRef(taskRepository.findOne(ref.task.getId()));
- }
-
- public static class JpaQueueRef implements AsyncService.QueueRef {
- public final Queue queue;
-
- JpaQueueRef(Queue queue) {
- this.queue = queue;
- }
-
- public String toString() {
- return "JpaQueueRef{" +
- "queue=" + queue +
- '}';
- }
- }
-
- public static class JpaExecutionRef implements AsyncService.ExecutionRef {
- private final Task task;
-
- public JpaExecutionRef(Task task) {
- this.task = task;
- }
-
- public List<String> getArguments() {
- return Arrays.asList(task.getArguments());
- }
-
- public Date getScheduled() {
- return task.getScheduled();
- }
-
- public Date getLastRun() {
- return task.getLastRun();
- }
-
- public Date getCompleted() {
- return task.getCompleted();
- }
-
- public boolean isDone() {
- return task.isDone();
- }
-
- public String toString() {
- return "JpaExecutionRef{" +
- "task=" + task +
- '}';
- }
- }
-
- private static class TaskFailureException extends RuntimeException {
- public TaskFailureException(Exception e) {
- super(e);
- }
- }
-
- private class CheckTimerTask implements Runnable {
- private final AsyncCallable callable;
- private final JpaQueueRef queueRef;
-
- private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) {
- this.callable = callable;
- this.queueRef = queueRef;
- }
-
- public void run() {
- log.info("JpaAsyncService$CheckTimerTask.run");
-
- List<Task> tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue);
-
- System.out.println("tasks.size() = " + tasks.size());
-
- try {
- for (final Task task : tasks) {
- try {
- executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
- log.warn("Task execution failed", e);
- }
- }
- } catch (Exception e) {
- log.warn("Error while executing tasks.", e);
- }
- }
-
- private void executeTask(final Task task) {
- final Date run = new Date();
- log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- task.registerRun();
- taskRepository.save(task);
- }
- });
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- try {
- callable.run();
- Date completed = new Date();
- log.info("Setting completed on task. date = {}, task = {}", completed, task);
- task.registerComplete(completed);
- taskRepository.save(task);
- } catch (Exception e) {
- throw new TaskFailureException(e);
- }
- }
- });
- }
- }
-
- private class MyTransactionSynchronization implements TransactionSynchronization {
-
- private final AsyncCallable callable;
-
- private final int interval;
-
- private final JpaQueueRef queueRef;
-
- public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) {
- this.callable = callable;
- this.interval = interval;
- this.queueRef = queueRef;
- }
-
- public void suspend() {
- }
-
- public void resume() {
- }
-
- public void flush() {
- }
-
- public void beforeCommit(boolean readOnly) {
- }
-
- public void beforeCompletion() {
- }
-
- public void afterCommit() {
- }
-
- public void afterCompletion(int status) {
- log.info("status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS);
- }
- }
- }
-}
+package io.trygvis.queue;
+
+import static java.util.concurrent.TimeUnit.*;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
+
+@SuppressWarnings("SpringJavaAutowiringInspection")
+@Component
+public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>/*, ApplicationContextAware*/ {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+
+ @PersistenceContext
+ private EntityManager entityManager;
+
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+
+ @Autowired
+ private QueueRepository queueRepository;
+
+ @Autowired
+ private TaskRepository taskRepository;
+
+ @Transactional
+ public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
+ log.info("registerQueue: ENTER");
+
+ Queue q = queueRepository.findByName(name);
+
+ log.info("q = {}", q);
+
+ if (q == null) {
+ q = new Queue(name, interval);
+ q = queueRepository.save(q);
+ } else {
+ boolean dirty = false;
+ if (interval != q.getInterval()) {
+ q.setInterval(interval);
+ dirty = true;
+ }
+
+ if (dirty) {
+ q = queueRepository.save(q);
+ }
+ }
+
+ log.info("q = {}", q);
+ entityManager.flush();
+// entityManager.detach(q);
+ log.info("q = {}", q);
+
+ JpaQueueRef queueRef = new JpaQueueRef(q);
+
+ log.info("registerQueue: LEAVE");
+
+ registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef));
+
+ return queueRef;
+ }
+
+ @Transactional(readOnly = true)
+ public JpaQueueRef getQueue(String name) {
+ Queue queue = queueRepository.findByName(name);
+
+ if (queue == null) {
+ throw new RollbackException("No such queue: '" + name + "'.");
+ }
+
+ entityManager.detach(queue);
+
+ return new JpaQueueRef(queue);
+ }
+
+ @Transactional
+ public JpaExecutionRef schedule(JpaQueueRef queue, String... args) {
+ log.info("schedule: ENTER");
+ Date scheduled = new Date();
+ Task task = new Task(queue.queue, scheduled, args);
+ log.info("task = {}", task);
+ taskRepository.save(task);
+ log.info("task = {}", task);
+// entityManager.detach(task);
+ log.info("schedule: LEAVE");
+ return new JpaExecutionRef(task);
+ }
+
+ @Transactional(readOnly = true)
+ public JpaExecutionRef update(JpaExecutionRef ref) {
+ return new JpaExecutionRef(taskRepository.findOne(ref.task.getId()));
+ }
+
+ public static class JpaQueueRef implements AsyncService.QueueRef {
+ public final Queue queue;
+
+ JpaQueueRef(Queue queue) {
+ this.queue = queue;
+ }
+
+ public String toString() {
+ return "JpaQueueRef{" +
+ "queue=" + queue +
+ '}';
+ }
+ }
+
+ public static class JpaExecutionRef implements AsyncService.ExecutionRef {
+ private final Task task;
+
+ public JpaExecutionRef(Task task) {
+ this.task = task;
+ }
+
+ public List<String> getArguments() {
+ return Arrays.asList(task.getArguments());
+ }
+
+ public Date getScheduled() {
+ return task.getScheduled();
+ }
+
+ public Date getLastRun() {
+ return task.getLastRun();
+ }
+
+ public Date getCompleted() {
+ return task.getCompleted();
+ }
+
+ public boolean isDone() {
+ return task.isDone();
+ }
+
+ public String toString() {
+ return "JpaExecutionRef{" +
+ "task=" + task +
+ '}';
+ }
+ }
+
+ private static class TaskFailureException extends RuntimeException {
+ public TaskFailureException(Exception e) {
+ super(e);
+ }
+ }
+
+ private class CheckTimerTask implements Runnable {
+ private final AsyncCallable callable;
+ private final JpaQueueRef queueRef;
+
+ private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) {
+ this.callable = callable;
+ this.queueRef = queueRef;
+ }
+
+ public void run() {
+ log.info("JpaAsyncService$CheckTimerTask.run");
+
+ List<Task> tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue);
+
+ System.out.println("tasks.size() = " + tasks.size());
+
+ try {
+ for (final Task task : tasks) {
+ try {
+ executeTask(task);
+ } catch (TransactionException | TaskFailureException e) {
+ log.warn("Task execution failed", e);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Error while executing tasks.", e);
+ }
+ }
+
+ private void executeTask(final Task task) {
+ final Date run = new Date();
+ log.info("Setting last run on task. date = {}, task = {}", run, task);
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ task.registerRun();
+ taskRepository.save(task);
+ }
+ });
+
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ try {
+ callable.run();
+ Date completed = new Date();
+ log.info("Setting completed on task. date = {}, task = {}", completed, task);
+ task.registerComplete(completed);
+ taskRepository.save(task);
+ } catch (Exception e) {
+ throw new TaskFailureException(e);
+ }
+ }
+ });
+ }
+ }
+
+ private class MyTransactionSynchronization implements TransactionSynchronization {
+
+ private final AsyncCallable callable;
+
+ private final int interval;
+
+ private final JpaQueueRef queueRef;
+
+ public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) {
+ this.callable = callable;
+ this.interval = interval;
+ this.queueRef = queueRef;
+ }
+
+ public void suspend() {
+ }
+
+ public void resume() {
+ }
+
+ public void flush() {
+ }
+
+ public void beforeCommit(boolean readOnly) {
+ }
+
+ public void beforeCompletion() {
+ }
+
+ public void afterCommit() {
+ }
+
+ public void afterCompletion(int status) {
+ log.info("status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS);
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java
index 0a31aaa..f2f540f 100755
--- a/src/main/java/io/trygvis/Main.java
+++ b/src/main/java/io/trygvis/Main.java
@@ -1,17 +1,21 @@
package io.trygvis;
import io.trygvis.queue.*;
+import io.trygvis.queue.Queue;
import org.hibernate.dialect.*;
import org.slf4j.*;
import org.slf4j.bridge.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.context.support.*;
import org.springframework.stereotype.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.support.*;
import java.util.*;
import static java.lang.System.*;
import static java.lang.Thread.*;
+import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED;
@Component
public class Main {
@@ -48,7 +52,10 @@ public class Main {
}
@Autowired
- private AsyncService<AsyncService.QueueRef, AsyncService.ExecutionRef> asyncService;
+ private TransactionTemplate transactionTemplate;
+
+ @Autowired
+ private AsyncService asyncService;
@Autowired
@Qualifier("createArticle")
@@ -61,35 +68,39 @@ public class Main {
public void run() throws Exception {
log.info("Main.run");
- JpaAsyncService.JpaQueueRef queueRef = asyncService.registerQueue("create-queue", 10, createArticleCallable);
- log.info("queue registered: ref = {}", queueRef);
+ final Queue q = asyncService.registerQueue("create-queue", 10, createArticleCallable);
+// log.info("queue registered: ref = {}", q);
// asyncService.registerQueue("update-queue", 1, updateArticeCallable);
- AsyncService.QueueRef queue = asyncService.getQueue("create-queue");
+// q = asyncService.getQueue("create-queue");
- List<AsyncService.ExecutionRef> refs = new ArrayList<>();
+ final List<Task> tasks = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- refs.add(asyncService.schedule(queue));
- }
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ for (int i = 0; i < 1; i++) {
+ tasks.add(asyncService.schedule(q));
+ }
+ }
+ });
while (true) {
- log.info("size = {}", refs.size());
- for (Iterator<AsyncService.ExecutionRef> iterator = refs.iterator(); iterator.hasNext(); ) {
- AsyncService.ExecutionRef ref = iterator.next();
+ sleep(10000);
+
+ log.info("tasks.size = {}", tasks.size());
+ for (Iterator<Task> iterator = tasks.iterator(); iterator.hasNext(); ) {
+ Task task = iterator.next();
- ref = asyncService.update(ref);
+ task = asyncService.update(task);
- log.info("ref = {}", ref);
+ log.info("task = {}", task);
- if (ref.isDone()) {
+ if (task.isDone()) {
iterator.remove();
}
-
- sleep(100);
}
- if (refs.isEmpty()) {
+ if (tasks.isEmpty()) {
break;
}
}
diff --git a/src/main/java/io/trygvis/data/QueueRepository.java b/src/main/java/io/trygvis/data/QueueRepository.java
index 143d747..47ed478 100755
--- a/src/main/java/io/trygvis/data/QueueRepository.java
+++ b/src/main/java/io/trygvis/data/QueueRepository.java
@@ -1,6 +1,6 @@
package io.trygvis.data;
-import io.trygvis.model.*;
+import io.trygvis.queue.*;
import org.springframework.data.jpa.repository.*;
public interface QueueRepository extends JpaRepository<Queue, Long> {
diff --git a/src/main/java/io/trygvis/data/TaskRepository.java b/src/main/java/io/trygvis/data/TaskRepository.java
index e24d520..b0710e3 100755
--- a/src/main/java/io/trygvis/data/TaskRepository.java
+++ b/src/main/java/io/trygvis/data/TaskRepository.java
@@ -1,7 +1,7 @@
package io.trygvis.data;
-import io.trygvis.model.*;
-import io.trygvis.model.Queue;
+import io.trygvis.queue.*;
+import io.trygvis.queue.Queue;
import org.springframework.data.jpa.repository.*;
import java.util.*;
diff --git a/src/main/java/io/trygvis/model/Queue.java b/src/main/java/io/trygvis/model/Queue.java
deleted file mode 100755
index aeec405..0000000
--- a/src/main/java/io/trygvis/model/Queue.java
+++ /dev/null
@@ -1,58 +0,0 @@
-package io.trygvis.model;
-
-import javax.persistence.*;
-
-@Entity
-@Table(
- uniqueConstraints = {
- @UniqueConstraint(name = "uq_queue__name", columnNames = "name")
- }
-)
-public class Queue {
-
- @Id
- @SequenceGenerator(name = "queue_seq", sequenceName = "queue_seq")
- @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "queue_seq")
- private Integer id;
-
- private String name;
-
- private long interval;
-
- @SuppressWarnings("UnusedDeclaration")
- private Queue() {
- }
-
- public Queue(String name, long interval) {
- this.name = name;
- this.interval = interval;
- }
-
- public Integer getId() {
- return id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public long getInterval() {
- return interval;
- }
-
- public void setInterval(long interval) {
- this.interval = interval;
- }
-
- public String toString() {
- return "Queue{" +
- "id=" + id +
- ", name='" + name + '\'' +
- ", interval=" + interval +
- '}';
- }
-}
diff --git a/src/main/java/io/trygvis/model/Task.java b/src/main/java/io/trygvis/model/Task.java
deleted file mode 100755
index 24ac83d..0000000
--- a/src/main/java/io/trygvis/model/Task.java
+++ /dev/null
@@ -1,95 +0,0 @@
-package io.trygvis.model;
-
-import javax.persistence.*;
-import java.util.*;
-import java.util.regex.*;
-
-@Entity
-public class Task {
- @Id
- @SequenceGenerator(name = "task_seq", sequenceName = "task_seq")
- @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "task_seq")
- private Long id;
-
- @ManyToOne
- private Queue queue;
-
- private Date scheduled;
-
- private Date lastRun;
-
- private int runCount;
-
- private Date completed;
-
- private String arguments;
-
- private static final Pattern pattern = Pattern.compile(" ");
-
- @SuppressWarnings("UnusedDeclaration")
- private Task() {
- }
-
- public Task(Queue queue, Date scheduled, String... arguments) {
- this.queue = queue;
- this.scheduled = scheduled;
-
- StringBuilder builder = new StringBuilder(arguments.length * 100);
- for (String argument : arguments) {
- if (pattern.matcher(argument).matches()) {
- throw new RuntimeException("Bad argument: '" + argument + "'.");
- }
- builder.append(argument).append(' ');
- }
- this.arguments = builder.toString();
- }
-
- public Long getId() {
- return id;
- }
-
- public String[] getArguments() {
- return arguments.split(" ");
- }
-
- public Date getScheduled() {
- return scheduled;
- }
-
- public Date getLastRun() {
- return lastRun;
- }
-
- public int getRunCount() {
- return runCount;
- }
-
- public Date getCompleted() {
- return completed;
- }
-
- public boolean isDone() {
- return completed != null;
- }
-
- public void registerRun() {
- lastRun = new Date();
- runCount++;
- }
-
- public void registerComplete(Date completed) {
- this.completed = completed;
- }
-
- public String toString() {
- return "Task{" +
- "id=" + id +
- ", queue=" + queue +
- ", scheduled=" + scheduled +
- ", lastRun=" + lastRun +
- ", runCount=" + runCount +
- ", completed=" + completed +
- ", arguments='" + arguments + '\'' +
- '}';
- }
-}
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java
index b08db1f..10f1b79 100755
--- a/src/main/java/io/trygvis/queue/AsyncService.java
+++ b/src/main/java/io/trygvis/queue/AsyncService.java
@@ -2,32 +2,25 @@ package io.trygvis.queue;
import org.quartz.*;
-import java.util.*;
+public interface AsyncService {
-public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionRef extends AsyncService.ExecutionRef> {
+ /**
+ * @param name
+ * @param interval how often the queue should be polled for missed tasks in seconds.
+ * @param callable
+ * @return
+ * @throws SchedulerException
+ */
+ Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
- JpaAsyncService.JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
+ Queue getQueue(String name);
- QueueRef getQueue(String name);
+ Task schedule(Queue queue, String... args);
- ExecutionRef schedule(QueueRef queue, String... args);
-
- ExecutionRef update(ExecutionRef ref);
-
- interface QueueRef {
- }
-
- interface ExecutionRef {
- List<String> getArguments();
-
- Date getScheduled();
-
- Date getLastRun();
-
- Date getCompleted();
-
- boolean isDone();
- }
+ /**
+ * Polls for a new state of the execution.
+ */
+ Task update(Task ref);
interface AsyncCallable {
void run() throws Exception;
diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
new file mode 100644
index 0000000..1df0ab6
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
@@ -0,0 +1,194 @@
+package io.trygvis.queue;
+
+import org.quartz.*;
+import org.slf4j.*;
+import org.springframework.beans.factory.annotation.*;
+import org.springframework.stereotype.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.annotation.*;
+import org.springframework.transaction.support.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.Arrays.*;
+import static java.util.concurrent.TimeUnit.*;
+import static org.springframework.transaction.annotation.Propagation.*;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
+
+@Component
+public class JdbcAsyncService implements AsyncService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+
+ private final Map<String, QueueThread> queues = new HashMap<>();
+
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+
+ @Autowired
+ private QueueDao queueDao;
+
+ @Autowired
+ private TaskDao taskDao;
+
+ @Transactional(propagation = REQUIRED)
+ public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException {
+ log.info("registerQueue: ENTER");
+
+ Queue q = queueDao.findByName(name);
+
+ log.info("q = {}", q);
+
+ final long interval_;
+ if (q == null) {
+ q = new Queue(name, interval * 1000);
+ queueDao.insert(q);
+ interval_ = interval;
+ } else {
+ // Found an existing queue. Use the Settings from the database.
+ interval_ = q.interval;
+ }
+
+ final QueueThread queueThread = new QueueThread(q, callable);
+ queues.put(name, queueThread);
+
+ registerSynchronization(new TransactionSynchronizationAdapter() {
+ public void afterCompletion(int status) {
+ log.info("status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ executor.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ queueThread.ping();
+ }
+ }, 1000, 1000 * interval_, MILLISECONDS);
+ Thread thread = new Thread(queueThread, name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+ }
+ });
+
+ log.info("registerQueue: LEAVE");
+ return q;
+ }
+
+ public Queue getQueue(String name) {
+ QueueThread queueThread = queues.get(name);
+
+ if (queueThread == null) {
+ throw new RuntimeException("No such queue: '" + name + "'.");
+ }
+
+ return queueThread.queue;
+ }
+
+ @Transactional(propagation = REQUIRED)
+ public Task schedule(Queue queue, String... args) {
+ log.info("schedule: ENTER");
+
+ Date scheduled = new Date();
+
+ StringBuilder arguments = new StringBuilder();
+ for (String arg : args) {
+ arguments.append(arg).append(' ');
+ }
+
+ long id = taskDao.insert(queue.name, scheduled, arguments.toString());
+ Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args));
+ log.info("task = {}", task);
+ queues.get(queue.name).ping();
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ log.info("schedule: LEAVE");
+ return task;
+ }
+
+ @Transactional(readOnly = true)
+ public Task update(Task ref) {
+ return taskDao.findById(ref.id);
+ }
+
+ class QueueThread implements Runnable {
+ public boolean shouldRun = true;
+
+ public final Queue queue;
+
+ private final AsyncCallable callable;
+
+ QueueThread(Queue queue, AsyncCallable callable) {
+ this.queue = queue;
+ this.callable = callable;
+ }
+
+ public void ping() {
+ log.info("Sending ping to " + queue);
+ synchronized (this) {
+ notify();
+ }
+ }
+
+ public void run() {
+ while (shouldRun) {
+ List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
+
+ log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
+
+ try {
+ for (final Task task : tasks) {
+ try {
+ executeTask(task);
+ } catch (TransactionException | TaskFailureException e) {
+ log.warn("Task execution failed", e);
+ }
+ }
+ } catch (Exception e) {
+ log.warn("Error while executing tasks.", e);
+ }
+
+ synchronized (this) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ private void executeTask(final Task task) {
+ final Date run = new Date();
+ log.info("Setting last run on task. date = {}, task = {}", run, task);
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ taskDao.update(task.registerRun());
+ }
+ });
+
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ try {
+ callable.run();
+ Date completed = new Date();
+ Task t = task.registerComplete(completed);
+ log.info("Completed task: {}", t);
+ taskDao.update(t);
+ } catch (Exception e) {
+ throw new TaskFailureException(e);
+ }
+ }
+ });
+ }
+ }
+
+ private static class TaskFailureException extends RuntimeException {
+ public TaskFailureException(Exception e) {
+ super(e);
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java
new file mode 100755
index 0000000..15003f7
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Queue.java
@@ -0,0 +1,24 @@
+package io.trygvis.queue;
+
+public class Queue {
+
+ public final String name;
+
+ public final long interval;
+
+ public Queue(String name, long interval) {
+ this.name = name;
+ this.interval = interval;
+ }
+
+ public Queue withInterval(int interval) {
+ return new Queue(name, interval);
+ }
+
+ public String toString() {
+ return "Queue{" +
+ "name='" + name + '\'' +
+ ", interval=" + interval +
+ '}';
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java
new file mode 100644
index 0000000..a3a79ca
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueDao.java
@@ -0,0 +1,34 @@
+package io.trygvis.queue;
+
+import org.springframework.beans.factory.annotation.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.stereotype.*;
+
+import java.sql.*;
+
+import static org.springframework.dao.support.DataAccessUtils.*;
+
+@Component
+public class QueueDao {
+
+ @Autowired
+ private JdbcTemplate jdbcTemplate;
+
+ public Queue findByName(String name) {
+ return singleResult(jdbcTemplate.query("SELECT name, interval FROM queue WHERE name=?", new QueueRowMapper(), name));
+ }
+
+ public void insert(Queue q) {
+ jdbcTemplate.update("INSERT INTO queue(name, interval) VALUES(?, ?)", q.name, q.interval);
+ }
+
+ public void update(Queue q) {
+ jdbcTemplate.update("UPDATE queue SET interval=? WHERE name=?", q.interval, q.name);
+ }
+
+ private class QueueRowMapper implements RowMapper<Queue> {
+ public Queue mapRow(ResultSet rs, int rowNum) throws SQLException {
+ return new Queue(rs.getString(1), rs.getLong(2));
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
new file mode 100755
index 0000000..7f64c77
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -0,0 +1,55 @@
+package io.trygvis.queue;
+
+import java.util.*;
+
+public class Task {
+
+ public final long id;
+
+ public final String queue;
+
+ public final Date scheduled;
+
+ public final Date lastRun;
+
+ public final int runCount;
+
+ public final Date completed;
+
+ public final List<String> arguments;
+
+ Task(long id, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
+ this.id = id;
+ this.queue = queue;
+ this.scheduled = scheduled;
+ this.lastRun = lastRun;
+ this.runCount = runCount;
+ this.completed = completed;
+
+ this.arguments = arguments;
+ }
+
+ public Task registerRun() {
+ return new Task(id, queue, scheduled, new Date(), runCount + 1, completed, arguments);
+ }
+
+ public Task registerComplete(Date completed) {
+ return new Task(id, queue, scheduled, lastRun, runCount, new Date(), arguments);
+ }
+
+ public String toString() {
+ return "Task{" +
+ "id=" + id +
+ ", queue=" + queue +
+ ", scheduled=" + scheduled +
+ ", lastRun=" + lastRun +
+ ", runCount=" + runCount +
+ ", completed=" + completed +
+ ", arguments='" + arguments + '\'' +
+ '}';
+ }
+
+ public boolean isDone() {
+ return completed != null;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
new file mode 100644
index 0000000..2e407a5
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -0,0 +1,54 @@
+package io.trygvis.queue;
+
+import org.springframework.beans.factory.annotation.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.stereotype.*;
+
+import java.sql.*;
+import java.util.Date;
+import java.util.*;
+
+import static java.util.Arrays.*;
+
+@Component
+public class TaskDao {
+
+ @Autowired
+ private JdbcTemplate jdbcTemplate;
+
+ public long insert(String queue, Date scheduled, String arguments) {
+ jdbcTemplate.update("INSERT INTO task(id, run_count, queue, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), 0, ?, ?, ?)", queue, scheduled, arguments);
+ return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class);
+ }
+
+ public Task findById(long id) {
+ return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?",
+ new TaskRowMapper(), id);
+ }
+
+ public List<Task> findByNameAndCompletedIsNull(String name) {
+ return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL",
+ new TaskRowMapper(), name);
+ }
+
+ public void update(Task task) {
+ jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?",
+ task.scheduled, task.lastRun, task.runCount, task.completed, task.id);
+ }
+
+ private class TaskRowMapper implements RowMapper<Task> {
+ public static final String fields = "id, queue, scheduled, last_run, run_count, completed, arguments";
+
+ public Task mapRow(ResultSet rs, int rowNum) throws SQLException {
+ return new Task(
+ rs.getLong(1),
+ rs.getString(2),
+ rs.getTimestamp(3),
+ rs.getTimestamp(4),
+ rs.getInt(5),
+ rs.getTimestamp(6),
+ asList(rs.getString(7).split(" ")));
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/main/java/io/trygvis/spring/Config.java
index 5df4dac..5dd845f 100755
--- a/src/main/java/io/trygvis/spring/Config.java
+++ b/src/main/java/io/trygvis/spring/Config.java
@@ -11,6 +11,7 @@ import org.springframework.context.annotation.*;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.*;
import org.springframework.data.jpa.repository.config.*;
+import org.springframework.jdbc.core.*;
import org.springframework.jdbc.datasource.*;
import org.springframework.orm.hibernate4.*;
import org.springframework.orm.jpa.*;
@@ -18,12 +19,13 @@ import org.springframework.transaction.*;
import org.springframework.transaction.annotation.*;
import org.springframework.transaction.support.*;
-import java.util.*;
import javax.persistence.*;
import javax.sql.*;
+import java.util.*;
import static org.hibernate.cfg.AvailableSettings.*;
import static org.hibernate.ejb.AvailableSettings.*;
+import static org.springframework.transaction.TransactionDefinition.*;
@Configuration
@ComponentScan(basePackages = "io.trygvis")
@@ -40,6 +42,11 @@ public class Config {
}};
}
+ @Bean
+ public JdbcTemplate jdbcTemplate(DataSource dataSource) {
+ return new JdbcTemplate(dataSource);
+ }
+
// public SpringBeanJobFactory springBeanJobFactory() {
// SpringBeanJobFactory factory = new SpringBeanJobFactory();
// return factory;
@@ -164,6 +171,9 @@ public class Config {
@Bean
public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) {
- return new TransactionTemplate(platformTransactionManager);
+ DefaultTransactionDefinition td = new DefaultTransactionDefinition();
+ td.setPropagationBehavior(PROPAGATION_REQUIRED);
+ td.setIsolationLevel(ISOLATION_READ_COMMITTED);
+ return new TransactionTemplate(platformTransactionManager, td);
}
}
diff --git a/src/main/resources/create.sql b/src/main/resources/create.sql
new file mode 100644
index 0000000..ed8913f
--- /dev/null
+++ b/src/main/resources/create.sql
@@ -0,0 +1,27 @@
+BEGIN;
+
+DROP TABLE IF EXISTS task;
+DROP TABLE IF EXISTS queue;
+DROP SEQUENCE IF EXISTS task_id;
+
+CREATE TABLE queue (
+ name VARCHAR(100) NOT NULL,
+ interval INTEGER NOT NULL,
+ CONSTRAINT pk_queue PRIMARY KEY (name)
+);
+
+CREATE TABLE task (
+ id INTEGER NOT NULL,
+ queue VARCHAR(100) NOT NULL,
+ scheduled TIMESTAMP NOT NULL,
+ last_run TIMESTAMP,
+ run_count INT NOT NULL,
+ completed TIMESTAMP,
+ arguments VARCHAR(100),
+ CONSTRAINT pk_task PRIMARY KEY (id),
+ CONSTRAINT fk_task__queue FOREIGN KEY (queue) REFERENCES queue (name)
+);
+
+CREATE SEQUENCE task_id;
+
+COMMIT;
diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml
index 39e3dbd..a25644b 100755
--- a/src/main/resources/logback.xml
+++ b/src/main/resources/logback.xml
@@ -5,6 +5,7 @@
<logger name="org.springframework" level="INFO"/>
<logger name="org.springframework.jdbc" level="INFO"/>
<logger name="org.springframework.orm" level="INFO"/>
+ <logger name="org.springframework.transaction" level="DEBUG"/>
<logger name="org.hibernate" level="INFO"/>
<logger name="org.hibernate.SQL" level="INFO"/>
@@ -13,7 +14,7 @@
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n</pattern>
+ <pattern>%d{HH:mm:ss.SSS} [%-15thread] %-5level %-50logger{36} - %msg%n</pattern>
</encoder>
</appender>