aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/JdbcAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java')
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java61
1 files changed, 16 insertions, 45 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index ddfa150..fd4b38b 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -1,7 +1,8 @@
package io.trygvis.async;
import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
@@ -11,11 +12,8 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
@@ -23,7 +21,7 @@ import static java.lang.Thread.sleep;
public class JdbcAsyncService {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final Map<String, QueueThread> queues = new HashMap<>();
+ private final Map<String, QueueController> queues = new HashMap<>();
private final QueueSystem queueSystem;
private final JdbcQueueService queueService;
@@ -33,49 +31,22 @@ public class JdbcAsyncService {
this.queueService = queueSystem.createQueueService();
}
- public synchronized void registerQueue(Queue queue, TaskEffect processor) {
- final QueueThread queueThread = new QueueThread(queueSystem, processor, queue);
-
- queues.put(queue.name, queueThread);
-
- log.info("registerQueue: LEAVE");
- }
-
- public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) {
- getQueueThread(queue.name).start(executor);
- }
-
- public synchronized void stopQueue(Queue queue) {
- QueueThread queueThread = queues.remove(queue.name);
-
- if (queueThread == null) {
- throw new RuntimeException("No such queue: '" + queue.name + "'.");
+ public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) {
+ if (queues.containsKey(queue.queue.name)) {
+ throw new IllegalArgumentException("Queue already exist.");
}
- queueThread.stop();
- }
-
- public Queue getQueue(String name) {
- QueueThread queueThread = getQueueThread(name);
+ QueueController queueController = new QueueController(queueSystem, req, processor, queue);
- return queueThread.queue;
- }
+ queues.put(queue.queue.name, queueController);
- public Task schedule(Connection c, final Queue queue, List<String> args) throws SQLException {
- return scheduleInner(c, null, queue, args);
- }
+ log.info("registerQueue: LEAVE");
- public Task schedule(Connection c, long parent, Queue queue, List<String> args) throws SQLException {
- return scheduleInner(c, parent, queue, args);
+ return queueController;
}
- private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException {
- Date scheduled = new Date();
-
- Task task = queueService.schedule(c, queue, parent, scheduled, args);
- log.info("Created task = {}", task);
-
- return task;
+ public QueueExecutor getQueue(String name) {
+ return getQueueThread(name).queue;
}
public Task await(Connection c, Task task, long timeout) throws SQLException {
@@ -105,12 +76,12 @@ public class JdbcAsyncService {
return taskDao.findById(ref.id());
}
- private QueueThread getQueueThread(String name) {
- QueueThread queueThread = queues.get(name);
+ private QueueController getQueueThread(String name) {
+ QueueController queueController = queues.get(name);
- if (queueThread == null) {
+ if (queueController == null) {
throw new RuntimeException("No such queue: '" + name + "'.");
}
- return queueThread;
+ return queueController;
}
}