aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/QueueThread.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/QueueThread.java')
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java78
1 files changed, 50 insertions, 28 deletions
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index 69466df..00c46b4 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -5,37 +5,33 @@ import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.transaction.TransactionException;
-import org.springframework.transaction.TransactionStatus;
-import org.springframework.transaction.support.TransactionCallback;
-import org.springframework.transaction.support.TransactionCallbackWithoutResult;
-import org.springframework.transaction.support.TransactionTemplate;
+import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException;
+
class QueueThread implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
- public boolean shouldRun = true;
-
- private boolean checkForNewTasks;
+ private final SqlEffectExecutor sqlEffectExecutor;
- private boolean busy;
+ private final AsyncService.AsyncCallable callable;
public final Queue queue;
- private final TaskDao taskDao;
+ public boolean shouldRun = true;
- private final TransactionTemplate transactionTemplate;
+ private boolean checkForNewTasks;
- private final AsyncService.AsyncCallable callable;
+ private boolean busy;
- QueueThread(Queue queue, TaskDao taskDao, TransactionTemplate transactionTemplate, AsyncService.AsyncCallable callable) {
- this.queue = queue;
- this.taskDao = taskDao;
- this.transactionTemplate = transactionTemplate;
+ QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) {
+ this.sqlEffectExecutor = sqlEffectExecutor;
this.callable = callable;
+ this.queue = queue;
}
public void ping() {
@@ -52,19 +48,25 @@ class QueueThread implements Runnable {
public void run() {
while (shouldRun) {
try {
- List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
- public List<Task> doInTransaction(TransactionStatus status) {
- return taskDao.findByNameAndCompletedIsNull(queue.name);
+// List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
+// public List<Task> doInTransaction(TransactionStatus status) {
+// return taskDao.findByNameAndCompletedIsNull(queue.name);
+// }
+// });
+ List<Task> tasks = sqlEffectExecutor.execute(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection connection) throws SQLException {
+ return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name);
}
});
log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
- if(tasks.size() > 0) {
+ if (tasks.size() > 0) {
for (final Task task : tasks) {
try {
executeTask(task);
- } catch (TransactionException | TaskFailureException e) {
+ } catch (SqlExecutionException | TaskFailureException e) {
log.warn("Task execution failed", e);
}
}
@@ -96,24 +98,44 @@ class QueueThread implements Runnable {
private void executeTask(final Task task) {
final Date run = new Date();
log.info("Setting last run on task. date = {}, task = {}", run, task);
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
- taskDao.update(task.registerRun());
+ sqlEffectExecutor.execute(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection connection) throws SQLException {
+ new TaskDao(connection).update(task.registerRun());
}
});
-
- transactionTemplate.execute(new TransactionCallbackWithoutResult() {
- protected void doInTransactionWithoutResult(TransactionStatus status) {
+// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+// protected void doInTransactionWithoutResult(TransactionStatus status) {
+// taskDao.update(task.registerRun());
+// }
+// });
+
+ sqlEffectExecutor.execute(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
try {
callable.run(task.arguments);
Date completed = new Date();
Task t = task.registerComplete(completed);
log.info("Completed task: {}", t);
- taskDao.update(t);
+ new TaskDao(c).update(t);
} catch (Exception e) {
throw new TaskFailureException(e);
}
}
});
+// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+// protected void doInTransactionWithoutResult(TransactionStatus status) {
+// try {
+// callable.run(task.arguments);
+// Date completed = new Date();
+// Task t = task.registerComplete(completed);
+// log.info("Completed task: {}", t);
+// taskDao.update(t);
+// } catch (Exception e) {
+// throw new TaskFailureException(e);
+// }
+// }
+// });
}
}