aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue')
-rwxr-xr-xsrc/main/java/io/trygvis/queue/AsyncService.java32
-rw-r--r--src/main/java/io/trygvis/queue/JdbcAsyncService.java149
-rw-r--r--src/main/java/io/trygvis/queue/QueueThread.java116
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Task.java2
-rw-r--r--src/main/java/io/trygvis/queue/TaskFailureException.java7
5 files changed, 1 insertions, 305 deletions
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java
deleted file mode 100755
index c9e5861..0000000
--- a/src/main/java/io/trygvis/queue/AsyncService.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package io.trygvis.queue;
-
-import org.quartz.SchedulerException;
-
-import java.util.List;
-
-public interface AsyncService {
-
- /**
- * @param name
- * @param interval how often the queue should be polled for missed tasks in seconds.
- * @param callable
- * @return
- * @throws SchedulerException
- */
- Queue registerQueue(String name, int interval, AsyncCallable callable) throws SchedulerException;
-
- Queue getQueue(String name);
-
- Task schedule(Queue queue, String... args);
-
- Task schedule(long parent, Queue queue, String... args);
-
- /**
- * Polls for a new state of the execution.
- */
- Task update(Task ref);
-
- interface AsyncCallable {
- void run(List<String> arguments) throws Exception;
- }
-}
diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
deleted file mode 100644
index 276541f..0000000
--- a/src/main/java/io/trygvis/queue/JdbcAsyncService.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package io.trygvis.queue;
-
-import org.quartz.SchedulerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-import org.springframework.transaction.annotation.Transactional;
-import org.springframework.transaction.support.TransactionSynchronization;
-import org.springframework.transaction.support.TransactionSynchronizationAdapter;
-import org.springframework.transaction.support.TransactionTemplate;
-
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import static java.lang.System.currentTimeMillis;
-import static java.lang.Thread.sleep;
-import static java.util.Arrays.asList;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import static org.springframework.transaction.annotation.Propagation.REQUIRED;
-import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
-
-@Component
-public class JdbcAsyncService implements AsyncService {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
-
- private final Map<String, QueueThread> queues = new HashMap<>();
-
- @Autowired
- private TransactionTemplate transactionTemplate;
-
- @Autowired
- private QueueDao queueDao;
-
- @Autowired
- private TaskDao taskDao;
-
- @Transactional(propagation = REQUIRED)
- public Queue registerQueue(final String name, final int interval, AsyncCallable callable) throws SchedulerException {
- log.info("registerQueue: ENTER");
-
- Queue q = queueDao.findByName(name);
-
- log.info("q = {}", q);
-
- final long interval_;
- if (q == null) {
- q = new Queue(name, interval);
- queueDao.insert(q);
- interval_ = interval;
- } else {
- // Found an existing queue. Use the Settings from the database.
- interval_ = q.interval;
- }
-
- final QueueThread queueThread = new QueueThread(q, taskDao, transactionTemplate, callable);
- queues.put(name, queueThread);
-
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- log.info("Transaction completed with status = {}", status);
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
- executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- queueThread.ping();
- }
- }, 10, interval_, SECONDS);
- Thread thread = new Thread(queueThread, name);
- thread.setDaemon(true);
- thread.start();
- }
- }
- });
-
- log.info("registerQueue: LEAVE");
- return q;
- }
-
- public Queue getQueue(String name) {
- QueueThread queueThread = queues.get(name);
-
- if (queueThread == null) {
- throw new RuntimeException("No such queue: '" + name + "'.");
- }
-
- return queueThread.queue;
- }
-
- @Transactional(propagation = REQUIRED)
- public Task schedule(final Queue queue, String... args) {
- return scheduleInner(null, queue, args);
- }
-
- public Task schedule(long parent, Queue queue, String... args) {
- return scheduleInner(parent, queue, args);
- }
-
- private Task scheduleInner(Long parent, final Queue queue, String... args) {
- Date scheduled = new Date();
-
- StringBuilder arguments = new StringBuilder();
- for (String arg : args) {
- arguments.append(arg).append(' ');
- }
-
- long id = taskDao.insert(parent, queue.name, scheduled, arguments.toString());
- Task task = new Task(parent, id, queue.name, scheduled, null, 0, null, asList(args));
- log.info("Created task = {}", task);
-
- registerSynchronization(new TransactionSynchronizationAdapter() {
- public void afterCompletion(int status) {
- if (status == TransactionSynchronization.STATUS_COMMITTED) {
- queues.get(queue.name).ping();
- }
- }
- });
-
- return task;
- }
-
- @Transactional
- public Task await(Task task, long timeout) {
- final long start = currentTimeMillis();
- final long end = start + timeout;
-
- while (currentTimeMillis() < end) {
- task = update(task);
-
- try {
- sleep(100);
- } catch (InterruptedException e) {
- // break
- }
- }
-
- return task;
- }
-
- @Transactional(readOnly = true)
- public Task update(Task ref) {
- return taskDao.findById(ref.id);
- }
-}
diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/queue/QueueThread.java
deleted file mode 100644
index a981c7e..0000000
--- a/src/main/java/io/trygvis/queue/QueueThread.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package io.trygvis.queue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.transaction.TransactionException;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
-import org.springframework.transaction.support.TransactionCallbackWithoutResult;
-import org.springframework.transaction.support.TransactionTemplate;
-
-import java.util.Date;
-import java.util.List;
-
-class QueueThread implements Runnable {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- public boolean shouldRun = true;
-
- private boolean checkForNewTasks;
-
- private boolean busy;
-
- public final Queue queue;
-
- private final TaskDao taskDao;
-
- private final TransactionTemplate transactionTemplate;
-
- private final AsyncService.AsyncCallable callable;
-
- QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) {
- this.queue = queue;
- this.taskDao = taskDao;
- this.transactionTemplate = transactionTemplate;
- this.callable = callable;
- }
-
- public void ping() {
- synchronized (this) {
- if (!busy) {
- log.info("Sending ping to " + queue);
- notify();
- } else {
- checkForNewTasks = true;
- }
- }
- }
-
- public void run() {
- while (shouldRun) {
- try {
- List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
- public List<Task> doInTransaction(TransactionStatus status) {
- return taskDao.findByNameAndCompletedIsNull(queue.name);
- }
- });
-
- log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
-
- if(tasks.size() > 0) {
- for (final Task task : tasks) {
- try {
- executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
- log.warn("Task execution failed", e);
- }
- }
- }
- } catch (Throwable e) {
- log.warn("Error while executing tasks.", e);
- }
-
- synchronized (this) {
- busy = false;
-
- if (checkForNewTasks) {
- log.info("Ping received!");
- checkForNewTasks = false;
- continue;
- }
-
- try {
- wait();
- } catch (InterruptedException e) {
- // ignore
- }
-
- busy = true;
- }
- }
- }
-
- private void executeTask(final Task task) {
- final Date run = new Date();
- log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- taskDao.update(task.registerRun());
- }
- });
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- try {
- callable.run(task.arguments);
- Date completed = new Date();
- Task t = task.registerComplete(completed);
- log.info("Completed task: {}", t);
- taskDao.update(t);
- } catch (Exception e) {
- throw new TaskFailureException(e);
- }
- }
- });
- }
-}
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
index 09d5060..2b1103b 100755
--- a/src/main/java/io/trygvis/queue/Task.java
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -21,7 +21,7 @@ public class Task {
public final List<String> arguments;
- Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
+ public Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
this.id = id;
this.parent = parent;
this.queue = queue;
diff --git a/src/main/java/io/trygvis/queue/TaskFailureException.java b/src/main/java/io/trygvis/queue/TaskFailureException.java
deleted file mode 100644
index d3d8c48..0000000
--- a/src/main/java/io/trygvis/queue/TaskFailureException.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package io.trygvis.queue;
-
-class TaskFailureException extends RuntimeException {
- public TaskFailureException(Exception e) {
- super(e);
- }
-}