From 7465fdb9aa847d29dacc56adbe473f1c1ceb298e Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Tue, 4 Jun 2013 20:54:56 +0200 Subject: o Creating a QueueService on top of the DAOs. --- .../java/io/trygvis/queue/JdbcQueueService.java | 91 ++++++++++++++++++++++ src/main/java/io/trygvis/queue/QueueService.java | 17 ++++ src/main/java/io/trygvis/queue/Task.java | 39 +++++++++- src/main/java/io/trygvis/queue/TaskDao.java | 14 ++-- 4 files changed, 154 insertions(+), 7 deletions(-) create mode 100644 src/main/java/io/trygvis/queue/JdbcQueueService.java create mode 100644 src/main/java/io/trygvis/queue/QueueService.java (limited to 'src/main/java/io/trygvis/queue') diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java new file mode 100644 index 0000000..793333d --- /dev/null +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -0,0 +1,91 @@ +package io.trygvis.queue; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public class JdbcQueueService { + + private Logger log = LoggerFactory.getLogger(getClass()); + + private JdbcQueueService(Connection c) throws SQLException { + if (c.getAutoCommit()) { + throw new SQLException("The connection cannot be in auto-commit mode."); + } + + DatabaseMetaData metaData = c.getMetaData(); + String productName = metaData.getDatabaseProductName(); + String productVersion = metaData.getDatabaseProductVersion(); + + log.info("productName = " + productName); + log.info("productVersion = " + productVersion); + } + + public void consume(Connection c, Queue queue, QueueService.TaskEffect effect) throws SQLException { + TaskDao taskDao = createTaskDao(c); + + List tasks = taskDao.findByNameAndCompletedIsNull(queue.name); + log.trace("Got {} tasks.", tasks.size()); + + for (Task task : tasks) { + log.trace("Executing task {}", task.id()); + try { + List newTasks = effect.consume(task); + log.trace("Executed task {}, newTasks: ", task.id(), newTasks.size()); + + Date now = new Date(); + + task = task.registerComplete(now); + + for (Task newTask : newTasks) { + taskDao.insert(task.id(), newTask.queue, now, newTask.arguments); + } + + taskDao.update(task); + } catch (Throwable e) { + log.error("Unable to execute task, id=" + task.id(), e); + } + c.commit(); + } + } + + public Queue getQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { + QueueDao queueDao = createQueueDao(c); + + Queue q = queueDao.findByName(name); + + if (q == null) { + if (!autoCreate) { + throw new SQLException("No such queue: '" + name + "'."); + } + + q = new Queue(name, interval); + queueDao.insert(q); + } + + return q; + } + + public void schedule(Connection c, Queue queue, Date scheduled, List arguments) throws SQLException { + TaskDao taskDao = createTaskDao(c); + + taskDao.insert(queue.name, scheduled, arguments); + } + + public static JdbcQueueService createQueueService(Connection c) throws SQLException { + return new JdbcQueueService(c); + } + + public QueueDao createQueueDao(Connection c) { + return new QueueDao(c); + } + + public TaskDao createTaskDao(Connection c) { + return new TaskDao(c); + } +} diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java new file mode 100644 index 0000000..2111013 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -0,0 +1,17 @@ +package io.trygvis.queue; + +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public interface QueueService { + void consume(Queue queue, TaskEffect effect) throws SQLException; + + Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException; + + void schedule(Queue queue, Date scheduled, List arguments) throws SQLException; + + public static interface TaskEffect { + List consume(Task task) throws Exception; + } +} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java index 2b1103b..1af40d7 100755 --- a/src/main/java/io/trygvis/queue/Task.java +++ b/src/main/java/io/trygvis/queue/Task.java @@ -3,9 +3,11 @@ package io.trygvis.queue; import java.util.Date; import java.util.List; +import static java.util.Arrays.asList; + public class Task { - public final long id; + private final long id; public final Long parent; @@ -54,7 +56,42 @@ public class Task { '}'; } + public long id() { + if (id == 0) { + throw new RuntimeException("This task has not been persisted yet."); + } + + return id; + } + public boolean isDone() { return completed != null; } + + public static Task newTask(String name, Date scheduled, String... arguments) { + return new Task(0, 0l, name, scheduled, null, 0, null, asList(arguments)); + } + + public static Task newTask(String name, Date scheduled, List arguments) { + return new Task(0, 0l, name, scheduled, null, 0, null, arguments); + } + + public static List stringToArguments(String arguments) { + return asList(arguments.split(",")); + } + + public static String argumentsToString(List arguments) { + StringBuilder builder = new StringBuilder(); + for (int i = 0, argumentsLength = arguments.size(); i < argumentsLength; i++) { + String argument = arguments.get(i); + if (argument.contains(",")) { + throw new RuntimeException("The argument string can't contain a comma."); + } + if (i > 0) { + builder.append(','); + } + builder.append(argument); + } + return builder.toString(); + } } diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 5459933..3aa2ac2 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -11,7 +11,8 @@ import java.util.Collections; import java.util.Date; import java.util.List; -import static java.util.Arrays.asList; +import static io.trygvis.queue.Task.argumentsToString; +import static io.trygvis.queue.Task.stringToArguments; public class TaskDao { @@ -23,11 +24,11 @@ public class TaskDao { this.connection = connection; } - public long insert(String queue, Date scheduled, String arguments) throws SQLException { + public long insert(String queue, Date scheduled, List arguments) throws SQLException { return insert(null, queue, scheduled, arguments); } - public long insert(Long parent, String queue, Date scheduled, String arguments) throws SQLException { + public long insert(Long parent, String queue, Date scheduled, List arguments) throws SQLException { String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " + "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)"; try (PreparedStatement stmt = connection.prepareStatement(sql)) { @@ -39,7 +40,7 @@ public class TaskDao { } stmt.setString(i++, queue); stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); - stmt.setString(i, arguments); + stmt.setString(i, argumentsToString(arguments)); stmt.executeUpdate(); } try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) { @@ -51,6 +52,7 @@ public class TaskDao { public Task findById(long id) throws SQLException { try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { + stmt.setLong(1, id); ResultSet rs = stmt.executeQuery(); return rs.next() ? mapRow(rs) : null; } @@ -76,7 +78,7 @@ public class TaskDao { setTimestamp(stmt, i++, task.lastRun); stmt.setInt(i++, task.runCount); setTimestamp(stmt, i++, task.completed); - stmt.setLong(i, task.id); + stmt.setLong(i, task.id()); stmt.executeUpdate(); } } @@ -99,6 +101,6 @@ public class TaskDao { rs.getTimestamp(5), rs.getInt(6), rs.getTimestamp(7), - arguments != null ? asList(arguments.split(" ")) : Collections.emptyList()); + arguments != null ? stringToArguments(arguments) : Collections.emptyList()); } } -- cgit v1.2.3