From 1ec4fae12c5e5363591013e5a759590d913d6782 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 16 Jun 2013 12:07:43 +0200 Subject: wip --- .../java/io/trygvis/async/JdbcAsyncService.java | 61 ++++++---------------- 1 file changed, 16 insertions(+), 45 deletions(-) (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java') diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index ddfa150..fd4b38b 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,7 +1,8 @@ package io.trygvis.async; import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; @@ -11,11 +12,8 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; -import java.util.Date; import java.util.HashMap; -import java.util.List; import java.util.Map; -import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; @@ -23,7 +21,7 @@ import static java.lang.Thread.sleep; public class JdbcAsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); - private final Map queues = new HashMap<>(); + private final Map queues = new HashMap<>(); private final QueueSystem queueSystem; private final JdbcQueueService queueService; @@ -33,49 +31,22 @@ public class JdbcAsyncService { this.queueService = queueSystem.createQueueService(); } - public synchronized void registerQueue(Queue queue, TaskEffect processor) { - final QueueThread queueThread = new QueueThread(queueSystem, processor, queue); - - queues.put(queue.name, queueThread); - - log.info("registerQueue: LEAVE"); - } - - public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) { - getQueueThread(queue.name).start(executor); - } - - public synchronized void stopQueue(Queue queue) { - QueueThread queueThread = queues.remove(queue.name); - - if (queueThread == null) { - throw new RuntimeException("No such queue: '" + queue.name + "'."); + public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { + if (queues.containsKey(queue.queue.name)) { + throw new IllegalArgumentException("Queue already exist."); } - queueThread.stop(); - } - - public Queue getQueue(String name) { - QueueThread queueThread = getQueueThread(name); + QueueController queueController = new QueueController(queueSystem, req, processor, queue); - return queueThread.queue; - } + queues.put(queue.queue.name, queueController); - public Task schedule(Connection c, final Queue queue, List args) throws SQLException { - return scheduleInner(c, null, queue, args); - } + log.info("registerQueue: LEAVE"); - public Task schedule(Connection c, long parent, Queue queue, List args) throws SQLException { - return scheduleInner(c, parent, queue, args); + return queueController; } - private Task scheduleInner(Connection c, Long parent, final Queue queue, List args) throws SQLException { - Date scheduled = new Date(); - - Task task = queueService.schedule(c, queue, parent, scheduled, args); - log.info("Created task = {}", task); - - return task; + public QueueExecutor getQueue(String name) { + return getQueueThread(name).queue; } public Task await(Connection c, Task task, long timeout) throws SQLException { @@ -105,12 +76,12 @@ public class JdbcAsyncService { return taskDao.findById(ref.id()); } - private QueueThread getQueueThread(String name) { - QueueThread queueThread = queues.get(name); + private QueueController getQueueThread(String name) { + QueueController queueController = queues.get(name); - if (queueThread == null) { + if (queueController == null) { throw new RuntimeException("No such queue: '" + name + "'."); } - return queueThread; + return queueController; } } -- cgit v1.2.3