aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/queue
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-09 15:15:46 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-09 15:15:46 +0200
commit33e3be55dc2d815cbd0208bf59d12a7e727f3105 (patch)
treea464f750d2cbdd6cdd805e574dd0aa66fa7027fd /src/main/java/io/trygvis/queue
parent7465fdb9aa847d29dacc56adbe473f1c1ceb298e (diff)
downloadquartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.tar.gz
quartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.tar.bz2
quartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.tar.xz
quartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.zip
wip
Diffstat (limited to 'src/main/java/io/trygvis/queue')
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java142
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java4
-rw-r--r--src/main/java/io/trygvis/queue/QueueSystem.java58
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Task.java34
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java68
-rw-r--r--src/main/java/io/trygvis/queue/TaskEffect.java7
6 files changed, 241 insertions, 72 deletions
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index 793333d..d284287 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -1,61 +1,127 @@
package io.trygvis.queue;
+import io.trygvis.async.SqlEffect;
+import io.trygvis.async.SqlEffectExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import static io.trygvis.queue.Task.TaskState.NEW;
+import static io.trygvis.queue.Task.TaskState.PROCESSING;
+
public class JdbcQueueService {
- private Logger log = LoggerFactory.getLogger(getClass());
+ private final Logger log = LoggerFactory.getLogger(getClass());
- private JdbcQueueService(Connection c) throws SQLException {
- if (c.getAutoCommit()) {
- throw new SQLException("The connection cannot be in auto-commit mode.");
- }
+ private final QueueSystem queueSystem;
- DatabaseMetaData metaData = c.getMetaData();
- String productName = metaData.getDatabaseProductName();
- String productVersion = metaData.getDatabaseProductVersion();
+ private final SqlEffectExecutor sqlEffectExecutor;
- log.info("productName = " + productName);
- log.info("productVersion = " + productVersion);
+ JdbcQueueService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consume(Connection c, Queue queue, QueueService.TaskEffect effect) throws SQLException {
- TaskDao taskDao = createTaskDao(c);
+ public void consumeAll(final Queue queue, final TaskEffect effect) {
+ final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
- List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
- log.trace("Got {} tasks.", tasks.size());
+ List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
+ log.trace("Got {} tasks.", tasks.size());
+ taskDao.setState(tasks, PROCESSING);
+ return tasks;
+ }
+ });
- for (Task task : tasks) {
- log.trace("Executing task {}", task.id());
- try {
- List<Task> newTasks = effect.consume(task);
- log.trace("Executed task {}, newTasks: ", task.id(), newTasks.size());
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ applyTasks(c, effect, queueSystem.createTaskDao(c), tasks);
+ }
+ });
+ }
+
+ public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection connection) throws SQLException {
+ for (Task task : tasks) {
+ final Date run = new Date();
+ log.info("Setting last run on task. date = {}, task = {}", run, task);
+ new TaskDao(connection).update(task.markProcessing());
+ }
+ }
+ });
+
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = new TaskDao(c);
+
+ applyTasks(c, taskEffect, taskDao, tasks);
+ }
+ });
+ }
+
+ /**
+ * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect.
+ */
+ private void applyTasks(Connection c, TaskEffect effect, final TaskDao taskDao, List<Task> tasks) throws SQLException {
+ Task task = null;
+ try {
+ for (int i = 0; i < tasks.size(); i++) {
+ task = tasks.get(i);
+
+ log.info("Executing task {}", task.id());
+
+ List<Task> newTasks = effect.apply(task);
Date now = new Date();
- task = task.registerComplete(now);
+ log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
+
+ task = task.markOk(now);
for (Task newTask : newTasks) {
- taskDao.insert(task.id(), newTask.queue, now, newTask.arguments);
+ schedule(c, newTask);
}
taskDao.update(task);
- } catch (Throwable e) {
- log.error("Unable to execute task, id=" + task.id(), e);
}
- c.commit();
+ } catch (final Exception e) {
+ if (task == null) {
+ return;
+ }
+
+ final Date now = new Date();
+ log.error("Unable to execute task, id=" + task.id(), e);
+
+ try {
+ taskDao.rollback();
+ } catch (SQLException e2) {
+ log.error("Error rolling back transaction after failed apply.", e2);
+ }
+
+ final Task t = task;
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+ Task task = t.markFailed(now);
+ taskDao.update(task);
+ }
+ });
}
}
- public Queue getQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
- QueueDao queueDao = createQueueDao(c);
+ public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
+ QueueDao queueDao = queueSystem.createQueueDao(c);
Queue q = queueDao.findByName(name);
@@ -71,21 +137,23 @@ public class JdbcQueueService {
return q;
}
- public void schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
- TaskDao taskDao = createTaskDao(c);
-
- taskDao.insert(queue.name, scheduled, arguments);
+ public void schedule(Connection c, Task task) throws SQLException {
+ schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
}
- public static JdbcQueueService createQueueService(Connection c) throws SQLException {
- return new JdbcQueueService(c);
+ public Task schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, null, scheduled, arguments);
}
- public QueueDao createQueueDao(Connection c) {
- return new QueueDao(c);
+ public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, parent, scheduled, arguments);
}
- public TaskDao createTaskDao(Connection c) {
- return new TaskDao(c);
+ private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+
+ long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
+
+ return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
}
}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
index 2111013..9773766 100644
--- a/src/main/java/io/trygvis/queue/QueueService.java
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -10,8 +10,4 @@ public interface QueueService {
Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException;
void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException;
-
- public static interface TaskEffect {
- List<Task> consume(Task task) throws Exception;
- }
}
diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java
new file mode 100644
index 0000000..42c8fd8
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueSystem.java
@@ -0,0 +1,58 @@
+package io.trygvis.queue;
+
+import io.trygvis.async.SqlEffect;
+import io.trygvis.async.SqlEffectExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+
+public class QueueSystem {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ public final SqlEffectExecutor sqlEffectExecutor;
+
+ public final JdbcQueueService queueService;
+
+ private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ if (c.getAutoCommit()) {
+ throw new SQLException("The connection cannot be in auto-commit mode.");
+ }
+
+ DatabaseMetaData metaData = c.getMetaData();
+ String productName = metaData.getDatabaseProductName();
+ String productVersion = metaData.getDatabaseProductVersion();
+
+ log.info("productName = " + productName);
+ log.info("productVersion = " + productVersion);
+ }
+ });
+
+ this.sqlEffectExecutor = sqlEffectExecutor;
+ queueService = new JdbcQueueService(this);
+ }
+
+ /**
+ * Initializes the queue system. Use this as the first thing do as it will validate the database.
+ */
+ public static QueueSystem initialize(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
+ return new QueueSystem(sqlEffectExecutor);
+ }
+
+ public JdbcQueueService createQueueService() {
+ return queueService;
+ }
+
+ public QueueDao createQueueDao(Connection c) {
+ return new QueueDao(c);
+ }
+
+ public TaskDao createTaskDao(Connection c) {
+ return new TaskDao(c);
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
index 1af40d7..29e37ac 100755
--- a/src/main/java/io/trygvis/queue/Task.java
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -3,16 +3,26 @@ package io.trygvis.queue;
import java.util.Date;
import java.util.List;
+import static io.trygvis.queue.Task.TaskState.*;
import static java.util.Arrays.asList;
public class Task {
+ public enum TaskState {
+ NEW,
+ PROCESSING,
+ OK,
+ FAILED
+ }
+
private final long id;
public final Long parent;
public final String queue;
+ public final TaskState state;
+
public final Date scheduled;
public final Date lastRun;
@@ -23,10 +33,11 @@ public class Task {
public final List<String> arguments;
- public Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
+ public Task(long id, Long parent, String queue, TaskState state, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
this.id = id;
this.parent = parent;
this.queue = queue;
+ this.state = state;
this.scheduled = scheduled;
this.lastRun = lastRun;
this.runCount = runCount;
@@ -35,12 +46,16 @@ public class Task {
this.arguments = arguments;
}
- public Task registerRun() {
- return new Task(id, parent, queue, scheduled, new Date(), runCount + 1, completed, arguments);
+ public Task markProcessing() {
+ return new Task(id, parent, queue, PROCESSING, scheduled, new Date(), runCount + 1, completed, arguments);
}
- public Task registerComplete(Date completed) {
- return new Task(id, parent, queue, scheduled, lastRun, runCount, completed, arguments);
+ public Task markOk(Date completed) {
+ return new Task(id, parent, queue, OK, scheduled, lastRun, runCount, completed, arguments);
+ }
+
+ public Task markFailed(Date now) {
+ return new Task(id, parent, queue, FAILED, scheduled, lastRun, runCount, completed, arguments);
}
public String toString() {
@@ -48,6 +63,7 @@ public class Task {
"id=" + id +
", parent=" + parent +
", queue=" + queue +
+ ", state=" + state +
", scheduled=" + scheduled +
", lastRun=" + lastRun +
", runCount=" + runCount +
@@ -68,12 +84,16 @@ public class Task {
return completed != null;
}
+ public Task childTask(String name, Date scheduled, String... arguments) {
+ return new Task(0, id(), name, NEW, scheduled, null, 0, null, asList(arguments));
+ }
+
public static Task newTask(String name, Date scheduled, String... arguments) {
- return new Task(0, 0l, name, scheduled, null, 0, null, asList(arguments));
+ return new Task(0, null, name, NEW, scheduled, null, 0, null, asList(arguments));
}
public static Task newTask(String name, Date scheduled, List<String> arguments) {
- return new Task(0, 0l, name, scheduled, null, 0, null, arguments);
+ return new Task(0, null, name, NEW, scheduled, null, 0, null, arguments);
}
public static List<String> stringToArguments(String arguments) {
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index 3aa2ac2..025823b 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -11,27 +11,25 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
+import static io.trygvis.queue.Task.TaskState;
+import static io.trygvis.queue.Task.TaskState.valueOf;
import static io.trygvis.queue.Task.argumentsToString;
import static io.trygvis.queue.Task.stringToArguments;
public class TaskDao {
- private final Connection connection;
+ private final Connection c;
- public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments";
+ public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments";
- public TaskDao(Connection connection) {
- this.connection = connection;
+ TaskDao(Connection c) {
+ this.c = c;
}
- public long insert(String queue, Date scheduled, List<String> arguments) throws SQLException {
- return insert(null, queue, scheduled, arguments);
- }
-
- public long insert(Long parent, String queue, Date scheduled, List<String> arguments) throws SQLException {
- String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " +
- "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)";
- try (PreparedStatement stmt = connection.prepareStatement(sql)) {
+ public long insert(Long parent, String queue, TaskState state, Date scheduled, List<String> arguments) throws SQLException {
+ String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)";
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
int i = 1;
if (parent == null) {
stmt.setNull(i++, Types.BIGINT);
@@ -39,11 +37,12 @@ public class TaskDao {
stmt.setLong(i++, parent);
}
stmt.setString(i++, queue);
+ stmt.setString(i++, state.name());
stmt.setTimestamp(i++, new Timestamp(scheduled.getTime()));
stmt.setString(i, argumentsToString(arguments));
stmt.executeUpdate();
}
- try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) {
ResultSet rs = stmt.executeQuery();
rs.next();
return rs.getLong(1);
@@ -51,7 +50,7 @@ public class TaskDao {
}
public Task findById(long id) throws SQLException {
- try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
stmt.setLong(1, id);
ResultSet rs = stmt.executeQuery();
return rs.next() ? mapRow(rs) : null;
@@ -59,7 +58,7 @@ public class TaskDao {
}
public List<Task> findByNameAndCompletedIsNull(String name) throws SQLException {
- try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) {
int i = 1;
stmt.setString(i, name);
ResultSet rs = stmt.executeQuery();
@@ -72,8 +71,9 @@ public class TaskDao {
}
public void update(Task task) throws SQLException {
- try (PreparedStatement stmt = connection.prepareStatement("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
+ try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
int i = 1;
+ stmt.setString(i++, task.state.name());
stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime()));
setTimestamp(stmt, i++, task.lastRun);
stmt.setInt(i++, task.runCount);
@@ -83,6 +83,19 @@ public class TaskDao {
}
}
+ public void setState(List<Task> tasks, TaskState state) throws SQLException {
+ Long[] ids = new Long[tasks.size()];
+ for (int i = 0, tasksSize = tasks.size(); i < tasksSize; i++) {
+ ids[i] = tasks.get(i).id();
+ }
+ try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ANY (?)")) {
+ int i = 1;
+ stmt.setString(i++, state.name());
+ stmt.setObject(i, c.createArrayOf("bigint", ids));
+ stmt.executeUpdate();
+ }
+ }
+
private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException {
if (date == null) {
stmt.setNull(parameterIndex, Types.TIMESTAMP);
@@ -92,15 +105,22 @@ public class TaskDao {
}
public Task mapRow(ResultSet rs) throws SQLException {
- String arguments = rs.getString(8);
+ String arguments = rs.getString(9);
+ int i = 1;
return new Task(
- rs.getLong(1),
- rs.getLong(2),
- rs.getString(3),
- rs.getTimestamp(4),
- rs.getTimestamp(5),
- rs.getInt(6),
- rs.getTimestamp(7),
+ rs.getLong(i++),
+ rs.getLong(i++),
+ rs.getString(i++),
+ valueOf(rs.getString(i++)),
+ rs.getTimestamp(i++),
+ rs.getTimestamp(i++),
+ rs.getInt(i++),
+ rs.getTimestamp(i),
arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList());
}
+
+ public void rollback() throws SQLException {
+ c.rollback();
+ c.close();
+ }
}
diff --git a/src/main/java/io/trygvis/queue/TaskEffect.java b/src/main/java/io/trygvis/queue/TaskEffect.java
new file mode 100644
index 0000000..186797f
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskEffect.java
@@ -0,0 +1,7 @@
+package io.trygvis.queue;
+
+import java.util.List;
+
+public interface TaskEffect {
+ List<Task> apply(Task task) throws Exception;
+}