From 637dddf11f5d60b35c9696914e1e2658b2ddc611 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 20 Apr 2013 15:43:01 +0200 Subject: wip --- JpaAsyncService.java | 242 +++++++++++++++++++ src/main/java/io/trygvis/Main.java | 45 ++-- src/main/java/io/trygvis/data/QueueRepository.java | 2 +- src/main/java/io/trygvis/data/TaskRepository.java | 4 +- src/main/java/io/trygvis/model/Queue.java | 58 ----- src/main/java/io/trygvis/model/Task.java | 95 -------- src/main/java/io/trygvis/queue/AsyncService.java | 37 ++- .../java/io/trygvis/queue/JdbcAsyncService.java | 194 ++++++++++++++++ .../java/io/trygvis/queue/JpaAsyncService.java | 257 --------------------- src/main/java/io/trygvis/queue/Queue.java | 24 ++ src/main/java/io/trygvis/queue/QueueDao.java | 34 +++ src/main/java/io/trygvis/queue/Task.java | 55 +++++ src/main/java/io/trygvis/queue/TaskDao.java | 54 +++++ src/main/java/io/trygvis/spring/Config.java | 14 +- src/main/resources/create.sql | 27 +++ src/main/resources/logback.xml | 3 +- 16 files changed, 690 insertions(+), 455 deletions(-) create mode 100755 JpaAsyncService.java delete mode 100755 src/main/java/io/trygvis/model/Queue.java delete mode 100755 src/main/java/io/trygvis/model/Task.java create mode 100644 src/main/java/io/trygvis/queue/JdbcAsyncService.java delete mode 100755 src/main/java/io/trygvis/queue/JpaAsyncService.java create mode 100755 src/main/java/io/trygvis/queue/Queue.java create mode 100644 src/main/java/io/trygvis/queue/QueueDao.java create mode 100755 src/main/java/io/trygvis/queue/Task.java create mode 100644 src/main/java/io/trygvis/queue/TaskDao.java create mode 100644 src/main/resources/create.sql diff --git a/JpaAsyncService.java b/JpaAsyncService.java new file mode 100755 index 0000000..eaef782 --- /dev/null +++ b/JpaAsyncService.java @@ -0,0 +1,242 @@ +package io.trygvis.queue; + +import static java.util.concurrent.TimeUnit.*; +import static org.springframework.transaction.support.TransactionSynchronizationManager.*; + +@SuppressWarnings("SpringJavaAutowiringInspection") +@Component +public class JpaAsyncService implements AsyncService/*, ApplicationContextAware*/ { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + @PersistenceContext + private EntityManager entityManager; + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private QueueRepository queueRepository; + + @Autowired + private TaskRepository taskRepository; + + @Transactional + public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { + log.info("registerQueue: ENTER"); + + Queue q = queueRepository.findByName(name); + + log.info("q = {}", q); + + if (q == null) { + q = new Queue(name, interval); + q = queueRepository.save(q); + } else { + boolean dirty = false; + if (interval != q.getInterval()) { + q.setInterval(interval); + dirty = true; + } + + if (dirty) { + q = queueRepository.save(q); + } + } + + log.info("q = {}", q); + entityManager.flush(); +// entityManager.detach(q); + log.info("q = {}", q); + + JpaQueueRef queueRef = new JpaQueueRef(q); + + log.info("registerQueue: LEAVE"); + + registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef)); + + return queueRef; + } + + @Transactional(readOnly = true) + public JpaQueueRef getQueue(String name) { + Queue queue = queueRepository.findByName(name); + + if (queue == null) { + throw new RollbackException("No such queue: '" + name + "'."); + } + + entityManager.detach(queue); + + return new JpaQueueRef(queue); + } + + @Transactional + public JpaExecutionRef schedule(JpaQueueRef queue, String... args) { + log.info("schedule: ENTER"); + Date scheduled = new Date(); + Task task = new Task(queue.queue, scheduled, args); + log.info("task = {}", task); + taskRepository.save(task); + log.info("task = {}", task); +// entityManager.detach(task); + log.info("schedule: LEAVE"); + return new JpaExecutionRef(task); + } + + @Transactional(readOnly = true) + public JpaExecutionRef update(JpaExecutionRef ref) { + return new JpaExecutionRef(taskRepository.findOne(ref.task.getId())); + } + + public static class JpaQueueRef implements AsyncService.QueueRef { + public final Queue queue; + + JpaQueueRef(Queue queue) { + this.queue = queue; + } + + public String toString() { + return "JpaQueueRef{" + + "queue=" + queue + + '}'; + } + } + + public static class JpaExecutionRef implements AsyncService.ExecutionRef { + private final Task task; + + public JpaExecutionRef(Task task) { + this.task = task; + } + + public List getArguments() { + return Arrays.asList(task.getArguments()); + } + + public Date getScheduled() { + return task.getScheduled(); + } + + public Date getLastRun() { + return task.getLastRun(); + } + + public Date getCompleted() { + return task.getCompleted(); + } + + public boolean isDone() { + return task.isDone(); + } + + public String toString() { + return "JpaExecutionRef{" + + "task=" + task + + '}'; + } + } + + private static class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } + } + + private class CheckTimerTask implements Runnable { + private final AsyncCallable callable; + private final JpaQueueRef queueRef; + + private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) { + this.callable = callable; + this.queueRef = queueRef; + } + + public void run() { + log.info("JpaAsyncService$CheckTimerTask.run"); + + List tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue); + + System.out.println("tasks.size() = " + tasks.size()); + + try { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } catch (Exception e) { + log.warn("Error while executing tasks.", e); + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + task.registerRun(); + taskRepository.save(task); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(); + Date completed = new Date(); + log.info("Setting completed on task. date = {}, task = {}", completed, task); + task.registerComplete(completed); + taskRepository.save(task); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } + } + + private class MyTransactionSynchronization implements TransactionSynchronization { + + private final AsyncCallable callable; + + private final int interval; + + private final JpaQueueRef queueRef; + + public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) { + this.callable = callable; + this.interval = interval; + this.queueRef = queueRef; + } + + public void suspend() { + } + + public void resume() { + } + + public void flush() { + } + + public void beforeCommit(boolean readOnly) { + } + + public void beforeCompletion() { + } + + public void afterCommit() { + } + + public void afterCompletion(int status) { + log.info("status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS); + } + } + } +} diff --git a/src/main/java/io/trygvis/Main.java b/src/main/java/io/trygvis/Main.java index 0a31aaa..f2f540f 100755 --- a/src/main/java/io/trygvis/Main.java +++ b/src/main/java/io/trygvis/Main.java @@ -1,17 +1,21 @@ package io.trygvis; import io.trygvis.queue.*; +import io.trygvis.queue.Queue; import org.hibernate.dialect.*; import org.slf4j.*; import org.slf4j.bridge.*; import org.springframework.beans.factory.annotation.*; import org.springframework.context.support.*; import org.springframework.stereotype.*; +import org.springframework.transaction.*; +import org.springframework.transaction.support.*; import java.util.*; import static java.lang.System.*; import static java.lang.Thread.*; +import static org.springframework.transaction.TransactionDefinition.PROPAGATION_REQUIRED; @Component public class Main { @@ -48,7 +52,10 @@ public class Main { } @Autowired - private AsyncService asyncService; + private TransactionTemplate transactionTemplate; + + @Autowired + private AsyncService asyncService; @Autowired @Qualifier("createArticle") @@ -61,35 +68,39 @@ public class Main { public void run() throws Exception { log.info("Main.run"); - JpaAsyncService.JpaQueueRef queueRef = asyncService.registerQueue("create-queue", 10, createArticleCallable); - log.info("queue registered: ref = {}", queueRef); + final Queue q = asyncService.registerQueue("create-queue", 10, createArticleCallable); +// log.info("queue registered: ref = {}", q); // asyncService.registerQueue("update-queue", 1, updateArticeCallable); - AsyncService.QueueRef queue = asyncService.getQueue("create-queue"); +// q = asyncService.getQueue("create-queue"); - List refs = new ArrayList<>(); + final List tasks = new ArrayList<>(); - for (int i = 0; i < 10; i++) { - refs.add(asyncService.schedule(queue)); - } + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + for (int i = 0; i < 1; i++) { + tasks.add(asyncService.schedule(q)); + } + } + }); while (true) { - log.info("size = {}", refs.size()); - for (Iterator iterator = refs.iterator(); iterator.hasNext(); ) { - AsyncService.ExecutionRef ref = iterator.next(); + sleep(10000); + + log.info("tasks.size = {}", tasks.size()); + for (Iterator iterator = tasks.iterator(); iterator.hasNext(); ) { + Task task = iterator.next(); - ref = asyncService.update(ref); + task = asyncService.update(task); - log.info("ref = {}", ref); + log.info("task = {}", task); - if (ref.isDone()) { + if (task.isDone()) { iterator.remove(); } - - sleep(100); } - if (refs.isEmpty()) { + if (tasks.isEmpty()) { break; } } diff --git a/src/main/java/io/trygvis/data/QueueRepository.java b/src/main/java/io/trygvis/data/QueueRepository.java index 143d747..47ed478 100755 --- a/src/main/java/io/trygvis/data/QueueRepository.java +++ b/src/main/java/io/trygvis/data/QueueRepository.java @@ -1,6 +1,6 @@ package io.trygvis.data; -import io.trygvis.model.*; +import io.trygvis.queue.*; import org.springframework.data.jpa.repository.*; public interface QueueRepository extends JpaRepository { diff --git a/src/main/java/io/trygvis/data/TaskRepository.java b/src/main/java/io/trygvis/data/TaskRepository.java index e24d520..b0710e3 100755 --- a/src/main/java/io/trygvis/data/TaskRepository.java +++ b/src/main/java/io/trygvis/data/TaskRepository.java @@ -1,7 +1,7 @@ package io.trygvis.data; -import io.trygvis.model.*; -import io.trygvis.model.Queue; +import io.trygvis.queue.*; +import io.trygvis.queue.Queue; import org.springframework.data.jpa.repository.*; import java.util.*; diff --git a/src/main/java/io/trygvis/model/Queue.java b/src/main/java/io/trygvis/model/Queue.java deleted file mode 100755 index aeec405..0000000 --- a/src/main/java/io/trygvis/model/Queue.java +++ /dev/null @@ -1,58 +0,0 @@ -package io.trygvis.model; - -import javax.persistence.*; - -@Entity -@Table( - uniqueConstraints = { - @UniqueConstraint(name = "uq_queue__name", columnNames = "name") - } -) -public class Queue { - - @Id - @SequenceGenerator(name = "queue_seq", sequenceName = "queue_seq") - @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "queue_seq") - private Integer id; - - private String name; - - private long interval; - - @SuppressWarnings("UnusedDeclaration") - private Queue() { - } - - public Queue(String name, long interval) { - this.name = name; - this.interval = interval; - } - - public Integer getId() { - return id; - } - - public String getName() { - return name; - } - - public void setName(String name) { - this.name = name; - } - - public long getInterval() { - return interval; - } - - public void setInterval(long interval) { - this.interval = interval; - } - - public String toString() { - return "Queue{" + - "id=" + id + - ", name='" + name + '\'' + - ", interval=" + interval + - '}'; - } -} diff --git a/src/main/java/io/trygvis/model/Task.java b/src/main/java/io/trygvis/model/Task.java deleted file mode 100755 index 24ac83d..0000000 --- a/src/main/java/io/trygvis/model/Task.java +++ /dev/null @@ -1,95 +0,0 @@ -package io.trygvis.model; - -import javax.persistence.*; -import java.util.*; -import java.util.regex.*; - -@Entity -public class Task { - @Id - @SequenceGenerator(name = "task_seq", sequenceName = "task_seq") - @GeneratedValue(strategy = GenerationType.SEQUENCE, generator = "task_seq") - private Long id; - - @ManyToOne - private Queue queue; - - private Date scheduled; - - private Date lastRun; - - private int runCount; - - private Date completed; - - private String arguments; - - private static final Pattern pattern = Pattern.compile(" "); - - @SuppressWarnings("UnusedDeclaration") - private Task() { - } - - public Task(Queue queue, Date scheduled, String... arguments) { - this.queue = queue; - this.scheduled = scheduled; - - StringBuilder builder = new StringBuilder(arguments.length * 100); - for (String argument : arguments) { - if (pattern.matcher(argument).matches()) { - throw new RuntimeException("Bad argument: '" + argument + "'."); - } - builder.append(argument).append(' '); - } - this.arguments = builder.toString(); - } - - public Long getId() { - return id; - } - - public String[] getArguments() { - return arguments.split(" "); - } - - public Date getScheduled() { - return scheduled; - } - - public Date getLastRun() { - return lastRun; - } - - public int getRunCount() { - return runCount; - } - - public Date getCompleted() { - return completed; - } - - public boolean isDone() { - return completed != null; - } - - public void registerRun() { - lastRun = new Date(); - runCount++; - } - - public void registerComplete(Date completed) { - this.completed = completed; - } - - public String toString() { - return "Task{" + - "id=" + id + - ", queue=" + queue + - ", scheduled=" + scheduled + - ", lastRun=" + lastRun + - ", runCount=" + runCount + - ", completed=" + completed + - ", arguments='" + arguments + '\'' + - '}'; - } -} diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java index b08db1f..10f1b79 100755 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ b/src/main/java/io/trygvis/queue/AsyncService.java @@ -2,32 +2,25 @@ package io.trygvis.queue; import org.quartz.*; -import java.util.*; +public interface AsyncService { -public interface AsyncService { + /** + * @param name + * @param interval how often the queue should be polled for missed tasks in seconds. + * @param callable + * @return + * @throws SchedulerException + */ + Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; - JpaAsyncService.JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException; + Queue getQueue(String name); - QueueRef getQueue(String name); + Task schedule(Queue queue, String... args); - ExecutionRef schedule(QueueRef queue, String... args); - - ExecutionRef update(ExecutionRef ref); - - interface QueueRef { - } - - interface ExecutionRef { - List getArguments(); - - Date getScheduled(); - - Date getLastRun(); - - Date getCompleted(); - - boolean isDone(); - } + /** + * Polls for a new state of the execution. + */ + Task update(Task ref); interface AsyncCallable { void run() throws Exception; diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java new file mode 100644 index 0000000..1df0ab6 --- /dev/null +++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java @@ -0,0 +1,194 @@ +package io.trygvis.queue; + +import org.quartz.*; +import org.slf4j.*; +import org.springframework.beans.factory.annotation.*; +import org.springframework.stereotype.*; +import org.springframework.transaction.*; +import org.springframework.transaction.annotation.*; +import org.springframework.transaction.support.*; + +import java.util.*; +import java.util.concurrent.*; + +import static java.util.Arrays.*; +import static java.util.concurrent.TimeUnit.*; +import static org.springframework.transaction.annotation.Propagation.*; +import static org.springframework.transaction.support.TransactionSynchronizationManager.*; + +@Component +public class JdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + private final Map queues = new HashMap<>(); + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private QueueDao queueDao; + + @Autowired + private TaskDao taskDao; + + @Transactional(propagation = REQUIRED) + public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { + log.info("registerQueue: ENTER"); + + Queue q = queueDao.findByName(name); + + log.info("q = {}", q); + + final long interval_; + if (q == null) { + q = new Queue(name, interval * 1000); + queueDao.insert(q); + interval_ = interval; + } else { + // Found an existing queue. Use the Settings from the database. + interval_ = q.interval; + } + + final QueueThread queueThread = new QueueThread(q, callable); + queues.put(name, queueThread); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + executor.scheduleAtFixedRate(new Runnable() { + public void run() { + queueThread.ping(); + } + }, 1000, 1000 * interval_, MILLISECONDS); + Thread thread = new Thread(queueThread, name); + thread.setDaemon(true); + thread.start(); + } + } + }); + + log.info("registerQueue: LEAVE"); + return q; + } + + public Queue getQueue(String name) { + QueueThread queueThread = queues.get(name); + + if (queueThread == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + + return queueThread.queue; + } + + @Transactional(propagation = REQUIRED) + public Task schedule(Queue queue, String... args) { + log.info("schedule: ENTER"); + + Date scheduled = new Date(); + + StringBuilder arguments = new StringBuilder(); + for (String arg : args) { + arguments.append(arg).append(' '); + } + + long id = taskDao.insert(queue.name, scheduled, arguments.toString()); + Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args)); + log.info("task = {}", task); + queues.get(queue.name).ping(); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + log.info("schedule: LEAVE"); + return task; + } + + @Transactional(readOnly = true) + public Task update(Task ref) { + return taskDao.findById(ref.id); + } + + class QueueThread implements Runnable { + public boolean shouldRun = true; + + public final Queue queue; + + private final AsyncCallable callable; + + QueueThread(Queue queue, AsyncCallable callable) { + this.queue = queue; + this.callable = callable; + } + + public void ping() { + log.info("Sending ping to " + queue); + synchronized (this) { + notify(); + } + } + + public void run() { + while (shouldRun) { + List tasks = taskDao.findByNameAndCompletedIsNull(queue.name); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.name); + + try { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } catch (Exception e) { + log.warn("Error while executing tasks.", e); + } + + synchronized (this) { + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + taskDao.update(task.registerRun()); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(); + Date completed = new Date(); + Task t = task.registerComplete(completed); + log.info("Completed task: {}", t); + taskDao.update(t); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } + } + + private static class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } + } +} diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java deleted file mode 100755 index e715ac7..0000000 --- a/src/main/java/io/trygvis/queue/JpaAsyncService.java +++ /dev/null @@ -1,257 +0,0 @@ -package io.trygvis.queue; - -import io.trygvis.data.*; -import io.trygvis.model.Queue; -import io.trygvis.model.*; -import org.quartz.*; -import org.slf4j.*; -import org.springframework.beans.factory.annotation.*; -import org.springframework.stereotype.*; -import org.springframework.transaction.*; -import org.springframework.transaction.annotation.*; -import org.springframework.transaction.support.*; - -import javax.persistence.*; -import java.util.*; -import java.util.concurrent.*; - -import static java.util.concurrent.TimeUnit.*; -import static org.springframework.transaction.support.TransactionSynchronizationManager.*; - -@SuppressWarnings("SpringJavaAutowiringInspection") -@Component -public class JpaAsyncService implements AsyncService/*, ApplicationContextAware*/ { - - private final Logger log = LoggerFactory.getLogger(getClass()); - - private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); - - @PersistenceContext - private EntityManager entityManager; - - @Autowired - private TransactionTemplate transactionTemplate; - - @Autowired - private QueueRepository queueRepository; - - @Autowired - private TaskRepository taskRepository; - - @Transactional - public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException { - log.info("registerQueue: ENTER"); - - Queue q = queueRepository.findByName(name); - - log.info("q = {}", q); - - if (q == null) { - q = new Queue(name, interval); - q = queueRepository.save(q); - } else { - boolean dirty = false; - if (interval != q.getInterval()) { - q.setInterval(interval); - dirty = true; - } - - if (dirty) { - q = queueRepository.save(q); - } - } - - log.info("q = {}", q); - entityManager.flush(); -// entityManager.detach(q); - log.info("q = {}", q); - - JpaQueueRef queueRef = new JpaQueueRef(q); - - log.info("registerQueue: LEAVE"); - - registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef)); - - return queueRef; - } - - @Transactional(readOnly = true) - public JpaQueueRef getQueue(String name) { - Queue queue = queueRepository.findByName(name); - - if (queue == null) { - throw new RollbackException("No such queue: '" + name + "'."); - } - - entityManager.detach(queue); - - return new JpaQueueRef(queue); - } - - @Transactional - public JpaExecutionRef schedule(JpaQueueRef queue, String... args) { - log.info("schedule: ENTER"); - Date scheduled = new Date(); - Task task = new Task(queue.queue, scheduled, args); - log.info("task = {}", task); - taskRepository.save(task); - log.info("task = {}", task); -// entityManager.detach(task); - log.info("schedule: LEAVE"); - return new JpaExecutionRef(task); - } - - @Transactional(readOnly = true) - public JpaExecutionRef update(JpaExecutionRef ref) { - return new JpaExecutionRef(taskRepository.findOne(ref.task.getId())); - } - - public static class JpaQueueRef implements AsyncService.QueueRef { - public final Queue queue; - - JpaQueueRef(Queue queue) { - this.queue = queue; - } - - public String toString() { - return "JpaQueueRef{" + - "queue=" + queue + - '}'; - } - } - - public static class JpaExecutionRef implements AsyncService.ExecutionRef { - private final Task task; - - public JpaExecutionRef(Task task) { - this.task = task; - } - - public List getArguments() { - return Arrays.asList(task.getArguments()); - } - - public Date getScheduled() { - return task.getScheduled(); - } - - public Date getLastRun() { - return task.getLastRun(); - } - - public Date getCompleted() { - return task.getCompleted(); - } - - public boolean isDone() { - return task.isDone(); - } - - public String toString() { - return "JpaExecutionRef{" + - "task=" + task + - '}'; - } - } - - private static class TaskFailureException extends RuntimeException { - public TaskFailureException(Exception e) { - super(e); - } - } - - private class CheckTimerTask implements Runnable { - private final AsyncCallable callable; - private final JpaQueueRef queueRef; - - private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) { - this.callable = callable; - this.queueRef = queueRef; - } - - public void run() { - log.info("JpaAsyncService$CheckTimerTask.run"); - - List tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue); - - System.out.println("tasks.size() = " + tasks.size()); - - try { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (TransactionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } - } catch (Exception e) { - log.warn("Error while executing tasks.", e); - } - } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - task.registerRun(); - taskRepository.save(task); - } - }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - try { - callable.run(); - Date completed = new Date(); - log.info("Setting completed on task. date = {}, task = {}", completed, task); - task.registerComplete(completed); - taskRepository.save(task); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); - } - } - - private class MyTransactionSynchronization implements TransactionSynchronization { - - private final AsyncCallable callable; - - private final int interval; - - private final JpaQueueRef queueRef; - - public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) { - this.callable = callable; - this.interval = interval; - this.queueRef = queueRef; - } - - public void suspend() { - } - - public void resume() { - } - - public void flush() { - } - - public void beforeCommit(boolean readOnly) { - } - - public void beforeCompletion() { - } - - public void afterCommit() { - } - - public void afterCompletion(int status) { - log.info("status = {}", status); - if (status == TransactionSynchronization.STATUS_COMMITTED) { - executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS); - } - } - } -} diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java new file mode 100755 index 0000000..15003f7 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Queue.java @@ -0,0 +1,24 @@ +package io.trygvis.queue; + +public class Queue { + + public final String name; + + public final long interval; + + public Queue(String name, long interval) { + this.name = name; + this.interval = interval; + } + + public Queue withInterval(int interval) { + return new Queue(name, interval); + } + + public String toString() { + return "Queue{" + + "name='" + name + '\'' + + ", interval=" + interval + + '}'; + } +} diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java new file mode 100644 index 0000000..a3a79ca --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueDao.java @@ -0,0 +1,34 @@ +package io.trygvis.queue; + +import org.springframework.beans.factory.annotation.*; +import org.springframework.jdbc.core.*; +import org.springframework.stereotype.*; + +import java.sql.*; + +import static org.springframework.dao.support.DataAccessUtils.*; + +@Component +public class QueueDao { + + @Autowired + private JdbcTemplate jdbcTemplate; + + public Queue findByName(String name) { + return singleResult(jdbcTemplate.query("SELECT name, interval FROM queue WHERE name=?", new QueueRowMapper(), name)); + } + + public void insert(Queue q) { + jdbcTemplate.update("INSERT INTO queue(name, interval) VALUES(?, ?)", q.name, q.interval); + } + + public void update(Queue q) { + jdbcTemplate.update("UPDATE queue SET interval=? WHERE name=?", q.interval, q.name); + } + + private class QueueRowMapper implements RowMapper { + public Queue mapRow(ResultSet rs, int rowNum) throws SQLException { + return new Queue(rs.getString(1), rs.getLong(2)); + } + } +} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java new file mode 100755 index 0000000..7f64c77 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Task.java @@ -0,0 +1,55 @@ +package io.trygvis.queue; + +import java.util.*; + +public class Task { + + public final long id; + + public final String queue; + + public final Date scheduled; + + public final Date lastRun; + + public final int runCount; + + public final Date completed; + + public final List arguments; + + Task(long id, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List arguments) { + this.id = id; + this.queue = queue; + this.scheduled = scheduled; + this.lastRun = lastRun; + this.runCount = runCount; + this.completed = completed; + + this.arguments = arguments; + } + + public Task registerRun() { + return new Task(id, queue, scheduled, new Date(), runCount + 1, completed, arguments); + } + + public Task registerComplete(Date completed) { + return new Task(id, queue, scheduled, lastRun, runCount, new Date(), arguments); + } + + public String toString() { + return "Task{" + + "id=" + id + + ", queue=" + queue + + ", scheduled=" + scheduled + + ", lastRun=" + lastRun + + ", runCount=" + runCount + + ", completed=" + completed + + ", arguments='" + arguments + '\'' + + '}'; + } + + public boolean isDone() { + return completed != null; + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java new file mode 100644 index 0000000..2e407a5 --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -0,0 +1,54 @@ +package io.trygvis.queue; + +import org.springframework.beans.factory.annotation.*; +import org.springframework.jdbc.core.*; +import org.springframework.stereotype.*; + +import java.sql.*; +import java.util.Date; +import java.util.*; + +import static java.util.Arrays.*; + +@Component +public class TaskDao { + + @Autowired + private JdbcTemplate jdbcTemplate; + + public long insert(String queue, Date scheduled, String arguments) { + jdbcTemplate.update("INSERT INTO task(id, run_count, queue, scheduled, arguments) " + + "VALUES(nextval('task_seq'), 0, ?, ?, ?)", queue, scheduled, arguments); + return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class); + } + + public Task findById(long id) { + return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?", + new TaskRowMapper(), id); + } + + public List findByNameAndCompletedIsNull(String name) { + return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL", + new TaskRowMapper(), name); + } + + public void update(Task task) { + jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?", + task.scheduled, task.lastRun, task.runCount, task.completed, task.id); + } + + private class TaskRowMapper implements RowMapper { + public static final String fields = "id, queue, scheduled, last_run, run_count, completed, arguments"; + + public Task mapRow(ResultSet rs, int rowNum) throws SQLException { + return new Task( + rs.getLong(1), + rs.getString(2), + rs.getTimestamp(3), + rs.getTimestamp(4), + rs.getInt(5), + rs.getTimestamp(6), + asList(rs.getString(7).split(" "))); + } + } +} diff --git a/src/main/java/io/trygvis/spring/Config.java b/src/main/java/io/trygvis/spring/Config.java index 5df4dac..5dd845f 100755 --- a/src/main/java/io/trygvis/spring/Config.java +++ b/src/main/java/io/trygvis/spring/Config.java @@ -11,6 +11,7 @@ import org.springframework.context.annotation.*; import org.springframework.context.annotation.Configuration; import org.springframework.context.support.*; import org.springframework.data.jpa.repository.config.*; +import org.springframework.jdbc.core.*; import org.springframework.jdbc.datasource.*; import org.springframework.orm.hibernate4.*; import org.springframework.orm.jpa.*; @@ -18,12 +19,13 @@ import org.springframework.transaction.*; import org.springframework.transaction.annotation.*; import org.springframework.transaction.support.*; -import java.util.*; import javax.persistence.*; import javax.sql.*; +import java.util.*; import static org.hibernate.cfg.AvailableSettings.*; import static org.hibernate.ejb.AvailableSettings.*; +import static org.springframework.transaction.TransactionDefinition.*; @Configuration @ComponentScan(basePackages = "io.trygvis") @@ -40,6 +42,11 @@ public class Config { }}; } + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } + // public SpringBeanJobFactory springBeanJobFactory() { // SpringBeanJobFactory factory = new SpringBeanJobFactory(); // return factory; @@ -164,6 +171,9 @@ public class Config { @Bean public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) { - return new TransactionTemplate(platformTransactionManager); + DefaultTransactionDefinition td = new DefaultTransactionDefinition(); + td.setPropagationBehavior(PROPAGATION_REQUIRED); + td.setIsolationLevel(ISOLATION_READ_COMMITTED); + return new TransactionTemplate(platformTransactionManager, td); } } diff --git a/src/main/resources/create.sql b/src/main/resources/create.sql new file mode 100644 index 0000000..ed8913f --- /dev/null +++ b/src/main/resources/create.sql @@ -0,0 +1,27 @@ +BEGIN; + +DROP TABLE IF EXISTS task; +DROP TABLE IF EXISTS queue; +DROP SEQUENCE IF EXISTS task_id; + +CREATE TABLE queue ( + name VARCHAR(100) NOT NULL, + interval INTEGER NOT NULL, + CONSTRAINT pk_queue PRIMARY KEY (name) +); + +CREATE TABLE task ( + id INTEGER NOT NULL, + queue VARCHAR(100) NOT NULL, + scheduled TIMESTAMP NOT NULL, + last_run TIMESTAMP, + run_count INT NOT NULL, + completed TIMESTAMP, + arguments VARCHAR(100), + CONSTRAINT pk_task PRIMARY KEY (id), + CONSTRAINT fk_task__queue FOREIGN KEY (queue) REFERENCES queue (name) +); + +CREATE SEQUENCE task_id; + +COMMIT; diff --git a/src/main/resources/logback.xml b/src/main/resources/logback.xml index 39e3dbd..a25644b 100755 --- a/src/main/resources/logback.xml +++ b/src/main/resources/logback.xml @@ -5,6 +5,7 @@ + @@ -13,7 +14,7 @@ - %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + %d{HH:mm:ss.SSS} [%-15thread] %-5level %-50logger{36} - %msg%n -- cgit v1.2.3