diff options
Diffstat (limited to 'src/main/java/io/trygvis/queue')
-rwxr-xr-x | src/main/java/io/trygvis/queue/AsyncService.java | 37 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcAsyncService.java | 194 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/JpaAsyncService.java | 257 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/Queue.java | 24 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueDao.java | 34 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/Task.java | 55 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskDao.java | 54 |
7 files changed, 376 insertions, 279 deletions
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java index b08db1f..10f1b79 100755 --- a/src/main/java/io/trygvis/queue/AsyncService.java +++ b/src/main/java/io/trygvis/queue/AsyncService.java @@ -2,32 +2,25 @@ package io.trygvis.queue; import org.quartz.*;
-import java.util.*;
+public interface AsyncService {
-public interface AsyncService<QueueRef extends AsyncService.QueueRef, ExecutionRef extends AsyncService.ExecutionRef> {
+ /**
+ * @param name
+ * @param interval how often the queue should be polled for missed tasks in seconds.
+ * @param callable
+ * @return
+ * @throws SchedulerException
+ */
+ Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
- JpaAsyncService.JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
+ Queue getQueue(String name);
- QueueRef getQueue(String name);
+ Task schedule(Queue queue, String... args);
- ExecutionRef schedule(QueueRef queue, String... args);
-
- ExecutionRef update(ExecutionRef ref);
-
- interface QueueRef {
- }
-
- interface ExecutionRef {
- List<String> getArguments();
-
- Date getScheduled();
-
- Date getLastRun();
-
- Date getCompleted();
-
- boolean isDone();
- }
+ /**
+ * Polls for a new state of the execution.
+ */
+ Task update(Task ref);
interface AsyncCallable {
void run() throws Exception;
diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java new file mode 100644 index 0000000..1df0ab6 --- /dev/null +++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java @@ -0,0 +1,194 @@ +package io.trygvis.queue; + +import org.quartz.*; +import org.slf4j.*; +import org.springframework.beans.factory.annotation.*; +import org.springframework.stereotype.*; +import org.springframework.transaction.*; +import org.springframework.transaction.annotation.*; +import org.springframework.transaction.support.*; + +import java.util.*; +import java.util.concurrent.*; + +import static java.util.Arrays.*; +import static java.util.concurrent.TimeUnit.*; +import static org.springframework.transaction.annotation.Propagation.*; +import static org.springframework.transaction.support.TransactionSynchronizationManager.*; + +@Component +public class JdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + + private final Map<String, QueueThread> queues = new HashMap<>(); + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private QueueDao queueDao; + + @Autowired + private TaskDao taskDao; + + @Transactional(propagation = REQUIRED) + public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException { + log.info("registerQueue: ENTER"); + + Queue q = queueDao.findByName(name); + + log.info("q = {}", q); + + final long interval_; + if (q == null) { + q = new Queue(name, interval * 1000); + queueDao.insert(q); + interval_ = interval; + } else { + // Found an existing queue. Use the Settings from the database. + interval_ = q.interval; + } + + final QueueThread queueThread = new QueueThread(q, callable); + queues.put(name, queueThread); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + executor.scheduleAtFixedRate(new Runnable() { + public void run() { + queueThread.ping(); + } + }, 1000, 1000 * interval_, MILLISECONDS); + Thread thread = new Thread(queueThread, name); + thread.setDaemon(true); + thread.start(); + } + } + }); + + log.info("registerQueue: LEAVE"); + return q; + } + + public Queue getQueue(String name) { + QueueThread queueThread = queues.get(name); + + if (queueThread == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + + return queueThread.queue; + } + + @Transactional(propagation = REQUIRED) + public Task schedule(Queue queue, String... args) { + log.info("schedule: ENTER"); + + Date scheduled = new Date(); + + StringBuilder arguments = new StringBuilder(); + for (String arg : args) { + arguments.append(arg).append(' '); + } + + long id = taskDao.insert(queue.name, scheduled, arguments.toString()); + Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args)); + log.info("task = {}", task); + queues.get(queue.name).ping(); + try { + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + log.info("schedule: LEAVE"); + return task; + } + + @Transactional(readOnly = true) + public Task update(Task ref) { + return taskDao.findById(ref.id); + } + + class QueueThread implements Runnable { + public boolean shouldRun = true; + + public final Queue queue; + + private final AsyncCallable callable; + + QueueThread(Queue queue, AsyncCallable callable) { + this.queue = queue; + this.callable = callable; + } + + public void ping() { + log.info("Sending ping to " + queue); + synchronized (this) { + notify(); + } + } + + public void run() { + while (shouldRun) { + List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.name); + + try { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } catch (Exception e) { + log.warn("Error while executing tasks.", e); + } + + synchronized (this) { + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + taskDao.update(task.registerRun()); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(); + Date completed = new Date(); + Task t = task.registerComplete(completed); + log.info("Completed task: {}", t); + taskDao.update(t); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } + } + + private static class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } + } +} diff --git a/src/main/java/io/trygvis/queue/JpaAsyncService.java b/src/main/java/io/trygvis/queue/JpaAsyncService.java deleted file mode 100755 index e715ac7..0000000 --- a/src/main/java/io/trygvis/queue/JpaAsyncService.java +++ /dev/null @@ -1,257 +0,0 @@ -package io.trygvis.queue;
-
-import io.trygvis.data.*;
-import io.trygvis.model.Queue;
-import io.trygvis.model.*;
-import org.quartz.*;
-import org.slf4j.*;
-import org.springframework.beans.factory.annotation.*;
-import org.springframework.stereotype.*;
-import org.springframework.transaction.*;
-import org.springframework.transaction.annotation.*;
-import org.springframework.transaction.support.*;
-
-import javax.persistence.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static java.util.concurrent.TimeUnit.*;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.*;
-
-@SuppressWarnings("SpringJavaAutowiringInspection")
-@Component
-public class JpaAsyncService implements AsyncService<JpaAsyncService.JpaQueueRef, JpaAsyncService.JpaExecutionRef>/*, ApplicationContextAware*/ {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
- @PersistenceContext
- private EntityManager entityManager;
-
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private QueueRepository queueRepository;
-
- @Autowired
- private TaskRepository taskRepository;
-
- @Transactional
- public synchronized JpaQueueRef registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException {
- log.info("registerQueue: ENTER");
-
- Queue q = queueRepository.findByName(name);
-
- log.info("q = {}", q);
-
- if (q == null) {
- q = new Queue(name, interval);
- q = queueRepository.save(q);
- } else {
- boolean dirty = false;
- if (interval != q.getInterval()) {
- q.setInterval(interval);
- dirty = true;
- }
-
- if (dirty) {
- q = queueRepository.save(q);
- }
- }
-
- log.info("q = {}", q);
- entityManager.flush();
-// entityManager.detach(q);
- log.info("q = {}", q);
-
- JpaQueueRef queueRef = new JpaQueueRef(q);
-
- log.info("registerQueue: LEAVE");
-
- registerSynchronization(new MyTransactionSynchronization(callable, interval, queueRef));
-
- return queueRef;
- }
-
- @Transactional(readOnly = true)
- public JpaQueueRef getQueue(String name) {
- Queue queue = queueRepository.findByName(name);
-
- if (queue == null) {
- throw new RollbackException("No such queue: '" + name + "'.");
- }
-
- entityManager.detach(queue);
-
- return new JpaQueueRef(queue);
- }
-
- @Transactional
- public JpaExecutionRef schedule(JpaQueueRef queue, String... args) {
- log.info("schedule: ENTER");
- Date scheduled = new Date();
- Task task = new Task(queue.queue, scheduled, args);
- log.info("task = {}", task);
- taskRepository.save(task);
- log.info("task = {}", task);
-// entityManager.detach(task);
- log.info("schedule: LEAVE");
- return new JpaExecutionRef(task);
- }
-
- @Transactional(readOnly = true)
- public JpaExecutionRef update(JpaExecutionRef ref) {
- return new JpaExecutionRef(taskRepository.findOne(ref.task.getId()));
- }
-
- public static class JpaQueueRef implements AsyncService.QueueRef {
- public final Queue queue;
-
- JpaQueueRef(Queue queue) {
- this.queue = queue;
- }
-
- public String toString() {
- return "JpaQueueRef{" +
- "queue=" + queue +
- '}';
- }
- }
-
- public static class JpaExecutionRef implements AsyncService.ExecutionRef {
- private final Task task;
-
- public JpaExecutionRef(Task task) {
- this.task = task;
- }
-
- public List<String> getArguments() {
- return Arrays.asList(task.getArguments());
- }
-
- public Date getScheduled() {
- return task.getScheduled();
- }
-
- public Date getLastRun() {
- return task.getLastRun();
- }
-
- public Date getCompleted() {
- return task.getCompleted();
- }
-
- public boolean isDone() {
- return task.isDone();
- }
-
- public String toString() {
- return "JpaExecutionRef{" +
- "task=" + task +
- '}';
- }
- }
-
- private static class TaskFailureException extends RuntimeException {
- public TaskFailureException(Exception e) {
- super(e);
- }
- }
-
- private class CheckTimerTask implements Runnable {
- private final AsyncCallable callable;
- private final JpaQueueRef queueRef;
-
- private CheckTimerTask(AsyncCallable callable, JpaQueueRef queueRef) {
- this.callable = callable;
- this.queueRef = queueRef;
- }
-
- public void run() {
- log.info("JpaAsyncService$CheckTimerTask.run");
-
- List<Task> tasks = taskRepository.findByQueueAndCompletedIsNull(queueRef.queue);
-
- System.out.println("tasks.size() = " + tasks.size());
-
- try {
- for (final Task task : tasks) {
- try {
- executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
- log.warn("Task execution failed", e);
- }
- }
- } catch (Exception e) {
- log.warn("Error while executing tasks.", e);
- }
- }
-
- private void executeTask(final Task task) {
- final Date run = new Date();
- log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- task.registerRun();
- taskRepository.save(task);
- }
- });
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- try {
- callable.run();
- Date completed = new Date();
- log.info("Setting completed on task. date = {}, task = {}", completed, task);
- task.registerComplete(completed);
- taskRepository.save(task);
- } catch (Exception e) {
- throw new TaskFailureException(e);
- }
- }
- });
- }
- }
-
- private class MyTransactionSynchronization implements TransactionSynchronization {
-
- private final AsyncCallable callable;
-
- private final int interval;
-
- private final JpaQueueRef queueRef;
-
- public MyTransactionSynchronization(AsyncCallable callable, int interval, JpaQueueRef queueRef) {
- this.callable = callable;
- this.interval = interval;
- this.queueRef = queueRef;
- }
-
- public void suspend() {
- }
-
- public void resume() {
- }
-
- public void flush() {
- }
-
- public void beforeCommit(boolean readOnly) {
- }
-
- public void beforeCompletion() {
- }
-
- public void afterCommit() {
- }
-
- public void afterCompletion(int status) {
- log.info("status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- executor.scheduleAtFixedRate(new CheckTimerTask(callable, queueRef), 1000, 1000 * interval, MILLISECONDS);
- }
- }
- }
-}
diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java new file mode 100755 index 0000000..15003f7 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Queue.java @@ -0,0 +1,24 @@ +package io.trygvis.queue; + +public class Queue { + + public final String name; + + public final long interval; + + public Queue(String name, long interval) { + this.name = name; + this.interval = interval; + } + + public Queue withInterval(int interval) { + return new Queue(name, interval); + } + + public String toString() { + return "Queue{" + + "name='" + name + '\'' + + ", interval=" + interval + + '}'; + } +} diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java new file mode 100644 index 0000000..a3a79ca --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueDao.java @@ -0,0 +1,34 @@ +package io.trygvis.queue; + +import org.springframework.beans.factory.annotation.*; +import org.springframework.jdbc.core.*; +import org.springframework.stereotype.*; + +import java.sql.*; + +import static org.springframework.dao.support.DataAccessUtils.*; + +@Component +public class QueueDao { + + @Autowired + private JdbcTemplate jdbcTemplate; + + public Queue findByName(String name) { + return singleResult(jdbcTemplate.query("SELECT name, interval FROM queue WHERE name=?", new QueueRowMapper(), name)); + } + + public void insert(Queue q) { + jdbcTemplate.update("INSERT INTO queue(name, interval) VALUES(?, ?)", q.name, q.interval); + } + + public void update(Queue q) { + jdbcTemplate.update("UPDATE queue SET interval=? WHERE name=?", q.interval, q.name); + } + + private class QueueRowMapper implements RowMapper<Queue> { + public Queue mapRow(ResultSet rs, int rowNum) throws SQLException { + return new Queue(rs.getString(1), rs.getLong(2)); + } + } +} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java new file mode 100755 index 0000000..7f64c77 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Task.java @@ -0,0 +1,55 @@ +package io.trygvis.queue; + +import java.util.*; + +public class Task { + + public final long id; + + public final String queue; + + public final Date scheduled; + + public final Date lastRun; + + public final int runCount; + + public final Date completed; + + public final List<String> arguments; + + Task(long id, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) { + this.id = id; + this.queue = queue; + this.scheduled = scheduled; + this.lastRun = lastRun; + this.runCount = runCount; + this.completed = completed; + + this.arguments = arguments; + } + + public Task registerRun() { + return new Task(id, queue, scheduled, new Date(), runCount + 1, completed, arguments); + } + + public Task registerComplete(Date completed) { + return new Task(id, queue, scheduled, lastRun, runCount, new Date(), arguments); + } + + public String toString() { + return "Task{" + + "id=" + id + + ", queue=" + queue + + ", scheduled=" + scheduled + + ", lastRun=" + lastRun + + ", runCount=" + runCount + + ", completed=" + completed + + ", arguments='" + arguments + '\'' + + '}'; + } + + public boolean isDone() { + return completed != null; + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java new file mode 100644 index 0000000..2e407a5 --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -0,0 +1,54 @@ +package io.trygvis.queue; + +import org.springframework.beans.factory.annotation.*; +import org.springframework.jdbc.core.*; +import org.springframework.stereotype.*; + +import java.sql.*; +import java.util.Date; +import java.util.*; + +import static java.util.Arrays.*; + +@Component +public class TaskDao { + + @Autowired + private JdbcTemplate jdbcTemplate; + + public long insert(String queue, Date scheduled, String arguments) { + jdbcTemplate.update("INSERT INTO task(id, run_count, queue, scheduled, arguments) " + + "VALUES(nextval('task_seq'), 0, ?, ?, ?)", queue, scheduled, arguments); + return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class); + } + + public Task findById(long id) { + return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?", + new TaskRowMapper(), id); + } + + public List<Task> findByNameAndCompletedIsNull(String name) { + return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL", + new TaskRowMapper(), name); + } + + public void update(Task task) { + jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?", + task.scheduled, task.lastRun, task.runCount, task.completed, task.id); + } + + private class TaskRowMapper implements RowMapper<Task> { + public static final String fields = "id, queue, scheduled, last_run, run_count, completed, arguments"; + + public Task mapRow(ResultSet rs, int rowNum) throws SQLException { + return new Task( + rs.getLong(1), + rs.getString(2), + rs.getTimestamp(3), + rs.getTimestamp(4), + rs.getInt(5), + rs.getTimestamp(6), + asList(rs.getString(7).split(" "))); + } + } +} |