aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java91
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java17
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Task.java39
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java14
4 files changed, 154 insertions, 7 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
new file mode 100644
index 0000000..793333d
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -0,0 +1,91 @@
+package io.trygvis.queue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+public class JdbcQueueService {
+
+ private Logger log = LoggerFactory.getLogger(getClass());
+
+ private JdbcQueueService(Connection c) throws SQLException {
+ if (c.getAutoCommit()) {
+ throw new SQLException("The connection cannot be in auto-commit mode.");
+ }
+
+ DatabaseMetaData metaData = c.getMetaData();
+ String productName = metaData.getDatabaseProductName();
+ String productVersion = metaData.getDatabaseProductVersion();
+
+ log.info("productName = " + productName);
+ log.info("productVersion = " + productVersion);
+ }
+
+ public void consume(Connection c, Queue queue, QueueService.TaskEffect effect) throws SQLException {
+ TaskDao taskDao = createTaskDao(c);
+
+ List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
+ log.trace("Got {} tasks.", tasks.size());
+
+ for (Task task : tasks) {
+ log.trace("Executing task {}", task.id());
+ try {
+ List<Task> newTasks = effect.consume(task);
+ log.trace("Executed task {}, newTasks: ", task.id(), newTasks.size());
+
+ Date now = new Date();
+
+ task = task.registerComplete(now);
+
+ for (Task newTask : newTasks) {
+ taskDao.insert(task.id(), newTask.queue, now, newTask.arguments);
+ }
+
+ taskDao.update(task);
+ } catch (Throwable e) {
+ log.error("Unable to execute task, id=" + task.id(), e);
+ }
+ c.commit();
+ }
+ }
+
+ public Queue getQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
+ QueueDao queueDao = createQueueDao(c);
+
+ Queue q = queueDao.findByName(name);
+
+ if (q == null) {
+ if (!autoCreate) {
+ throw new SQLException("No such queue: '" + name + "'.");
+ }
+
+ q = new Queue(name, interval);
+ queueDao.insert(q);
+ }
+
+ return q;
+ }
+
+ public void schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = createTaskDao(c);
+
+ taskDao.insert(queue.name, scheduled, arguments);
+ }
+
+ public static JdbcQueueService createQueueService(Connection c) throws SQLException {
+ return new JdbcQueueService(c);
+ }
+
+ public QueueDao createQueueDao(Connection c) {
+ return new QueueDao(c);
+ }
+
+ public TaskDao createTaskDao(Connection c) {
+ return new TaskDao(c);
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
new file mode 100644
index 0000000..2111013
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -0,0 +1,17 @@
+package io.trygvis.queue;
+
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+public interface QueueService {
+ void consume(Queue queue, TaskEffect effect) throws SQLException;
+
+ Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException;
+
+ void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException;
+
+ public static interface TaskEffect {
+ List<Task> consume(Task task) throws Exception;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
index 2b1103b..1af40d7 100755
--- a/src/main/java/io/trygvis/queue/Task.java
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -3,9 +3,11 @@ package io.trygvis.queue;
import java.util.Date;
import java.util.List;
+import static java.util.Arrays.asList;
+
public class Task {
- public final long id;
+ private final long id;
public final Long parent;
@@ -54,7 +56,42 @@ public class Task {
'}';
}
+ public long id() {
+ if (id == 0) {
+ throw new RuntimeException("This task has not been persisted yet.");
+ }
+
+ return id;
+ }
+
public boolean isDone() {
return completed != null;
}
+
+ public static Task newTask(String name, Date scheduled, String... arguments) {
+ return new Task(0, 0l, name, scheduled, null, 0, null, asList(arguments));
+ }
+
+ public static Task newTask(String name, Date scheduled, List<String> arguments) {
+ return new Task(0, 0l, name, scheduled, null, 0, null, arguments);
+ }
+
+ public static List<String> stringToArguments(String arguments) {
+ return asList(arguments.split(","));
+ }
+
+ public static String argumentsToString(List<String> arguments) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0, argumentsLength = arguments.size(); i < argumentsLength; i++) {
+ String argument = arguments.get(i);
+ if (argument.contains(",")) {
+ throw new RuntimeException("The argument string can't contain a comma.");
+ }
+ if (i > 0) {
+ builder.append(',');
+ }
+ builder.append(argument);
+ }
+ return builder.toString();
+ }
}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index 5459933..3aa2ac2 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -11,7 +11,8 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
-import static java.util.Arrays.asList;
+import static io.trygvis.queue.Task.argumentsToString;
+import static io.trygvis.queue.Task.stringToArguments;
public class TaskDao {
@@ -23,11 +24,11 @@ public class TaskDao {
this.connection = connection;
}
- public long insert(String queue, Date scheduled, String arguments) throws SQLException {
+ public long insert(String queue, Date scheduled, List<String> arguments) throws SQLException {
return insert(null, queue, scheduled, arguments);
}
- public long insert(Long parent, String queue, Date scheduled, String arguments) throws SQLException {
+ public long insert(Long parent, String queue, Date scheduled, List<String> arguments) throws SQLException {
String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " +
"VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)";
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
@@ -39,7 +40,7 @@ public class TaskDao {
}
stmt.setString(i++, queue);
stmt.setTimestamp(i++, new Timestamp(scheduled.getTime()));
- stmt.setString(i, arguments);
+ stmt.setString(i, argumentsToString(arguments));
stmt.executeUpdate();
}
try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) {
@@ -51,6 +52,7 @@ public class TaskDao {
public Task findById(long id) throws SQLException {
try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
+ stmt.setLong(1, id);
ResultSet rs = stmt.executeQuery();
return rs.next() ? mapRow(rs) : null;
}
@@ -76,7 +78,7 @@ public class TaskDao {
setTimestamp(stmt, i++, task.lastRun);
stmt.setInt(i++, task.runCount);
setTimestamp(stmt, i++, task.completed);
- stmt.setLong(i, task.id);
+ stmt.setLong(i, task.id());
stmt.executeUpdate();
}
}
@@ -99,6 +101,6 @@ public class TaskDao {
rs.getTimestamp(5),
rs.getInt(6),
rs.getTimestamp(7),
- arguments != null ? asList(arguments.split(" ")) : Collections.<String>emptyList());
+ arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList());
}
}