diff options
Diffstat (limited to 'src/main/java/io')
-rwxr-xr-x | src/main/java/io/trygvis/async/AsyncService.java | 7 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/JdbcAsyncService.java | 22 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/QueueThread.java | 84 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/SqlEffectExecutor.java | 55 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcQueueService.java | 142 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueService.java | 4 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueSystem.java | 58 | ||||
-rwxr-xr-x | src/main/java/io/trygvis/queue/Task.java | 34 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskDao.java | 68 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskEffect.java | 7 | ||||
-rw-r--r-- | src/main/java/io/trygvis/spring/DefaultConfig.java | 18 | ||||
-rw-r--r-- | src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java | 13 | ||||
-rw-r--r-- | src/main/java/io/trygvis/spring/SpringQueueService.java | 32 |
13 files changed, 355 insertions, 189 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java index 17d53e9..daf99e4 100755 --- a/src/main/java/io/trygvis/async/AsyncService.java +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -2,6 +2,7 @@ package io.trygvis.async; import io.trygvis.queue.Queue; import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; import java.util.List; @@ -10,7 +11,7 @@ import java.util.List; */ public interface AsyncService { - void registerQueue(Queue queue, final AsyncService.AsyncCallable callable); + void registerQueue(Queue queue, TaskEffect processor); Queue getQueue(String name); @@ -22,8 +23,4 @@ public interface AsyncService { * Polls for a new state of the execution. */ Task update(Task ref); - - interface AsyncCallable { - void run(List<String> arguments) throws Exception; - } } diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 310c59b..6baa56e 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,8 +1,11 @@ package io.trygvis.async; +import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskDao; +import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -23,8 +26,16 @@ public class JdbcAsyncService { private final Map<String, QueueThread> queues = new HashMap<>(); - public void registerQueue(SqlEffectExecutor sqlEffectExecutor, Queue queue, AsyncService.AsyncCallable callable) { - final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, queue); + private final QueueSystem queueSystem; + private final JdbcQueueService queueService; + + public JdbcAsyncService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + this.queueService = queueSystem.createQueueService(); + } + + public void registerQueue(Queue queue, TaskEffect processor) { + final QueueThread queueThread = new QueueThread(queueSystem, processor, queue); queues.put(queue.name, queueThread); @@ -69,12 +80,11 @@ public class JdbcAsyncService { } private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException { - TaskDao taskDao = new TaskDao(c); + TaskDao taskDao = queueSystem.createTaskDao(c); Date scheduled = new Date(); - long id = taskDao.insert(parent, queue.name, scheduled, args); - Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, args); + Task task = queueService.schedule(c, queue, parent, scheduled, args); log.info("Created task = {}", task); return task; @@ -102,7 +112,7 @@ public class JdbcAsyncService { } public Task update(Connection c, Task ref) throws SQLException { - TaskDao taskDao = new TaskDao(c); + TaskDao taskDao = queueSystem.createTaskDao(c); return taskDao.findById(ref.id()); } diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 00c46b4..33753a3 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -1,24 +1,27 @@ package io.trygvis.async; +import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; -import io.trygvis.queue.TaskDao; +import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; -import java.util.Date; import java.util.List; -import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException; - class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); + private final QueueSystem queueSystem; + + private final JdbcQueueService queueService; + private final SqlEffectExecutor sqlEffectExecutor; - private final AsyncService.AsyncCallable callable; + private final TaskEffect taskEffect; public final Queue queue; @@ -28,9 +31,11 @@ class QueueThread implements Runnable { private boolean busy; - QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) { - this.sqlEffectExecutor = sqlEffectExecutor; - this.callable = callable; + QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; + this.queueService = queueSystem.createQueueService(); + this.taskEffect = taskEffect; this.queue = queue; } @@ -48,28 +53,17 @@ class QueueThread implements Runnable { public void run() { while (shouldRun) { try { -// List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() { -// public List<Task> doInTransaction(TransactionStatus status) { -// return taskDao.findByNameAndCompletedIsNull(queue.name); -// } -// }); - List<Task> tasks = sqlEffectExecutor.execute(new SqlEffect<List<Task>>() { + List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { @Override - public List<Task> doInConnection(Connection connection) throws SQLException { - return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name); + public List<Task> doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByNameAndCompletedIsNull(queue.name); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { - for (final Task task : tasks) { - try { - executeTask(task); - } catch (SqlExecutionException | TaskFailureException e) { - log.warn("Task execution failed", e); - } - } + queueService.executeTask(taskEffect, tasks); } } catch (Throwable e) { log.warn("Error while executing tasks.", e); @@ -94,48 +88,4 @@ class QueueThread implements Runnable { } } } - - private void executeTask(final Task task) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - sqlEffectExecutor.execute(new SqlEffect.Void() { - @Override - public void doInConnection(Connection connection) throws SQLException { - new TaskDao(connection).update(task.registerRun()); - } - }); -// transactionTemplate.execute(new TransactionCallbackWithoutResult() { -// protected void doInTransactionWithoutResult(TransactionStatus status) { -// taskDao.update(task.registerRun()); -// } -// }); - - sqlEffectExecutor.execute(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - try { - callable.run(task.arguments); - Date completed = new Date(); - Task t = task.registerComplete(completed); - log.info("Completed task: {}", t); - new TaskDao(c).update(t); - } catch (Exception e) { - throw new TaskFailureException(e); - } - } - }); -// transactionTemplate.execute(new TransactionCallbackWithoutResult() { -// protected void doInTransactionWithoutResult(TransactionStatus status) { -// try { -// callable.run(task.arguments); -// Date completed = new Date(); -// Task t = task.registerComplete(completed); -// log.info("Completed task: {}", t); -// taskDao.update(t); -// } catch (Exception e) { -// throw new TaskFailureException(e); -// } -// } -// }); - } } diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java index c8abbd3..e03cf5e 100644 --- a/src/main/java/io/trygvis/async/SqlEffectExecutor.java +++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java @@ -12,22 +12,67 @@ public class SqlEffectExecutor { this.dataSource = dataSource; } - public <A> A execute(SqlEffect<A> effect) { + public <A> A transaction(SqlEffect<A> effect) { +/* + int pid; + try (Connection c = dataSource.getConnection()) { - return effect.doInConnection(c); + + try (Statement statement = c.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()"); + rs.next(); + pid = rs.getInt(1); + } + + System.out.println("pid = " + pid); + + try { + effect.doInConnection(c); + c.commit(); + } catch (SQLException e) { + c.rollback(); + e.printStackTrace(); + } finally { + System.out.println("Closing pid=" + pid); + try { + c.close(); + } catch (Exception e) { + e.printStackTrace(); + } + } } catch (SQLException e) { + e.printStackTrace(); throw new SqlExecutionException(e); } - } +*/ - public void execute(SqlEffect.Void effect) { try (Connection c = dataSource.getConnection()) { - effect.doInConnection(c); + boolean ok = false; + try { + A a = effect.doInConnection(c); + c.commit(); + ok = true; + return a; + } finally { + if (!ok) { + c.rollback(); + } + } } catch (SQLException e) { throw new SqlExecutionException(e); } } + public void transaction(final SqlEffect.Void effect) { + transaction(new SqlEffect<Object>() { + @Override + public Object doInConnection(Connection c) throws SQLException { + effect.doInConnection(c); + return null; + } + }); + } + public static class SqlExecutionException extends RuntimeException { public final SQLException exception; diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index 793333d..d284287 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -1,61 +1,127 @@ package io.trygvis.queue; +import io.trygvis.async.SqlEffect; +import io.trygvis.async.SqlEffectExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; -import java.sql.DatabaseMetaData; import java.sql.SQLException; import java.util.Date; import java.util.List; +import static io.trygvis.queue.Task.TaskState.NEW; +import static io.trygvis.queue.Task.TaskState.PROCESSING; + public class JdbcQueueService { - private Logger log = LoggerFactory.getLogger(getClass()); + private final Logger log = LoggerFactory.getLogger(getClass()); - private JdbcQueueService(Connection c) throws SQLException { - if (c.getAutoCommit()) { - throw new SQLException("The connection cannot be in auto-commit mode."); - } + private final QueueSystem queueSystem; - DatabaseMetaData metaData = c.getMetaData(); - String productName = metaData.getDatabaseProductName(); - String productVersion = metaData.getDatabaseProductVersion(); + private final SqlEffectExecutor sqlEffectExecutor; - log.info("productName = " + productName); - log.info("productVersion = " + productVersion); + JdbcQueueService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consume(Connection c, Queue queue, QueueService.TaskEffect effect) throws SQLException { - TaskDao taskDao = createTaskDao(c); + public void consumeAll(final Queue queue, final TaskEffect effect) { + final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { + @Override + public List<Task> doInConnection(Connection c) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); - List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name); - log.trace("Got {} tasks.", tasks.size()); + List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name); + log.trace("Got {} tasks.", tasks.size()); + taskDao.setState(tasks, PROCESSING); + return tasks; + } + }); - for (Task task : tasks) { - log.trace("Executing task {}", task.id()); - try { - List<Task> newTasks = effect.consume(task); - log.trace("Executed task {}, newTasks: ", task.id(), newTasks.size()); + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + applyTasks(c, effect, queueSystem.createTaskDao(c), tasks); + } + }); + } + + public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection connection) throws SQLException { + for (Task task : tasks) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + new TaskDao(connection).update(task.markProcessing()); + } + } + }); + + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + TaskDao taskDao = new TaskDao(c); + + applyTasks(c, taskEffect, taskDao, tasks); + } + }); + } + + /** + * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect. + */ + private void applyTasks(Connection c, TaskEffect effect, final TaskDao taskDao, List<Task> tasks) throws SQLException { + Task task = null; + try { + for (int i = 0; i < tasks.size(); i++) { + task = tasks.get(i); + + log.info("Executing task {}", task.id()); + + List<Task> newTasks = effect.apply(task); Date now = new Date(); - task = task.registerComplete(now); + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + + task = task.markOk(now); for (Task newTask : newTasks) { - taskDao.insert(task.id(), newTask.queue, now, newTask.arguments); + schedule(c, newTask); } taskDao.update(task); - } catch (Throwable e) { - log.error("Unable to execute task, id=" + task.id(), e); } - c.commit(); + } catch (final Exception e) { + if (task == null) { + return; + } + + final Date now = new Date(); + log.error("Unable to execute task, id=" + task.id(), e); + + try { + taskDao.rollback(); + } catch (SQLException e2) { + log.error("Error rolling back transaction after failed apply.", e2); + } + + final Task t = task; + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + Task task = t.markFailed(now); + taskDao.update(task); + } + }); } } - public Queue getQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { - QueueDao queueDao = createQueueDao(c); + public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { + QueueDao queueDao = queueSystem.createQueueDao(c); Queue q = queueDao.findByName(name); @@ -71,21 +137,23 @@ public class JdbcQueueService { return q; } - public void schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException { - TaskDao taskDao = createTaskDao(c); - - taskDao.insert(queue.name, scheduled, arguments); + public void schedule(Connection c, Task task) throws SQLException { + schedule(c, task.queue, task.parent, task.scheduled, task.arguments); } - public static JdbcQueueService createQueueService(Connection c) throws SQLException { - return new JdbcQueueService(c); + public Task schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, null, scheduled, arguments); } - public QueueDao createQueueDao(Connection c) { - return new QueueDao(c); + public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, parent, scheduled, arguments); } - public TaskDao createTaskDao(Connection c) { - return new TaskDao(c); + private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + + long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); + + return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); } } diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index 2111013..9773766 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -10,8 +10,4 @@ public interface QueueService { Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException; void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException; - - public static interface TaskEffect { - List<Task> consume(Task task) throws Exception; - } } diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java new file mode 100644 index 0000000..42c8fd8 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueSystem.java @@ -0,0 +1,58 @@ +package io.trygvis.queue; + +import io.trygvis.async.SqlEffect; +import io.trygvis.async.SqlEffectExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; + +public class QueueSystem { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public final SqlEffectExecutor sqlEffectExecutor; + + public final JdbcQueueService queueService; + + private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + if (c.getAutoCommit()) { + throw new SQLException("The connection cannot be in auto-commit mode."); + } + + DatabaseMetaData metaData = c.getMetaData(); + String productName = metaData.getDatabaseProductName(); + String productVersion = metaData.getDatabaseProductVersion(); + + log.info("productName = " + productName); + log.info("productVersion = " + productVersion); + } + }); + + this.sqlEffectExecutor = sqlEffectExecutor; + queueService = new JdbcQueueService(this); + } + + /** + * Initializes the queue system. Use this as the first thing do as it will validate the database. + */ + public static QueueSystem initialize(SqlEffectExecutor sqlEffectExecutor) throws SQLException { + return new QueueSystem(sqlEffectExecutor); + } + + public JdbcQueueService createQueueService() { + return queueService; + } + + public QueueDao createQueueDao(Connection c) { + return new QueueDao(c); + } + + public TaskDao createTaskDao(Connection c) { + return new TaskDao(c); + } +} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java index 1af40d7..29e37ac 100755 --- a/src/main/java/io/trygvis/queue/Task.java +++ b/src/main/java/io/trygvis/queue/Task.java @@ -3,16 +3,26 @@ package io.trygvis.queue; import java.util.Date; import java.util.List; +import static io.trygvis.queue.Task.TaskState.*; import static java.util.Arrays.asList; public class Task { + public enum TaskState { + NEW, + PROCESSING, + OK, + FAILED + } + private final long id; public final Long parent; public final String queue; + public final TaskState state; + public final Date scheduled; public final Date lastRun; @@ -23,10 +33,11 @@ public class Task { public final List<String> arguments; - public Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) { + public Task(long id, Long parent, String queue, TaskState state, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) { this.id = id; this.parent = parent; this.queue = queue; + this.state = state; this.scheduled = scheduled; this.lastRun = lastRun; this.runCount = runCount; @@ -35,12 +46,16 @@ public class Task { this.arguments = arguments; } - public Task registerRun() { - return new Task(id, parent, queue, scheduled, new Date(), runCount + 1, completed, arguments); + public Task markProcessing() { + return new Task(id, parent, queue, PROCESSING, scheduled, new Date(), runCount + 1, completed, arguments); } - public Task registerComplete(Date completed) { - return new Task(id, parent, queue, scheduled, lastRun, runCount, completed, arguments); + public Task markOk(Date completed) { + return new Task(id, parent, queue, OK, scheduled, lastRun, runCount, completed, arguments); + } + + public Task markFailed(Date now) { + return new Task(id, parent, queue, FAILED, scheduled, lastRun, runCount, completed, arguments); } public String toString() { @@ -48,6 +63,7 @@ public class Task { "id=" + id + ", parent=" + parent + ", queue=" + queue + + ", state=" + state + ", scheduled=" + scheduled + ", lastRun=" + lastRun + ", runCount=" + runCount + @@ -68,12 +84,16 @@ public class Task { return completed != null; } + public Task childTask(String name, Date scheduled, String... arguments) { + return new Task(0, id(), name, NEW, scheduled, null, 0, null, asList(arguments)); + } + public static Task newTask(String name, Date scheduled, String... arguments) { - return new Task(0, 0l, name, scheduled, null, 0, null, asList(arguments)); + return new Task(0, null, name, NEW, scheduled, null, 0, null, asList(arguments)); } public static Task newTask(String name, Date scheduled, List<String> arguments) { - return new Task(0, 0l, name, scheduled, null, 0, null, arguments); + return new Task(0, null, name, NEW, scheduled, null, 0, null, arguments); } public static List<String> stringToArguments(String arguments) { diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 3aa2ac2..025823b 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -11,27 +11,25 @@ import java.util.Collections; import java.util.Date; import java.util.List; +import static io.trygvis.queue.Task.TaskState; +import static io.trygvis.queue.Task.TaskState.valueOf; import static io.trygvis.queue.Task.argumentsToString; import static io.trygvis.queue.Task.stringToArguments; public class TaskDao { - private final Connection connection; + private final Connection c; - public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments"; + public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments"; - public TaskDao(Connection connection) { - this.connection = connection; + TaskDao(Connection c) { + this.c = c; } - public long insert(String queue, Date scheduled, List<String> arguments) throws SQLException { - return insert(null, queue, scheduled, arguments); - } - - public long insert(Long parent, String queue, Date scheduled, List<String> arguments) throws SQLException { - String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " + - "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)"; - try (PreparedStatement stmt = connection.prepareStatement(sql)) { + public long insert(Long parent, String queue, TaskState state, Date scheduled, List<String> arguments) throws SQLException { + String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " + + "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)"; + try (PreparedStatement stmt = c.prepareStatement(sql)) { int i = 1; if (parent == null) { stmt.setNull(i++, Types.BIGINT); @@ -39,11 +37,12 @@ public class TaskDao { stmt.setLong(i++, parent); } stmt.setString(i++, queue); + stmt.setString(i++, state.name()); stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); stmt.setString(i, argumentsToString(arguments)); stmt.executeUpdate(); } - try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) { + try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) { ResultSet rs = stmt.executeQuery(); rs.next(); return rs.getLong(1); @@ -51,7 +50,7 @@ public class TaskDao { } public Task findById(long id) throws SQLException { - try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { stmt.setLong(1, id); ResultSet rs = stmt.executeQuery(); return rs.next() ? mapRow(rs) : null; @@ -59,7 +58,7 @@ public class TaskDao { } public List<Task> findByNameAndCompletedIsNull(String name) throws SQLException { - try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) { int i = 1; stmt.setString(i, name); ResultSet rs = stmt.executeQuery(); @@ -72,8 +71,9 @@ public class TaskDao { } public void update(Task task) throws SQLException { - try (PreparedStatement stmt = connection.prepareStatement("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { + try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { int i = 1; + stmt.setString(i++, task.state.name()); stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); setTimestamp(stmt, i++, task.lastRun); stmt.setInt(i++, task.runCount); @@ -83,6 +83,19 @@ public class TaskDao { } } + public void setState(List<Task> tasks, TaskState state) throws SQLException { + Long[] ids = new Long[tasks.size()]; + for (int i = 0, tasksSize = tasks.size(); i < tasksSize; i++) { + ids[i] = tasks.get(i).id(); + } + try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ANY (?)")) { + int i = 1; + stmt.setString(i++, state.name()); + stmt.setObject(i, c.createArrayOf("bigint", ids)); + stmt.executeUpdate(); + } + } + private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException { if (date == null) { stmt.setNull(parameterIndex, Types.TIMESTAMP); @@ -92,15 +105,22 @@ public class TaskDao { } public Task mapRow(ResultSet rs) throws SQLException { - String arguments = rs.getString(8); + String arguments = rs.getString(9); + int i = 1; return new Task( - rs.getLong(1), - rs.getLong(2), - rs.getString(3), - rs.getTimestamp(4), - rs.getTimestamp(5), - rs.getInt(6), - rs.getTimestamp(7), + rs.getLong(i++), + rs.getLong(i++), + rs.getString(i++), + valueOf(rs.getString(i++)), + rs.getTimestamp(i++), + rs.getTimestamp(i++), + rs.getInt(i++), + rs.getTimestamp(i), arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList()); } + + public void rollback() throws SQLException { + c.rollback(); + c.close(); + } } diff --git a/src/main/java/io/trygvis/queue/TaskEffect.java b/src/main/java/io/trygvis/queue/TaskEffect.java new file mode 100644 index 0000000..186797f --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskEffect.java @@ -0,0 +1,7 @@ +package io.trygvis.queue; + +import java.util.List; + +public interface TaskEffect { + List<Task> apply(Task task) throws Exception; +} diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java index 68761f2..3ba28de 100644 --- a/src/main/java/io/trygvis/spring/DefaultConfig.java +++ b/src/main/java/io/trygvis/spring/DefaultConfig.java @@ -1,21 +1,31 @@ package io.trygvis.spring; import io.trygvis.async.AsyncService; +import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.jdbc.core.JdbcTemplate; +import javax.sql.DataSource; +import java.sql.SQLException; + @Configuration public class DefaultConfig { @Bean - public AsyncService asyncService(JdbcTemplate jdbcTemplate) { - return new SpringJdbcAsyncService(jdbcTemplate); + public QueueSystem queueSystem(DataSource ds) throws SQLException { + return QueueSystem.initialize(new SqlEffectExecutor(ds)); + } + + @Bean + public AsyncService asyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { + return new SpringJdbcAsyncService(queueSystem, jdbcTemplate); } @Bean - public QueueService queueService(JdbcTemplate jdbcTemplate) { - return new SpringQueueService(jdbcTemplate); + public QueueService queueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { + return new SpringQueueService(queueSystem, jdbcTemplate); } } diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java index 6702642..96442e6 100644 --- a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java +++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java @@ -4,7 +4,9 @@ import io.trygvis.async.AsyncService; import io.trygvis.async.JdbcAsyncService; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.jdbc.core.ConnectionCallback; @@ -29,19 +31,16 @@ public class SpringJdbcAsyncService implements AsyncService { private final JdbcTemplate jdbcTemplate; - private final SqlEffectExecutor sqlEffectExecutor; - private final JdbcAsyncService jdbcAsyncService; - public SpringJdbcAsyncService(JdbcTemplate jdbcTemplate) { + public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; - jdbcAsyncService = new JdbcAsyncService(); - sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource()); + jdbcAsyncService = new JdbcAsyncService(queueSystem); } @Transactional(propagation = REQUIRED) - public void registerQueue(final Queue queue, final AsyncService.AsyncCallable callable) { - jdbcAsyncService.registerQueue(sqlEffectExecutor, queue, callable); + public void registerQueue(final Queue queue, final TaskEffect processor) { + jdbcAsyncService.registerQueue(queue, processor); registerSynchronization(new TransactionSynchronizationAdapter() { public void afterCompletion(int status) { diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java index 3432e35..21746e5 100644 --- a/src/main/java/io/trygvis/spring/SpringQueueService.java +++ b/src/main/java/io/trygvis/spring/SpringQueueService.java @@ -3,48 +3,34 @@ package io.trygvis.spring; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.TaskEffect; import org.springframework.dao.DataAccessException; import org.springframework.jdbc.core.ConnectionCallback; import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.transaction.annotation.Transactional; -import javax.annotation.PostConstruct; import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; -import static io.trygvis.queue.JdbcQueueService.createQueueService; - public class SpringQueueService implements QueueService { public final JdbcTemplate jdbcTemplate; public JdbcQueueService queueService; - public SpringQueueService(JdbcTemplate jdbcTemplate) { + public SpringQueueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { this.jdbcTemplate = jdbcTemplate; + this.queueService = queueSystem.queueService; } - @PostConstruct - public void postConstruct() { - queueService = jdbcTemplate.execute(new ConnectionCallback<JdbcQueueService>() { - @Override - public JdbcQueueService doInConnection(Connection c) throws SQLException, DataAccessException { - return createQueueService(c); - } - }); - } - - @Transactional + /** + * @see JdbcQueueService#consumeAll(io.trygvis.queue.Queue, io.trygvis.queue.TaskEffect) + */ public void consume(final Queue queue, final TaskEffect effect) throws SQLException { - jdbcTemplate.execute(new ConnectionCallback<Object>() { - @Override - public Object doInConnection(Connection c) throws SQLException, DataAccessException { - queueService.consume(c, queue, effect); - return null; - } - }); + queueService.consumeAll(queue, effect); } @Transactional @@ -52,7 +38,7 @@ public class SpringQueueService implements QueueService { return jdbcTemplate.execute(new ConnectionCallback<Queue>() { @Override public Queue doInConnection(Connection c) throws SQLException, DataAccessException { - return queueService.getQueue(c, name, interval, autoCreate); + return queueService.lookupQueue(c, name, interval, autoCreate); } }); } |