package io.trygvis.async; import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; import java.sql.Connection; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; public class JdbcAsyncService { private final Map queues = new HashMap<>(); private final QueueSystem queueSystem; public JdbcAsyncService(QueueSystem queueSystem) { this.queueSystem = queueSystem; } public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { if (queues.containsKey(queue.queue.name)) { throw new IllegalArgumentException("Queue already exist."); } QueueController queueController = new QueueController(queueSystem, req, processor, queue); queues.put(queue.queue.name, queueController); return queueController; } public QueueExecutor getQueue(String name) { return getQueueThread(name).queue; } public Task await(Connection c, Task task, long timeout) throws SQLException { final long start = currentTimeMillis(); final long end = start + timeout; while (currentTimeMillis() < end) { task = update(c, task); if (task == null) { throw new RuntimeException("The task went away."); } try { sleep(100); } catch (InterruptedException e) { // break } } return task; } public Task update(Connection c, Task ref) throws SQLException { return queueSystem.createTaskDao(c).findById(ref.id()); } private synchronized QueueController getQueueThread(String name) { QueueController queueController = queues.get(name); if (queueController == null) { throw new RuntimeException("No such queue: '" + name + "'."); } return queueController; } }