diff options
Diffstat (limited to 'src/main/java/io/trygvis/queue/QueueThread.java')
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueThread.java | 116 |
1 files changed, 116 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/queue/QueueThread.java b/src/main/java/io/trygvis/queue/QueueThread.java new file mode 100644 index 0000000..a981c7e --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueThread.java @@ -0,0 +1,116 @@ +package io.trygvis.queue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.Date; +import java.util.List; + +class QueueThread implements Runnable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean busy; + + public final Queue queue; + + private final TaskDao taskDao; + + private final TransactionTemplate transactionTemplate; + + private final AsyncService.AsyncCallable callable; + + QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { + this.queue = queue; + this.taskDao = taskDao; + this.transactionTemplate = transactionTemplate; + this.callable = callable; + } + + public void ping() { + synchronized (this) { + if (!busy) { + log.info("Sending ping to " + queue); + notify(); + } else { + checkForNewTasks = true; + } + } + } + + public void run() { + while (shouldRun) { + try { + List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() { + public List<Task> doInTransaction(TransactionStatus status) { + return taskDao.findByNameAndCompletedIsNull(queue.name); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.name); + + if(tasks.size() > 0) { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } + } catch (Throwable e) { + log.warn("Error while executing tasks.", e); + } + + synchronized (this) { + busy = false; + + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + continue; + } + + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + + busy = true; + } + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + taskDao.update(task.registerRun()); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(task.arguments); + Date completed = new Date(); + Task t = task.registerComplete(completed); + log.info("Completed task: {}", t); + taskDao.update(t); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } +} |