diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-04-20 15:43:01 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-04-20 15:43:01 +0200 |
commit | 637dddf11f5d60b35c9696914e1e2658b2ddc611 (patch) | |
tree | ef320c1950d78163dd0cc3247bee172f4e3e32ff /src/main/java/io/trygvis | |
parent | c274d9177e4a495e7b793120dfd1ce12fa5632c7 (diff) | |
download | quartz-based-queue-637dddf11f5d60b35c9696914e1e2658b2ddc611.tar.gz quartz-based-queue-637dddf11f5d60b35c9696914e1e2658b2ddc611.tar.bz2 quartz-based-queue-637dddf11f5d60b35c9696914e1e2658b2ddc611.tar.xz quartz-based-queue-637dddf11f5d60b35c9696914e1e2658b2ddc611.zip |
wip
Diffstat (limited to 'src/main/java/io/trygvis')
-rwxr-xr-x | src/main/java/io/trygvis/Main.java | 45 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/data/QueueRepository.java | 2 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/data/TaskRepository.java | 4 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/model/Queue.java | 58 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/model/Task.java | 95 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/AsyncService.java | 37 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcAsyncService.java | 194 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/JpaAsyncService.java | 257 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/Queue.java | 24 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueDao.java | 34 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/Task.java | 55 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskDao.java | 54 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/spring/Config.java | 14 |
13 files changed, 419 insertions, 454 deletions
diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java index 0a31aaa..f2f540f 100755 --- a/src/main/java/io/trygvis/Main.java +++ b/src/main/java/io/trygvis/Main.java @@ -1,17 +1,21 @@ package io.trygvis;
import io.trygvis.queue.*;
+import io.trygvis.queue.Queue;
import org.hibernate.dialect.*;
import org.slf4j.*;
import org.slf4j.bridge.*;
import org.springframework.beans.factory.annotation.*;
import org.springframework.context.support.*;
import org.springframework.stereotype.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.support.*;
import java.util.*;
import static java.lang.System.*;
import static java.lang.Thread.*;
+import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED;
@Component
public class Main {
@@ -48,7 +52,10 @@ public class Main { }
@Autowired
- private AsyncService<AsyncService.QueueRef, AsyncService.ExecutionRef> asyncService;
+ private TransactionTemplate transactionTemplate;
+
+ @Autowired
+ private AsyncService asyncService;
@Autowired
@Qualifier("createArticle")
@@ -61,35 +68,39 @@ public class Main { public void run() throws Exception {
log.info("Main.run");
- JpaAsyncService.JpaQueueRef queueRef = asyncService.registerQueue("create-queue", 10, createArticleCallable);
- log.info("queue registered: ref = {}", queueRef);
+ final Queue q = asyncService.registerQueue("create-queue", 10, createArticleCallable);
+// log.info("queue registered: ref = {}", q);
// asyncService.registerQueue("update-queue", 1, updateArticeCallable);
- AsyncService.QueueRef queue = asyncService.getQueue("create-queue");
+// q = asyncService.getQueue("create-queue");
- List<AsyncService.ExecutionRef> refs = new ArrayList<>();
+ final List<Task> tasks = new ArrayList<>();
- for (int i = 0; i < 10; i++) {
- refs.add(asyncService.schedule(queue));
- }
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ for (int i = 0; i < 1; i++) {
+ tasks.add(asyncService.schedule(q));
+ }
+ }
+ });
while (true) {
- log.info("size = {}", refs.size());
- for (Iterator<AsyncService.ExecutionRef> iterator = refs.iterator(); iterator.hasNext(); ) {
- AsyncService.ExecutionRef ref = iterator.next();
+ sleep(10000);
+
+ log.info("tasks.size = {}", tasks.size());
+ for (Iterator<Task> iterator = tasks.iterator(); iterator.hasNext(); ) {
+ Task task = iterator.next();
- ref = asyncService.update(ref);
+ task = asyncService.update(task);
- log.info("ref = {}", ref);
+ log.info("task = {}", task);
- if (ref.isDone()) {
+ if (task.isDone()) {
iterator.remove();
}
-
- sleep(100);
}
- if (refs.isEmpty()) {
+ if (tasks.isEmpty()) {
break;
}
}
diff --git a/src/main/java/io/trygvis/data/QueueRepository.java b/src/main/java/io/trygvis/data/QueueRepository.java index 143d747..47ed478 100755 --- a/src/main/java/io/trygvis/data/QueueRepository.java +++ b/src/main/java/io/trygvis/data/QueueRepository.java @@ -1,6 +1,6 @@ package io.trygvis.data;
-import io.trygvis.model.*;
+import io.trygvis.queue.*;
import org.springframework.data.jpa.repository.*;
public interface QueueRepository extends JpaRepository<Queue, Long> {
diff --git a/src/main/java/io/trygvis/data/TaskRepository.java b/src/main/java/io/trygvis/data/TaskRepository.java index e24d520..b0710e3 100755 --- a/src/main/java/io/trygvis/data/TaskRepository.java +++ b/src/main/java/io/trygvis/data/TaskRepository.java @@ -1,7 +1,7 @@ package io.trygvis.data;
-import io.trygvis.model.*;
-import io.trygvis.model.Queue;
+import io.trygvis.queue.*;
+import io.trygvis.queue.Queue;
import org.springframework.data.jpa.repository.*;
import java.util.*;
diff --git a/src/main/java/io/trygvis/model/Queue.java b/src/main/java/io/trygvis/model/Queue.java deleted file mode 100755 index aeec405..0000000 --- a/src/main/java/io/trygvis/model/Queue.java +++ /dev/null @@ -1,58 +0,0 @@ -package io.trygvis.model;
-
-import javax.persistence.*;
-
-@Entity
-@Table(
- uniqueConstraints = {
- @UniqueConstraint(name = "uq_queue__name", columnNames = "name")
- }
-)
-public class Queue {
-
- @Id
- @SequenceGenerator(name = "queue_seq", sequenceName = "queue_seq")
- @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "queue_seq")
- private Integer id;
-
- private String name;
-
- private long interval;
-
- @SuppressWarnings("UnusedDeclaration")
- private Queue() {
- }
-
- public Queue(String name, long interval) {
- this.name = name;
- this.interval = interval;
- }
-
- public Integer getId() {
- return id;
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public long getInterval() {
- return interval;
- }
-
- public void setInterval(long interval) {
- this.interval = interval;
- }
-
- public String toString() {
- return "Queue{" +
- "id=" + id +
- ", name='" + name + '\'' +
- ", interval=" + interval +
- '}';
- }
-}
diff --git a/src/main/java/io/trygvis/model/Task.java b/src/main/java/io/trygvis/model/Task.java deleted file mode 100755 index 24ac83d..0000000 --- a/src/main/java/io/trygvis/model/Task.java +++ /dev/null @@ -1,95 +0,0 @@ -package io.trygvis.model;
-
-import javax.persistence.*;
-import java.util.*;
-import java.util.regex.*;
-
-@Entity
-public class Task {
- @Id
- @SequenceGenerator(name = "task_seq", sequenceName = "task_seq")
- @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "task_seq")
- private Long id;
-
- @ManyToOne
- private Queue queue;
-
- private Date scheduled;
-
- private Date lastRun;
-
- private int runCount;
-
- private Date completed;
-
- private String arguments;
-
- private static final Pattern pattern = Pattern.compile(" ");
-
- @SuppressWarnings("UnusedDeclaration")
- private Task() {
- }
-
- public Task(Queue queue, Date scheduled, String... arguments) {
- this.queue = queue;
- this.scheduled = scheduled;
-
- StringBuilder builder = new StringBuilder(arguments.length * 100);
- for (String argument : arguments) {
- if (pattern.matcher(argument).matches()) {
- throw new RuntimeException("Bad argument: '" + argument + "'.");
- }
- builder.append(argument).append(' ');
- }
- this.arguments = builder.toString();
- }
-
- public Long getId() {
- return id;
- }
-
- public String[] getArguments() {
- return arguments.split(" ");
- }
-
- public Date getScheduled() {
- return scheduled;
- }
-
- public Date getLastRun() {
- return lastRun;
- }
-
- public int getRunCount() {
- return runCount;
- }
-
- public Date getCompleted() {
- return completed;
- }
-
- public boolean isDone() {
- return completed != null;
- }
-
- public void registerRun() {
- lastRun = new Date();
- runCount++;
- }
-
- public void registerComplete(Date completed) {
- this.completed = completed;
- }
-
- public String toString() {
- return "Task{" +
- "id=" + id +
- ", queue=" + queue +
- ", scheduled=" + scheduled +
- ", lastRun=" + lastRun +
- ", runCount=" + runCount +
- ", completed=" + completed +
- ", arguments='" + arguments + '\'' +
- '}';
- }
-}
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java index b08db1f..10f1b79 100755 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ b/src/main/java/io/trygvis/queue/AsyncService.java @@ -2,32 +2,25 @@ package io.trygvis.queue; import org.quartz.*;
-import java.util.*;
+public interface AsyncService {
-public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionRef extends AsyncService.ExecutionRef> {
+ /**
+ * @param name
+ * @param interval how often the queue should be polled for missed tasks in seconds.
+ * @param callable
+ * @return
+ * @throws SchedulerException
+ */
+ Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
- JpaAsyncService.JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
+ Queue getQueue(String name);
- QueueRef getQueue(String name);
+ Task schedule(Queue queue, String... args);
- ExecutionRef schedule(QueueRef queue, String... args);
-
- ExecutionRef update(ExecutionRef ref);
-
- interface QueueRef {
- }
-
- interface ExecutionRef {
- List<String> getArguments();
-
- Date getScheduled();
-
- Date getLastRun();
-
- Date getCompleted();
-
- boolean isDone();
- }
+ /**
+ * Polls for a new state of the execution.
+ */
+ Task update(Task ref);
interface AsyncCallable {
void run() throws Exception;
diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java new file mode 100644 index 0000000..1df0ab6 --- /dev/null +++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java @@ -0,0 +1,194 @@ +package io.trygvis.queue; + +import org.quartz.*; +import org.slf4j.*; +import org.springframework.beans.factory.annotation.*; +import org.springframework.stereotype.*; +import org.springframework.transaction.*; +import org.springframework.transaction.annotation.*; +import org.springframework.transaction.support.*; + +import java.util.*; +import java.util.concurrent.*; + +import static java.util.Arrays.*; +import static java.util.concurrent.TimeUnit.*; +import static org.springframework.transaction.annotation.Propagation.*; +import static org.springframework.transaction.support.TransactionSynchronizationManager.*; + +@Component +public class JdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + private final Map<String, QueueThread> queues = new HashMap<>(); + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private QueueDao queueDao; + + @Autowired + private TaskDao taskDao; + + @Transactional(propagation = REQUIRED) + public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { + log.info("registerQueue: ENTER"); + + Queue q = queueDao.findByName(name); + + log.info("q = {}", q); + + final long interval_; + if (q == null) { + q = new Queue(name, interval * 1000); + queueDao.insert(q); + interval_ = interval; + } else { + // Found an existing queue. Use the Settings from the database. + interval_ = q.interval; + } + + final QueueThread queueThread = new QueueThread(q, callable); + queues.put(name, queueThread); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + executor.scheduleAtFixedRate(new Runnable() { + public void run() { + queueThread.ping(); + } + }, 1000, 1000 * interval_, MILLISECONDS); + Thread thread = new Thread(queueThread, name); + thread.setDaemon(true); + thread.start(); + } + } + }); + + log.info("registerQueue: LEAVE"); + return q; + } + + public Queue getQueue(String name) { + QueueThread queueThread = queues.get(name); + + if (queueThread == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + + return queueThread.queue; + } + + @Transactional(propagation = REQUIRED) + public Task schedule(Queue queue, String... args) { + log.info("schedule: ENTER"); + + Date scheduled = new Date(); + + StringBuilder arguments = new StringBuilder(); + for (String arg : args) { + arguments.append(arg).append(' '); + } + + long id = taskDao.insert(queue.name, scheduled, arguments.toString()); + Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args)); + log.info("task = {}", task); + queues.get(queue.name).ping(); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + log.info("schedule: LEAVE"); + return task; + } + + @Transactional(readOnly = true) + public Task update(Task ref) { + return taskDao.findById(ref.id); + } + + class QueueThread implements Runnable { + public boolean shouldRun = true; + + public final Queue queue; + + private final AsyncCallable callable; + + QueueThread(Queue queue, AsyncCallable callable) { + this.queue = queue; + this.callable = callable; + } + + public void ping() { + log.info("Sending ping to " + queue); + synchronized (this) { + notify(); + } + } + + public void run() { + while (shouldRun) { + List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.name); + + try { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } catch (Exception e) { + log.warn("Error while executing tasks.", e); + } + + synchronized (this) { + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + taskDao.update(task.registerRun()); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(); + Date completed = new Date(); + Task t = task.registerComplete(completed); + log.info("Completed task: {}", t); + taskDao.update(t); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } + } + + private static class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } + } +} diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java deleted file mode 100755 index e715ac7..0000000 --- a/src/main/java/io/trygvis/queue/JpaAsyncService.java +++ /dev/null @@ -1,257 +0,0 @@ -package io.trygvis.queue;
-
-import io.trygvis.data.*;
-import io.trygvis.model.Queue;
-import io.trygvis.model.*;
-import org.quartz.*;
-import org.slf4j.*;
-import org.springframework.beans.factory.annotation.*;
-import org.springframework.stereotype.*;
-import org.springframework.transaction.*;
-import org.springframework.transaction.annotation.*;
-import org.springframework.transaction.support.*;
-
-import javax.persistence.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
-
-@SuppressWarnings("SpringJavaAutowiringInspection")
-@Component
-public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>/*, ApplicationContextAware*/ {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
- @PersistenceContext
- private EntityManager entityManager;
-
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private QueueRepository queueRepository;
-
- @Autowired
- private TaskRepository taskRepository;
-
- @Transactional
- public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
- log.info("registerQueue: ENTER");
-
- Queue q = queueRepository.findByName(name);
-
- log.info("q = {}", q);
-
- if (q == null) {
- q = new Queue(name, interval);
- q = queueRepository.save(q);
- } else {
- boolean dirty = false;
- if (interval != q.getInterval()) {
- q.setInterval(interval);
- dirty = true;
- }
-
- if (dirty) {
- q = queueRepository.save(q);
- }
- }
-
- log.info("q = {}", q);
- entityManager.flush();
-// entityManager.detach(q);
- log.info("q = {}", q);
-
- JpaQueueRef queueRef = new JpaQueueRef(q);
-
- log.info("registerQueue: LEAVE");
-
- registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef));
-
- return queueRef;
- }
-
- @Transactional(readOnly = true)
- public JpaQueueRef getQueue(String name) {
- Queue queue = queueRepository.findByName(name);
-
- if (queue == null) {
- throw new RollbackException("No such queue: '" + name + "'.");
- }
-
- entityManager.detach(queue);
-
- return new JpaQueueRef(queue);
- }
-
- @Transactional
- public JpaExecutionRef schedule(JpaQueueRef queue, String... args) {
- log.info("schedule: ENTER");
- Date scheduled = new Date();
- Task task = new Task(queue.queue, scheduled, args);
- log.info("task = {}", task);
- taskRepository.save(task);
- log.info("task = {}", task);
-// entityManager.detach(task);
- log.info("schedule: LEAVE");
- return new JpaExecutionRef(task);
- }
-
- @Transactional(readOnly = true)
- public JpaExecutionRef update(JpaExecutionRef ref) {
- return new JpaExecutionRef(taskRepository.findOne(ref.task.getId()));
- }
-
- public static class JpaQueueRef implements AsyncService.QueueRef {
- public final Queue queue;
-
- JpaQueueRef(Queue queue) {
- this.queue = queue;
- }
-
- public String toString() {
- return "JpaQueueRef{" +
- "queue=" + queue +
- '}';
- }
- }
-
- public static class JpaExecutionRef implements AsyncService.ExecutionRef {
- private final Task task;
-
- public JpaExecutionRef(Task task) {
- this.task = task;
- }
-
- public List<String> getArguments() {
- return Arrays.asList(task.getArguments());
- }
-
- public Date getScheduled() {
- return task.getScheduled();
- }
-
- public Date getLastRun() {
- return task.getLastRun();
- }
-
- public Date getCompleted() {
- return task.getCompleted();
- }
-
- public boolean isDone() {
- return task.isDone();
- }
-
- public String toString() {
- return "JpaExecutionRef{" +
- "task=" + task +
- '}';
- }
- }
-
- private static class TaskFailureException extends RuntimeException {
- public TaskFailureException(Exception e) {
- super(e);
- }
- }
-
- private class CheckTimerTask implements Runnable {
- private final AsyncCallable callable;
- private final JpaQueueRef queueRef;
-
- private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) {
- this.callable = callable;
- this.queueRef = queueRef;
- }
-
- public void run() {
- log.info("JpaAsyncService$CheckTimerTask.run");
-
- List<Task> tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue);
-
- System.out.println("tasks.size() = " + tasks.size());
-
- try {
- for (final Task task : tasks) {
- try {
- executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
- log.warn("Task execution failed", e);
- }
- }
- } catch (Exception e) {
- log.warn("Error while executing tasks.", e);
- }
- }
-
- private void executeTask(final Task task) {
- final Date run = new Date();
- log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- task.registerRun();
- taskRepository.save(task);
- }
- });
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- try {
- callable.run();
- Date completed = new Date();
- log.info("Setting completed on task. date = {}, task = {}", completed, task);
- task.registerComplete(completed);
- taskRepository.save(task);
- } catch (Exception e) {
- throw new TaskFailureException(e);
- }
- }
- });
- }
- }
-
- private class MyTransactionSynchronization implements TransactionSynchronization {
-
- private final AsyncCallable callable;
-
- private final int interval;
-
- private final JpaQueueRef queueRef;
-
- public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) {
- this.callable = callable;
- this.interval = interval;
- this.queueRef = queueRef;
- }
-
- public void suspend() {
- }
-
- public void resume() {
- }
-
- public void flush() {
- }
-
- public void beforeCommit(boolean readOnly) {
- }
-
- public void beforeCompletion() {
- }
-
- public void afterCommit() {
- }
-
- public void afterCompletion(int status) {
- log.info("status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS);
- }
- }
- }
-}
diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java new file mode 100755 index 0000000..15003f7 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Queue.java @@ -0,0 +1,24 @@ +package io.trygvis.queue; + +public class Queue { + + public final String name; + + public final long interval; + + public Queue(String name, long interval) { + this.name = name; + this.interval = interval; + } + + public Queue withInterval(int interval) { + return new Queue(name, interval); + } + + public String toString() { + return "Queue{" + + "name='" + name + '\'' + + ", interval=" + interval + + '}'; + } +} diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java new file mode 100644 index 0000000..a3a79ca --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueDao.java @@ -0,0 +1,34 @@ +package io.trygvis.queue; + +import org.springframework.beans.factory.annotation.*; +import org.springframework.jdbc.core.*; +import org.springframework.stereotype.*; + +import java.sql.*; + +import static org.springframework.dao.support.DataAccessUtils.*; + +@Component +public class QueueDao { + + @Autowired + private JdbcTemplate jdbcTemplate; + + public Queue findByName(String name) { + return singleResult(jdbcTemplate.query("SELECT name, interval FROM queue WHERE name=?", new QueueRowMapper(), name)); + } + + public void insert(Queue q) { + jdbcTemplate.update("INSERT INTO queue(name, interval) VALUES(?, ?)", q.name, q.interval); + } + + public void update(Queue q) { + jdbcTemplate.update("UPDATE queue SET interval=? WHERE name=?", q.interval, q.name); + } + + private class QueueRowMapper implements RowMapper<Queue> { + public Queue mapRow(ResultSet rs, int rowNum) throws SQLException { + return new Queue(rs.getString(1), rs.getLong(2)); + } + } +} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java new file mode 100755 index 0000000..7f64c77 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Task.java @@ -0,0 +1,55 @@ +package io.trygvis.queue; + +import java.util.*; + +public class Task { + + public final long id; + + public final String queue; + + public final Date scheduled; + + public final Date lastRun; + + public final int runCount; + + public final Date completed; + + public final List<String> arguments; + + Task(long id, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) { + this.id = id; + this.queue = queue; + this.scheduled = scheduled; + this.lastRun = lastRun; + this.runCount = runCount; + this.completed = completed; + + this.arguments = arguments; + } + + public Task registerRun() { + return new Task(id, queue, scheduled, new Date(), runCount + 1, completed, arguments); + } + + public Task registerComplete(Date completed) { + return new Task(id, queue, scheduled, lastRun, runCount, new Date(), arguments); + } + + public String toString() { + return "Task{" + + "id=" + id + + ", queue=" + queue + + ", scheduled=" + scheduled + + ", lastRun=" + lastRun + + ", runCount=" + runCount + + ", completed=" + completed + + ", arguments='" + arguments + '\'' + + '}'; + } + + public boolean isDone() { + return completed != null; + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java new file mode 100644 index 0000000..2e407a5 --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -0,0 +1,54 @@ +package io.trygvis.queue; + +import org.springframework.beans.factory.annotation.*; +import org.springframework.jdbc.core.*; +import org.springframework.stereotype.*; + +import java.sql.*; +import java.util.Date; +import java.util.*; + +import static java.util.Arrays.*; + +@Component +public class TaskDao { + + @Autowired + private JdbcTemplate jdbcTemplate; + + public long insert(String queue, Date scheduled, String arguments) { + jdbcTemplate.update("INSERT INTO task(id, run_count, queue, scheduled, arguments) " + + "VALUES(nextval('task_seq'), 0, ?, ?, ?)", queue, scheduled, arguments); + return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class); + } + + public Task findById(long id) { + return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?", + new TaskRowMapper(), id); + } + + public List<Task> findByNameAndCompletedIsNull(String name) { + return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL", + new TaskRowMapper(), name); + } + + public void update(Task task) { + jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?", + task.scheduled, task.lastRun, task.runCount, task.completed, task.id); + } + + private class TaskRowMapper implements RowMapper<Task> { + public static final String fields = "id, queue, scheduled, last_run, run_count, completed, arguments"; + + public Task mapRow(ResultSet rs, int rowNum) throws SQLException { + return new Task( + rs.getLong(1), + rs.getString(2), + rs.getTimestamp(3), + rs.getTimestamp(4), + rs.getInt(5), + rs.getTimestamp(6), + asList(rs.getString(7).split(" "))); + } + } +} diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/main/java/io/trygvis/spring/Config.java index 5df4dac..5dd845f 100755 --- a/src/main/java/io/trygvis/spring/Config.java +++ b/src/main/java/io/trygvis/spring/Config.java @@ -11,6 +11,7 @@ import org.springframework.context.annotation.*; import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.*;
import org.springframework.data.jpa.repository.config.*;
+import org.springframework.jdbc.core.*;
import org.springframework.jdbc.datasource.*;
import org.springframework.orm.hibernate4.*;
import org.springframework.orm.jpa.*;
@@ -18,12 +19,13 @@ import org.springframework.transaction.*; import org.springframework.transaction.annotation.*;
import org.springframework.transaction.support.*;
-import java.util.*;
import javax.persistence.*;
import javax.sql.*;
+import java.util.*;
import static org.hibernate.cfg.AvailableSettings.*;
import static org.hibernate.ejb.AvailableSettings.*;
+import static org.springframework.transaction.TransactionDefinition.*;
@Configuration
@ComponentScan(basePackages = "io.trygvis")
@@ -40,6 +42,11 @@ public class Config { }};
}
+ @Bean
+ public JdbcTemplate jdbcTemplate(DataSource dataSource) {
+ return new JdbcTemplate(dataSource);
+ }
+
// public SpringBeanJobFactory springBeanJobFactory() {
// SpringBeanJobFactory factory = new SpringBeanJobFactory();
// return factory;
@@ -164,6 +171,9 @@ public class Config { @Bean
public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) {
- return new TransactionTemplate(platformTransactionManager);
+ DefaultTransactionDefinition td = new DefaultTransactionDefinition();
+ td.setPropagationBehavior(PROPAGATION_REQUIRED);
+ td.setIsolationLevel(ISOLATION_READ_COMMITTED);
+ return new TransactionTemplate(platformTransactionManager, td);
}
}
|