package io.trygvis.async; import io.trygvis.queue.Queue; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException; class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); private final SqlEffectExecutor sqlEffectExecutor; private final AsyncService.AsyncCallable callable; public final Queue queue; public boolean shouldRun = true; private boolean checkForNewTasks; private boolean busy; QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) { this.sqlEffectExecutor = sqlEffectExecutor; this.callable = callable; this.queue = queue; } public void ping() { synchronized (this) { if (!busy) { log.info("Sending ping to " + queue); notify(); } else { checkForNewTasks = true; } } } public void run() { while (shouldRun) { try { // List tasks = transactionTemplate.execute(new TransactionCallback>() { // public List doInTransaction(TransactionStatus status) { // return taskDao.findByNameAndCompletedIsNull(queue.name); // } // }); List tasks = sqlEffectExecutor.execute(new SqlEffect>() { @Override public List doInConnection(Connection connection) throws SQLException { return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { for (final Task task : tasks) { try { executeTask(task); } catch (SqlExecutionException | TaskFailureException e) { log.warn("Task execution failed", e); } } } } catch (Throwable e) { log.warn("Error while executing tasks.", e); } synchronized (this) { busy = false; if (checkForNewTasks) { log.info("Ping received!"); checkForNewTasks = false; continue; } try { wait(); } catch (InterruptedException e) { // ignore } busy = true; } } } private void executeTask(final Task task) { final Date run = new Date(); log.info("Setting last run on task. date = {}, task = {}", run, task); sqlEffectExecutor.execute(new SqlEffect.Void() { @Override public void doInConnection(Connection connection) throws SQLException { new TaskDao(connection).update(task.registerRun()); } }); // transactionTemplate.execute(new TransactionCallbackWithoutResult() { // protected void doInTransactionWithoutResult(TransactionStatus status) { // taskDao.update(task.registerRun()); // } // }); sqlEffectExecutor.execute(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { try { callable.run(task.arguments); Date completed = new Date(); Task t = task.registerComplete(completed); log.info("Completed task: {}", t); new TaskDao(c).update(t); } catch (Exception e) { throw new TaskFailureException(e); } } }); // transactionTemplate.execute(new TransactionCallbackWithoutResult() { // protected void doInTransactionWithoutResult(TransactionStatus status) { // try { // callable.run(task.arguments); // Date completed = new Date(); // Task t = task.registerComplete(completed); // log.info("Completed task: {}", t); // taskDao.update(t); // } catch (Exception e) { // throw new TaskFailureException(e); // } // } // }); } }