aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java55
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Queue.java24
-rw-r--r--src/main/java/io/trygvis/queue/QueueDao.java45
-rw-r--r--src/main/java/io/trygvis/queue/QueueExecutor.java177
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java54
-rw-r--r--src/main/java/io/trygvis/queue/QueueStats.java20
-rw-r--r--src/main/java/io/trygvis/queue/QueueSystem.java61
-rw-r--r--src/main/java/io/trygvis/queue/SqlEffect.java12
-rw-r--r--src/main/java/io/trygvis/queue/SqlEffectExecutor.java50
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Task.java117
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java148
-rw-r--r--src/main/java/io/trygvis/queue/TaskEffect.java7
12 files changed, 770 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
new file mode 100644
index 0000000..ef0b5bb
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -0,0 +1,55 @@
+package io.trygvis.queue;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JdbcQueueService {
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ private final Map<String, QueueExecutor> queues = new HashMap<>();
+
+ JdbcQueueService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
+ }
+
+ public synchronized QueueExecutor getQueue(String name) {
+ QueueExecutor queueExecutor = queues.get(name);
+
+ if (queueExecutor != null) {
+ return queueExecutor;
+ }
+
+ throw new IllegalArgumentException("No such queue: " + name);
+ }
+
+ public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException {
+ QueueExecutor queueExecutor = queues.get(name);
+
+ if (queueExecutor != null) {
+ return queueExecutor;
+ }
+
+ QueueDao queueDao = queueSystem.createQueueDao(c);
+
+ Queue q = queueDao.findByName(name);
+
+ if (q == null) {
+ if (!autoCreate) {
+ throw new SQLException("No such queue: '" + name + "'.");
+ }
+
+ q = new Queue(name, interval);
+ queueDao.insert(q);
+ }
+
+ queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q);
+ queues.put(name, queueExecutor);
+ return queueExecutor;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java
new file mode 100755
index 0000000..15003f7
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Queue.java
@@ -0,0 +1,24 @@
+package io.trygvis.queue;
+
+public class Queue {
+
+ public final String name;
+
+ public final long interval;
+
+ public Queue(String name, long interval) {
+ this.name = name;
+ this.interval = interval;
+ }
+
+ public Queue withInterval(int interval) {
+ return new Queue(name, interval);
+ }
+
+ public String toString() {
+ return "Queue{" +
+ "name='" + name + '\'' +
+ ", interval=" + interval +
+ '}';
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java
new file mode 100644
index 0000000..2f69e11
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueDao.java
@@ -0,0 +1,45 @@
+package io.trygvis.queue;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class QueueDao {
+
+ private final Connection connection;
+
+ public QueueDao(Connection connection) {
+ this.connection = connection;
+ }
+
+ public Queue findByName(String name) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("SELECT name, interval FROM queue WHERE name=?")) {
+ stmt.setString(1, name);
+ ResultSet rs = stmt.executeQuery();
+ return rs.next() ? mapRow(rs) : null;
+ }
+ }
+
+ public void insert(Queue q) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("INSERT INTO queue(name, interval) VALUES(?, ?)")) {
+ int i = 1;
+ stmt.setString(i++, q.name);
+ stmt.setLong(i, q.interval);
+ stmt.executeUpdate();
+ }
+ }
+
+ public void update(Queue q) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("UPDATE queue SET interval=? WHERE name=?")) {
+ int i = 1;
+ stmt.setLong(i++, q.interval);
+ stmt.setString(i, q.name);
+ stmt.executeUpdate();
+ }
+ }
+
+ public Queue mapRow(ResultSet rs) throws SQLException {
+ return new Queue(rs.getString(1), rs.getLong(2));
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java
new file mode 100644
index 0000000..88e5b46
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueExecutor.java
@@ -0,0 +1,177 @@
+package io.trygvis.queue;
+
+import io.trygvis.async.QueueStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
+public class QueueExecutor {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ public final Queue queue;
+
+ private final Stats stats = new Stats();
+
+ public enum TaskExecutionResult {
+ OK,
+ FAILED,
+ MISSED
+ }
+
+ public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = sqlEffectExecutor;
+ this.queue = queue;
+ }
+
+ private static class Stats {
+ public int total;
+ public int ok;
+ public int failed;
+ public int scheduled;
+ public int missed;
+
+ public synchronized QueueStats toStats() {
+ return new QueueStats(total, ok, failed, missed, scheduled);
+ }
+ }
+
+ public QueueStats getStats() {
+ return stats.toStats();
+ }
+
+ public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
+ log.info("Consuming tasks: request={}", req);
+
+ List<Task> tasks;
+ do {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Consuming chunk with {} tasks", tasks.size());
+
+ applyTasks(req, effect, tasks);
+ } while (tasks.size() > 0);
+ }
+
+ public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
+ for (Task task : tasks) {
+ TaskExecutionResult result = applyTask(effect, task);
+
+ if (result == FAILED && req.stopOnError) {
+ throw new RuntimeException("Error while executing task, id=" + task.id());
+ }
+ }
+ }
+
+ /**
+ * Executed each task in its own transaction.
+ * <p/>
+ * If the task fails, the status is set to error in a separate transaction.
+ */
+ public TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
+ try {
+ Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
+ @Override
+ public Integer doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW);
+ }
+ });
+
+ if (count == 0) {
+ log.warn("Missed task {}", task.id());
+ synchronized (stats) {
+ stats.missed++;
+ }
+ return MISSED;
+ }
+
+ log.info("Executing task {}", task.id());
+
+ final List<Task> newTasks = effect.apply(task);
+
+ final Date now = new Date();
+
+ log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
+
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ for (Task newTask : newTasks) {
+ schedule(c, newTask);
+ }
+
+ queueSystem.createTaskDao(c).update(task.markOk(now));
+ }
+ });
+
+ synchronized (stats) {
+ stats.total++;
+ stats.ok++;
+ }
+
+ return OK;
+ } catch (Exception e) {
+ final Date now = new Date();
+ log.error("Unable to execute task, id=" + task.id(), e);
+
+ synchronized (stats) {
+ stats.total++;
+ stats.failed++;
+ }
+
+ try {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+ taskDao.update(task.markFailed(now));
+ }
+ });
+ } catch (SQLException e1) {
+ log.error("Error while marking task as failed.", e1);
+ }
+
+ return FAILED;
+ }
+ }
+
+ public void schedule(Connection c, Task task) throws SQLException {
+ schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
+ }
+
+ public Task schedule(Connection c, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, null, scheduled, arguments);
+ }
+
+ public Task schedule(Connection c, long parent, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, parent, scheduled, arguments);
+ }
+
+ private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+
+ long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
+
+ synchronized (stats) {
+ stats.scheduled++;
+ }
+
+ return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
new file mode 100644
index 0000000..1c38f1f
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -0,0 +1,54 @@
+package io.trygvis.queue;
+
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+public interface QueueService {
+ QueueExecutor getQueue(String name, int interval, boolean autoCreate) throws SQLException;
+
+ void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException;
+
+ public static class TaskExecutionRequest {
+ public final long chunkSize;
+ public final boolean stopOnError;
+ public final Long interval;
+ public final boolean continueOnFullChunk;
+ // TODO: saveExceptions
+
+ public TaskExecutionRequest(long chunkSize, boolean stopOnError) {
+ this(chunkSize, stopOnError, null, true);
+ }
+
+ private TaskExecutionRequest(long chunkSize, boolean stopOnError, Long interval, boolean continueOnFullChunk) {
+ this.chunkSize = chunkSize;
+ this.stopOnError = stopOnError;
+ this.interval = interval;
+ this.continueOnFullChunk = continueOnFullChunk;
+ }
+
+ public TaskExecutionRequest interval(long interval) {
+ return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk);
+ }
+
+ public TaskExecutionRequest continueOnFullChunk(boolean continueOnFullChunk) {
+ return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk);
+ }
+
+ @Override
+ public String toString() {
+ return "TaskExecutionRequest{" +
+ "chunkSize=" + chunkSize +
+ ", stopOnError=" + stopOnError +
+ '}';
+ }
+
+ public long interval(Queue queue) {
+ if (interval != null) {
+ return interval;
+ }
+
+ return queue.interval;
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueStats.java b/src/main/java/io/trygvis/queue/QueueStats.java
new file mode 100644
index 0000000..5b048b3
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueStats.java
@@ -0,0 +1,20 @@
+package io.trygvis.queue;
+
+import java.util.EnumMap;
+
+public class QueueStats {
+ public final String name;
+ public final long totalTaskCount;
+ public final EnumMap<Task.TaskState, Long> states;
+
+ public QueueStats(String name, EnumMap<Task.TaskState, Long> states) {
+ this.name = name;
+ this.states = states;
+
+ long c = 0;
+ for (Long l : states.values()) {
+ c += l;
+ }
+ this.totalTaskCount = c;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java
new file mode 100644
index 0000000..5954526
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueSystem.java
@@ -0,0 +1,61 @@
+package io.trygvis.queue;
+
+import io.trygvis.async.JdbcAsyncService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+
+public class QueueSystem {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ public final SqlEffectExecutor sqlEffectExecutor;
+
+ private final JdbcQueueService queueService;
+
+ private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ if (c.getAutoCommit()) {
+ throw new SQLException("The connection cannot be in auto-commit mode.");
+ }
+
+ DatabaseMetaData metaData = c.getMetaData();
+ String productName = metaData.getDatabaseProductName();
+ String productVersion = metaData.getDatabaseProductVersion();
+
+ log.info("productName = " + productName);
+ log.info("productVersion = " + productVersion);
+ }
+ });
+
+ this.sqlEffectExecutor = sqlEffectExecutor;
+ queueService = new JdbcQueueService(this);
+ }
+
+ /**
+ * Initializes the queue system. Use this as the first thing do as it will validate the database.
+ */
+ public static QueueSystem initialize(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
+ return new QueueSystem(sqlEffectExecutor);
+ }
+
+ public JdbcQueueService createQueueService() {
+ return queueService;
+ }
+
+ public QueueDao createQueueDao(Connection c) {
+ return new QueueDao(c);
+ }
+
+ public TaskDao createTaskDao(Connection c) {
+ return new TaskDao(c);
+ }
+
+ public JdbcAsyncService createAsyncService() {
+ return new JdbcAsyncService(this);
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/SqlEffect.java b/src/main/java/io/trygvis/queue/SqlEffect.java
new file mode 100644
index 0000000..7864bcd
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/SqlEffect.java
@@ -0,0 +1,12 @@
+package io.trygvis.queue;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public interface SqlEffect<A> {
+ A doInConnection(Connection c) throws SQLException;
+
+ interface Void {
+ void doInConnection(Connection c) throws SQLException;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/SqlEffectExecutor.java b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java
new file mode 100644
index 0000000..92802da
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java
@@ -0,0 +1,50 @@
+package io.trygvis.queue;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public class SqlEffectExecutor {
+
+ private final DataSource dataSource;
+
+ public SqlEffectExecutor(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public <A> A transaction(SqlEffect<A> effect) throws SQLException {
+// int pid;
+
+ try (Connection c = dataSource.getConnection()) {
+// pid = getPid(c);
+// System.out.println("pid = " + pid);
+
+ boolean ok = false;
+ try {
+ A a = effect.doInConnection(c);
+ c.commit();
+ ok = true;
+ return a;
+ } finally {
+// System.out.println("Closing, pid = " + pid);
+ if (!ok) {
+ try {
+ c.rollback();
+ } catch (SQLException e) {
+ // ignore
+ }
+ }
+ }
+ }
+ }
+
+ public void transaction(final SqlEffect.Void effect) throws SQLException {
+ transaction(new SqlEffect<Object>() {
+ @Override
+ public Object doInConnection(Connection c) throws SQLException {
+ effect.doInConnection(c);
+ return null;
+ }
+ });
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
new file mode 100755
index 0000000..8038050
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -0,0 +1,117 @@
+package io.trygvis.queue;
+
+import java.util.Date;
+import java.util.List;
+
+import static io.trygvis.queue.Task.TaskState.*;
+import static java.util.Arrays.asList;
+
+public class Task {
+
+ public enum TaskState {
+ NEW,
+ PROCESSING,
+ OK,
+ FAILED
+ }
+
+ private final long id;
+
+ public final Long parent;
+
+ public final String queue;
+
+ public final TaskState state;
+
+ public final Date scheduled;
+
+ public final Date lastRun;
+
+ public final int runCount;
+
+ public final Date completed;
+
+ public final List<String> arguments;
+
+ public Task(long id, Long parent, String queue, TaskState state, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
+ this.id = id;
+ this.parent = parent;
+ this.queue = queue;
+ this.state = state;
+ this.scheduled = scheduled;
+ this.lastRun = lastRun;
+ this.runCount = runCount;
+ this.completed = completed;
+
+ this.arguments = arguments;
+ }
+
+ public Task markProcessing() {
+ return new Task(id, parent, queue, PROCESSING, scheduled, new Date(), runCount + 1, completed, arguments);
+ }
+
+ public Task markOk(Date completed) {
+ return new Task(id, parent, queue, OK, scheduled, lastRun, runCount, completed, arguments);
+ }
+
+ public Task markFailed(Date now) {
+ return new Task(id, parent, queue, FAILED, scheduled, lastRun, runCount, completed, arguments);
+ }
+
+ public String toString() {
+ return "Task{" +
+ "id=" + id +
+ ", parent=" + parent +
+ ", queue=" + queue +
+ ", state=" + state +
+ ", scheduled=" + scheduled +
+ ", lastRun=" + lastRun +
+ ", runCount=" + runCount +
+ ", completed=" + completed +
+ ", arguments='" + arguments + '\'' +
+ '}';
+ }
+
+ public long id() {
+ if (id == 0) {
+ throw new RuntimeException("This task has not been persisted yet.");
+ }
+
+ return id;
+ }
+
+ public boolean isDone() {
+ return completed != null;
+ }
+
+ public Task childTask(String queue, Date scheduled, String... arguments) {
+ return new Task(0, id(), queue, NEW, scheduled, null, 0, null, asList(arguments));
+ }
+
+ public static Task newTask(String queue, Date scheduled, String... arguments) {
+ return new Task(0, null, queue, NEW, scheduled, null, 0, null, asList(arguments));
+ }
+
+ public static Task newTask(String queue, Date scheduled, List<String> arguments) {
+ return new Task(0, null, queue, NEW, scheduled, null, 0, null, arguments);
+ }
+
+ public static List<String> stringToArguments(String arguments) {
+ return asList(arguments.split(","));
+ }
+
+ public static String argumentsToString(List<String> arguments) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0, argumentsLength = arguments.size(); i < argumentsLength; i++) {
+ String argument = arguments.get(i);
+ if (argument.contains(",")) {
+ throw new RuntimeException("The argument string can't contain a comma.");
+ }
+ if (i > 0) {
+ builder.append(',');
+ }
+ builder.append(argument);
+ }
+ return builder.toString();
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
new file mode 100644
index 0000000..365b44b
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -0,0 +1,148 @@
+package io.trygvis.queue;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.List;
+
+import static io.trygvis.queue.Task.*;
+
+public class TaskDao {
+
+ private final Connection c;
+
+ public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments";
+
+ TaskDao(Connection c) {
+ this.c = c;
+ }
+
+ public long insert(Long parent, String queue, TaskState state, Date scheduled, List<String> arguments) throws SQLException {
+ String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)";
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
+ int i = 1;
+ if (parent == null) {
+ stmt.setNull(i++, Types.BIGINT);
+ } else {
+ stmt.setLong(i++, parent);
+ }
+ stmt.setString(i++, queue);
+ stmt.setString(i++, state.name());
+ stmt.setTimestamp(i++, new Timestamp(scheduled.getTime()));
+ stmt.setString(i, argumentsToString(arguments));
+ stmt.executeUpdate();
+ }
+ try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) {
+ ResultSet rs = stmt.executeQuery();
+ rs.next();
+ return rs.getLong(1);
+ }
+ }
+
+ public Task findById(long id) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
+ stmt.setLong(1, id);
+ ResultSet rs = stmt.executeQuery();
+ return rs.next() ? mapRow(rs) : null;
+ }
+ }
+
+ public List<Task> findByQueueAndState(String queue, TaskState state, long limit) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) {
+ int i = 1;
+ stmt.setString(i++, queue);
+ stmt.setString(i++, state.name());
+ stmt.setLong(i, limit);
+ ResultSet rs = stmt.executeQuery();
+ List<Task> list = new ArrayList<>();
+ while (rs.next()) {
+ list.add(mapRow(rs));
+ }
+ return list;
+ }
+ }
+
+ public QueueStats findQueueStatsByName(String queue) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) {
+ int i = 1;
+ stmt.setString(i, queue);
+ ResultSet rs = stmt.executeQuery();
+ EnumMap<TaskState, Long> states = new EnumMap<>(TaskState.class);
+ while (rs.next()) {
+ states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2));
+ }
+ return new QueueStats(queue, states);
+ }
+ }
+
+ public int update(Task task) throws SQLException {
+ return update(task, null);
+ }
+
+ public int update(Task task, TaskState state) throws SQLException {
+ String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?";
+
+ if (state != null) {
+ sql += " AND state=?";
+ }
+
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
+ int i = 1;
+ stmt.setString(i++, task.state.name());
+ stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime()));
+ setTimestamp(stmt, i++, task.lastRun);
+ stmt.setInt(i++, task.runCount);
+ setTimestamp(stmt, i++, task.completed);
+ stmt.setLong(i++, task.id());
+ if (state != null) {
+ stmt.setString(i, state.name());
+ }
+ return stmt.executeUpdate();
+ }
+ }
+
+ public void setState(Task task, TaskState state) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) {
+ int i = 1;
+ stmt.setString(i++, state.name());
+ stmt.setLong(i, task.id());
+ stmt.executeUpdate();
+ }
+ }
+
+ private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException {
+ if (date == null) {
+ stmt.setNull(parameterIndex, Types.TIMESTAMP);
+ } else {
+ stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime()));
+ }
+ }
+
+ public Task mapRow(ResultSet rs) throws SQLException {
+ String arguments = rs.getString(9);
+ int i = 1;
+ return new Task(
+ rs.getLong(i++),
+ rs.getLong(i++),
+ rs.getString(i++),
+ TaskState.valueOf(rs.getString(i++)),
+ rs.getTimestamp(i++),
+ rs.getTimestamp(i++),
+ rs.getInt(i++),
+ rs.getTimestamp(i),
+ arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList());
+ }
+
+ public void rollback() throws SQLException {
+ c.rollback();
+ c.close();
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/TaskEffect.java b/src/main/java/io/trygvis/queue/TaskEffect.java
new file mode 100644
index 0000000..186797f
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskEffect.java
@@ -0,0 +1,7 @@
+package io.trygvis.queue;
+
+import java.util.List;
+
+public interface TaskEffect {
+ List<Task> apply(Task task) throws Exception;
+}