aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/JdbcAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java')
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java22
1 files changed, 16 insertions, 6 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 310c59b..6baa56e 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -1,8 +1,11 @@
package io.trygvis.async;
+import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
+import io.trygvis.queue.TaskEffect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,8 +26,16 @@ public class JdbcAsyncService {
private final Map<String, QueueThread> queues = new HashMap<>();
- public void registerQueue(SqlEffectExecutor sqlEffectExecutor, Queue queue, AsyncService.AsyncCallable callable) {
- final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, queue);
+ private final QueueSystem queueSystem;
+ private final JdbcQueueService queueService;
+
+ public JdbcAsyncService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ this.queueService = queueSystem.createQueueService();
+ }
+
+ public void registerQueue(Queue queue, TaskEffect processor) {
+ final QueueThread queueThread = new QueueThread(queueSystem, processor, queue);
queues.put(queue.name, queueThread);
@@ -69,12 +80,11 @@ public class JdbcAsyncService {
}
private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException {
- TaskDao taskDao = new TaskDao(c);
+ TaskDao taskDao = queueSystem.createTaskDao(c);
Date scheduled = new Date();
- long id = taskDao.insert(parent, queue.name, scheduled, args);
- Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, args);
+ Task task = queueService.schedule(c, queue, parent, scheduled, args);
log.info("Created task = {}", task);
return task;
@@ -102,7 +112,7 @@ public class JdbcAsyncService {
}
public Task update(Connection c, Task ref) throws SQLException {
- TaskDao taskDao = new TaskDao(c);
+ TaskDao taskDao = queueSystem.createTaskDao(c);
return taskDao.findById(ref.id());
}