diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-05-29 22:16:50 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-05-29 22:16:50 +0200 |
commit | 7d704feb86c44fca57941d223e8605b55fcf68f0 (patch) | |
tree | 7a33458a46bcc6e211ef3e833441f80762c61a63 /src/main/java/io/trygvis/queue/QueueThread.java | |
parent | b65d39ab617d19ac48f44bc41f04a18803ca75e6 (diff) | |
download | quartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.tar.gz quartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.tar.bz2 quartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.tar.xz quartz-based-queue-7d704feb86c44fca57941d223e8605b55fcf68f0.zip |
o Splitting out the parts that implement the "async" features vs the "queue" features.
Diffstat (limited to 'src/main/java/io/trygvis/queue/QueueThread.java')
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueThread.java | 116 |
1 files changed, 0 insertions, 116 deletions
diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/queue/QueueThread.java deleted file mode 100644 index a981c7e..0000000 --- a/src/main/java/io/trygvis/queue/QueueThread.java +++ /dev/null @@ -1,116 +0,0 @@ -package io.trygvis.queue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.transaction.TransactionException; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionTemplate; - -import java.util.Date; -import java.util.List; - -class QueueThread implements Runnable { - private final Logger log = LoggerFactory.getLogger(getClass()); - - public boolean shouldRun = true; - - private boolean checkForNewTasks; - - private boolean busy; - - public final Queue queue; - - private final TaskDao taskDao; - - private final TransactionTemplate transactionTemplate; - - private final AsyncService.AsyncCallable callable; - - QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { - this.queue = queue; - this.taskDao = taskDao; - this.transactionTemplate = transactionTemplate; - this.callable = callable; - } - - public void ping() { - synchronized (this) { - if (!busy) { - log.info("Sending ping to " + queue); - notify(); - } else { - checkForNewTasks = true; - } - } - } - - public void run() { - while (shouldRun) { - try { - List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() { - public List<Task> doInTransaction(TransactionStatus status) { - return taskDao.findByNameAndCompletedIsNull(queue.name); - } - }); - - log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - - if(tasks.size() > 0) { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (TransactionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } - } - } catch (Throwable e) { - log.warn("Error while executing tasks.", e); - } - - synchronized (this) { - busy = false; - - if (checkForNewTasks) { - log.info("Ping received!"); - checkForNewTasks = false; - continue; - } - - try { - wait(); - } catch (InterruptedException e) { - // ignore - } - - busy = true; - } - } - } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - taskDao.update(task.registerRun()); - } - }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - try { - callable.run(task.arguments); - Date completed = new Date(); - Task t = task.registerComplete(completed); - log.info("Completed task: {}", t); - taskDao.update(t); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); - } -} |