package io.trygvis.queue; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; import static io.trygvis.queue.Task.TaskState.NEW; import static io.trygvis.queue.Task.TaskState.PROCESSING; public class JdbcQueueService { private final Logger log = LoggerFactory.getLogger(getClass()); private final QueueSystem queueSystem; private final SqlEffectExecutor sqlEffectExecutor; JdbcQueueService(QueueSystem queueSystem) { this.queueSystem = queueSystem; this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } public void consumeAll(final Queue queue, final TaskEffect effect) throws SQLException { final List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { @Override public List doInConnection(Connection c) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); List tasks = taskDao.findByNameAndCompletedIsNull(queue.name); log.trace("Got {} tasks.", tasks.size()); taskDao.setState(tasks, PROCESSING); return tasks; } }); sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { applyTasks(c, effect, queueSystem.createTaskDao(c), tasks); } }); } public void executeTask(final TaskEffect taskEffect, final List tasks) throws SQLException { sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection connection) throws SQLException { for (Task task : tasks) { final Date run = new Date(); log.info("Setting last run on task. date = {}, task = {}", run, task); new TaskDao(connection).update(task.markProcessing()); } } }); sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { TaskDao taskDao = new TaskDao(c); applyTasks(c, taskEffect, taskDao, tasks); } }); } /** * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect. */ private void applyTasks(Connection c, TaskEffect effect, final TaskDao taskDao, List tasks) throws SQLException { Task task = null; try { for (int i = 0; i < tasks.size(); i++) { task = tasks.get(i); log.info("Executing task {}", task.id()); List newTasks = effect.apply(task); Date now = new Date(); log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); task = task.markOk(now); for (Task newTask : newTasks) { schedule(c, newTask); } taskDao.update(task); } } catch (final Exception e) { if (task == null) { return; } final Date now = new Date(); log.error("Unable to execute task, id=" + task.id(), e); final Task t = task; sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); Task task = t.markFailed(now); taskDao.update(task); } }); if(e instanceof SQLException) { throw ((SQLException) e); } throw new RuntimeException("Error while executing task, id=" + task.id(), e); } } public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { QueueDao queueDao = queueSystem.createQueueDao(c); Queue q = queueDao.findByName(name); if (q == null) { if (!autoCreate) { throw new SQLException("No such queue: '" + name + "'."); } q = new Queue(name, interval); queueDao.insert(q); } return q; } public void schedule(Connection c, Task task) throws SQLException { schedule(c, task.queue, task.parent, task.scheduled, task.arguments); } public Task schedule(Connection c, Queue queue, Date scheduled, List arguments) throws SQLException { return schedule(c, queue.name, null, scheduled, arguments); } public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List arguments) throws SQLException { return schedule(c, queue.name, parent, scheduled, arguments); } private Task schedule(Connection c, String queue, Long parent, Date scheduled, List arguments) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); } public static class TaskExecutionFailed extends Throwable { public TaskExecutionFailed(Exception e) { super(e); } } }