diff options
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcQueueService.java')
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcQueueService.java | 91 |
1 files changed, 91 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java new file mode 100644 index 0000000..793333d --- /dev/null +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -0,0 +1,91 @@ +package io.trygvis.queue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public class JdbcQueueService { + + private Logger log = LoggerFactory.getLogger(getClass()); + + private JdbcQueueService(Connection c) throws SQLException { + if (c.getAutoCommit()) { + throw new SQLException("The connection cannot be in auto-commit mode."); + } + + DatabaseMetaData metaData = c.getMetaData(); + String productName = metaData.getDatabaseProductName(); + String productVersion = metaData.getDatabaseProductVersion(); + + log.info("productName = " + productName); + log.info("productVersion = " + productVersion); + } + + public void consume(Connection c, Queue queue, QueueService.TaskEffect effect) throws SQLException { + TaskDao taskDao = createTaskDao(c); + + List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name); + log.trace("Got {} tasks.", tasks.size()); + + for (Task task : tasks) { + log.trace("Executing task {}", task.id()); + try { + List<Task> newTasks = effect.consume(task); + log.trace("Executed task {}, newTasks: ", task.id(), newTasks.size()); + + Date now = new Date(); + + task = task.registerComplete(now); + + for (Task newTask : newTasks) { + taskDao.insert(task.id(), newTask.queue, now, newTask.arguments); + } + + taskDao.update(task); + } catch (Throwable e) { + log.error("Unable to execute task, id=" + task.id(), e); + } + c.commit(); + } + } + + public Queue getQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { + QueueDao queueDao = createQueueDao(c); + + Queue q = queueDao.findByName(name); + + if (q == null) { + if (!autoCreate) { + throw new SQLException("No such queue: '" + name + "'."); + } + + q = new Queue(name, interval); + queueDao.insert(q); + } + + return q; + } + + public void schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException { + TaskDao taskDao = createTaskDao(c); + + taskDao.insert(queue.name, scheduled, arguments); + } + + public static JdbcQueueService createQueueService(Connection c) throws SQLException { + return new JdbcQueueService(c); + } + + public QueueDao createQueueDao(Connection c) { + return new QueueDao(c); + } + + public TaskDao createTaskDao(Connection c) { + return new TaskDao(c); + } +} |