From 52084f7b4e6f50c90b3255cdf2eb9deab560c970 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 2 Jun 2013 12:32:29 +0200 Subject: o Making some test cases. --- src/main/java/io/trygvis/async/QueueThread.java | 78 ++++++++++++++++--------- 1 file changed, 50 insertions(+), 28 deletions(-) (limited to 'src/main/java/io/trygvis/async/QueueThread.java') diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 69466df..00c46b4 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -5,37 +5,33 @@ import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.transaction.TransactionException; -import org.springframework.transaction.TransactionStatus; -import org.springframework.transaction.support.TransactionCallback; -import org.springframework.transaction.support.TransactionCallbackWithoutResult; -import org.springframework.transaction.support.TransactionTemplate; +import java.sql.Connection; +import java.sql.SQLException; import java.util.Date; import java.util.List; +import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException; + class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); - public boolean shouldRun = true; - - private boolean checkForNewTasks; + private final SqlEffectExecutor sqlEffectExecutor; - private boolean busy; + private final AsyncService.AsyncCallable callable; public final Queue queue; - private final TaskDao taskDao; + public boolean shouldRun = true; - private final TransactionTemplate transactionTemplate; + private boolean checkForNewTasks; - private final AsyncService.AsyncCallable callable; + private boolean busy; - QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) { - this.queue = queue; - this.taskDao = taskDao; - this.transactionTemplate = transactionTemplate; + QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) { + this.sqlEffectExecutor = sqlEffectExecutor; this.callable = callable; + this.queue = queue; } public void ping() { @@ -52,19 +48,25 @@ class QueueThread implements Runnable { public void run() { while (shouldRun) { try { - List tasks = transactionTemplate.execute(new TransactionCallback>() { - public List doInTransaction(TransactionStatus status) { - return taskDao.findByNameAndCompletedIsNull(queue.name); +// List tasks = transactionTemplate.execute(new TransactionCallback>() { +// public List doInTransaction(TransactionStatus status) { +// return taskDao.findByNameAndCompletedIsNull(queue.name); +// } +// }); + List tasks = sqlEffectExecutor.execute(new SqlEffect>() { + @Override + public List doInConnection(Connection connection) throws SQLException { + return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); - if(tasks.size() > 0) { + if (tasks.size() > 0) { for (final Task task : tasks) { try { executeTask(task); - } catch (TransactionException | TaskFailureException e) { + } catch (SqlExecutionException | TaskFailureException e) { log.warn("Task execution failed", e); } } @@ -96,24 +98,44 @@ class QueueThread implements Runnable { private void executeTask(final Task task) { final Date run = new Date(); log.info("Setting last run on task. date = {}, task = {}", run, task); - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { - taskDao.update(task.registerRun()); + sqlEffectExecutor.execute(new SqlEffect.Void() { + @Override + public void doInConnection(Connection connection) throws SQLException { + new TaskDao(connection).update(task.registerRun()); } }); - - transactionTemplate.execute(new TransactionCallbackWithoutResult() { - protected void doInTransactionWithoutResult(TransactionStatus status) { +// transactionTemplate.execute(new TransactionCallbackWithoutResult() { +// protected void doInTransactionWithoutResult(TransactionStatus status) { +// taskDao.update(task.registerRun()); +// } +// }); + + sqlEffectExecutor.execute(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { try { callable.run(task.arguments); Date completed = new Date(); Task t = task.registerComplete(completed); log.info("Completed task: {}", t); - taskDao.update(t); + new TaskDao(c).update(t); } catch (Exception e) { throw new TaskFailureException(e); } } }); +// transactionTemplate.execute(new TransactionCallbackWithoutResult() { +// protected void doInTransactionWithoutResult(TransactionStatus status) { +// try { +// callable.run(task.arguments); +// Date completed = new Date(); +// Task t = task.registerComplete(completed); +// log.info("Completed task: {}", t); +// taskDao.update(t); +// } catch (Exception e) { +// throw new TaskFailureException(e); +// } +// } +// }); } } -- cgit v1.2.3