package io.trygvis.queue; import io.trygvis.async.QueueStats; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*; import static io.trygvis.queue.Task.TaskState.NEW; public class QueueExecutor { private final Logger log = LoggerFactory.getLogger(getClass()); private final QueueSystem queueSystem; private final SqlEffectExecutor sqlEffectExecutor; public final Queue queue; private final Stats stats = new Stats(); public enum TaskExecutionResult { OK, FAILED, MISSED } public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) { this.queueSystem = queueSystem; this.sqlEffectExecutor = sqlEffectExecutor; this.queue = queue; } private static class Stats { public int total; public int ok; public int failed; public int scheduled; public QueueStats toStats() { return new QueueStats(total, ok, failed, scheduled); } } public QueueStats getStats() { return stats.toStats(); } public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { log.info("Consuming tasks: request={}", req); List tasks; do { tasks = sqlEffectExecutor.transaction(new SqlEffect>() { @Override public List doInConnection(Connection c) throws SQLException { return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); } }); log.info("Consuming chunk with {} tasks", tasks.size()); applyTasks(req, effect, tasks); } while (tasks.size() > 0); } public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List tasks) { for (Task task : tasks) { TaskExecutionResult result = applyTask(effect, task); if (result == FAILED && req.stopOnError) { throw new RuntimeException("Error while executing task, id=" + task.id()); } } } /** * Executed each task in its own transaction. *

* If the task fails, the status is set to error in a separate transaction. */ public TaskExecutionResult applyTask(TaskEffect effect, final Task task) { try { Integer count = sqlEffectExecutor.transaction(new SqlEffect() { @Override public Integer doInConnection(Connection c) throws SQLException { return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW); } }); if (count == 0) { log.warn("Missed task {}", task.id()); return MISSED; } log.info("Executing task {}", task.id()); final List newTasks = effect.apply(task); final Date now = new Date(); log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { for (Task newTask : newTasks) { schedule(c, newTask); } queueSystem.createTaskDao(c).update(task.markOk(now)); } }); synchronized (stats) { stats.total++; stats.ok++; } return OK; } catch (Exception e) { final Date now = new Date(); log.error("Unable to execute task, id=" + task.id(), e); synchronized (stats) { stats.total++; stats.failed++; } try { sqlEffectExecutor.transaction(new SqlEffect.Void() { @Override public void doInConnection(Connection c) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); taskDao.update(task.markFailed(now)); } }); } catch (SQLException e1) { log.error("Error while marking task as failed.", e1); } return FAILED; } } public void schedule(Connection c, Task task) throws SQLException { schedule(c, task.queue, task.parent, task.scheduled, task.arguments); } public Task schedule(Connection c, Date scheduled, List arguments) throws SQLException { return schedule(c, queue.name, null, scheduled, arguments); } public Task schedule(Connection c, long parent, Date scheduled, List arguments) throws SQLException { return schedule(c, queue.name, parent, scheduled, arguments); } private Task schedule(Connection c, String queue, Long parent, Date scheduled, List arguments) throws SQLException { TaskDao taskDao = queueSystem.createTaskDao(c); long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); synchronized (stats) { stats.scheduled++; } return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); } }