aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/QueueExecutor.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/QueueExecutor.java')
-rw-r--r--src/main/java/io/trygvis/queue/QueueExecutor.java177
1 files changed, 177 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java
new file mode 100644
index 0000000..88e5b46
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueExecutor.java
@@ -0,0 +1,177 @@
+package io.trygvis.queue;
+
+import io.trygvis.async.QueueStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
+public class QueueExecutor {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ public final Queue queue;
+
+ private final Stats stats = new Stats();
+
+ public enum TaskExecutionResult {
+ OK,
+ FAILED,
+ MISSED
+ }
+
+ public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = sqlEffectExecutor;
+ this.queue = queue;
+ }
+
+ private static class Stats {
+ public int total;
+ public int ok;
+ public int failed;
+ public int scheduled;
+ public int missed;
+
+ public synchronized QueueStats toStats() {
+ return new QueueStats(total, ok, failed, missed, scheduled);
+ }
+ }
+
+ public QueueStats getStats() {
+ return stats.toStats();
+ }
+
+ public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
+ log.info("Consuming tasks: request={}", req);
+
+ List<Task> tasks;
+ do {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Consuming chunk with {} tasks", tasks.size());
+
+ applyTasks(req, effect, tasks);
+ } while (tasks.size() > 0);
+ }
+
+ public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
+ for (Task task : tasks) {
+ TaskExecutionResult result = applyTask(effect, task);
+
+ if (result == FAILED && req.stopOnError) {
+ throw new RuntimeException("Error while executing task, id=" + task.id());
+ }
+ }
+ }
+
+ /**
+ * Executed each task in its own transaction.
+ * <p/>
+ * If the task fails, the status is set to error in a separate transaction.
+ */
+ public TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
+ try {
+ Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
+ @Override
+ public Integer doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW);
+ }
+ });
+
+ if (count == 0) {
+ log.warn("Missed task {}", task.id());
+ synchronized (stats) {
+ stats.missed++;
+ }
+ return MISSED;
+ }
+
+ log.info("Executing task {}", task.id());
+
+ final List<Task> newTasks = effect.apply(task);
+
+ final Date now = new Date();
+
+ log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
+
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ for (Task newTask : newTasks) {
+ schedule(c, newTask);
+ }
+
+ queueSystem.createTaskDao(c).update(task.markOk(now));
+ }
+ });
+
+ synchronized (stats) {
+ stats.total++;
+ stats.ok++;
+ }
+
+ return OK;
+ } catch (Exception e) {
+ final Date now = new Date();
+ log.error("Unable to execute task, id=" + task.id(), e);
+
+ synchronized (stats) {
+ stats.total++;
+ stats.failed++;
+ }
+
+ try {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+ taskDao.update(task.markFailed(now));
+ }
+ });
+ } catch (SQLException e1) {
+ log.error("Error while marking task as failed.", e1);
+ }
+
+ return FAILED;
+ }
+ }
+
+ public void schedule(Connection c, Task task) throws SQLException {
+ schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
+ }
+
+ public Task schedule(Connection c, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, null, scheduled, arguments);
+ }
+
+ public Task schedule(Connection c, long parent, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, parent, scheduled, arguments);
+ }
+
+ private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+
+ long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
+
+ synchronized (stats) {
+ stats.scheduled++;
+ }
+
+ return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
+ }
+}