aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/JdbcQueueService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcQueueService.java')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java91
1 files changed, 91 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
new file mode 100644
index 0000000..793333d
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -0,0 +1,91 @@
+package io.trygvis.queue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+public class JdbcQueueService {
+
+ private Logger log = LoggerFactory.getLogger(getClass());
+
+ private JdbcQueueService(Connection c) throws SQLException {
+ if (c.getAutoCommit()) {
+ throw new SQLException("The connection cannot be in auto-commit mode.");
+ }
+
+ DatabaseMetaData metaData = c.getMetaData();
+ String productName = metaData.getDatabaseProductName();
+ String productVersion = metaData.getDatabaseProductVersion();
+
+ log.info("productName = " + productName);
+ log.info("productVersion = " + productVersion);
+ }
+
+ public void consume(Connection c, Queue queue, QueueService.TaskEffect effect) throws SQLException {
+ TaskDao taskDao = createTaskDao(c);
+
+ List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
+ log.trace("Got {} tasks.", tasks.size());
+
+ for (Task task : tasks) {
+ log.trace("Executing task {}", task.id());
+ try {
+ List<Task> newTasks = effect.consume(task);
+ log.trace("Executed task {}, newTasks: ", task.id(), newTasks.size());
+
+ Date now = new Date();
+
+ task = task.registerComplete(now);
+
+ for (Task newTask : newTasks) {
+ taskDao.insert(task.id(), newTask.queue, now, newTask.arguments);
+ }
+
+ taskDao.update(task);
+ } catch (Throwable e) {
+ log.error("Unable to execute task, id=" + task.id(), e);
+ }
+ c.commit();
+ }
+ }
+
+ public Queue getQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
+ QueueDao queueDao = createQueueDao(c);
+
+ Queue q = queueDao.findByName(name);
+
+ if (q == null) {
+ if (!autoCreate) {
+ throw new SQLException("No such queue: '" + name + "'.");
+ }
+
+ q = new Queue(name, interval);
+ queueDao.insert(q);
+ }
+
+ return q;
+ }
+
+ public void schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = createTaskDao(c);
+
+ taskDao.insert(queue.name, scheduled, arguments);
+ }
+
+ public static JdbcQueueService createQueueService(Connection c) throws SQLException {
+ return new JdbcQueueService(c);
+ }
+
+ public QueueDao createQueueDao(Connection c) {
+ return new QueueDao(c);
+ }
+
+ public TaskDao createTaskDao(Connection c) {
+ return new TaskDao(c);
+ }
+}