aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java131
-rw-r--r--src/main/java/io/trygvis/queue/QueueExecutor.java184
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java4
-rw-r--r--src/main/java/io/trygvis/queue/QueueSystem.java2
4 files changed, 203 insertions, 118 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index cb7af4b..a366838 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -1,121 +1,42 @@
package io.trygvis.queue;
-import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.Date;
-import java.util.List;
-
-import static io.trygvis.queue.QueueService.TaskExecutionRequest;
-import static io.trygvis.queue.Task.TaskState.NEW;
+import java.util.HashMap;
+import java.util.Map;
public class JdbcQueueService {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
private final QueueSystem queueSystem;
private final SqlEffectExecutor sqlEffectExecutor;
+ private final Map<String, QueueExecutor> queues = new HashMap<>();
+
JdbcQueueService(QueueSystem queueSystem) {
this.queueSystem = queueSystem;
this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
- log.info("Consuming tasks: request={}", req);
-
- List<Task> tasks;
- do {
- tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
- @Override
- public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize);
- }
- });
-
- log.info("Consuming chunk with {} tasks", tasks.size());
-
- applyTasks(req, effect, tasks);
- } while (tasks.size() > 0);
- }
-
- public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException {
- applyTasks(req, taskEffect, tasks);
- }
+ public synchronized QueueExecutor getQueue(String name) {
+ QueueExecutor queueExecutor = queues.get(name);
- /**
- * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect.
- */
- private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) throws SQLException {
- for (Task task : tasks) {
- boolean ok = applyTask(effect, task);
-
- if (!ok && req.stopOnError) {
- throw new RuntimeException("Error while executing task, id=" + task.id());
- }
+ if (queueExecutor != null) {
+ return queueExecutor;
}
- }
-
- private boolean applyTask(TaskEffect effect, final Task task) throws SQLException {
- try {
- final Date run = new Date();
- Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
- @Override
- public Integer doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).update(task.markProcessing());
- }
- });
-
- if (count == 1) {
- log.info("Executing task {}", task.id());
- } else {
- log.trace("Missed task {}", task.id());
- }
-
- final List<Task> newTasks = effect.apply(task);
-
- final Date now = new Date();
-
- log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection c) throws SQLException {
- for (Task newTask : newTasks) {
- schedule(c, newTask);
- }
-
- queueSystem.createTaskDao(c).update(task.markOk(now));
- }
- });
-
- return true;
- } catch (Exception e) {
- final Date now = new Date();
- log.error("Unable to execute task, id=" + task.id(), e);
-
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection c) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
- taskDao.update(task.markFailed(now));
- }
- });
+ throw new IllegalArgumentException("No such queue: " + name);
+ }
- if (e instanceof SQLException) {
- throw ((SQLException) e);
- }
+ public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException {
+ QueueExecutor queueExecutor = queues.get(name);
- return false;
+ if (queueExecutor != null) {
+ return queueExecutor;
}
- }
- public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
QueueDao queueDao = queueSystem.createQueueDao(c);
Queue q = queueDao.findByName(name);
@@ -129,26 +50,8 @@ public class JdbcQueueService {
queueDao.insert(q);
}
- return q;
- }
-
- public void schedule(Connection c, Task task) throws SQLException {
- schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
- }
-
- public Task schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
- return schedule(c, queue.name, null, scheduled, arguments);
- }
-
- public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List<String> arguments) throws SQLException {
- return schedule(c, queue.name, parent, scheduled, arguments);
- }
-
- private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
-
- long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
-
- return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
+ queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q);
+ queues.put(name, queueExecutor);
+ return queueExecutor;
}
}
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java
new file mode 100644
index 0000000..a1eb3b7
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueExecutor.java
@@ -0,0 +1,184 @@
+package io.trygvis.queue;
+
+import io.trygvis.async.QueueStats;
+import io.trygvis.async.SqlEffect;
+import io.trygvis.async.SqlEffectExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED;
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED;
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
+public class QueueExecutor {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ public final Queue queue;
+
+ private final Stats stats = new Stats();
+
+ public enum TaskExecutionResult {
+ OK,
+ FAILED,
+ MISSED
+ }
+
+ public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = sqlEffectExecutor;
+ this.queue = queue;
+ }
+
+ private static class Stats {
+ public int total;
+ public int ok;
+ public int failed;
+ public int scheduled;
+
+ public QueueStats toStats() {
+ return new QueueStats(total, ok, failed, scheduled);
+ }
+ }
+
+ public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
+ log.info("Consuming tasks: request={}", req);
+
+ List<Task> tasks;
+ do {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Consuming chunk with {} tasks", tasks.size());
+
+ applyTasks(req, effect, tasks);
+ } while (tasks.size() > 0);
+ }
+
+ public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List<Task> tasks,
+ ScheduledThreadPoolExecutor executor) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ applyTasks(req, taskEffect, tasks);
+ }
+ });
+ }
+
+ private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
+ for (Task task : tasks) {
+ TaskExecutionResult result = applyTask(effect, task);
+
+ if (result == FAILED && req.stopOnError) {
+ throw new RuntimeException("Error while executing task, id=" + task.id());
+ }
+ }
+ }
+
+ /**
+ * Executed each task in its own transaction.
+ * <p/>
+ * If the task fails, the status is set to error in a separate transaction.
+ */
+ private TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
+ try {
+ Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
+ @Override
+ public Integer doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).update(task.markProcessing());
+ }
+ });
+
+ if (count == 0) {
+ log.trace("Missed task {}", task.id());
+ return MISSED;
+ }
+
+ log.info("Executing task {}", task.id());
+
+ final List<Task> newTasks = effect.apply(task);
+
+ final Date now = new Date();
+
+ log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
+
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ for (Task newTask : newTasks) {
+ schedule(c, newTask);
+ }
+
+ queueSystem.createTaskDao(c).update(task.markOk(now));
+ }
+ });
+
+ synchronized (stats) {
+ stats.total++;
+ stats.ok++;
+ }
+
+ return OK;
+ } catch (Exception e) {
+ final Date now = new Date();
+ log.error("Unable to execute task, id=" + task.id(), e);
+
+ synchronized (stats) {
+ stats.total++;
+ stats.failed++;
+ }
+
+ try {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+ taskDao.update(task.markFailed(now));
+ }
+ });
+ } catch (SQLException e1) {
+ log.error("Error while marking task as failed.", e1);
+ }
+
+ return FAILED;
+ }
+ }
+
+ public void schedule(Connection c, Task task) throws SQLException {
+ schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
+ }
+
+ public Task schedule(Connection c, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, null, scheduled, arguments);
+ }
+
+ public Task schedule(Connection c, long parent, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, parent, scheduled, arguments);
+ }
+
+ private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+
+ long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
+
+ synchronized (stats) {
+ stats.scheduled++;
+ }
+
+ return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
index d97eaf0..eee14ed 100644
--- a/src/main/java/io/trygvis/queue/QueueService.java
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -5,9 +5,7 @@ import java.util.Date;
import java.util.List;
public interface QueueService {
- void consume(Queue queue, TaskExecutionRequest req, TaskEffect effect) throws SQLException;
-
- Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException;
+ QueueExecutor getQueue(String name, int interval, boolean autoCreate) throws SQLException;
void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException;
diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java
index 3b0c018..6710bf4 100644
--- a/src/main/java/io/trygvis/queue/QueueSystem.java
+++ b/src/main/java/io/trygvis/queue/QueueSystem.java
@@ -15,7 +15,7 @@ public class QueueSystem {
public final SqlEffectExecutor sqlEffectExecutor;
- public final JdbcQueueService queueService;
+ private final JdbcQueueService queueService;
private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
sqlEffectExecutor.transaction(new SqlEffect.Void() {