From 7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 23 Jun 2013 09:37:57 +0200 Subject: o Initial import of JDBC queue. --- .../java/io/trygvis/queue/JdbcQueueService.java | 55 +++++++ src/main/java/io/trygvis/queue/Queue.java | 24 +++ src/main/java/io/trygvis/queue/QueueDao.java | 45 ++++++ src/main/java/io/trygvis/queue/QueueExecutor.java | 177 +++++++++++++++++++++ src/main/java/io/trygvis/queue/QueueService.java | 54 +++++++ src/main/java/io/trygvis/queue/QueueStats.java | 20 +++ src/main/java/io/trygvis/queue/QueueSystem.java | 61 +++++++ src/main/java/io/trygvis/queue/SqlEffect.java | 12 ++ .../java/io/trygvis/queue/SqlEffectExecutor.java | 50 ++++++ src/main/java/io/trygvis/queue/Task.java | 117 ++++++++++++++ src/main/java/io/trygvis/queue/TaskDao.java | 148 +++++++++++++++++ src/main/java/io/trygvis/queue/TaskEffect.java | 7 + 12 files changed, 770 insertions(+) create mode 100644 src/main/java/io/trygvis/queue/JdbcQueueService.java create mode 100755 src/main/java/io/trygvis/queue/Queue.java create mode 100644 src/main/java/io/trygvis/queue/QueueDao.java create mode 100644 src/main/java/io/trygvis/queue/QueueExecutor.java create mode 100644 src/main/java/io/trygvis/queue/QueueService.java create mode 100644 src/main/java/io/trygvis/queue/QueueStats.java create mode 100644 src/main/java/io/trygvis/queue/QueueSystem.java create mode 100644 src/main/java/io/trygvis/queue/SqlEffect.java create mode 100644 src/main/java/io/trygvis/queue/SqlEffectExecutor.java create mode 100755 src/main/java/io/trygvis/queue/Task.java create mode 100644 src/main/java/io/trygvis/queue/TaskDao.java create mode 100644 src/main/java/io/trygvis/queue/TaskEffect.java (limited to 'src/main/java/io/trygvis/queue') diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java new file mode 100644 index 0000000..ef0b5bb --- /dev/null +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -0,0 +1,55 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class JdbcQueueService { + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + private final Map queues = new HashMap<>(); + + JdbcQueueService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; + } + + public synchronized QueueExecutor getQueue(String name) { + QueueExecutor queueExecutor = queues.get(name); + + if (queueExecutor != null) { + return queueExecutor; + } + + throw new IllegalArgumentException("No such queue: " + name); + } + + public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException { + QueueExecutor queueExecutor = queues.get(name); + + if (queueExecutor != null) { + return queueExecutor; + } + + QueueDao queueDao = queueSystem.createQueueDao(c); + + Queue q = queueDao.findByName(name); + + if (q == null) { + if (!autoCreate) { + throw new SQLException("No such queue: '" + name + "'."); + } + + q = new Queue(name, interval); + queueDao.insert(q); + } + + queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q); + queues.put(name, queueExecutor); + return queueExecutor; + } +} diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java new file mode 100755 index 0000000..15003f7 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Queue.java @@ -0,0 +1,24 @@ +package io.trygvis.queue; + +public class Queue { + + public final String name; + + public final long interval; + + public Queue(String name, long interval) { + this.name = name; + this.interval = interval; + } + + public Queue withInterval(int interval) { + return new Queue(name, interval); + } + + public String toString() { + return "Queue{" + + "name='" + name + '\'' + + ", interval=" + interval + + '}'; + } +} diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java new file mode 100644 index 0000000..2f69e11 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueDao.java @@ -0,0 +1,45 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +public class QueueDao { + + private final Connection connection; + + public QueueDao(Connection connection) { + this.connection = connection; + } + + public Queue findByName(String name) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("SELECT name, interval FROM queue WHERE name=?")) { + stmt.setString(1, name); + ResultSet rs = stmt.executeQuery(); + return rs.next() ? mapRow(rs) : null; + } + } + + public void insert(Queue q) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("INSERT INTO queue(name, interval) VALUES(?, ?)")) { + int i = 1; + stmt.setString(i++, q.name); + stmt.setLong(i, q.interval); + stmt.executeUpdate(); + } + } + + public void update(Queue q) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("UPDATE queue SET interval=? WHERE name=?")) { + int i = 1; + stmt.setLong(i++, q.interval); + stmt.setString(i, q.name); + stmt.executeUpdate(); + } + } + + public Queue mapRow(ResultSet rs) throws SQLException { + return new Queue(rs.getString(1), rs.getLong(2)); + } +} diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java new file mode 100644 index 0000000..88e5b46 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -0,0 +1,177 @@ +package io.trygvis.queue; + +import io.trygvis.async.QueueStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueExecutor { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + public final Queue queue; + + private final Stats stats = new Stats(); + + public enum TaskExecutionResult { + OK, + FAILED, + MISSED + } + + public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = sqlEffectExecutor; + this.queue = queue; + } + + private static class Stats { + public int total; + public int ok; + public int failed; + public int scheduled; + public int missed; + + public synchronized QueueStats toStats() { + return new QueueStats(total, ok, failed, missed, scheduled); + } + } + + public QueueStats getStats() { + return stats.toStats(); + } + + public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + log.info("Consuming tasks: request={}", req); + + List tasks; + do { + tasks = sqlEffectExecutor.transaction(new SqlEffect>() { + @Override + public List doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); + } + }); + + log.info("Consuming chunk with {} tasks", tasks.size()); + + applyTasks(req, effect, tasks); + } while (tasks.size() > 0); + } + + public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List tasks) { + for (Task task : tasks) { + TaskExecutionResult result = applyTask(effect, task); + + if (result == FAILED && req.stopOnError) { + throw new RuntimeException("Error while executing task, id=" + task.id()); + } + } + } + + /** + * Executed each task in its own transaction. + *

+ * If the task fails, the status is set to error in a separate transaction. + */ + public TaskExecutionResult applyTask(TaskEffect effect, final Task task) { + try { + Integer count = sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public Integer doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW); + } + }); + + if (count == 0) { + log.warn("Missed task {}", task.id()); + synchronized (stats) { + stats.missed++; + } + return MISSED; + } + + log.info("Executing task {}", task.id()); + + final List newTasks = effect.apply(task); + + final Date now = new Date(); + + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (Task newTask : newTasks) { + schedule(c, newTask); + } + + queueSystem.createTaskDao(c).update(task.markOk(now)); + } + }); + + synchronized (stats) { + stats.total++; + stats.ok++; + } + + return OK; + } catch (Exception e) { + final Date now = new Date(); + log.error("Unable to execute task, id=" + task.id(), e); + + synchronized (stats) { + stats.total++; + stats.failed++; + } + + try { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + taskDao.update(task.markFailed(now)); + } + }); + } catch (SQLException e1) { + log.error("Error while marking task as failed.", e1); + } + + return FAILED; + } + } + + public void schedule(Connection c, Task task) throws SQLException { + schedule(c, task.queue, task.parent, task.scheduled, task.arguments); + } + + public Task schedule(Connection c, Date scheduled, List arguments) throws SQLException { + return schedule(c, queue.name, null, scheduled, arguments); + } + + public Task schedule(Connection c, long parent, Date scheduled, List arguments) throws SQLException { + return schedule(c, queue.name, parent, scheduled, arguments); + } + + private Task schedule(Connection c, String queue, Long parent, Date scheduled, List arguments) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + + long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); + + synchronized (stats) { + stats.scheduled++; + } + + return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); + } +} diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java new file mode 100644 index 0000000..1c38f1f --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -0,0 +1,54 @@ +package io.trygvis.queue; + +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public interface QueueService { + QueueExecutor getQueue(String name, int interval, boolean autoCreate) throws SQLException; + + void schedule(Queue queue, Date scheduled, List arguments) throws SQLException; + + public static class TaskExecutionRequest { + public final long chunkSize; + public final boolean stopOnError; + public final Long interval; + public final boolean continueOnFullChunk; + // TODO: saveExceptions + + public TaskExecutionRequest(long chunkSize, boolean stopOnError) { + this(chunkSize, stopOnError, null, true); + } + + private TaskExecutionRequest(long chunkSize, boolean stopOnError, Long interval, boolean continueOnFullChunk) { + this.chunkSize = chunkSize; + this.stopOnError = stopOnError; + this.interval = interval; + this.continueOnFullChunk = continueOnFullChunk; + } + + public TaskExecutionRequest interval(long interval) { + return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); + } + + public TaskExecutionRequest continueOnFullChunk(boolean continueOnFullChunk) { + return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); + } + + @Override + public String toString() { + return "TaskExecutionRequest{" + + "chunkSize=" + chunkSize + + ", stopOnError=" + stopOnError + + '}'; + } + + public long interval(Queue queue) { + if (interval != null) { + return interval; + } + + return queue.interval; + } + } +} diff --git a/src/main/java/io/trygvis/queue/QueueStats.java b/src/main/java/io/trygvis/queue/QueueStats.java new file mode 100644 index 0000000..5b048b3 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueStats.java @@ -0,0 +1,20 @@ +package io.trygvis.queue; + +import java.util.EnumMap; + +public class QueueStats { + public final String name; + public final long totalTaskCount; + public final EnumMap states; + + public QueueStats(String name, EnumMap states) { + this.name = name; + this.states = states; + + long c = 0; + for (Long l : states.values()) { + c += l; + } + this.totalTaskCount = c; + } +} diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java new file mode 100644 index 0000000..5954526 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueSystem.java @@ -0,0 +1,61 @@ +package io.trygvis.queue; + +import io.trygvis.async.JdbcAsyncService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; + +public class QueueSystem { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public final SqlEffectExecutor sqlEffectExecutor; + + private final JdbcQueueService queueService; + + private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + if (c.getAutoCommit()) { + throw new SQLException("The connection cannot be in auto-commit mode."); + } + + DatabaseMetaData metaData = c.getMetaData(); + String productName = metaData.getDatabaseProductName(); + String productVersion = metaData.getDatabaseProductVersion(); + + log.info("productName = " + productName); + log.info("productVersion = " + productVersion); + } + }); + + this.sqlEffectExecutor = sqlEffectExecutor; + queueService = new JdbcQueueService(this); + } + + /** + * Initializes the queue system. Use this as the first thing do as it will validate the database. + */ + public static QueueSystem initialize(SqlEffectExecutor sqlEffectExecutor) throws SQLException { + return new QueueSystem(sqlEffectExecutor); + } + + public JdbcQueueService createQueueService() { + return queueService; + } + + public QueueDao createQueueDao(Connection c) { + return new QueueDao(c); + } + + public TaskDao createTaskDao(Connection c) { + return new TaskDao(c); + } + + public JdbcAsyncService createAsyncService() { + return new JdbcAsyncService(this); + } +} diff --git a/src/main/java/io/trygvis/queue/SqlEffect.java b/src/main/java/io/trygvis/queue/SqlEffect.java new file mode 100644 index 0000000..7864bcd --- /dev/null +++ b/src/main/java/io/trygvis/queue/SqlEffect.java @@ -0,0 +1,12 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.SQLException; + +public interface SqlEffect { + A doInConnection(Connection c) throws SQLException; + + interface Void { + void doInConnection(Connection c) throws SQLException; + } +} diff --git a/src/main/java/io/trygvis/queue/SqlEffectExecutor.java b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java new file mode 100644 index 0000000..92802da --- /dev/null +++ b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java @@ -0,0 +1,50 @@ +package io.trygvis.queue; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; + +public class SqlEffectExecutor { + + private final DataSource dataSource; + + public SqlEffectExecutor(DataSource dataSource) { + this.dataSource = dataSource; + } + + public A transaction(SqlEffect effect) throws SQLException { +// int pid; + + try (Connection c = dataSource.getConnection()) { +// pid = getPid(c); +// System.out.println("pid = " + pid); + + boolean ok = false; + try { + A a = effect.doInConnection(c); + c.commit(); + ok = true; + return a; + } finally { +// System.out.println("Closing, pid = " + pid); + if (!ok) { + try { + c.rollback(); + } catch (SQLException e) { + // ignore + } + } + } + } + } + + public void transaction(final SqlEffect.Void effect) throws SQLException { + transaction(new SqlEffect() { + @Override + public Object doInConnection(Connection c) throws SQLException { + effect.doInConnection(c); + return null; + } + }); + } +} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java new file mode 100755 index 0000000..8038050 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Task.java @@ -0,0 +1,117 @@ +package io.trygvis.queue; + +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.Task.TaskState.*; +import static java.util.Arrays.asList; + +public class Task { + + public enum TaskState { + NEW, + PROCESSING, + OK, + FAILED + } + + private final long id; + + public final Long parent; + + public final String queue; + + public final TaskState state; + + public final Date scheduled; + + public final Date lastRun; + + public final int runCount; + + public final Date completed; + + public final List arguments; + + public Task(long id, Long parent, String queue, TaskState state, Date scheduled, Date lastRun, int runCount, Date completed, List arguments) { + this.id = id; + this.parent = parent; + this.queue = queue; + this.state = state; + this.scheduled = scheduled; + this.lastRun = lastRun; + this.runCount = runCount; + this.completed = completed; + + this.arguments = arguments; + } + + public Task markProcessing() { + return new Task(id, parent, queue, PROCESSING, scheduled, new Date(), runCount + 1, completed, arguments); + } + + public Task markOk(Date completed) { + return new Task(id, parent, queue, OK, scheduled, lastRun, runCount, completed, arguments); + } + + public Task markFailed(Date now) { + return new Task(id, parent, queue, FAILED, scheduled, lastRun, runCount, completed, arguments); + } + + public String toString() { + return "Task{" + + "id=" + id + + ", parent=" + parent + + ", queue=" + queue + + ", state=" + state + + ", scheduled=" + scheduled + + ", lastRun=" + lastRun + + ", runCount=" + runCount + + ", completed=" + completed + + ", arguments='" + arguments + '\'' + + '}'; + } + + public long id() { + if (id == 0) { + throw new RuntimeException("This task has not been persisted yet."); + } + + return id; + } + + public boolean isDone() { + return completed != null; + } + + public Task childTask(String queue, Date scheduled, String... arguments) { + return new Task(0, id(), queue, NEW, scheduled, null, 0, null, asList(arguments)); + } + + public static Task newTask(String queue, Date scheduled, String... arguments) { + return new Task(0, null, queue, NEW, scheduled, null, 0, null, asList(arguments)); + } + + public static Task newTask(String queue, Date scheduled, List arguments) { + return new Task(0, null, queue, NEW, scheduled, null, 0, null, arguments); + } + + public static List stringToArguments(String arguments) { + return asList(arguments.split(",")); + } + + public static String argumentsToString(List arguments) { + StringBuilder builder = new StringBuilder(); + for (int i = 0, argumentsLength = arguments.size(); i < argumentsLength; i++) { + String argument = arguments.get(i); + if (argument.contains(",")) { + throw new RuntimeException("The argument string can't contain a comma."); + } + if (i > 0) { + builder.append(','); + } + builder.append(argument); + } + return builder.toString(); + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java new file mode 100644 index 0000000..365b44b --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -0,0 +1,148 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.EnumMap; +import java.util.List; + +import static io.trygvis.queue.Task.*; + +public class TaskDao { + + private final Connection c; + + public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments"; + + TaskDao(Connection c) { + this.c = c; + } + + public long insert(Long parent, String queue, TaskState state, Date scheduled, List arguments) throws SQLException { + String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " + + "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)"; + try (PreparedStatement stmt = c.prepareStatement(sql)) { + int i = 1; + if (parent == null) { + stmt.setNull(i++, Types.BIGINT); + } else { + stmt.setLong(i++, parent); + } + stmt.setString(i++, queue); + stmt.setString(i++, state.name()); + stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); + stmt.setString(i, argumentsToString(arguments)); + stmt.executeUpdate(); + } + try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) { + ResultSet rs = stmt.executeQuery(); + rs.next(); + return rs.getLong(1); + } + } + + public Task findById(long id) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { + stmt.setLong(1, id); + ResultSet rs = stmt.executeQuery(); + return rs.next() ? mapRow(rs) : null; + } + } + + public List findByQueueAndState(String queue, TaskState state, long limit) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) { + int i = 1; + stmt.setString(i++, queue); + stmt.setString(i++, state.name()); + stmt.setLong(i, limit); + ResultSet rs = stmt.executeQuery(); + List list = new ArrayList<>(); + while (rs.next()) { + list.add(mapRow(rs)); + } + return list; + } + } + + public QueueStats findQueueStatsByName(String queue) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) { + int i = 1; + stmt.setString(i, queue); + ResultSet rs = stmt.executeQuery(); + EnumMap states = new EnumMap<>(TaskState.class); + while (rs.next()) { + states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2)); + } + return new QueueStats(queue, states); + } + } + + public int update(Task task) throws SQLException { + return update(task, null); + } + + public int update(Task task, TaskState state) throws SQLException { + String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?"; + + if (state != null) { + sql += " AND state=?"; + } + + try (PreparedStatement stmt = c.prepareStatement(sql)) { + int i = 1; + stmt.setString(i++, task.state.name()); + stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); + setTimestamp(stmt, i++, task.lastRun); + stmt.setInt(i++, task.runCount); + setTimestamp(stmt, i++, task.completed); + stmt.setLong(i++, task.id()); + if (state != null) { + stmt.setString(i, state.name()); + } + return stmt.executeUpdate(); + } + } + + public void setState(Task task, TaskState state) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) { + int i = 1; + stmt.setString(i++, state.name()); + stmt.setLong(i, task.id()); + stmt.executeUpdate(); + } + } + + private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException { + if (date == null) { + stmt.setNull(parameterIndex, Types.TIMESTAMP); + } else { + stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime())); + } + } + + public Task mapRow(ResultSet rs) throws SQLException { + String arguments = rs.getString(9); + int i = 1; + return new Task( + rs.getLong(i++), + rs.getLong(i++), + rs.getString(i++), + TaskState.valueOf(rs.getString(i++)), + rs.getTimestamp(i++), + rs.getTimestamp(i++), + rs.getInt(i++), + rs.getTimestamp(i), + arguments != null ? stringToArguments(arguments) : Collections.emptyList()); + } + + public void rollback() throws SQLException { + c.rollback(); + c.close(); + } +} diff --git a/src/main/java/io/trygvis/queue/TaskEffect.java b/src/main/java/io/trygvis/queue/TaskEffect.java new file mode 100644 index 0000000..186797f --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskEffect.java @@ -0,0 +1,7 @@ +package io.trygvis.queue; + +import java.util.List; + +public interface TaskEffect { + List apply(Task task) throws Exception; +} -- cgit v1.2.3