From 1ec4fae12c5e5363591013e5a759590d913d6782 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 16 Jun 2013 12:07:43 +0200 Subject: wip --- src/main/java/io/trygvis/queue/QueueExecutor.java | 184 ++++++++++++++++++++++ 1 file changed, 184 insertions(+) create mode 100644 src/main/java/io/trygvis/queue/QueueExecutor.java (limited to 'src/main/java/io/trygvis/queue/QueueExecutor.java') diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java new file mode 100644 index 0000000..a1eb3b7 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -0,0 +1,184 @@ +package io.trygvis.queue; + +import io.trygvis.async.QueueStats; +import io.trygvis.async.SqlEffect; +import io.trygvis.async.SqlEffectExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueExecutor { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + public final Queue queue; + + private final Stats stats = new Stats(); + + public enum TaskExecutionResult { + OK, + FAILED, + MISSED + } + + public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = sqlEffectExecutor; + this.queue = queue; + } + + private static class Stats { + public int total; + public int ok; + public int failed; + public int scheduled; + + public QueueStats toStats() { + return new QueueStats(total, ok, failed, scheduled); + } + } + + public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + log.info("Consuming tasks: request={}", req); + + List tasks; + do { + tasks = sqlEffectExecutor.transaction(new SqlEffect>() { + @Override + public List doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); + } + }); + + log.info("Consuming chunk with {} tasks", tasks.size()); + + applyTasks(req, effect, tasks); + } while (tasks.size() > 0); + } + + public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List tasks, + ScheduledThreadPoolExecutor executor) { + executor.execute(new Runnable() { + @Override + public void run() { + applyTasks(req, taskEffect, tasks); + } + }); + } + + private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List tasks) { + for (Task task : tasks) { + TaskExecutionResult result = applyTask(effect, task); + + if (result == FAILED && req.stopOnError) { + throw new RuntimeException("Error while executing task, id=" + task.id()); + } + } + } + + /** + * Executed each task in its own transaction. + *

+ * If the task fails, the status is set to error in a separate transaction. + */ + private TaskExecutionResult applyTask(TaskEffect effect, final Task task) { + try { + Integer count = sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public Integer doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).update(task.markProcessing()); + } + }); + + if (count == 0) { + log.trace("Missed task {}", task.id()); + return MISSED; + } + + log.info("Executing task {}", task.id()); + + final List newTasks = effect.apply(task); + + final Date now = new Date(); + + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (Task newTask : newTasks) { + schedule(c, newTask); + } + + queueSystem.createTaskDao(c).update(task.markOk(now)); + } + }); + + synchronized (stats) { + stats.total++; + stats.ok++; + } + + return OK; + } catch (Exception e) { + final Date now = new Date(); + log.error("Unable to execute task, id=" + task.id(), e); + + synchronized (stats) { + stats.total++; + stats.failed++; + } + + try { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + taskDao.update(task.markFailed(now)); + } + }); + } catch (SQLException e1) { + log.error("Error while marking task as failed.", e1); + } + + return FAILED; + } + } + + public void schedule(Connection c, Task task) throws SQLException { + schedule(c, task.queue, task.parent, task.scheduled, task.arguments); + } + + public Task schedule(Connection c, Date scheduled, List arguments) throws SQLException { + return schedule(c, queue.name, null, scheduled, arguments); + } + + public Task schedule(Connection c, long parent, Date scheduled, List arguments) throws SQLException { + return schedule(c, queue.name, parent, scheduled, arguments); + } + + private Task schedule(Connection c, String queue, Long parent, Date scheduled, List arguments) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + + long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); + + synchronized (stats) { + stats.scheduled++; + } + + return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); + } +} -- cgit v1.2.3