From 7d704feb86c44fca57941d223e8605b55fcf68f0 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 29 May 2013 22:16:50 +0200 Subject: o Splitting out the parts that implement the "async" features vs the "queue" features. --- src/main/java/io/trygvis/async/QueueThread.java | 119 ++++++++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 src/main/java/io/trygvis/async/QueueThread.java (limited to 'src/main/java/io/trygvis/async/QueueThread.java') diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java new file mode 100644 index 0000000..69466df --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -0,0 +1,119 @@ +package io.trygvis.async; + +import io.trygvis.queue.Queue; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskDao; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.Date; +import java.util.List; + +class QueueThread implements Runnable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean busy; + + public final Queue queue; + + private final TaskDao taskDao; + + private final TransactionTemplate transactionTemplate; + + private final AsyncService.AsyncCallable callable; + + QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { + this.queue = queue; + this.taskDao = taskDao; + this.transactionTemplate = transactionTemplate; + this.callable = callable; + } + + public void ping() { + synchronized (this) { + if (!busy) { + log.info("Sending ping to " + queue); + notify(); + } else { + checkForNewTasks = true; + } + } + } + + public void run() { + while (shouldRun) { + try { + List tasks = transactionTemplate.execute(new TransactionCallback>() { + public List doInTransaction(TransactionStatus status) { + return taskDao.findByNameAndCompletedIsNull(queue.name); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.name); + + if(tasks.size() > 0) { + for (final Task task : tasks) { + try { + executeTask(task); + } catch (TransactionException | TaskFailureException e) { + log.warn("Task execution failed", e); + } + } + } + } catch (Throwable e) { + log.warn("Error while executing tasks.", e); + } + + synchronized (this) { + busy = false; + + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + continue; + } + + try { + wait(); + } catch (InterruptedException e) { + // ignore + } + + busy = true; + } + } + } + + private void executeTask(final Task task) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + taskDao.update(task.registerRun()); + } + }); + + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + try { + callable.run(task.arguments); + Date completed = new Date(); + Task t = task.registerComplete(completed); + log.info("Completed task: {}", t); + taskDao.update(t); + } catch (Exception e) { + throw new TaskFailureException(e); + } + } + }); + } +} -- cgit v1.2.3