diff options
Diffstat (limited to 'src/main/java/io/trygvis/queue/QueueExecutor.java')
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueExecutor.java | 177 |
1 files changed, 177 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java new file mode 100644 index 0000000..88e5b46 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -0,0 +1,177 @@ +package io.trygvis.queue; + +import io.trygvis.async.QueueStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueExecutor { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + public final Queue queue; + + private final Stats stats = new Stats(); + + public enum TaskExecutionResult { + OK, + FAILED, + MISSED + } + + public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = sqlEffectExecutor; + this.queue = queue; + } + + private static class Stats { + public int total; + public int ok; + public int failed; + public int scheduled; + public int missed; + + public synchronized QueueStats toStats() { + return new QueueStats(total, ok, failed, missed, scheduled); + } + } + + public QueueStats getStats() { + return stats.toStats(); + } + + public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + log.info("Consuming tasks: request={}", req); + + List<Task> tasks; + do { + tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { + @Override + public List<Task> doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); + } + }); + + log.info("Consuming chunk with {} tasks", tasks.size()); + + applyTasks(req, effect, tasks); + } while (tasks.size() > 0); + } + + public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) { + for (Task task : tasks) { + TaskExecutionResult result = applyTask(effect, task); + + if (result == FAILED && req.stopOnError) { + throw new RuntimeException("Error while executing task, id=" + task.id()); + } + } + } + + /** + * Executed each task in its own transaction. + * <p/> + * If the task fails, the status is set to error in a separate transaction. + */ + public TaskExecutionResult applyTask(TaskEffect effect, final Task task) { + try { + Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() { + @Override + public Integer doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW); + } + }); + + if (count == 0) { + log.warn("Missed task {}", task.id()); + synchronized (stats) { + stats.missed++; + } + return MISSED; + } + + log.info("Executing task {}", task.id()); + + final List<Task> newTasks = effect.apply(task); + + final Date now = new Date(); + + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (Task newTask : newTasks) { + schedule(c, newTask); + } + + queueSystem.createTaskDao(c).update(task.markOk(now)); + } + }); + + synchronized (stats) { + stats.total++; + stats.ok++; + } + + return OK; + } catch (Exception e) { + final Date now = new Date(); + log.error("Unable to execute task, id=" + task.id(), e); + + synchronized (stats) { + stats.total++; + stats.failed++; + } + + try { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + taskDao.update(task.markFailed(now)); + } + }); + } catch (SQLException e1) { + log.error("Error while marking task as failed.", e1); + } + + return FAILED; + } + } + + public void schedule(Connection c, Task task) throws SQLException { + schedule(c, task.queue, task.parent, task.scheduled, task.arguments); + } + + public Task schedule(Connection c, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, null, scheduled, arguments); + } + + public Task schedule(Connection c, long parent, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, parent, scheduled, arguments); + } + + private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + + long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); + + synchronized (stats) { + stats.scheduled++; + } + + return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); + } +} |