diff options
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcQueueService.java')
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcQueueService.java | 105 |
1 files changed, 44 insertions, 61 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index c99bf2e..edd6c80 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -10,6 +10,7 @@ import java.sql.SQLException; import java.util.Date; import java.util.List; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; import static io.trygvis.queue.Task.TaskState.PROCESSING; @@ -26,97 +27,85 @@ public class JdbcQueueService { this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consumeAll(final Queue queue, final TaskEffect effect) throws SQLException { + public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException { final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { @Override public List<Task> doInConnection(Connection c) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - - List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name); - log.trace("Got {} tasks.", tasks.size()); - taskDao.setState(tasks, PROCESSING); - return tasks; + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); } }); - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - applyTasks(c, effect, queueSystem.createTaskDao(c), tasks); - } - }); + applyTasks(req, effect, tasks); } - public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) throws SQLException { - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection connection) throws SQLException { - for (Task task : tasks) { - final Date run = new Date(); - log.info("Setting last run on task. date = {}, task = {}", run, task); - new TaskDao(connection).update(task.markProcessing()); - } - } - }); - - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - TaskDao taskDao = new TaskDao(c); - - applyTasks(c, taskEffect, taskDao, tasks); - } - }); + public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException { + applyTasks(req, taskEffect, tasks); } /** * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect. */ - private void applyTasks(Connection c, TaskEffect effect, final TaskDao taskDao, List<Task> tasks) throws SQLException { - Task task = null; + private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) throws SQLException { + for (Task task : tasks) { + boolean ok = applyTask(effect, task); + + if (!ok && req.stopOnError) { + throw new RuntimeException("Error while executing task, id=" + task.id()); + } + } + } + + private boolean applyTask(TaskEffect effect, final Task task) throws SQLException { try { - for (int i = 0; i < tasks.size(); i++) { - task = tasks.get(i); + final Date run = new Date(); + Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() { + @Override + public Integer doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).update(task.markProcessing()); + } + }); + if (count == 1) { log.info("Executing task {}", task.id()); + } else { + log.trace("Missed task {}", task.id()); + } - List<Task> newTasks = effect.apply(task); + final List<Task> newTasks = effect.apply(task); - Date now = new Date(); + final Date now = new Date(); - log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); - task = task.markOk(now); + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (Task newTask : newTasks) { + schedule(c, newTask); + } - for (Task newTask : newTasks) { - schedule(c, newTask); + queueSystem.createTaskDao(c).update(task.markOk(now)); } + }); - taskDao.update(task); - } - } catch (final Exception e) { - if (task == null) { - return; - } - + return true; + } catch (Exception e) { final Date now = new Date(); log.error("Unable to execute task, id=" + task.id(), e); - final Task t = task; sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); - Task task = t.markFailed(now); - taskDao.update(task); + taskDao.update(task.markFailed(now)); } }); - if(e instanceof SQLException) { + if (e instanceof SQLException) { throw ((SQLException) e); } - throw new RuntimeException("Error while executing task, id=" + task.id(), e); + return false; } } @@ -156,10 +145,4 @@ public class JdbcQueueService { return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); } - - public static class TaskExecutionFailed extends Throwable { - public TaskExecutionFailed(Exception e) { - super(e); - } - } } |