aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async/JdbcAsyncService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java')
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java75
1 files changed, 75 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
new file mode 100644
index 0000000..46f1f30
--- /dev/null
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -0,0 +1,75 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.lang.System.currentTimeMillis;
+import static java.lang.Thread.sleep;
+
+public class JdbcAsyncService {
+ private final Map<String, QueueController> queues = new HashMap<>();
+
+ private final QueueSystem queueSystem;
+
+ public JdbcAsyncService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ }
+
+ public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) {
+ if (queues.containsKey(queue.queue.name)) {
+ throw new IllegalArgumentException("Queue already exist.");
+ }
+
+ QueueController queueController = new QueueController(queueSystem, req, processor, queue);
+
+ queues.put(queue.queue.name, queueController);
+
+ return queueController;
+ }
+
+ public QueueExecutor getQueue(String name) {
+ return getQueueThread(name).queue;
+ }
+
+ public Task await(Connection c, Task task, long timeout) throws SQLException {
+ final long start = currentTimeMillis();
+ final long end = start + timeout;
+
+ while (currentTimeMillis() < end) {
+ task = update(c, task);
+
+ if (task == null) {
+ throw new RuntimeException("The task went away.");
+ }
+
+ try {
+ sleep(100);
+ } catch (InterruptedException e) {
+ // break
+ }
+ }
+
+ return task;
+ }
+
+ public Task update(Connection c, Task ref) throws SQLException {
+ return queueSystem.createTaskDao(c).findById(ref.id());
+ }
+
+ private synchronized QueueController getQueueThread(String name) {
+ QueueController queueController = queues.get(name);
+
+ if (queueController == null) {
+ throw new RuntimeException("No such queue: '" + name + "'.");
+ }
+ return queueController;
+ }
+}