diff options
Diffstat (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java')
-rw-r--r-- | src/main/java/io/trygvis/async/JdbcAsyncService.java | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java new file mode 100644 index 0000000..46f1f30 --- /dev/null +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -0,0 +1,75 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; + +public class JdbcAsyncService { + private final Map<String, QueueController> queues = new HashMap<>(); + + private final QueueSystem queueSystem; + + public JdbcAsyncService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + } + + public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { + if (queues.containsKey(queue.queue.name)) { + throw new IllegalArgumentException("Queue already exist."); + } + + QueueController queueController = new QueueController(queueSystem, req, processor, queue); + + queues.put(queue.queue.name, queueController); + + return queueController; + } + + public QueueExecutor getQueue(String name) { + return getQueueThread(name).queue; + } + + public Task await(Connection c, Task task, long timeout) throws SQLException { + final long start = currentTimeMillis(); + final long end = start + timeout; + + while (currentTimeMillis() < end) { + task = update(c, task); + + if (task == null) { + throw new RuntimeException("The task went away."); + } + + try { + sleep(100); + } catch (InterruptedException e) { + // break + } + } + + return task; + } + + public Task update(Connection c, Task ref) throws SQLException { + return queueSystem.createTaskDao(c).findById(ref.id()); + } + + private synchronized QueueController getQueueThread(String name) { + QueueController queueController = queues.get(name); + + if (queueController == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + return queueController; + } +} |