aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/QueueThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/QueueThread.java')
-rw-r--r--src/main/java/io/trygvis/queue/QueueThread.java116
1 files changed, 116 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/queue/QueueThread.java
new file mode 100644
index 0000000..a981c7e
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueThread.java
@@ -0,0 +1,116 @@
+package io.trygvis.queue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.transaction.TransactionException;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import org.springframework.transaction.support.TransactionCallbackWithoutResult;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import java.util.Date;
+import java.util.List;
+
+class QueueThread implements Runnable {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ public boolean shouldRun = true;
+
+ private boolean checkForNewTasks;
+
+ private boolean busy;
+
+ public final Queue queue;
+
+ private final TaskDao taskDao;
+
+ private final TransactionTemplate transactionTemplate;
+
+ private final AsyncService.AsyncCallable callable;
+
+ QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) {
+ this.queue = queue;
+ this.taskDao = taskDao;
+ this.transactionTemplate = transactionTemplate;
+ this.callable = callable;
+ }
+
+ public void ping() {
+ synchronized (this) {
+ if (!busy) {
+ log.info("Sending ping to " + queue);
+ notify();
+ } else {
+ checkForNewTasks = true;
+ }
+ }
+ }
+
+ public void run() {
+ while (shouldRun) {
+ try {
+ List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
+ public List<Task> doInTransaction(TransactionStatus status) {
+ return taskDao.findByNameAndCompletedIsNull(queue.name);
+ }
+ });
+
+ log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
+
+ if(tasks.size() > 0) {
+ for (final Task task : tasks) {
+ try {
+ executeTask(task);
+ } catch (TransactionException | TaskFailureException e) {
+ log.warn("Task execution failed", e);
+ }
+ }
+ }
+ } catch (Throwable e) {
+ log.warn("Error while executing tasks.", e);
+ }
+
+ synchronized (this) {
+ busy = false;
+
+ if (checkForNewTasks) {
+ log.info("Ping received!");
+ checkForNewTasks = false;
+ continue;
+ }
+
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+
+ busy = true;
+ }
+ }
+ }
+
+ private void executeTask(final Task task) {
+ final Date run = new Date();
+ log.info("Setting last run on task. date = {}, task = {}", run, task);
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ taskDao.update(task.registerRun());
+ }
+ });
+
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ try {
+ callable.run(task.arguments);
+ Date completed = new Date();
+ Task t = task.registerComplete(completed);
+ log.info("Completed task: {}", t);
+ taskDao.update(t);
+ } catch (Exception e) {
+ throw new TaskFailureException(e);
+ }
+ }
+ });
+ }
+}