From 54d7b2ce520e57cc0ffb9582546b80a32fa00682 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 12 Jun 2013 22:55:18 +0200 Subject: wip --- .../java/io/trygvis/async/JdbcAsyncService.java | 41 ++++++++++------------ 1 file changed, 19 insertions(+), 22 deletions(-) (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java') diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 6baa56e..ddfa150 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -19,7 +19,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; -import static java.util.concurrent.TimeUnit.SECONDS; public class JdbcAsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -34,7 +33,7 @@ public class JdbcAsyncService { this.queueService = queueSystem.createQueueService(); } - public void registerQueue(Queue queue, TaskEffect processor) { + public synchronized void registerQueue(Queue queue, TaskEffect processor) { final QueueThread queueThread = new QueueThread(queueSystem, processor, queue); queues.put(queue.name, queueThread); @@ -42,31 +41,22 @@ public class JdbcAsyncService { log.info("registerQueue: LEAVE"); } - public void startQueue(ScheduledThreadPoolExecutor executor, String name) { - final QueueThread queueThread = queues.get(name); + public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) { + getQueueThread(queue.name).start(executor); + } + + public synchronized void stopQueue(Queue queue) { + QueueThread queueThread = queues.remove(queue.name); if (queueThread == null) { - throw new RuntimeException("No such queue: " + name); + throw new RuntimeException("No such queue: '" + queue.name + "'."); } - long interval = queueThread.queue.interval; - log.info("Starting thread for queue {} with poll interval = {}s", name, interval); - executor.scheduleAtFixedRate(new Runnable() { - public void run() { - queueThread.ping(); - } - }, 10, interval, SECONDS); - Thread thread = new Thread(queueThread, name); - thread.setDaemon(true); - thread.start(); + queueThread.stop(); } public Queue getQueue(String name) { - QueueThread queueThread = queues.get(name); - - if (queueThread == null) { - throw new RuntimeException("No such queue: '" + name + "'."); - } + QueueThread queueThread = getQueueThread(name); return queueThread.queue; } @@ -80,8 +70,6 @@ public class JdbcAsyncService { } private Task scheduleInner(Connection c, Long parent, final Queue queue, List args) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - Date scheduled = new Date(); Task task = queueService.schedule(c, queue, parent, scheduled, args); @@ -116,4 +104,13 @@ public class JdbcAsyncService { return taskDao.findById(ref.id()); } + + private QueueThread getQueueThread(String name) { + QueueThread queueThread = queues.get(name); + + if (queueThread == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + return queueThread; + } } -- cgit v1.2.3