diff options
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcQueueService.java')
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcQueueService.java | 142 |
1 files changed, 105 insertions, 37 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index 793333d..d284287 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -1,61 +1,127 @@ package io.trygvis.queue; +import io.trygvis.async.SqlEffect; +import io.trygvis.async.SqlEffectExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; -import java.sql.DatabaseMetaData; import java.sql.SQLException; import java.util.Date; import java.util.List; +import static io.trygvis.queue.Task.TaskState.NEW; +import static io.trygvis.queue.Task.TaskState.PROCESSING; + public class JdbcQueueService { - private Logger log = LoggerFactory.getLogger(getClass()); + private final Logger log = LoggerFactory.getLogger(getClass()); - private JdbcQueueService(Connection c) throws SQLException { - if (c.getAutoCommit()) { - throw new SQLException("The connection cannot be in auto-commit mode."); - } + private final QueueSystem queueSystem; - DatabaseMetaData metaData = c.getMetaData(); - String productName = metaData.getDatabaseProductName(); - String productVersion = metaData.getDatabaseProductVersion(); + private final SqlEffectExecutor sqlEffectExecutor; - log.info("productName = " + productName); - log.info("productVersion = " + productVersion); + JdbcQueueService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consume(Connection c, Queue queue, QueueService.TaskEffect effect) throws SQLException { - TaskDao taskDao = createTaskDao(c); + public void consumeAll(final Queue queue, final TaskEffect effect) { + final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { + @Override + public List<Task> doInConnection(Connection c) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); - List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name); - log.trace("Got {} tasks.", tasks.size()); + List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name); + log.trace("Got {} tasks.", tasks.size()); + taskDao.setState(tasks, PROCESSING); + return tasks; + } + }); - for (Task task : tasks) { - log.trace("Executing task {}", task.id()); - try { - List<Task> newTasks = effect.consume(task); - log.trace("Executed task {}, newTasks: ", task.id(), newTasks.size()); + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + applyTasks(c, effect, queueSystem.createTaskDao(c), tasks); + } + }); + } + + public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection connection) throws SQLException { + for (Task task : tasks) { + final Date run = new Date(); + log.info("Setting last run on task. date = {}, task = {}", run, task); + new TaskDao(connection).update(task.markProcessing()); + } + } + }); + + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + TaskDao taskDao = new TaskDao(c); + + applyTasks(c, taskEffect, taskDao, tasks); + } + }); + } + + /** + * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect. + */ + private void applyTasks(Connection c, TaskEffect effect, final TaskDao taskDao, List<Task> tasks) throws SQLException { + Task task = null; + try { + for (int i = 0; i < tasks.size(); i++) { + task = tasks.get(i); + + log.info("Executing task {}", task.id()); + + List<Task> newTasks = effect.apply(task); Date now = new Date(); - task = task.registerComplete(now); + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + + task = task.markOk(now); for (Task newTask : newTasks) { - taskDao.insert(task.id(), newTask.queue, now, newTask.arguments); + schedule(c, newTask); } taskDao.update(task); - } catch (Throwable e) { - log.error("Unable to execute task, id=" + task.id(), e); } - c.commit(); + } catch (final Exception e) { + if (task == null) { + return; + } + + final Date now = new Date(); + log.error("Unable to execute task, id=" + task.id(), e); + + try { + taskDao.rollback(); + } catch (SQLException e2) { + log.error("Error rolling back transaction after failed apply.", e2); + } + + final Task t = task; + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + Task task = t.markFailed(now); + taskDao.update(task); + } + }); } } - public Queue getQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { - QueueDao queueDao = createQueueDao(c); + public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { + QueueDao queueDao = queueSystem.createQueueDao(c); Queue q = queueDao.findByName(name); @@ -71,21 +137,23 @@ public class JdbcQueueService { return q; } - public void schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException { - TaskDao taskDao = createTaskDao(c); - - taskDao.insert(queue.name, scheduled, arguments); + public void schedule(Connection c, Task task) throws SQLException { + schedule(c, task.queue, task.parent, task.scheduled, task.arguments); } - public static JdbcQueueService createQueueService(Connection c) throws SQLException { - return new JdbcQueueService(c); + public Task schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, null, scheduled, arguments); } - public QueueDao createQueueDao(Connection c) { - return new QueueDao(c); + public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, parent, scheduled, arguments); } - public TaskDao createTaskDao(Connection c) { - return new TaskDao(c); + private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + + long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); + + return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); } } |