diff options
Diffstat (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java')
-rw-r--r-- | src/main/java/io/trygvis/async/JdbcAsyncService.java | 22 |
1 files changed, 16 insertions, 6 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 310c59b..6baa56e 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,8 +1,11 @@ package io.trygvis.async; +import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; +import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,8 +26,16 @@ public class JdbcAsyncService { private final Map<String, QueueThread> queues = new HashMap<>(); - public void registerQueue(SqlEffectExecutor sqlEffectExecutor, Queue queue, AsyncService.AsyncCallable callable) { - final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, queue); + private final QueueSystem queueSystem; + private final JdbcQueueService queueService; + + public JdbcAsyncService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + this.queueService = queueSystem.createQueueService(); + } + + public void registerQueue(Queue queue, TaskEffect processor) { + final QueueThread queueThread = new QueueThread(queueSystem, processor, queue); queues.put(queue.name, queueThread); @@ -69,12 +80,11 @@ public class JdbcAsyncService { } private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException { - TaskDao taskDao = new TaskDao(c); + TaskDao taskDao = queueSystem.createTaskDao(c); Date scheduled = new Date(); - long id = taskDao.insert(parent, queue.name, scheduled, args); - Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, args); + Task task = queueService.schedule(c, queue, parent, scheduled, args); log.info("Created task = {}", task); return task; @@ -102,7 +112,7 @@ public class JdbcAsyncService { } public Task update(Connection c, Task ref) throws SQLException { - TaskDao taskDao = new TaskDao(c); + TaskDao taskDao = queueSystem.createTaskDao(c); return taskDao.findById(ref.id()); } |