aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-05-29 22:16:50 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-05-29 22:16:50 +0200
commit7d704feb86c44fca57941d223e8605b55fcf68f0 (patch)
tree7a33458a46bcc6e211ef3e833441f80762c61a63 /src/main/java/io/trygvis/async
parentb65d39ab617d19ac48f44bc41f04a18803ca75e6 (diff)
downloadquartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.tar.gz
quartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.tar.bz2
quartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.tar.xz
quartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.zip
o Splitting out the parts that implement the "async" features vs the "queue" features.
Diffstat (limited to 'src/main/java/io/trygvis/async')
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java37
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java153
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java119
-rw-r--r--src/main/java/io/trygvis/async/TaskFailureException.java7
4 files changed, 316 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
new file mode 100755
index 0000000..e90a0e4
--- /dev/null
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -0,0 +1,37 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.Task;
+import org.quartz.SchedulerException;
+
+import java.util.List;
+
+/**
+ * A simple framework for running tasks.
+ */
+public interface AsyncService {
+
+ /**
+ * @param name
+ * @param interval how often the queue should be polled for missed tasks in seconds.
+ * @param callable
+ * @return
+ * @throws SchedulerException
+ */
+ Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
+
+ Queue getQueue(String name);
+
+ Task schedule(Queue queue, String... args);
+
+ Task schedule(long parent, Queue queue, String... args);
+
+ /**
+ * Polls for a new state of the execution.
+ */
+ Task update(Task ref);
+
+ interface AsyncCallable {
+ void run(List<String> arguments) throws Exception;
+ }
+}
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
new file mode 100644
index 0000000..4e78a37
--- /dev/null
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -0,0 +1,153 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueDao;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskDao;
+import org.quartz.SchedulerException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionSynchronization;
+import org.springframework.transaction.support.TransactionSynchronizationAdapter;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static java.lang.System.currentTimeMillis;
+import static java.lang.Thread.sleep;
+import static java.util.Arrays.asList;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static org.springframework.transaction.annotation.Propagation.REQUIRED;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
+
+@Component
+public class JdbcAsyncService implements AsyncService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+
+ private final Map<String, QueueThread> queues = new HashMap<>();
+
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+
+ @Autowired
+ private QueueDao queueDao;
+
+ @Autowired
+ private TaskDao taskDao;
+
+ @Transactional(propagation = REQUIRED)
+ public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException {
+ log.info("registerQueue: ENTER");
+
+ Queue q = queueDao.findByName(name);
+
+ log.info("q = {}", q);
+
+ final long interval_;
+ if (q == null) {
+ q = new Queue(name, interval);
+ queueDao.insert(q);
+ interval_ = interval;
+ } else {
+ // Found an existing queue. Use the Settings from the database.
+ interval_ = q.interval;
+ }
+
+ final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable);
+ queues.put(name, queueThread);
+
+ registerSynchronization(new TransactionSynchronizationAdapter() {
+ public void afterCompletion(int status) {
+ log.info("Transaction completed with status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
+ executor.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ queueThread.ping();
+ }
+ }, 10, interval_, SECONDS);
+ Thread thread = new Thread(queueThread, name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+ }
+ });
+
+ log.info("registerQueue: LEAVE");
+ return q;
+ }
+
+ public Queue getQueue(String name) {
+ QueueThread queueThread = queues.get(name);
+
+ if (queueThread == null) {
+ throw new RuntimeException("No such queue: '" + name + "'.");
+ }
+
+ return queueThread.queue;
+ }
+
+ @Transactional(propagation = REQUIRED)
+ public Task schedule(final Queue queue, String... args) {
+ return scheduleInner(null, queue, args);
+ }
+
+ public Task schedule(long parent, Queue queue, String... args) {
+ return scheduleInner(parent, queue, args);
+ }
+
+ private Task scheduleInner(Long parent, final Queue queue, String... args) {
+ Date scheduled = new Date();
+
+ StringBuilder arguments = new StringBuilder();
+ for (String arg : args) {
+ arguments.append(arg).append(' ');
+ }
+
+ long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString());
+ Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args));
+ log.info("Created task = {}", task);
+
+ registerSynchronization(new TransactionSynchronizationAdapter() {
+ public void afterCompletion(int status) {
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ queues.get(queue.name).ping();
+ }
+ }
+ });
+
+ return task;
+ }
+
+ @Transactional
+ public Task await(Task task, long timeout) {
+ final long start = currentTimeMillis();
+ final long end = start + timeout;
+
+ while (currentTimeMillis() < end) {
+ task = update(task);
+
+ try {
+ sleep(100);
+ } catch (InterruptedException e) {
+ // break
+ }
+ }
+
+ return task;
+ }
+
+ @Transactional(readOnly = true)
+ public Task update(Task ref) {
+ return taskDao.findById(ref.id);
+ }
+}
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
new file mode 100644
index 0000000..69466df
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -0,0 +1,119 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskDao;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.transaction.TransactionException;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionCallbackWithoutResult;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import java.util.Date;
+import java.util.List;
+
+class QueueThread implements Runnable {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ public boolean shouldRun = true;
+
+ private boolean checkForNewTasks;
+
+ private boolean busy;
+
+ public final Queue queue;
+
+ private final TaskDao taskDao;
+
+ private final TransactionTemplate transactionTemplate;
+
+ private final AsyncService.AsyncCallable callable;
+
+ QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) {
+ this.queue = queue;
+ this.taskDao = taskDao;
+ this.transactionTemplate = transactionTemplate;
+ this.callable = callable;
+ }
+
+ public void ping() {
+ synchronized (this) {
+ if (!busy) {
+ log.info("Sending ping to " + queue);
+ notify();
+ } else {
+ checkForNewTasks = true;
+ }
+ }
+ }
+
+ public void run() {
+ while (shouldRun) {
+ try {
+ List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
+ public List<Task> doInTransaction(TransactionStatus status) {
+ return taskDao.findByNameAndCompletedIsNull(queue.name);
+ }
+ });
+
+ log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
+
+ if(tasks.size() > 0) {
+ for (final Task task : tasks) {
+ try {
+ executeTask(task);
+ } catch (TransactionException | TaskFailureException e) {
+ log.warn("Task execution failed", e);
+ }
+ }
+ }
+ } catch (Throwable e) {
+ log.warn("Error while executing tasks.", e);
+ }
+
+ synchronized (this) {
+ busy = false;
+
+ if (checkForNewTasks) {
+ log.info("Ping received!");
+ checkForNewTasks = false;
+ continue;
+ }
+
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ busy = true;
+ }
+ }
+ }
+
+ private void executeTask(final Task task) {
+ final Date run = new Date();
+ log.info("Setting last run on task. date = {}, task = {}", run, task);
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ taskDao.update(task.registerRun());
+ }
+ });
+
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ try {
+ callable.run(task.arguments);
+ Date completed = new Date();
+ Task t = task.registerComplete(completed);
+ log.info("Completed task: {}", t);
+ taskDao.update(t);
+ } catch (Exception e) {
+ throw new TaskFailureException(e);
+ }
+ }
+ });
+ }
+}
diff --git a/src/main/java/io/trygvis/async/TaskFailureException.java b/src/main/java/io/trygvis/async/TaskFailureException.java
new file mode 100644
index 0000000..7278e17
--- /dev/null
+++ b/src/main/java/io/trygvis/async/TaskFailureException.java
@@ -0,0 +1,7 @@
+package io.trygvis.async;
+
+class TaskFailureException extends RuntimeException {
+ public TaskFailureException(Exception e) {
+ super(e);
+ }
+}