diff options
Diffstat (limited to 'src/main/java/io/trygvis/queue')
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcQueueService.java | 131 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueExecutor.java | 184 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueService.java | 4 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueSystem.java | 2 |
4 files changed, 203 insertions, 118 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index cb7af4b..a366838 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -1,121 +1,42 @@ package io.trygvis.queue; -import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; -import java.util.Date; -import java.util.List; - -import static io.trygvis.queue.QueueService.TaskExecutionRequest; -import static io.trygvis.queue.Task.TaskState.NEW; +import java.util.HashMap; +import java.util.Map; public class JdbcQueueService { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final QueueSystem queueSystem; private final SqlEffectExecutor sqlEffectExecutor; + private final Map<String, QueueExecutor> queues = new HashMap<>(); + JdbcQueueService(QueueSystem queueSystem) { this.queueSystem = queueSystem; this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException { - log.info("Consuming tasks: request={}", req); - - List<Task> tasks; - do { - tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { - @Override - public List<Task> doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); - } - }); - - log.info("Consuming chunk with {} tasks", tasks.size()); - - applyTasks(req, effect, tasks); - } while (tasks.size() > 0); - } - - public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException { - applyTasks(req, taskEffect, tasks); - } + public synchronized QueueExecutor getQueue(String name) { + QueueExecutor queueExecutor = queues.get(name); - /** - * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect. - */ - private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) throws SQLException { - for (Task task : tasks) { - boolean ok = applyTask(effect, task); - - if (!ok && req.stopOnError) { - throw new RuntimeException("Error while executing task, id=" + task.id()); - } + if (queueExecutor != null) { + return queueExecutor; } - } - - private boolean applyTask(TaskEffect effect, final Task task) throws SQLException { - try { - final Date run = new Date(); - Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() { - @Override - public Integer doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).update(task.markProcessing()); - } - }); - - if (count == 1) { - log.info("Executing task {}", task.id()); - } else { - log.trace("Missed task {}", task.id()); - } - - final List<Task> newTasks = effect.apply(task); - - final Date now = new Date(); - - log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - for (Task newTask : newTasks) { - schedule(c, newTask); - } - - queueSystem.createTaskDao(c).update(task.markOk(now)); - } - }); - - return true; - } catch (Exception e) { - final Date now = new Date(); - log.error("Unable to execute task, id=" + task.id(), e); - - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - taskDao.update(task.markFailed(now)); - } - }); + throw new IllegalArgumentException("No such queue: " + name); + } - if (e instanceof SQLException) { - throw ((SQLException) e); - } + public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException { + QueueExecutor queueExecutor = queues.get(name); - return false; + if (queueExecutor != null) { + return queueExecutor; } - } - public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException { QueueDao queueDao = queueSystem.createQueueDao(c); Queue q = queueDao.findByName(name); @@ -129,26 +50,8 @@ public class JdbcQueueService { queueDao.insert(q); } - return q; - } - - public void schedule(Connection c, Task task) throws SQLException { - schedule(c, task.queue, task.parent, task.scheduled, task.arguments); - } - - public Task schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException { - return schedule(c, queue.name, null, scheduled, arguments); - } - - public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List<String> arguments) throws SQLException { - return schedule(c, queue.name, parent, scheduled, arguments); - } - - private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - - long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); - - return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); + queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q); + queues.put(name, queueExecutor); + return queueExecutor; } } diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java new file mode 100644 index 0000000..a1eb3b7 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -0,0 +1,184 @@ +package io.trygvis.queue; + +import io.trygvis.async.QueueStats; +import io.trygvis.async.SqlEffect; +import io.trygvis.async.SqlEffectExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueExecutor { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + public final Queue queue; + + private final Stats stats = new Stats(); + + public enum TaskExecutionResult { + OK, + FAILED, + MISSED + } + + public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = sqlEffectExecutor; + this.queue = queue; + } + + private static class Stats { + public int total; + public int ok; + public int failed; + public int scheduled; + + public QueueStats toStats() { + return new QueueStats(total, ok, failed, scheduled); + } + } + + public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + log.info("Consuming tasks: request={}", req); + + List<Task> tasks; + do { + tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { + @Override + public List<Task> doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); + } + }); + + log.info("Consuming chunk with {} tasks", tasks.size()); + + applyTasks(req, effect, tasks); + } while (tasks.size() > 0); + } + + public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List<Task> tasks, + ScheduledThreadPoolExecutor executor) { + executor.execute(new Runnable() { + @Override + public void run() { + applyTasks(req, taskEffect, tasks); + } + }); + } + + private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) { + for (Task task : tasks) { + TaskExecutionResult result = applyTask(effect, task); + + if (result == FAILED && req.stopOnError) { + throw new RuntimeException("Error while executing task, id=" + task.id()); + } + } + } + + /** + * Executed each task in its own transaction. + * <p/> + * If the task fails, the status is set to error in a separate transaction. + */ + private TaskExecutionResult applyTask(TaskEffect effect, final Task task) { + try { + Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() { + @Override + public Integer doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).update(task.markProcessing()); + } + }); + + if (count == 0) { + log.trace("Missed task {}", task.id()); + return MISSED; + } + + log.info("Executing task {}", task.id()); + + final List<Task> newTasks = effect.apply(task); + + final Date now = new Date(); + + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (Task newTask : newTasks) { + schedule(c, newTask); + } + + queueSystem.createTaskDao(c).update(task.markOk(now)); + } + }); + + synchronized (stats) { + stats.total++; + stats.ok++; + } + + return OK; + } catch (Exception e) { + final Date now = new Date(); + log.error("Unable to execute task, id=" + task.id(), e); + + synchronized (stats) { + stats.total++; + stats.failed++; + } + + try { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + taskDao.update(task.markFailed(now)); + } + }); + } catch (SQLException e1) { + log.error("Error while marking task as failed.", e1); + } + + return FAILED; + } + } + + public void schedule(Connection c, Task task) throws SQLException { + schedule(c, task.queue, task.parent, task.scheduled, task.arguments); + } + + public Task schedule(Connection c, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, null, scheduled, arguments); + } + + public Task schedule(Connection c, long parent, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, parent, scheduled, arguments); + } + + private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + + long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); + + synchronized (stats) { + stats.scheduled++; + } + + return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); + } +} diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index d97eaf0..eee14ed 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -5,9 +5,7 @@ import java.util.Date; import java.util.List; public interface QueueService { - void consume(Queue queue, TaskExecutionRequest req, TaskEffect effect) throws SQLException; - - Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException; + QueueExecutor getQueue(String name, int interval, boolean autoCreate) throws SQLException; void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException; diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java index 3b0c018..6710bf4 100644 --- a/src/main/java/io/trygvis/queue/QueueSystem.java +++ b/src/main/java/io/trygvis/queue/QueueSystem.java @@ -15,7 +15,7 @@ public class QueueSystem { public final SqlEffectExecutor sqlEffectExecutor; - public final JdbcQueueService queueService; + private final JdbcQueueService queueService; private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException { sqlEffectExecutor.transaction(new SqlEffect.Void() { |