aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/QueueThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/QueueThread.java')
-rw-r--r--src/main/java/io/trygvis/queue/QueueThread.java116
1 files changed, 0 insertions, 116 deletions
diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/queue/QueueThread.java
deleted file mode 100644
index a981c7e..0000000
--- a/src/main/java/io/trygvis/queue/QueueThread.java
+++ /dev/null
@@ -1,116 +0,0 @@
-package io.trygvis.queue;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.transaction.TransactionException;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
-import org.springframework.transaction.support.TransactionCallbackWithoutResult;
-import org.springframework.transaction.support.TransactionTemplate;
-
-import java.util.Date;
-import java.util.List;
-
-class QueueThread implements Runnable {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- public boolean shouldRun = true;
-
- private boolean checkForNewTasks;
-
- private boolean busy;
-
- public final Queue queue;
-
- private final TaskDao taskDao;
-
- private final TransactionTemplate transactionTemplate;
-
- private final AsyncService.AsyncCallable callable;
-
- QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) {
- this.queue = queue;
- this.taskDao = taskDao;
- this.transactionTemplate = transactionTemplate;
- this.callable = callable;
- }
-
- public void ping() {
- synchronized (this) {
- if (!busy) {
- log.info("Sending ping to " + queue);
- notify();
- } else {
- checkForNewTasks = true;
- }
- }
- }
-
- public void run() {
- while (shouldRun) {
- try {
- List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
- public List<Task> doInTransaction(TransactionStatus status) {
- return taskDao.findByNameAndCompletedIsNull(queue.name);
- }
- });
-
- log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
-
- if(tasks.size() > 0) {
- for (final Task task : tasks) {
- try {
- executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
- log.warn("Task execution failed", e);
- }
- }
- }
- } catch (Throwable e) {
- log.warn("Error while executing tasks.", e);
- }
-
- synchronized (this) {
- busy = false;
-
- if (checkForNewTasks) {
- log.info("Ping received!");
- checkForNewTasks = false;
- continue;
- }
-
- try {
- wait();
- } catch (InterruptedException e) {
- // ignore
- }
-
- busy = true;
- }
- }
- }
-
- private void executeTask(final Task task) {
- final Date run = new Date();
- log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- taskDao.update(task.registerRun());
- }
- });
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- try {
- callable.run(task.arguments);
- Date completed = new Date();
- Task t = task.registerComplete(completed);
- log.info("Completed task: {}", t);
- taskDao.update(t);
- } catch (Exception e) {
- throw new TaskFailureException(e);
- }
- }
- });
- }
-}