diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-23 09:37:57 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-23 09:37:57 +0200 |
commit | 7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 (patch) | |
tree | c0bd7202845697207b04d518f613588df17d9e12 /src/main/java/io/trygvis/async/JdbcAsyncService.java | |
download | jdbc-queue-master.tar.gz jdbc-queue-master.tar.bz2 jdbc-queue-master.tar.xz jdbc-queue-master.zip |
Diffstat (limited to 'src/main/java/io/trygvis/async/JdbcAsyncService.java')
-rw-r--r-- | src/main/java/io/trygvis/async/JdbcAsyncService.java | 75 |
1 files changed, 75 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java new file mode 100644 index 0000000..46f1f30 --- /dev/null +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -0,0 +1,75 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; + +public class JdbcAsyncService { + private final Map<String, QueueController> queues = new HashMap<>(); + + private final QueueSystem queueSystem; + + public JdbcAsyncService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + } + + public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { + if (queues.containsKey(queue.queue.name)) { + throw new IllegalArgumentException("Queue already exist."); + } + + QueueController queueController = new QueueController(queueSystem, req, processor, queue); + + queues.put(queue.queue.name, queueController); + + return queueController; + } + + public QueueExecutor getQueue(String name) { + return getQueueThread(name).queue; + } + + public Task await(Connection c, Task task, long timeout) throws SQLException { + final long start = currentTimeMillis(); + final long end = start + timeout; + + while (currentTimeMillis() < end) { + task = update(c, task); + + if (task == null) { + throw new RuntimeException("The task went away."); + } + + try { + sleep(100); + } catch (InterruptedException e) { + // break + } + } + + return task; + } + + public Task update(Connection c, Task ref) throws SQLException { + return queueSystem.createTaskDao(c).findById(ref.id()); + } + + private synchronized QueueController getQueueThread(String name) { + QueueController queueController = queues.get(name); + + if (queueController == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + return queueController; + } +} |