diff options
Diffstat (limited to 'src/main/java')
-rw-r--r-- | src/main/java/io/trygvis/async/QueueThread.java | 7 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/SqlEffectExecutor.java | 18 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcQueueService.java | 105 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueService.java | 11 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskDao.java | 21 | ||||
-rw-r--r-- | src/main/java/io/trygvis/spring/SpringQueueService.java | 6 |
6 files changed, 75 insertions, 93 deletions
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 33753a3..558e769 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -12,6 +12,9 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.List; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; +import static io.trygvis.queue.Task.TaskState.NEW; + class QueueThread implements Runnable { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -56,14 +59,14 @@ class QueueThread implements Runnable { List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { @Override public List<Task> doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByNameAndCompletedIsNull(queue.name); + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { - queueService.executeTask(taskEffect, tasks); + queueService.executeTask(new TaskExecutionRequest(true), taskEffect, tasks); } } catch (Throwable e) { log.warn("Error while executing tasks.", e); diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java index 51ad31d..3da2cd3 100644 --- a/src/main/java/io/trygvis/async/SqlEffectExecutor.java +++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java @@ -15,11 +15,11 @@ public class SqlEffectExecutor { } public <A> A transaction(SqlEffect<A> effect) throws SQLException { - int pid; +// int pid; try (Connection c = dataSource.getConnection()) { - pid = getPid(c); - System.out.println("pid = " + pid); +// pid = getPid(c); +// System.out.println("pid = " + pid); boolean ok = false; try { @@ -28,7 +28,7 @@ public class SqlEffectExecutor { ok = true; return a; } finally { - System.out.println("Closing, pid = " + pid); +// System.out.println("Closing, pid = " + pid); if (!ok) { try { c.rollback(); @@ -49,14 +49,4 @@ public class SqlEffectExecutor { } }); } - - private int getPid(Connection c) throws SQLException { - int pid; - try (Statement statement = c.createStatement()) { - ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()"); - rs.next(); - pid = rs.getInt(1); - } - return pid; - } } diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index c99bf2e..edd6c80 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -10,6 +10,7 @@ import java.sql.SQLException; import java.util.Date; import java.util.List; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; import static io.trygvis.queue.Task.TaskState.PROCESSING; @@ -26,97 +27,85 @@ public class JdbcQueueService { this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consumeAll(final Queue queue, final TaskEffect effect) throws SQLException { + public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException { final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { @Override public List<Task> doInConnection(Connection c) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - - List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name); - log.trace("Got {} tasks.", tasks.size()); - taskDao.setState(tasks, PROCESSING); - return tasks; + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); } }); - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - applyTasks(c, effect, queueSystem.createTaskDao(c), tasks); - } - }); + applyTasks(req, effect, tasks); } - public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) throws SQLException { - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection connection) throws SQLException { - for (Task task : tasks) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - new TaskDao(connection).update(task.markProcessing()); - } - } - }); - - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - TaskDao taskDao = new TaskDao(c); - - applyTasks(c, taskEffect, taskDao, tasks); - } - }); + public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException { + applyTasks(req, taskEffect, tasks); } /** * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect. */ - private void applyTasks(Connection c, TaskEffect effect, final TaskDao taskDao, List<Task> tasks) throws SQLException { - Task task = null; + private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) throws SQLException { + for (Task task : tasks) { + boolean ok = applyTask(effect, task); + + if (!ok && req.stopOnError) { + throw new RuntimeException("Error while executing task, id=" + task.id()); + } + } + } + + private boolean applyTask(TaskEffect effect, final Task task) throws SQLException { try { - for (int i = 0; i < tasks.size(); i++) { - task = tasks.get(i); + final Date run = new Date(); + Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() { + @Override + public Integer doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).update(task.markProcessing()); + } + }); + if (count == 1) { log.info("Executing task {}", task.id()); + } else { + log.trace("Missed task {}", task.id()); + } - List<Task> newTasks = effect.apply(task); + final List<Task> newTasks = effect.apply(task); - Date now = new Date(); + final Date now = new Date(); - log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); - task = task.markOk(now); + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (Task newTask : newTasks) { + schedule(c, newTask); + } - for (Task newTask : newTasks) { - schedule(c, newTask); + queueSystem.createTaskDao(c).update(task.markOk(now)); } + }); - taskDao.update(task); - } - } catch (final Exception e) { - if (task == null) { - return; - } - + return true; + } catch (Exception e) { final Date now = new Date(); log.error("Unable to execute task, id=" + task.id(), e); - final Task t = task; sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); - Task task = t.markFailed(now); - taskDao.update(task); + taskDao.update(task.markFailed(now)); } }); - if(e instanceof SQLException) { + if (e instanceof SQLException) { throw ((SQLException) e); } - throw new RuntimeException("Error while executing task, id=" + task.id(), e); + return false; } } @@ -156,10 +145,4 @@ public class JdbcQueueService { return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); } - - public static class TaskExecutionFailed extends Throwable { - public TaskExecutionFailed(Exception e) { - super(e); - } - } } diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index 9773766..ee74bf4 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -5,9 +5,18 @@ import java.util.Date; import java.util.List; public interface QueueService { - void consume(Queue queue, TaskEffect effect) throws SQLException; + void consume(Queue queue, TaskExecutionRequest req, TaskEffect effect) throws SQLException; Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException; void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException; + + public static class TaskExecutionRequest { + public final boolean stopOnError; + // TODO: saveExceptions + + public TaskExecutionRequest(boolean stopOnError) { + this.stopOnError = stopOnError; + } + } } diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 8b58585..9adec8f 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -57,10 +57,11 @@ public class TaskDao { } } - public List<Task> findByNameAndCompletedIsNull(String name) throws SQLException { - try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) { + public List<Task> findByQueueAndState(String queue, TaskState state) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=?")) { int i = 1; - stmt.setString(i, name); + stmt.setString(i++, queue); + stmt.setString(i, state.name()); ResultSet rs = stmt.executeQuery(); List<Task> list = new ArrayList<>(); while (rs.next()) { @@ -70,7 +71,7 @@ public class TaskDao { } } - public void update(Task task) throws SQLException { + public int update(Task task) throws SQLException { try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { int i = 1; stmt.setString(i++, task.state.name()); @@ -79,19 +80,15 @@ public class TaskDao { stmt.setInt(i++, task.runCount); setTimestamp(stmt, i++, task.completed); stmt.setLong(i, task.id()); - stmt.executeUpdate(); + return stmt.executeUpdate(); } } - public void setState(List<Task> tasks, TaskState state) throws SQLException { - Long[] ids = new Long[tasks.size()]; - for (int i = 0, tasksSize = tasks.size(); i < tasksSize; i++) { - ids[i] = tasks.get(i).id(); - } - try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ANY (?)")) { + public void setState(Task task, TaskState state) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) { int i = 1; stmt.setString(i++, state.name()); - stmt.setObject(i, c.createArrayOf("bigint", ids)); + stmt.setLong(i, task.id()); stmt.executeUpdate(); } } diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java index 21746e5..271e9bf 100644 --- a/src/main/java/io/trygvis/spring/SpringQueueService.java +++ b/src/main/java/io/trygvis/spring/SpringQueueService.java @@ -27,10 +27,10 @@ public class SpringQueueService implements QueueService { } /** - * @see JdbcQueueService#consumeAll(io.trygvis.queue.Queue, io.trygvis.queue.TaskEffect) + * @see JdbcQueueService#consumeAll(io.trygvis.queue.Queue, io.trygvis.queue.QueueService.TaskExecutionRequest, io.trygvis.queue.TaskEffect) */ - public void consume(final Queue queue, final TaskEffect effect) throws SQLException { - queueService.consumeAll(queue, effect); + public void consume(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + queueService.consumeAll(queue, req, effect); } @Transactional |