aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue/JdbcQueueService.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/queue/JdbcQueueService.java')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java142
1 files changed, 105 insertions, 37 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index 793333d..d284287 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -1,61 +1,127 @@
package io.trygvis.queue;
+import io.trygvis.async.SqlEffect;
+import io.trygvis.async.SqlEffectExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import static io.trygvis.queue.Task.TaskState.NEW;
+import static io.trygvis.queue.Task.TaskState.PROCESSING;
+
public class JdbcQueueService {
- private Logger log = LoggerFactory.getLogger(getClass());
+ private final Logger log = LoggerFactory.getLogger(getClass());
- private JdbcQueueService(Connection c) throws SQLException {
- if (c.getAutoCommit()) {
- throw new SQLException("The connection cannot be in auto-commit mode.");
- }
+ private final QueueSystem queueSystem;
- DatabaseMetaData metaData = c.getMetaData();
- String productName = metaData.getDatabaseProductName();
- String productVersion = metaData.getDatabaseProductVersion();
+ private final SqlEffectExecutor sqlEffectExecutor;
- log.info("productName = " + productName);
- log.info("productVersion = " + productVersion);
+ JdbcQueueService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consume(Connection c, Queue queue, QueueService.TaskEffect effect) throws SQLException {
- TaskDao taskDao = createTaskDao(c);
+ public void consumeAll(final Queue queue, final TaskEffect effect) {
+ final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
- List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
- log.trace("Got {} tasks.", tasks.size());
+ List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
+ log.trace("Got {} tasks.", tasks.size());
+ taskDao.setState(tasks, PROCESSING);
+ return tasks;
+ }
+ });
- for (Task task : tasks) {
- log.trace("Executing task {}", task.id());
- try {
- List<Task> newTasks = effect.consume(task);
- log.trace("Executed task {}, newTasks: ", task.id(), newTasks.size());
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ applyTasks(c, effect, queueSystem.createTaskDao(c), tasks);
+ }
+ });
+ }
+
+ public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection connection) throws SQLException {
+ for (Task task : tasks) {
+ final Date run = new Date();
+ log.info("Setting last run on task. date = {}, task = {}", run, task);
+ new TaskDao(connection).update(task.markProcessing());
+ }
+ }
+ });
+
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = new TaskDao(c);
+
+ applyTasks(c, taskEffect, taskDao, tasks);
+ }
+ });
+ }
+
+ /**
+ * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect.
+ */
+ private void applyTasks(Connection c, TaskEffect effect, final TaskDao taskDao, List<Task> tasks) throws SQLException {
+ Task task = null;
+ try {
+ for (int i = 0; i < tasks.size(); i++) {
+ task = tasks.get(i);
+
+ log.info("Executing task {}", task.id());
+
+ List<Task> newTasks = effect.apply(task);
Date now = new Date();
- task = task.registerComplete(now);
+ log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
+
+ task = task.markOk(now);
for (Task newTask : newTasks) {
- taskDao.insert(task.id(), newTask.queue, now, newTask.arguments);
+ schedule(c, newTask);
}
taskDao.update(task);
- } catch (Throwable e) {
- log.error("Unable to execute task, id=" + task.id(), e);
}
- c.commit();
+ } catch (final Exception e) {
+ if (task == null) {
+ return;
+ }
+
+ final Date now = new Date();
+ log.error("Unable to execute task, id=" + task.id(), e);
+
+ try {
+ taskDao.rollback();
+ } catch (SQLException e2) {
+ log.error("Error rolling back transaction after failed apply.", e2);
+ }
+
+ final Task t = task;
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+ Task task = t.markFailed(now);
+ taskDao.update(task);
+ }
+ });
}
}
- public Queue getQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
- QueueDao queueDao = createQueueDao(c);
+ public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
+ QueueDao queueDao = queueSystem.createQueueDao(c);
Queue q = queueDao.findByName(name);
@@ -71,21 +137,23 @@ public class JdbcQueueService {
return q;
}
- public void schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
- TaskDao taskDao = createTaskDao(c);
-
- taskDao.insert(queue.name, scheduled, arguments);
+ public void schedule(Connection c, Task task) throws SQLException {
+ schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
}
- public static JdbcQueueService createQueueService(Connection c) throws SQLException {
- return new JdbcQueueService(c);
+ public Task schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, null, scheduled, arguments);
}
- public QueueDao createQueueDao(Connection c) {
- return new QueueDao(c);
+ public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, parent, scheduled, arguments);
}
- public TaskDao createTaskDao(Connection c) {
- return new TaskDao(c);
+ private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+
+ long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
+
+ return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
}
}