aboutsummaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-23 09:37:57 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-23 09:37:57 +0200
commit7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 (patch)
treec0bd7202845697207b04d518f613588df17d9e12 /src/main/java
downloadjdbc-queue-master.tar.gz
jdbc-queue-master.tar.bz2
jdbc-queue-master.tar.xz
jdbc-queue-master.zip
o Initial import of JDBC queue.HEADmaster
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/io/trygvis/activemq/ActiveMqHinter.java152
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java30
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java75
-rw-r--r--src/main/java/io/trygvis/async/QueueController.java201
-rw-r--r--src/main/java/io/trygvis/async/QueueStats.java28
-rw-r--r--src/main/java/io/trygvis/async/TaskFailureException.java7
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java55
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Queue.java24
-rw-r--r--src/main/java/io/trygvis/queue/QueueDao.java45
-rw-r--r--src/main/java/io/trygvis/queue/QueueExecutor.java177
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java54
-rw-r--r--src/main/java/io/trygvis/queue/QueueStats.java20
-rw-r--r--src/main/java/io/trygvis/queue/QueueSystem.java61
-rw-r--r--src/main/java/io/trygvis/queue/SqlEffect.java12
-rw-r--r--src/main/java/io/trygvis/queue/SqlEffectExecutor.java50
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Task.java117
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java148
-rw-r--r--src/main/java/io/trygvis/queue/TaskEffect.java7
-rw-r--r--src/main/java/io/trygvis/spring/DefaultConfig.java31
-rw-r--r--src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java110
-rw-r--r--src/main/java/io/trygvis/spring/SpringQueueService.java49
21 files changed, 1453 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/activemq/ActiveMqHinter.java b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
new file mode 100644
index 0000000..f2cfb6e
--- /dev/null
+++ b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
@@ -0,0 +1,152 @@
+package io.trygvis.activemq;
+
+import io.trygvis.async.QueueController;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.TextMessage;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.sql.SQLException;
+
+import static java.lang.Long.parseLong;
+import static java.lang.System.arraycopy;
+import static java.nio.charset.Charset.forName;
+import static javax.jms.Session.AUTO_ACKNOWLEDGE;
+
+public class ActiveMqHinter implements AutoCloseable {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueConnection c;
+
+ private static final Charset utf8 = forName("utf-8");
+
+ public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException {
+ log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL());
+ c = connectionFactory.createQueueConnection();
+ c.start();
+ log.info("Connected, clientId = {}", c.getClientID());
+ }
+
+ public void createHinter(final QueueController controller) throws JMSException {
+ QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(controller.queue.queue.name);
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ if ((message instanceof TextMessage)) {
+ String body;
+ try {
+ TextMessage textMessage = (TextMessage) message;
+ body = textMessage.getText();
+ } catch (JMSException e) {
+ log.warn("Exception while reading body.", e);
+ throw new RuntimeException("Exception while reading body.", e);
+ }
+
+ consumeString(new StringReader(body), controller);
+ } else if (message instanceof BytesMessage) {
+ final BytesMessage bytesMessage = (BytesMessage) message;
+ consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller);
+ } else {
+ throw new RuntimeException("Unsupported message type: " + message.getClass());
+ }
+ }
+ });
+
+ controller.addOnStopListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ consumer.close();
+ } catch (JMSException e) {
+ log.error("Error while closing JMS consumer", e);
+ }
+ }
+ });
+ }
+
+ private void consumeString(Reader reader, QueueController controller) {
+ try {
+ BufferedReader r = new BufferedReader(reader);
+
+ String line = r.readLine();
+
+ while (line != null) {
+ for (String id : line.split(",")) {
+ controller.hint(parseLong(id.trim()));
+ }
+ line = r.readLine();
+ }
+ } catch (IOException | SQLException e) {
+ log.warn("Could not consume body.", e);
+ throw new RuntimeException("Could not consume body.", e);
+ } catch (NumberFormatException e) {
+ log.warn("Could not consume body.", e);
+ throw e;
+ }
+ }
+
+ public void close() throws JMSException {
+ c.close();
+ }
+
+ private static class ByteMessageInputStream extends InputStream {
+ private final BytesMessage bytesMessage;
+
+ public ByteMessageInputStream(BytesMessage bytesMessage) {
+ this.bytesMessage = bytesMessage;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ try {
+ return bytesMessage.readBytes(b);
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public int read(byte[] out, int off, int len) throws IOException {
+ byte[] b = new byte[len];
+ try {
+ int read = bytesMessage.readBytes(b);
+ if (read == -1) {
+ return -1;
+ }
+ arraycopy(b, 0, out, off, read);
+ return read;
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ try {
+ return bytesMessage.readByte();
+ } catch (javax.jms.MessageEOFException e) {
+ return -1;
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
new file mode 100755
index 0000000..9332596
--- /dev/null
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -0,0 +1,30 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * A simple framework for running tasks.
+ */
+public interface AsyncService {
+
+ QueueController registerQueue(Queue queue, QueueService.TaskExecutionRequest req, TaskEffect processor) throws SQLException;
+
+ QueueExecutor getQueue(String name);
+
+ Task schedule(Queue queue, Date scheduled, List<String> args);
+
+ Task schedule(Queue queue, long parent, Date scheduled, List<String> args);
+
+ /**
+ * Polls for a new state of the execution.
+ */
+ Task update(Task ref);
+}
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
new file mode 100644
index 0000000..46f1f30
--- /dev/null
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -0,0 +1,75 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.lang.System.currentTimeMillis;
+import static java.lang.Thread.sleep;
+
+public class JdbcAsyncService {
+ private final Map<String, QueueController> queues = new HashMap<>();
+
+ private final QueueSystem queueSystem;
+
+ public JdbcAsyncService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ }
+
+ public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) {
+ if (queues.containsKey(queue.queue.name)) {
+ throw new IllegalArgumentException("Queue already exist.");
+ }
+
+ QueueController queueController = new QueueController(queueSystem, req, processor, queue);
+
+ queues.put(queue.queue.name, queueController);
+
+ return queueController;
+ }
+
+ public QueueExecutor getQueue(String name) {
+ return getQueueThread(name).queue;
+ }
+
+ public Task await(Connection c, Task task, long timeout) throws SQLException {
+ final long start = currentTimeMillis();
+ final long end = start + timeout;
+
+ while (currentTimeMillis() < end) {
+ task = update(c, task);
+
+ if (task == null) {
+ throw new RuntimeException("The task went away.");
+ }
+
+ try {
+ sleep(100);
+ } catch (InterruptedException e) {
+ // break
+ }
+ }
+
+ return task;
+ }
+
+ public Task update(Connection c, Task ref) throws SQLException {
+ return queueSystem.createTaskDao(c).findById(ref.id());
+ }
+
+ private synchronized QueueController getQueueThread(String name) {
+ QueueController queueController = queues.get(name);
+
+ if (queueController == null) {
+ throw new RuntimeException("No such queue: '" + name + "'.");
+ }
+ return queueController;
+ }
+}
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java
new file mode 100644
index 0000000..a343d42
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueController.java
@@ -0,0 +1,201 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.SqlEffect;
+import io.trygvis.queue.SqlEffectExecutor;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult;
+import static io.trygvis.queue.QueueService.TaskExecutionRequest;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
+public class QueueController {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ private final TaskEffect taskEffect;
+
+ private final TaskExecutionRequest req;
+
+ public final QueueExecutor queue;
+
+ private boolean shouldRun = true;
+
+ private boolean checkForNewTasks;
+
+ private boolean running;
+
+ private Thread thread;
+
+ private ScheduledThreadPoolExecutor executor;
+
+ private List<Runnable> stopListeners = new ArrayList<>();
+
+ public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) {
+ this.queueSystem = queueSystem;
+ this.req = req;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
+ this.taskEffect = taskEffect;
+ this.queue = queue;
+ }
+
+ public void scheduleAll(final List<Task> tasks) throws InterruptedException {
+ ExecutorCompletionService<TaskExecutionResult> service = new ExecutorCompletionService<>(executor);
+
+ for (final Task task : tasks) {
+ service.submit(new Callable<TaskExecutionResult>() {
+ @Override
+ public TaskExecutionResult call() throws Exception {
+ return queue.applyTask(taskEffect, task);
+ }
+ });
+ }
+ }
+
+ public void invokeAll(final List<Task> tasks) throws InterruptedException {
+ List<Callable<TaskExecutionResult>> callables = new ArrayList<>(tasks.size());
+ for (final Task task : tasks) {
+ callables.add(new Callable<TaskExecutionResult>() {
+ @Override
+ public TaskExecutionResult call() throws Exception {
+ return queue.applyTask(taskEffect, task);
+ }
+ });
+ }
+
+ executor.invokeAll(callables);
+ }
+
+ public void hint(final long id) throws SQLException {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ Task task = queueSystem.createTaskDao(c).findById(id);
+
+ try {
+ scheduleAll(Collections.singletonList(task));
+ } catch (InterruptedException e) {
+ throw new SQLException(e);
+ }
+ }
+ });
+ }
+
+ public synchronized void addOnStopListener(Runnable runnable) {
+ stopListeners.add(runnable);
+ }
+
+ private class QueueThread implements Runnable {
+ public void run() {
+ while (shouldRun) {
+ List<Task> tasks = null;
+
+ try {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name);
+
+ if (tasks.size() > 0) {
+ invokeAll(tasks);
+ }
+ } catch (Throwable e) {
+ if (shouldRun) {
+ log.warn("Error while executing tasks.", e);
+ }
+ }
+
+ // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
+ if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) {
+ log.info("Got a full chunk, continuing directly.");
+ continue;
+ }
+
+ synchronized (this) {
+ if (checkForNewTasks) {
+ log.info("Ping received!");
+ checkForNewTasks = false;
+ } else {
+ try {
+ wait(req.interval(queue.queue));
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ log.info("Thread for queue {} has stopped.", queue.queue.name);
+ running = false;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+ }
+
+ public synchronized void start(ScheduledThreadPoolExecutor executor) {
+ if (running) {
+ throw new IllegalStateException("Already running");
+ }
+
+ log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval);
+
+ running = true;
+ this.executor = executor;
+
+ thread = new Thread(new QueueThread(), "queue: " + queue.queue.name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public synchronized void stop() {
+ if (!running) {
+ return;
+ }
+
+ log.info("Stopping thread for queue {}", queue.queue.name);
+
+ for (Runnable runnable : stopListeners) {
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ log.error("Error while running stop listener " + runnable, e);
+ }
+ }
+
+ shouldRun = false;
+
+ // TODO: empty out the currently executing tasks.
+
+ thread.interrupt();
+ while (running) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ thread.interrupt();
+ }
+ thread = null;
+ executor.shutdownNow();
+ }
+}
diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java
new file mode 100644
index 0000000..8edc720
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueStats.java
@@ -0,0 +1,28 @@
+package io.trygvis.async;
+
+public class QueueStats {
+ public final int totalMessageCount;
+ public final int okMessageCount;
+ public final int failedMessageCount;
+ public final int missedMessageCount;
+ public final int scheduledMessageCount;
+
+ public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) {
+ this.totalMessageCount = totalMessageCount;
+ this.okMessageCount = okMessageCount;
+ this.failedMessageCount = failedMessageCount;
+ this.missedMessageCount = missedMessageCount;
+ this.scheduledMessageCount = scheduledMessageCount;
+ }
+
+ @Override
+ public String toString() {
+ return "QueueStats{" +
+ "totalMessageCount=" + totalMessageCount +
+ ", okMessageCount=" + okMessageCount +
+ ", failedMessageCount=" + failedMessageCount +
+ ", missedMessageCount=" + missedMessageCount +
+ ", scheduledMessageCount=" + scheduledMessageCount +
+ '}';
+ }
+}
diff --git a/src/main/java/io/trygvis/async/TaskFailureException.java b/src/main/java/io/trygvis/async/TaskFailureException.java
new file mode 100644
index 0000000..7278e17
--- /dev/null
+++ b/src/main/java/io/trygvis/async/TaskFailureException.java
@@ -0,0 +1,7 @@
+package io.trygvis.async;
+
+class TaskFailureException extends RuntimeException {
+ public TaskFailureException(Exception e) {
+ super(e);
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
new file mode 100644
index 0000000..ef0b5bb
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -0,0 +1,55 @@
+package io.trygvis.queue;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+public class JdbcQueueService {
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ private final Map<String, QueueExecutor> queues = new HashMap<>();
+
+ JdbcQueueService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
+ }
+
+ public synchronized QueueExecutor getQueue(String name) {
+ QueueExecutor queueExecutor = queues.get(name);
+
+ if (queueExecutor != null) {
+ return queueExecutor;
+ }
+
+ throw new IllegalArgumentException("No such queue: " + name);
+ }
+
+ public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException {
+ QueueExecutor queueExecutor = queues.get(name);
+
+ if (queueExecutor != null) {
+ return queueExecutor;
+ }
+
+ QueueDao queueDao = queueSystem.createQueueDao(c);
+
+ Queue q = queueDao.findByName(name);
+
+ if (q == null) {
+ if (!autoCreate) {
+ throw new SQLException("No such queue: '" + name + "'.");
+ }
+
+ q = new Queue(name, interval);
+ queueDao.insert(q);
+ }
+
+ queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q);
+ queues.put(name, queueExecutor);
+ return queueExecutor;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java
new file mode 100755
index 0000000..15003f7
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Queue.java
@@ -0,0 +1,24 @@
+package io.trygvis.queue;
+
+public class Queue {
+
+ public final String name;
+
+ public final long interval;
+
+ public Queue(String name, long interval) {
+ this.name = name;
+ this.interval = interval;
+ }
+
+ public Queue withInterval(int interval) {
+ return new Queue(name, interval);
+ }
+
+ public String toString() {
+ return "Queue{" +
+ "name='" + name + '\'' +
+ ", interval=" + interval +
+ '}';
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java
new file mode 100644
index 0000000..2f69e11
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueDao.java
@@ -0,0 +1,45 @@
+package io.trygvis.queue;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+
+public class QueueDao {
+
+ private final Connection connection;
+
+ public QueueDao(Connection connection) {
+ this.connection = connection;
+ }
+
+ public Queue findByName(String name) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("SELECT name, interval FROM queue WHERE name=?")) {
+ stmt.setString(1, name);
+ ResultSet rs = stmt.executeQuery();
+ return rs.next() ? mapRow(rs) : null;
+ }
+ }
+
+ public void insert(Queue q) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("INSERT INTO queue(name, interval) VALUES(?, ?)")) {
+ int i = 1;
+ stmt.setString(i++, q.name);
+ stmt.setLong(i, q.interval);
+ stmt.executeUpdate();
+ }
+ }
+
+ public void update(Queue q) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("UPDATE queue SET interval=? WHERE name=?")) {
+ int i = 1;
+ stmt.setLong(i++, q.interval);
+ stmt.setString(i, q.name);
+ stmt.executeUpdate();
+ }
+ }
+
+ public Queue mapRow(ResultSet rs) throws SQLException {
+ return new Queue(rs.getString(1), rs.getLong(2));
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java
new file mode 100644
index 0000000..88e5b46
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueExecutor.java
@@ -0,0 +1,177 @@
+package io.trygvis.queue;
+
+import io.trygvis.async.QueueStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
+public class QueueExecutor {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ public final Queue queue;
+
+ private final Stats stats = new Stats();
+
+ public enum TaskExecutionResult {
+ OK,
+ FAILED,
+ MISSED
+ }
+
+ public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = sqlEffectExecutor;
+ this.queue = queue;
+ }
+
+ private static class Stats {
+ public int total;
+ public int ok;
+ public int failed;
+ public int scheduled;
+ public int missed;
+
+ public synchronized QueueStats toStats() {
+ return new QueueStats(total, ok, failed, missed, scheduled);
+ }
+ }
+
+ public QueueStats getStats() {
+ return stats.toStats();
+ }
+
+ public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
+ log.info("Consuming tasks: request={}", req);
+
+ List<Task> tasks;
+ do {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Consuming chunk with {} tasks", tasks.size());
+
+ applyTasks(req, effect, tasks);
+ } while (tasks.size() > 0);
+ }
+
+ public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
+ for (Task task : tasks) {
+ TaskExecutionResult result = applyTask(effect, task);
+
+ if (result == FAILED && req.stopOnError) {
+ throw new RuntimeException("Error while executing task, id=" + task.id());
+ }
+ }
+ }
+
+ /**
+ * Executed each task in its own transaction.
+ * <p/>
+ * If the task fails, the status is set to error in a separate transaction.
+ */
+ public TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
+ try {
+ Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
+ @Override
+ public Integer doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW);
+ }
+ });
+
+ if (count == 0) {
+ log.warn("Missed task {}", task.id());
+ synchronized (stats) {
+ stats.missed++;
+ }
+ return MISSED;
+ }
+
+ log.info("Executing task {}", task.id());
+
+ final List<Task> newTasks = effect.apply(task);
+
+ final Date now = new Date();
+
+ log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
+
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ for (Task newTask : newTasks) {
+ schedule(c, newTask);
+ }
+
+ queueSystem.createTaskDao(c).update(task.markOk(now));
+ }
+ });
+
+ synchronized (stats) {
+ stats.total++;
+ stats.ok++;
+ }
+
+ return OK;
+ } catch (Exception e) {
+ final Date now = new Date();
+ log.error("Unable to execute task, id=" + task.id(), e);
+
+ synchronized (stats) {
+ stats.total++;
+ stats.failed++;
+ }
+
+ try {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+ taskDao.update(task.markFailed(now));
+ }
+ });
+ } catch (SQLException e1) {
+ log.error("Error while marking task as failed.", e1);
+ }
+
+ return FAILED;
+ }
+ }
+
+ public void schedule(Connection c, Task task) throws SQLException {
+ schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
+ }
+
+ public Task schedule(Connection c, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, null, scheduled, arguments);
+ }
+
+ public Task schedule(Connection c, long parent, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, parent, scheduled, arguments);
+ }
+
+ private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+
+ long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
+
+ synchronized (stats) {
+ stats.scheduled++;
+ }
+
+ return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
new file mode 100644
index 0000000..1c38f1f
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -0,0 +1,54 @@
+package io.trygvis.queue;
+
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+public interface QueueService {
+ QueueExecutor getQueue(String name, int interval, boolean autoCreate) throws SQLException;
+
+ void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException;
+
+ public static class TaskExecutionRequest {
+ public final long chunkSize;
+ public final boolean stopOnError;
+ public final Long interval;
+ public final boolean continueOnFullChunk;
+ // TODO: saveExceptions
+
+ public TaskExecutionRequest(long chunkSize, boolean stopOnError) {
+ this(chunkSize, stopOnError, null, true);
+ }
+
+ private TaskExecutionRequest(long chunkSize, boolean stopOnError, Long interval, boolean continueOnFullChunk) {
+ this.chunkSize = chunkSize;
+ this.stopOnError = stopOnError;
+ this.interval = interval;
+ this.continueOnFullChunk = continueOnFullChunk;
+ }
+
+ public TaskExecutionRequest interval(long interval) {
+ return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk);
+ }
+
+ public TaskExecutionRequest continueOnFullChunk(boolean continueOnFullChunk) {
+ return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk);
+ }
+
+ @Override
+ public String toString() {
+ return "TaskExecutionRequest{" +
+ "chunkSize=" + chunkSize +
+ ", stopOnError=" + stopOnError +
+ '}';
+ }
+
+ public long interval(Queue queue) {
+ if (interval != null) {
+ return interval;
+ }
+
+ return queue.interval;
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueStats.java b/src/main/java/io/trygvis/queue/QueueStats.java
new file mode 100644
index 0000000..5b048b3
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueStats.java
@@ -0,0 +1,20 @@
+package io.trygvis.queue;
+
+import java.util.EnumMap;
+
+public class QueueStats {
+ public final String name;
+ public final long totalTaskCount;
+ public final EnumMap<Task.TaskState, Long> states;
+
+ public QueueStats(String name, EnumMap<Task.TaskState, Long> states) {
+ this.name = name;
+ this.states = states;
+
+ long c = 0;
+ for (Long l : states.values()) {
+ c += l;
+ }
+ this.totalTaskCount = c;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java
new file mode 100644
index 0000000..5954526
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueSystem.java
@@ -0,0 +1,61 @@
+package io.trygvis.queue;
+
+import io.trygvis.async.JdbcAsyncService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+
+public class QueueSystem {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ public final SqlEffectExecutor sqlEffectExecutor;
+
+ private final JdbcQueueService queueService;
+
+ private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ if (c.getAutoCommit()) {
+ throw new SQLException("The connection cannot be in auto-commit mode.");
+ }
+
+ DatabaseMetaData metaData = c.getMetaData();
+ String productName = metaData.getDatabaseProductName();
+ String productVersion = metaData.getDatabaseProductVersion();
+
+ log.info("productName = " + productName);
+ log.info("productVersion = " + productVersion);
+ }
+ });
+
+ this.sqlEffectExecutor = sqlEffectExecutor;
+ queueService = new JdbcQueueService(this);
+ }
+
+ /**
+ * Initializes the queue system. Use this as the first thing do as it will validate the database.
+ */
+ public static QueueSystem initialize(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
+ return new QueueSystem(sqlEffectExecutor);
+ }
+
+ public JdbcQueueService createQueueService() {
+ return queueService;
+ }
+
+ public QueueDao createQueueDao(Connection c) {
+ return new QueueDao(c);
+ }
+
+ public TaskDao createTaskDao(Connection c) {
+ return new TaskDao(c);
+ }
+
+ public JdbcAsyncService createAsyncService() {
+ return new JdbcAsyncService(this);
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/SqlEffect.java b/src/main/java/io/trygvis/queue/SqlEffect.java
new file mode 100644
index 0000000..7864bcd
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/SqlEffect.java
@@ -0,0 +1,12 @@
+package io.trygvis.queue;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public interface SqlEffect<A> {
+ A doInConnection(Connection c) throws SQLException;
+
+ interface Void {
+ void doInConnection(Connection c) throws SQLException;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/SqlEffectExecutor.java b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java
new file mode 100644
index 0000000..92802da
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java
@@ -0,0 +1,50 @@
+package io.trygvis.queue;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+
+public class SqlEffectExecutor {
+
+ private final DataSource dataSource;
+
+ public SqlEffectExecutor(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+
+ public <A> A transaction(SqlEffect<A> effect) throws SQLException {
+// int pid;
+
+ try (Connection c = dataSource.getConnection()) {
+// pid = getPid(c);
+// System.out.println("pid = " + pid);
+
+ boolean ok = false;
+ try {
+ A a = effect.doInConnection(c);
+ c.commit();
+ ok = true;
+ return a;
+ } finally {
+// System.out.println("Closing, pid = " + pid);
+ if (!ok) {
+ try {
+ c.rollback();
+ } catch (SQLException e) {
+ // ignore
+ }
+ }
+ }
+ }
+ }
+
+ public void transaction(final SqlEffect.Void effect) throws SQLException {
+ transaction(new SqlEffect<Object>() {
+ @Override
+ public Object doInConnection(Connection c) throws SQLException {
+ effect.doInConnection(c);
+ return null;
+ }
+ });
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
new file mode 100755
index 0000000..8038050
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -0,0 +1,117 @@
+package io.trygvis.queue;
+
+import java.util.Date;
+import java.util.List;
+
+import static io.trygvis.queue.Task.TaskState.*;
+import static java.util.Arrays.asList;
+
+public class Task {
+
+ public enum TaskState {
+ NEW,
+ PROCESSING,
+ OK,
+ FAILED
+ }
+
+ private final long id;
+
+ public final Long parent;
+
+ public final String queue;
+
+ public final TaskState state;
+
+ public final Date scheduled;
+
+ public final Date lastRun;
+
+ public final int runCount;
+
+ public final Date completed;
+
+ public final List<String> arguments;
+
+ public Task(long id, Long parent, String queue, TaskState state, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
+ this.id = id;
+ this.parent = parent;
+ this.queue = queue;
+ this.state = state;
+ this.scheduled = scheduled;
+ this.lastRun = lastRun;
+ this.runCount = runCount;
+ this.completed = completed;
+
+ this.arguments = arguments;
+ }
+
+ public Task markProcessing() {
+ return new Task(id, parent, queue, PROCESSING, scheduled, new Date(), runCount + 1, completed, arguments);
+ }
+
+ public Task markOk(Date completed) {
+ return new Task(id, parent, queue, OK, scheduled, lastRun, runCount, completed, arguments);
+ }
+
+ public Task markFailed(Date now) {
+ return new Task(id, parent, queue, FAILED, scheduled, lastRun, runCount, completed, arguments);
+ }
+
+ public String toString() {
+ return "Task{" +
+ "id=" + id +
+ ", parent=" + parent +
+ ", queue=" + queue +
+ ", state=" + state +
+ ", scheduled=" + scheduled +
+ ", lastRun=" + lastRun +
+ ", runCount=" + runCount +
+ ", completed=" + completed +
+ ", arguments='" + arguments + '\'' +
+ '}';
+ }
+
+ public long id() {
+ if (id == 0) {
+ throw new RuntimeException("This task has not been persisted yet.");
+ }
+
+ return id;
+ }
+
+ public boolean isDone() {
+ return completed != null;
+ }
+
+ public Task childTask(String queue, Date scheduled, String... arguments) {
+ return new Task(0, id(), queue, NEW, scheduled, null, 0, null, asList(arguments));
+ }
+
+ public static Task newTask(String queue, Date scheduled, String... arguments) {
+ return new Task(0, null, queue, NEW, scheduled, null, 0, null, asList(arguments));
+ }
+
+ public static Task newTask(String queue, Date scheduled, List<String> arguments) {
+ return new Task(0, null, queue, NEW, scheduled, null, 0, null, arguments);
+ }
+
+ public static List<String> stringToArguments(String arguments) {
+ return asList(arguments.split(","));
+ }
+
+ public static String argumentsToString(List<String> arguments) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0, argumentsLength = arguments.size(); i < argumentsLength; i++) {
+ String argument = arguments.get(i);
+ if (argument.contains(",")) {
+ throw new RuntimeException("The argument string can't contain a comma.");
+ }
+ if (i > 0) {
+ builder.append(',');
+ }
+ builder.append(argument);
+ }
+ return builder.toString();
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
new file mode 100644
index 0000000..365b44b
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -0,0 +1,148 @@
+package io.trygvis.queue;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.List;
+
+import static io.trygvis.queue.Task.*;
+
+public class TaskDao {
+
+ private final Connection c;
+
+ public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments";
+
+ TaskDao(Connection c) {
+ this.c = c;
+ }
+
+ public long insert(Long parent, String queue, TaskState state, Date scheduled, List<String> arguments) throws SQLException {
+ String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)";
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
+ int i = 1;
+ if (parent == null) {
+ stmt.setNull(i++, Types.BIGINT);
+ } else {
+ stmt.setLong(i++, parent);
+ }
+ stmt.setString(i++, queue);
+ stmt.setString(i++, state.name());
+ stmt.setTimestamp(i++, new Timestamp(scheduled.getTime()));
+ stmt.setString(i, argumentsToString(arguments));
+ stmt.executeUpdate();
+ }
+ try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) {
+ ResultSet rs = stmt.executeQuery();
+ rs.next();
+ return rs.getLong(1);
+ }
+ }
+
+ public Task findById(long id) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
+ stmt.setLong(1, id);
+ ResultSet rs = stmt.executeQuery();
+ return rs.next() ? mapRow(rs) : null;
+ }
+ }
+
+ public List<Task> findByQueueAndState(String queue, TaskState state, long limit) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) {
+ int i = 1;
+ stmt.setString(i++, queue);
+ stmt.setString(i++, state.name());
+ stmt.setLong(i, limit);
+ ResultSet rs = stmt.executeQuery();
+ List<Task> list = new ArrayList<>();
+ while (rs.next()) {
+ list.add(mapRow(rs));
+ }
+ return list;
+ }
+ }
+
+ public QueueStats findQueueStatsByName(String queue) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) {
+ int i = 1;
+ stmt.setString(i, queue);
+ ResultSet rs = stmt.executeQuery();
+ EnumMap<TaskState, Long> states = new EnumMap<>(TaskState.class);
+ while (rs.next()) {
+ states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2));
+ }
+ return new QueueStats(queue, states);
+ }
+ }
+
+ public int update(Task task) throws SQLException {
+ return update(task, null);
+ }
+
+ public int update(Task task, TaskState state) throws SQLException {
+ String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?";
+
+ if (state != null) {
+ sql += " AND state=?";
+ }
+
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
+ int i = 1;
+ stmt.setString(i++, task.state.name());
+ stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime()));
+ setTimestamp(stmt, i++, task.lastRun);
+ stmt.setInt(i++, task.runCount);
+ setTimestamp(stmt, i++, task.completed);
+ stmt.setLong(i++, task.id());
+ if (state != null) {
+ stmt.setString(i, state.name());
+ }
+ return stmt.executeUpdate();
+ }
+ }
+
+ public void setState(Task task, TaskState state) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) {
+ int i = 1;
+ stmt.setString(i++, state.name());
+ stmt.setLong(i, task.id());
+ stmt.executeUpdate();
+ }
+ }
+
+ private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException {
+ if (date == null) {
+ stmt.setNull(parameterIndex, Types.TIMESTAMP);
+ } else {
+ stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime()));
+ }
+ }
+
+ public Task mapRow(ResultSet rs) throws SQLException {
+ String arguments = rs.getString(9);
+ int i = 1;
+ return new Task(
+ rs.getLong(i++),
+ rs.getLong(i++),
+ rs.getString(i++),
+ TaskState.valueOf(rs.getString(i++)),
+ rs.getTimestamp(i++),
+ rs.getTimestamp(i++),
+ rs.getInt(i++),
+ rs.getTimestamp(i),
+ arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList());
+ }
+
+ public void rollback() throws SQLException {
+ c.rollback();
+ c.close();
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/TaskEffect.java b/src/main/java/io/trygvis/queue/TaskEffect.java
new file mode 100644
index 0000000..186797f
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskEffect.java
@@ -0,0 +1,7 @@
+package io.trygvis.queue;
+
+import java.util.List;
+
+public interface TaskEffect {
+ List<Task> apply(Task task) throws Exception;
+}
diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java
new file mode 100644
index 0000000..6890d58
--- /dev/null
+++ b/src/main/java/io/trygvis/spring/DefaultConfig.java
@@ -0,0 +1,31 @@
+package io.trygvis.spring;
+
+import io.trygvis.async.AsyncService;
+import io.trygvis.queue.SqlEffectExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jdbc.core.JdbcTemplate;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+
+@Configuration
+public class DefaultConfig {
+
+ @Bean
+ public QueueSystem queueSystem(DataSource ds) throws SQLException {
+ return QueueSystem.initialize(new SqlEffectExecutor(ds));
+ }
+
+ @Bean
+ public AsyncService asyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ return new SpringJdbcAsyncService(queueSystem, jdbcTemplate);
+ }
+
+ @Bean
+ public QueueService queueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ return new SpringQueueService(queueSystem, jdbcTemplate);
+ }
+}
diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
new file mode 100644
index 0000000..12dc71e
--- /dev/null
+++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
@@ -0,0 +1,110 @@
+package io.trygvis.spring;
+
+import io.trygvis.async.AsyncService;
+import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.QueueController;
+import io.trygvis.queue.SqlEffect;
+import io.trygvis.queue.JdbcQueueService;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionSynchronization;
+import org.springframework.transaction.support.TransactionSynchronizationAdapter;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static org.springframework.transaction.annotation.Propagation.REQUIRED;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
+
+public class SpringJdbcAsyncService implements AsyncService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final ScheduledThreadPoolExecutor executor;
+
+ private final JdbcTemplate jdbcTemplate;
+
+ private final JdbcAsyncService jdbcAsyncService;
+
+ private final JdbcQueueService queueService;
+
+ private final QueueSystem queueSystem;
+
+ public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ this.queueSystem = queueSystem;
+ this.jdbcTemplate = jdbcTemplate;
+ jdbcAsyncService = new JdbcAsyncService(queueSystem);
+ queueService = queueSystem.createQueueService();
+ executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+ }
+
+ @Transactional(propagation = REQUIRED)
+ public QueueController registerQueue(final Queue queue, final QueueService.TaskExecutionRequest req, final TaskEffect processor) throws SQLException {
+ QueueExecutor queueExecutor = queueSystem.sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor>() {
+ @Override
+ public QueueExecutor doInConnection(Connection c) throws SQLException {
+ return queueService.lookupQueue(c, queue.name, queue.interval, true);
+ }
+ });
+
+ final QueueController queueController = jdbcAsyncService.registerQueue(queueExecutor, req, processor);
+
+ registerSynchronization(new TransactionSynchronizationAdapter() {
+ public void afterCompletion(int status) {
+ log.info("Transaction completed with status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ queueController.start(executor);
+ }
+ }
+ });
+
+ return queueController;
+ }
+
+ public QueueExecutor getQueue(String name) {
+ return jdbcAsyncService.getQueue(name);
+ }
+
+ @Transactional(propagation = REQUIRED)
+ public Task schedule(final Queue queue, final Date scheduled, final List<String> args) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ QueueExecutor queueExecutor = queueService.getQueue(queue.name);
+ return queueExecutor.schedule(c, scheduled, args);
+ }
+ });
+ }
+
+ public Task schedule(final Queue queue, final long parent, final Date scheduled, final List<String> args) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ QueueExecutor queueExecutor = queueService.getQueue(queue.name);
+ return queueExecutor.schedule(c, parent, scheduled, args);
+ }
+ });
+ }
+
+ @Transactional(readOnly = true)
+ public Task update(final Task ref) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ return jdbcAsyncService.update(c, ref);
+ }
+ });
+ }
+}
diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java
new file mode 100644
index 0000000..2027ab5
--- /dev/null
+++ b/src/main/java/io/trygvis/spring/SpringQueueService.java
@@ -0,0 +1,49 @@
+package io.trygvis.spring;
+
+import io.trygvis.queue.JdbcQueueService;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+public class SpringQueueService implements QueueService {
+
+ public final JdbcTemplate jdbcTemplate;
+
+ public JdbcQueueService queueService;
+
+ public SpringQueueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ this.jdbcTemplate = jdbcTemplate;
+ this.queueService = queueSystem.createQueueService();
+ }
+
+ @Transactional
+ public QueueExecutor getQueue(final String name, final int interval, final boolean autoCreate) throws SQLException {
+ return jdbcTemplate.execute(new ConnectionCallback<QueueExecutor>() {
+ @Override
+ public QueueExecutor doInConnection(Connection c) throws SQLException, DataAccessException {
+ return queueService.lookupQueue(c, name, interval, autoCreate);
+ }
+ });
+ }
+
+ @Transactional
+ public void schedule(final Queue queue, final Date scheduled, final List<String> arguments) throws SQLException {
+ jdbcTemplate.execute(new ConnectionCallback<Object>() {
+ @Override
+ public Object doInConnection(Connection c) throws SQLException, DataAccessException {
+ queueService.getQueue(queue.name).schedule(c, scheduled, arguments);
+ return null;
+ }
+ });
+ }
+}