aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/JdbcAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java')
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java41
1 files changed, 19 insertions, 22 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 6baa56e..ddfa150 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -19,7 +19,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
-import static java.util.concurrent.TimeUnit.SECONDS;
public class JdbcAsyncService {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -34,7 +33,7 @@ public class JdbcAsyncService {
this.queueService = queueSystem.createQueueService();
}
- public void registerQueue(Queue queue, TaskEffect processor) {
+ public synchronized void registerQueue(Queue queue, TaskEffect processor) {
final QueueThread queueThread = new QueueThread(queueSystem, processor, queue);
queues.put(queue.name, queueThread);
@@ -42,31 +41,22 @@ public class JdbcAsyncService {
log.info("registerQueue: LEAVE");
}
- public void startQueue(ScheduledThreadPoolExecutor executor, String name) {
- final QueueThread queueThread = queues.get(name);
+ public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) {
+ getQueueThread(queue.name).start(executor);
+ }
+
+ public synchronized void stopQueue(Queue queue) {
+ QueueThread queueThread = queues.remove(queue.name);
if (queueThread == null) {
- throw new RuntimeException("No such queue: " + name);
+ throw new RuntimeException("No such queue: '" + queue.name + "'.");
}
- long interval = queueThread.queue.interval;
- log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
- executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- queueThread.ping();
- }
- }, 10, interval, SECONDS);
- Thread thread = new Thread(queueThread, name);
- thread.setDaemon(true);
- thread.start();
+ queueThread.stop();
}
public Queue getQueue(String name) {
- QueueThread queueThread = queues.get(name);
-
- if (queueThread == null) {
- throw new RuntimeException("No such queue: '" + name + "'.");
- }
+ QueueThread queueThread = getQueueThread(name);
return queueThread.queue;
}
@@ -80,8 +70,6 @@ public class JdbcAsyncService {
}
private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
-
Date scheduled = new Date();
Task task = queueService.schedule(c, queue, parent, scheduled, args);
@@ -116,4 +104,13 @@ public class JdbcAsyncService {
return taskDao.findById(ref.id());
}
+
+ private QueueThread getQueueThread(String name) {
+ QueueThread queueThread = queues.get(name);
+
+ if (queueThread == null) {
+ throw new RuntimeException("No such queue: '" + name + "'.");
+ }
+ return queueThread;
+ }
}