package io.trygvis.async; import io.trygvis.queue.Queue; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.transaction.TransactionException; import org.springframework.transaction.TransactionStatus; import org.springframework.transaction.support.TransactionCallback; import org.springframework.transaction.support.TransactionCallbackWithoutResult; import org.springframework.transaction.support.TransactionTemplate; import java.util.Date; import java.util.List; class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); public boolean shouldRun = true; private boolean checkForNewTasks; private boolean busy; public final Queue queue; private final TaskDao taskDao; private final TransactionTemplate transactionTemplate; private final AsyncService.AsyncCallable callable; QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { this.queue = queue; this.taskDao = taskDao; this.transactionTemplate = transactionTemplate; this.callable = callable; } public void ping() { synchronized (this) { if (!busy) { log.info("Sending ping to " + queue); notify(); } else { checkForNewTasks = true; } } } public void run() { while (shouldRun) { try { List tasks = transactionTemplate.execute(new TransactionCallback>() { public List doInTransaction(TransactionStatus status) { return taskDao.findByNameAndCompletedIsNull(queue.name); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if(tasks.size() > 0) { for (final Task task : tasks) { try { executeTask(task); } catch (TransactionException | TaskFailureException e) { log.warn("Task execution failed", e); } } } } catch (Throwable e) { log.warn("Error while executing tasks.", e); } synchronized (this) { busy = false; if (checkForNewTasks) { log.info("Ping received!"); checkForNewTasks = false; continue; } try { wait(); } catch (InterruptedException e) { // ignore } busy = true; } } } private void executeTask(final Task task) { final Date run = new Date(); log.info("Setting last run on task. date = {}, task = {}", run, task); transactionTemplate.execute(new TransactionCallbackWithoutResult() { protected void doInTransactionWithoutResult(TransactionStatus status) { taskDao.update(task.registerRun()); } }); transactionTemplate.execute(new TransactionCallbackWithoutResult() { protected void doInTransactionWithoutResult(TransactionStatus status) { try { callable.run(task.arguments); Date completed = new Date(); Task t = task.registerComplete(completed); log.info("Completed task: {}", t); taskDao.update(t); } catch (Exception e) { throw new TaskFailureException(e); } } }); } }