aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io')
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java7
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java22
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java84
-rw-r--r--src/main/java/io/trygvis/async/SqlEffectExecutor.java55
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java142
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java4
-rw-r--r--src/main/java/io/trygvis/queue/QueueSystem.java58
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Task.java34
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java68
-rw-r--r--src/main/java/io/trygvis/queue/TaskEffect.java7
-rw-r--r--src/main/java/io/trygvis/spring/DefaultConfig.java18
-rw-r--r--src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java13
-rw-r--r--src/main/java/io/trygvis/spring/SpringQueueService.java32
13 files changed, 355 insertions, 189 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
index 17d53e9..daf99e4 100755
--- a/src/main/java/io/trygvis/async/AsyncService.java
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -2,6 +2,7 @@ package io.trygvis.async;
import io.trygvis.queue.Queue;
import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
import java.util.List;
@@ -10,7 +11,7 @@ import java.util.List;
*/
public interface AsyncService {
- void registerQueue(Queue queue, final AsyncService.AsyncCallable callable);
+ void registerQueue(Queue queue, TaskEffect processor);
Queue getQueue(String name);
@@ -22,8 +23,4 @@ public interface AsyncService {
* Polls for a new state of the execution.
*/
Task update(Task ref);
-
- interface AsyncCallable {
- void run(List<String> arguments) throws Exception;
- }
}
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 310c59b..6baa56e 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -1,8 +1,11 @@
package io.trygvis.async;
+import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
+import io.trygvis.queue.TaskEffect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -23,8 +26,16 @@ public class JdbcAsyncService {
private final Map<String, QueueThread> queues = new HashMap<>();
- public void registerQueue(SqlEffectExecutor sqlEffectExecutor, Queue queue, AsyncService.AsyncCallable callable) {
- final QueueThread queueThread = new QueueThread(sqlEffectExecutor, callable, queue);
+ private final QueueSystem queueSystem;
+ private final JdbcQueueService queueService;
+
+ public JdbcAsyncService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ this.queueService = queueSystem.createQueueService();
+ }
+
+ public void registerQueue(Queue queue, TaskEffect processor) {
+ final QueueThread queueThread = new QueueThread(queueSystem, processor, queue);
queues.put(queue.name, queueThread);
@@ -69,12 +80,11 @@ public class JdbcAsyncService {
}
private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException {
- TaskDao taskDao = new TaskDao(c);
+ TaskDao taskDao = queueSystem.createTaskDao(c);
Date scheduled = new Date();
- long id = taskDao.insert(parent, queue.name, scheduled, args);
- Task task = new Task(id, parent, queue.name, scheduled, null, 0, null, args);
+ Task task = queueService.schedule(c, queue, parent, scheduled, args);
log.info("Created task = {}", task);
return task;
@@ -102,7 +112,7 @@ public class JdbcAsyncService {
}
public Task update(Connection c, Task ref) throws SQLException {
- TaskDao taskDao = new TaskDao(c);
+ TaskDao taskDao = queueSystem.createTaskDao(c);
return taskDao.findById(ref.id());
}
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index 00c46b4..33753a3 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -1,24 +1,27 @@
package io.trygvis.async;
+import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
-import io.trygvis.queue.TaskDao;
+import io.trygvis.queue.TaskEffect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.Date;
import java.util.List;
-import static io.trygvis.async.SqlEffectExecutor.SqlExecutionException;
-
class QueueThread implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
+ private final QueueSystem queueSystem;
+
+ private final JdbcQueueService queueService;
+
private final SqlEffectExecutor sqlEffectExecutor;
- private final AsyncService.AsyncCallable callable;
+ private final TaskEffect taskEffect;
public final Queue queue;
@@ -28,9 +31,11 @@ class QueueThread implements Runnable {
private boolean busy;
- QueueThread(SqlEffectExecutor sqlEffectExecutor, AsyncService.AsyncCallable callable, Queue queue) {
- this.sqlEffectExecutor = sqlEffectExecutor;
- this.callable = callable;
+ QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
+ this.queueService = queueSystem.createQueueService();
+ this.taskEffect = taskEffect;
this.queue = queue;
}
@@ -48,28 +53,17 @@ class QueueThread implements Runnable {
public void run() {
while (shouldRun) {
try {
-// List<Task> tasks = transactionTemplate.execute(new TransactionCallback<List<Task>>() {
-// public List<Task> doInTransaction(TransactionStatus status) {
-// return taskDao.findByNameAndCompletedIsNull(queue.name);
-// }
-// });
- List<Task> tasks = sqlEffectExecutor.execute(new SqlEffect<List<Task>>() {
+ List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
@Override
- public List<Task> doInConnection(Connection connection) throws SQLException {
- return new TaskDao(connection).findByNameAndCompletedIsNull(queue.name);
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByNameAndCompletedIsNull(queue.name);
}
});
log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
if (tasks.size() > 0) {
- for (final Task task : tasks) {
- try {
- executeTask(task);
- } catch (SqlExecutionException | TaskFailureException e) {
- log.warn("Task execution failed", e);
- }
- }
+ queueService.executeTask(taskEffect, tasks);
}
} catch (Throwable e) {
log.warn("Error while executing tasks.", e);
@@ -94,48 +88,4 @@ class QueueThread implements Runnable {
}
}
}
-
- private void executeTask(final Task task) {
- final Date run = new Date();
- log.info("Setting last run on task. date = {}, task = {}", run, task);
- sqlEffectExecutor.execute(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection connection) throws SQLException {
- new TaskDao(connection).update(task.registerRun());
- }
- });
-// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
-// protected void doInTransactionWithoutResult(TransactionStatus status) {
-// taskDao.update(task.registerRun());
-// }
-// });
-
- sqlEffectExecutor.execute(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection c) throws SQLException {
- try {
- callable.run(task.arguments);
- Date completed = new Date();
- Task t = task.registerComplete(completed);
- log.info("Completed task: {}", t);
- new TaskDao(c).update(t);
- } catch (Exception e) {
- throw new TaskFailureException(e);
- }
- }
- });
-// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
-// protected void doInTransactionWithoutResult(TransactionStatus status) {
-// try {
-// callable.run(task.arguments);
-// Date completed = new Date();
-// Task t = task.registerComplete(completed);
-// log.info("Completed task: {}", t);
-// taskDao.update(t);
-// } catch (Exception e) {
-// throw new TaskFailureException(e);
-// }
-// }
-// });
- }
}
diff --git a/src/main/java/io/trygvis/async/SqlEffectExecutor.java b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
index c8abbd3..e03cf5e 100644
--- a/src/main/java/io/trygvis/async/SqlEffectExecutor.java
+++ b/src/main/java/io/trygvis/async/SqlEffectExecutor.java
@@ -12,22 +12,67 @@ public class SqlEffectExecutor {
this.dataSource = dataSource;
}
- public <A> A execute(SqlEffect<A> effect) {
+ public <A> A transaction(SqlEffect<A> effect) {
+/*
+ int pid;
+
try (Connection c = dataSource.getConnection()) {
- return effect.doInConnection(c);
+
+ try (Statement statement = c.createStatement()) {
+ ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()");
+ rs.next();
+ pid = rs.getInt(1);
+ }
+
+ System.out.println("pid = " + pid);
+
+ try {
+ effect.doInConnection(c);
+ c.commit();
+ } catch (SQLException e) {
+ c.rollback();
+ e.printStackTrace();
+ } finally {
+ System.out.println("Closing pid=" + pid);
+ try {
+ c.close();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
} catch (SQLException e) {
+ e.printStackTrace();
throw new SqlExecutionException(e);
}
- }
+*/
- public void execute(SqlEffect.Void effect) {
try (Connection c = dataSource.getConnection()) {
- effect.doInConnection(c);
+ boolean ok = false;
+ try {
+ A a = effect.doInConnection(c);
+ c.commit();
+ ok = true;
+ return a;
+ } finally {
+ if (!ok) {
+ c.rollback();
+ }
+ }
} catch (SQLException e) {
throw new SqlExecutionException(e);
}
}
+ public void transaction(final SqlEffect.Void effect) {
+ transaction(new SqlEffect<Object>() {
+ @Override
+ public Object doInConnection(Connection c) throws SQLException {
+ effect.doInConnection(c);
+ return null;
+ }
+ });
+ }
+
public static class SqlExecutionException extends RuntimeException {
public final SQLException exception;
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index 793333d..d284287 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -1,61 +1,127 @@
package io.trygvis.queue;
+import io.trygvis.async.SqlEffect;
+import io.trygvis.async.SqlEffectExecutor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
-import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import static io.trygvis.queue.Task.TaskState.NEW;
+import static io.trygvis.queue.Task.TaskState.PROCESSING;
+
public class JdbcQueueService {
- private Logger log = LoggerFactory.getLogger(getClass());
+ private final Logger log = LoggerFactory.getLogger(getClass());
- private JdbcQueueService(Connection c) throws SQLException {
- if (c.getAutoCommit()) {
- throw new SQLException("The connection cannot be in auto-commit mode.");
- }
+ private final QueueSystem queueSystem;
- DatabaseMetaData metaData = c.getMetaData();
- String productName = metaData.getDatabaseProductName();
- String productVersion = metaData.getDatabaseProductVersion();
+ private final SqlEffectExecutor sqlEffectExecutor;
- log.info("productName = " + productName);
- log.info("productVersion = " + productVersion);
+ JdbcQueueService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consume(Connection c, Queue queue, QueueService.TaskEffect effect) throws SQLException {
- TaskDao taskDao = createTaskDao(c);
+ public void consumeAll(final Queue queue, final TaskEffect effect) {
+ final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
- List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
- log.trace("Got {} tasks.", tasks.size());
+ List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
+ log.trace("Got {} tasks.", tasks.size());
+ taskDao.setState(tasks, PROCESSING);
+ return tasks;
+ }
+ });
- for (Task task : tasks) {
- log.trace("Executing task {}", task.id());
- try {
- List<Task> newTasks = effect.consume(task);
- log.trace("Executed task {}, newTasks: ", task.id(), newTasks.size());
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ applyTasks(c, effect, queueSystem.createTaskDao(c), tasks);
+ }
+ });
+ }
+
+ public void executeTask(final TaskEffect taskEffect, final List<Task> tasks) {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection connection) throws SQLException {
+ for (Task task : tasks) {
+ final Date run = new Date();
+ log.info("Setting last run on task. date = {}, task = {}", run, task);
+ new TaskDao(connection).update(task.markProcessing());
+ }
+ }
+ });
+
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = new TaskDao(c);
+
+ applyTasks(c, taskEffect, taskDao, tasks);
+ }
+ });
+ }
+
+ /**
+ * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect.
+ */
+ private void applyTasks(Connection c, TaskEffect effect, final TaskDao taskDao, List<Task> tasks) throws SQLException {
+ Task task = null;
+ try {
+ for (int i = 0; i < tasks.size(); i++) {
+ task = tasks.get(i);
+
+ log.info("Executing task {}", task.id());
+
+ List<Task> newTasks = effect.apply(task);
Date now = new Date();
- task = task.registerComplete(now);
+ log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
+
+ task = task.markOk(now);
for (Task newTask : newTasks) {
- taskDao.insert(task.id(), newTask.queue, now, newTask.arguments);
+ schedule(c, newTask);
}
taskDao.update(task);
- } catch (Throwable e) {
- log.error("Unable to execute task, id=" + task.id(), e);
}
- c.commit();
+ } catch (final Exception e) {
+ if (task == null) {
+ return;
+ }
+
+ final Date now = new Date();
+ log.error("Unable to execute task, id=" + task.id(), e);
+
+ try {
+ taskDao.rollback();
+ } catch (SQLException e2) {
+ log.error("Error rolling back transaction after failed apply.", e2);
+ }
+
+ final Task t = task;
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+ Task task = t.markFailed(now);
+ taskDao.update(task);
+ }
+ });
}
}
- public Queue getQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
- QueueDao queueDao = createQueueDao(c);
+ public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
+ QueueDao queueDao = queueSystem.createQueueDao(c);
Queue q = queueDao.findByName(name);
@@ -71,21 +137,23 @@ public class JdbcQueueService {
return q;
}
- public void schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
- TaskDao taskDao = createTaskDao(c);
-
- taskDao.insert(queue.name, scheduled, arguments);
+ public void schedule(Connection c, Task task) throws SQLException {
+ schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
}
- public static JdbcQueueService createQueueService(Connection c) throws SQLException {
- return new JdbcQueueService(c);
+ public Task schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, null, scheduled, arguments);
}
- public QueueDao createQueueDao(Connection c) {
- return new QueueDao(c);
+ public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, parent, scheduled, arguments);
}
- public TaskDao createTaskDao(Connection c) {
- return new TaskDao(c);
+ private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+
+ long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
+
+ return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
}
}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
index 2111013..9773766 100644
--- a/src/main/java/io/trygvis/queue/QueueService.java
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -10,8 +10,4 @@ public interface QueueService {
Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException;
void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException;
-
- public static interface TaskEffect {
- List<Task> consume(Task task) throws Exception;
- }
}
diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java
new file mode 100644
index 0000000..42c8fd8
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueSystem.java
@@ -0,0 +1,58 @@
+package io.trygvis.queue;
+
+import io.trygvis.async.SqlEffect;
+import io.trygvis.async.SqlEffectExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+
+public class QueueSystem {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ public final SqlEffectExecutor sqlEffectExecutor;
+
+ public final JdbcQueueService queueService;
+
+ private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ if (c.getAutoCommit()) {
+ throw new SQLException("The connection cannot be in auto-commit mode.");
+ }
+
+ DatabaseMetaData metaData = c.getMetaData();
+ String productName = metaData.getDatabaseProductName();
+ String productVersion = metaData.getDatabaseProductVersion();
+
+ log.info("productName = " + productName);
+ log.info("productVersion = " + productVersion);
+ }
+ });
+
+ this.sqlEffectExecutor = sqlEffectExecutor;
+ queueService = new JdbcQueueService(this);
+ }
+
+ /**
+ * Initializes the queue system. Use this as the first thing do as it will validate the database.
+ */
+ public static QueueSystem initialize(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
+ return new QueueSystem(sqlEffectExecutor);
+ }
+
+ public JdbcQueueService createQueueService() {
+ return queueService;
+ }
+
+ public QueueDao createQueueDao(Connection c) {
+ return new QueueDao(c);
+ }
+
+ public TaskDao createTaskDao(Connection c) {
+ return new TaskDao(c);
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
index 1af40d7..29e37ac 100755
--- a/src/main/java/io/trygvis/queue/Task.java
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -3,16 +3,26 @@ package io.trygvis.queue;
import java.util.Date;
import java.util.List;
+import static io.trygvis.queue.Task.TaskState.*;
import static java.util.Arrays.asList;
public class Task {
+ public enum TaskState {
+ NEW,
+ PROCESSING,
+ OK,
+ FAILED
+ }
+
private final long id;
public final Long parent;
public final String queue;
+ public final TaskState state;
+
public final Date scheduled;
public final Date lastRun;
@@ -23,10 +33,11 @@ public class Task {
public final List<String> arguments;
- public Task(long id, Long parent, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
+ public Task(long id, Long parent, String queue, TaskState state, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
this.id = id;
this.parent = parent;
this.queue = queue;
+ this.state = state;
this.scheduled = scheduled;
this.lastRun = lastRun;
this.runCount = runCount;
@@ -35,12 +46,16 @@ public class Task {
this.arguments = arguments;
}
- public Task registerRun() {
- return new Task(id, parent, queue, scheduled, new Date(), runCount + 1, completed, arguments);
+ public Task markProcessing() {
+ return new Task(id, parent, queue, PROCESSING, scheduled, new Date(), runCount + 1, completed, arguments);
}
- public Task registerComplete(Date completed) {
- return new Task(id, parent, queue, scheduled, lastRun, runCount, completed, arguments);
+ public Task markOk(Date completed) {
+ return new Task(id, parent, queue, OK, scheduled, lastRun, runCount, completed, arguments);
+ }
+
+ public Task markFailed(Date now) {
+ return new Task(id, parent, queue, FAILED, scheduled, lastRun, runCount, completed, arguments);
}
public String toString() {
@@ -48,6 +63,7 @@ public class Task {
"id=" + id +
", parent=" + parent +
", queue=" + queue +
+ ", state=" + state +
", scheduled=" + scheduled +
", lastRun=" + lastRun +
", runCount=" + runCount +
@@ -68,12 +84,16 @@ public class Task {
return completed != null;
}
+ public Task childTask(String name, Date scheduled, String... arguments) {
+ return new Task(0, id(), name, NEW, scheduled, null, 0, null, asList(arguments));
+ }
+
public static Task newTask(String name, Date scheduled, String... arguments) {
- return new Task(0, 0l, name, scheduled, null, 0, null, asList(arguments));
+ return new Task(0, null, name, NEW, scheduled, null, 0, null, asList(arguments));
}
public static Task newTask(String name, Date scheduled, List<String> arguments) {
- return new Task(0, 0l, name, scheduled, null, 0, null, arguments);
+ return new Task(0, null, name, NEW, scheduled, null, 0, null, arguments);
}
public static List<String> stringToArguments(String arguments) {
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index 3aa2ac2..025823b 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -11,27 +11,25 @@ import java.util.Collections;
import java.util.Date;
import java.util.List;
+import static io.trygvis.queue.Task.TaskState;
+import static io.trygvis.queue.Task.TaskState.valueOf;
import static io.trygvis.queue.Task.argumentsToString;
import static io.trygvis.queue.Task.stringToArguments;
public class TaskDao {
- private final Connection connection;
+ private final Connection c;
- public static final String fields = "id, parent, queue, scheduled, last_run, run_count, completed, arguments";
+ public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments";
- public TaskDao(Connection connection) {
- this.connection = connection;
+ TaskDao(Connection c) {
+ this.c = c;
}
- public long insert(String queue, Date scheduled, List<String> arguments) throws SQLException {
- return insert(null, queue, scheduled, arguments);
- }
-
- public long insert(Long parent, String queue, Date scheduled, List<String> arguments) throws SQLException {
- String sql = "INSERT INTO task(id, parent, run_count, queue, scheduled, arguments) " +
- "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?)";
- try (PreparedStatement stmt = connection.prepareStatement(sql)) {
+ public long insert(Long parent, String queue, TaskState state, Date scheduled, List<String> arguments) throws SQLException {
+ String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)";
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
int i = 1;
if (parent == null) {
stmt.setNull(i++, Types.BIGINT);
@@ -39,11 +37,12 @@ public class TaskDao {
stmt.setLong(i++, parent);
}
stmt.setString(i++, queue);
+ stmt.setString(i++, state.name());
stmt.setTimestamp(i++, new Timestamp(scheduled.getTime()));
stmt.setString(i, argumentsToString(arguments));
stmt.executeUpdate();
}
- try (PreparedStatement stmt = connection.prepareStatement("SELECT currval('task_seq')")) {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) {
ResultSet rs = stmt.executeQuery();
rs.next();
return rs.getLong(1);
@@ -51,7 +50,7 @@ public class TaskDao {
}
public Task findById(long id) throws SQLException {
- try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
stmt.setLong(1, id);
ResultSet rs = stmt.executeQuery();
return rs.next() ? mapRow(rs) : null;
@@ -59,7 +58,7 @@ public class TaskDao {
}
public List<Task> findByNameAndCompletedIsNull(String name) throws SQLException {
- try (PreparedStatement stmt = connection.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND completed IS NULL")) {
int i = 1;
stmt.setString(i, name);
ResultSet rs = stmt.executeQuery();
@@ -72,8 +71,9 @@ public class TaskDao {
}
public void update(Task task) throws SQLException {
- try (PreparedStatement stmt = connection.prepareStatement("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
+ try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
int i = 1;
+ stmt.setString(i++, task.state.name());
stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime()));
setTimestamp(stmt, i++, task.lastRun);
stmt.setInt(i++, task.runCount);
@@ -83,6 +83,19 @@ public class TaskDao {
}
}
+ public void setState(List<Task> tasks, TaskState state) throws SQLException {
+ Long[] ids = new Long[tasks.size()];
+ for (int i = 0, tasksSize = tasks.size(); i < tasksSize; i++) {
+ ids[i] = tasks.get(i).id();
+ }
+ try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ANY (?)")) {
+ int i = 1;
+ stmt.setString(i++, state.name());
+ stmt.setObject(i, c.createArrayOf("bigint", ids));
+ stmt.executeUpdate();
+ }
+ }
+
private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException {
if (date == null) {
stmt.setNull(parameterIndex, Types.TIMESTAMP);
@@ -92,15 +105,22 @@ public class TaskDao {
}
public Task mapRow(ResultSet rs) throws SQLException {
- String arguments = rs.getString(8);
+ String arguments = rs.getString(9);
+ int i = 1;
return new Task(
- rs.getLong(1),
- rs.getLong(2),
- rs.getString(3),
- rs.getTimestamp(4),
- rs.getTimestamp(5),
- rs.getInt(6),
- rs.getTimestamp(7),
+ rs.getLong(i++),
+ rs.getLong(i++),
+ rs.getString(i++),
+ valueOf(rs.getString(i++)),
+ rs.getTimestamp(i++),
+ rs.getTimestamp(i++),
+ rs.getInt(i++),
+ rs.getTimestamp(i),
arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList());
}
+
+ public void rollback() throws SQLException {
+ c.rollback();
+ c.close();
+ }
}
diff --git a/src/main/java/io/trygvis/queue/TaskEffect.java b/src/main/java/io/trygvis/queue/TaskEffect.java
new file mode 100644
index 0000000..186797f
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskEffect.java
@@ -0,0 +1,7 @@
+package io.trygvis.queue;
+
+import java.util.List;
+
+public interface TaskEffect {
+ List<Task> apply(Task task) throws Exception;
+}
diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java
index 68761f2..3ba28de 100644
--- a/src/main/java/io/trygvis/spring/DefaultConfig.java
+++ b/src/main/java/io/trygvis/spring/DefaultConfig.java
@@ -1,21 +1,31 @@
package io.trygvis.spring;
import io.trygvis.async.AsyncService;
+import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jdbc.core.JdbcTemplate;
+import javax.sql.DataSource;
+import java.sql.SQLException;
+
@Configuration
public class DefaultConfig {
@Bean
- public AsyncService asyncService(JdbcTemplate jdbcTemplate) {
- return new SpringJdbcAsyncService(jdbcTemplate);
+ public QueueSystem queueSystem(DataSource ds) throws SQLException {
+ return QueueSystem.initialize(new SqlEffectExecutor(ds));
+ }
+
+ @Bean
+ public AsyncService asyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ return new SpringJdbcAsyncService(queueSystem, jdbcTemplate);
}
@Bean
- public QueueService queueService(JdbcTemplate jdbcTemplate) {
- return new SpringQueueService(jdbcTemplate);
+ public QueueService queueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ return new SpringQueueService(queueSystem, jdbcTemplate);
}
}
diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
index 6702642..96442e6 100644
--- a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
+++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
@@ -4,7 +4,9 @@ import io.trygvis.async.AsyncService;
import io.trygvis.async.JdbcAsyncService;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.ConnectionCallback;
@@ -29,19 +31,16 @@ public class SpringJdbcAsyncService implements AsyncService {
private final JdbcTemplate jdbcTemplate;
- private final SqlEffectExecutor sqlEffectExecutor;
-
private final JdbcAsyncService jdbcAsyncService;
- public SpringJdbcAsyncService(JdbcTemplate jdbcTemplate) {
+ public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
- jdbcAsyncService = new JdbcAsyncService();
- sqlEffectExecutor = new SqlEffectExecutor(this.jdbcTemplate.getDataSource());
+ jdbcAsyncService = new JdbcAsyncService(queueSystem);
}
@Transactional(propagation = REQUIRED)
- public void registerQueue(final Queue queue, final AsyncService.AsyncCallable callable) {
- jdbcAsyncService.registerQueue(sqlEffectExecutor, queue, callable);
+ public void registerQueue(final Queue queue, final TaskEffect processor) {
+ jdbcAsyncService.registerQueue(queue, processor);
registerSynchronization(new TransactionSynchronizationAdapter() {
public void afterCompletion(int status) {
diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java
index 3432e35..21746e5 100644
--- a/src/main/java/io/trygvis/spring/SpringQueueService.java
+++ b/src/main/java/io/trygvis/spring/SpringQueueService.java
@@ -3,48 +3,34 @@ package io.trygvis.spring;
import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.TaskEffect;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.transaction.annotation.Transactional;
-import javax.annotation.PostConstruct;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
-import static io.trygvis.queue.JdbcQueueService.createQueueService;
-
public class SpringQueueService implements QueueService {
public final JdbcTemplate jdbcTemplate;
public JdbcQueueService queueService;
- public SpringQueueService(JdbcTemplate jdbcTemplate) {
+ public SpringQueueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
+ this.queueService = queueSystem.queueService;
}
- @PostConstruct
- public void postConstruct() {
- queueService = jdbcTemplate.execute(new ConnectionCallback<JdbcQueueService>() {
- @Override
- public JdbcQueueService doInConnection(Connection c) throws SQLException, DataAccessException {
- return createQueueService(c);
- }
- });
- }
-
- @Transactional
+ /**
+ * @see JdbcQueueService#consumeAll(io.trygvis.queue.Queue, io.trygvis.queue.TaskEffect)
+ */
public void consume(final Queue queue, final TaskEffect effect) throws SQLException {
- jdbcTemplate.execute(new ConnectionCallback<Object>() {
- @Override
- public Object doInConnection(Connection c) throws SQLException, DataAccessException {
- queueService.consume(c, queue, effect);
- return null;
- }
- });
+ queueService.consumeAll(queue, effect);
}
@Transactional
@@ -52,7 +38,7 @@ public class SpringQueueService implements QueueService {
return jdbcTemplate.execute(new ConnectionCallback<Queue>() {
@Override
public Queue doInConnection(Connection c) throws SQLException, DataAccessException {
- return queueService.getQueue(c, name, interval, autoCreate);
+ return queueService.lookupQueue(c, name, interval, autoCreate);
}
});
}