diff options
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-23 09:37:57 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-23 09:37:57 +0200
commit7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 (patch)
o Initial import of JDBC queue.HEADmaster
37 files changed, 2542 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..a52a3d3
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,35 @@
+# maven release
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..d13037d
--- /dev/null
+++ b/README.md
@@ -0,0 +1,99 @@
+JDBC Queue (in lack of a better name)
+JDBC queue is a library for writing transactional, messaging-oriented software. It does this by managing `tasks` in a
+set of `queues` in a plain SQL database. It is currently only been tested with PostgreSQL but should be fairly portable.
+It consists of three major parts:
+* A core **queue** part which implements CRUD access to the queues, tasks and configuration.
+* An **async** part works on top of a single queue. It controls a consumer thread and dispatches tasks to an normal
+ Java Executor.
+* A **spring** layer that integrates connection and transaction handing with the standard Spring tools.
+The **queue** interface is indented to be used by:
+* Management code that want to get queue statistics or reconfigure the queues
+* Cron jobs that want to consume everything that has been scheduled
+* Applications that are run just to insert a small number of tasks
+The **async** layer provides the JMS like interface for each queue. It creates a consumer thread that polls the database
+at a specified interval, marks the task for processing and passes it on to the executor. By using a multi-threaded
+executor it can scale up quite easily.
+The **spring** layer makes sure that the parts plays along nicely with the existing JDBC/JPA/Hibernate code that you
+already have.
+Transactionality: each task is performed in an SQL transaction ensuring consistency between the task table and the
+other tables used when processing the task.
+A task has:
+* state
+* parent
+* created_date
+* last_updated
+* completed_date
+Each task has an optional parent reference: this allows you to trace the messages around in your system to see what
+effects each task had.
+"queue system" allowing multiple queue systems to be run in a single JVM
+Push: Intra-JVM notification of new elements on a queue for instant processing.
+Use this library if you want correctness and managebility over speed.
+Possible improvements
+**Batch processing of tasks in a single transaction**: let the consumer thread fetch a batch of N tasks, set all of
+them to PROCESSING in a transaction and send the batch to a processor thread which will process all of them in one
+This will significantly reduce the number of transactions required thus increasing speed. A possible issue is that if
+one of the tasks fails it will abort the entire transaction. If this happens consistenly it can keep all of the tasks
+from completion so some sort of mechanism to only pick tasks that haven't failed before might be useful.
+**Error handling strategies**: Currently there is no retrying or anything smart around tasks that fail. This definitely
+needs to be improved.
+A generic class that re-schedules a task for execution and can be used as a TimerTask might be useful.
+Support locking rows instead of extra states: This might significantly improve performance and write pressure on the
+**Configurable state machine**: Right now the possible states a task can be in is hard-coded.
+**Utilities to do routing**: this library does not intend to compete with normal JMS servers or specialized tools like
+Apache Camel but it might still be useful to have some tools with the package:
+* A consumer that can be configured to replicate the task to a set of other queues creating a classic MQ topic.
+* A consumer that can be configured to replicate the task from this database to another. As this will span two
+ transactions the operation has to be idempotent, but that should be doable. It might be useful to add some fields to
+ a task that points to the remote task.
+* A conumer that take tasks that has failed too many times and move them to a dead letter queue.
+**Optional push notification between JVMs**: use a simple MQ with in-memory storage to provide push notification after
+new tasks has been committed to the database. This will allow the system to behave like a RPC-like system, just with
+proper transactional semantics. The normal database poller can be set to poll at a much lower interval to pick up
+old messages whose notification was lost.
+**Schema dependent features**: JDBC queue does not depend on a very specific schema, it mainly requires two tables
+with a certain set of columns. Features like the parent reference might not be useful for all applications so it might
+be useful for a queue system to look in the task database to see if the column is there and fail if someone tries to
+create a task with a parent reference that is not valid.
+This might also be implemented in a more simple fasion when creating the QueueSystem so the app doesn't have to
+discover anything.
diff --git a/pom.xml b/pom.xml
new file mode 100755
index 0000000..ee319ef
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,134 @@
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>io.trygvis.jdbc-queue</groupId>
+ <artifactId>jdbc-queue</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${version.slf4j}</version>
+ </dependency>
+ <!-- All extra dependencies are provided -->
+ <!-- Spring dependencies -->
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>${version.spring}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-tx</artifactId>
+ <version>${version.spring}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jdbc</artifactId>
+ <version>${version.spring}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-beans</artifactId>
+ <version>${version.spring}</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- ActiveMQ Dependencies -->
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client</artifactId>
+ <version>5.8.0</version>
+ <scope>provided</scope>
+ </dependency>
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-test</artifactId>
+ <version>${version.spring}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easytesting</groupId>
+ <artifactId>fest-assert</artifactId>
+ <version>1.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.jolbox</groupId>
+ <artifactId>bonecp</artifactId>
+ <version>0.7.1.RELEASE</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>14.0.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.9</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>${version.slf4j}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>${version.slf4j}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>9.2-1002-jdbc4</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${version.slf4j}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${version.slf4j}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ <properties>
+ <version.spring>3.2.2.RELEASE</version.spring>
+ <version.slf4j>1.7.2</version.slf4j>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
diff --git a/src/main/java/io/trygvis/activemq/ActiveMqHinter.java b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
new file mode 100644
index 0000000..f2cfb6e
--- /dev/null
+++ b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
@@ -0,0 +1,152 @@
+package io.trygvis.activemq;
+import io.trygvis.async.QueueController;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.TextMessage;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.sql.SQLException;
+import static java.lang.Long.parseLong;
+import static java.lang.System.arraycopy;
+import static java.nio.charset.Charset.forName;
+import static javax.jms.Session.AUTO_ACKNOWLEDGE;
+public class ActiveMqHinter implements AutoCloseable {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final QueueConnection c;
+ private static final Charset utf8 = forName("utf-8");
+ public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException {
+ log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL());
+ c = connectionFactory.createQueueConnection();
+ c.start();
+ log.info("Connected, clientId = {}", c.getClientID());
+ }
+ public void createHinter(final QueueController controller) throws JMSException {
+ QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(controller.queue.queue.name);
+ final MessageConsumer consumer = session.createConsumer(queue);
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ if ((message instanceof TextMessage)) {
+ String body;
+ try {
+ TextMessage textMessage = (TextMessage) message;
+ body = textMessage.getText();
+ } catch (JMSException e) {
+ log.warn("Exception while reading body.", e);
+ throw new RuntimeException("Exception while reading body.", e);
+ }
+ consumeString(new StringReader(body), controller);
+ } else if (message instanceof BytesMessage) {
+ final BytesMessage bytesMessage = (BytesMessage) message;
+ consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller);
+ } else {
+ throw new RuntimeException("Unsupported message type: " + message.getClass());
+ }
+ }
+ });
+ controller.addOnStopListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ consumer.close();
+ } catch (JMSException e) {
+ log.error("Error while closing JMS consumer", e);
+ }
+ }
+ });
+ }
+ private void consumeString(Reader reader, QueueController controller) {
+ try {
+ BufferedReader r = new BufferedReader(reader);
+ String line = r.readLine();
+ while (line != null) {
+ for (String id : line.split(",")) {
+ controller.hint(parseLong(id.trim()));
+ }
+ line = r.readLine();
+ }
+ } catch (IOException | SQLException e) {
+ log.warn("Could not consume body.", e);
+ throw new RuntimeException("Could not consume body.", e);
+ } catch (NumberFormatException e) {
+ log.warn("Could not consume body.", e);
+ throw e;
+ }
+ }
+ public void close() throws JMSException {
+ c.close();
+ }
+ private static class ByteMessageInputStream extends InputStream {
+ private final BytesMessage bytesMessage;
+ public ByteMessageInputStream(BytesMessage bytesMessage) {
+ this.bytesMessage = bytesMessage;
+ }
+ @Override
+ public int read(byte[] b) throws IOException {
+ try {
+ return bytesMessage.readBytes(b);
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+ @Override
+ public int read(byte[] out, int off, int len) throws IOException {
+ byte[] b = new byte[len];
+ try {
+ int read = bytesMessage.readBytes(b);
+ if (read == -1) {
+ return -1;
+ }
+ arraycopy(b, 0, out, off, read);
+ return read;
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+ @Override
+ public int read() throws IOException {
+ try {
+ return bytesMessage.readByte();
+ } catch (javax.jms.MessageEOFException e) {
+ return -1;
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+ }
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
new file mode 100755
index 0000000..9332596
--- /dev/null
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -0,0 +1,30 @@
+package io.trygvis.async;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+ * A simple framework for running tasks.
+ */
+public interface AsyncService {
+ QueueController registerQueue(Queue queue, QueueService.TaskExecutionRequest req, TaskEffect processor) throws SQLException;
+ QueueExecutor getQueue(String name);
+ Task schedule(Queue queue, Date scheduled, List<String> args);
+ Task schedule(Queue queue, long parent, Date scheduled, List<String> args);
+ /**
+ * Polls for a new state of the execution.
+ */
+ Task update(Task ref);
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
new file mode 100644
index 0000000..46f1f30
--- /dev/null
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -0,0 +1,75 @@
+package io.trygvis.async;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+import static java.lang.System.currentTimeMillis;
+import static java.lang.Thread.sleep;
+public class JdbcAsyncService {
+ private final Map<String, QueueController> queues = new HashMap<>();
+ private final QueueSystem queueSystem;
+ public JdbcAsyncService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ }
+ public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) {
+ if (queues.containsKey(queue.queue.name)) {
+ throw new IllegalArgumentException("Queue already exist.");
+ }
+ QueueController queueController = new QueueController(queueSystem, req, processor, queue);
+ queues.put(queue.queue.name, queueController);
+ return queueController;
+ }
+ public QueueExecutor getQueue(String name) {
+ return getQueueThread(name).queue;
+ }
+ public Task await(Connection c, Task task, long timeout) throws SQLException {
+ final long start = currentTimeMillis();
+ final long end = start + timeout;
+ while (currentTimeMillis() < end) {
+ task = update(c, task);
+ if (task == null) {
+ throw new RuntimeException("The task went away.");
+ }
+ try {
+ sleep(100);
+ } catch (InterruptedException e) {
+ // break
+ }
+ }
+ return task;
+ }
+ public Task update(Connection c, Task ref) throws SQLException {
+ return queueSystem.createTaskDao(c).findById(ref.id());
+ }
+ private synchronized QueueController getQueueThread(String name) {
+ QueueController queueController = queues.get(name);
+ if (queueController == null) {
+ throw new RuntimeException("No such queue: '" + name + "'.");
+ }
+ return queueController;
+ }
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java
new file mode 100644
index 0000000..a343d42
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueController.java
@@ -0,0 +1,201 @@
+package io.trygvis.async;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.SqlEffect;
+import io.trygvis.queue.SqlEffectExecutor;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult;
+import static io.trygvis.queue.QueueService.TaskExecutionRequest;
+import static io.trygvis.queue.Task.TaskState.NEW;
+public class QueueController {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final QueueSystem queueSystem;
+ private final SqlEffectExecutor sqlEffectExecutor;
+ private final TaskEffect taskEffect;
+ private final TaskExecutionRequest req;
+ public final QueueExecutor queue;
+ private boolean shouldRun = true;
+ private boolean checkForNewTasks;
+ private boolean running;
+ private Thread thread;
+ private ScheduledThreadPoolExecutor executor;
+ private List<Runnable> stopListeners = new ArrayList<>();
+ public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) {
+ this.queueSystem = queueSystem;
+ this.req = req;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
+ this.taskEffect = taskEffect;
+ this.queue = queue;
+ }
+ public void scheduleAll(final List<Task> tasks) throws InterruptedException {
+ ExecutorCompletionService<TaskExecutionResult> service = new ExecutorCompletionService<>(executor);
+ for (final Task task : tasks) {
+ service.submit(new Callable<TaskExecutionResult>() {
+ @Override
+ public TaskExecutionResult call() throws Exception {
+ return queue.applyTask(taskEffect, task);
+ }
+ });
+ }
+ }
+ public void invokeAll(final List<Task> tasks) throws InterruptedException {
+ List<Callable<TaskExecutionResult>> callables = new ArrayList<>(tasks.size());
+ for (final Task task : tasks) {
+ callables.add(new Callable<TaskExecutionResult>() {
+ @Override
+ public TaskExecutionResult call() throws Exception {
+ return queue.applyTask(taskEffect, task);
+ }
+ });
+ }
+ executor.invokeAll(callables);
+ }
+ public void hint(final long id) throws SQLException {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ Task task = queueSystem.createTaskDao(c).findById(id);
+ try {
+ scheduleAll(Collections.singletonList(task));
+ } catch (InterruptedException e) {
+ throw new SQLException(e);
+ }
+ }
+ });
+ }
+ public synchronized void addOnStopListener(Runnable runnable) {
+ stopListeners.add(runnable);
+ }
+ private class QueueThread implements Runnable {
+ public void run() {
+ while (shouldRun) {
+ List<Task> tasks = null;
+ try {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize);
+ }
+ });
+ log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name);
+ if (tasks.size() > 0) {
+ invokeAll(tasks);
+ }
+ } catch (Throwable e) {
+ if (shouldRun) {
+ log.warn("Error while executing tasks.", e);
+ }
+ }
+ // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
+ if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) {
+ log.info("Got a full chunk, continuing directly.");
+ continue;
+ }
+ synchronized (this) {
+ if (checkForNewTasks) {
+ log.info("Ping received!");
+ checkForNewTasks = false;
+ } else {
+ try {
+ wait(req.interval(queue.queue));
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+ log.info("Thread for queue {} has stopped.", queue.queue.name);
+ running = false;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+ }
+ public synchronized void start(ScheduledThreadPoolExecutor executor) {
+ if (running) {
+ throw new IllegalStateException("Already running");
+ }
+ log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval);
+ running = true;
+ this.executor = executor;
+ thread = new Thread(new QueueThread(), "queue: " + queue.queue.name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+ public synchronized void stop() {
+ if (!running) {
+ return;
+ }
+ log.info("Stopping thread for queue {}", queue.queue.name);
+ for (Runnable runnable : stopListeners) {
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ log.error("Error while running stop listener " + runnable, e);
+ }
+ }
+ shouldRun = false;
+ // TODO: empty out the currently executing tasks.
+ thread.interrupt();
+ while (running) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ thread.interrupt();
+ }
+ thread = null;
+ executor.shutdownNow();
+ }
diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java
new file mode 100644
index 0000000..8edc720
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueStats.java
@@ -0,0 +1,28 @@
+package io.trygvis.async;
+public class QueueStats {
+ public final int totalMessageCount;
+ public final int okMessageCount;
+ public final int failedMessageCount;
+ public final int missedMessageCount;
+ public final int scheduledMessageCount;
+ public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) {
+ this.totalMessageCount = totalMessageCount;
+ this.okMessageCount = okMessageCount;
+ this.failedMessageCount = failedMessageCount;
+ this.missedMessageCount = missedMessageCount;
+ this.scheduledMessageCount = scheduledMessageCount;
+ }
+ @Override
+ public String toString() {
+ return "QueueStats{" +
+ "totalMessageCount=" + totalMessageCount +
+ ", okMessageCount=" + okMessageCount +
+ ", failedMessageCount=" + failedMessageCount +
+ ", missedMessageCount=" + missedMessageCount +
+ ", scheduledMessageCount=" + scheduledMessageCount +
+ '}';
+ }
diff --git a/src/main/java/io/trygvis/async/TaskFailureException.java b/src/main/java/io/trygvis/async/TaskFailureException.java
new file mode 100644
index 0000000..7278e17
--- /dev/null
+++ b/src/main/java/io/trygvis/async/TaskFailureException.java
@@ -0,0 +1,7 @@
+package io.trygvis.async;
+class TaskFailureException extends RuntimeException {
+ public TaskFailureException(Exception e) {
+ super(e);
+ }
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
new file mode 100644
index 0000000..ef0b5bb
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -0,0 +1,55 @@
+package io.trygvis.queue;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+public class JdbcQueueService {
+ private final QueueSystem queueSystem;
+ private final SqlEffectExecutor sqlEffectExecutor;
+ private final Map<String, QueueExecutor> queues = new HashMap<>();
+ JdbcQueueService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
+ }
+ public synchronized QueueExecutor getQueue(String name) {
+ QueueExecutor queueExecutor = queues.get(name);
+ if (queueExecutor != null) {
+ return queueExecutor;
+ }
+ throw new IllegalArgumentException("No such queue: " + name);
+ }
+ public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException {
+ QueueExecutor queueExecutor = queues.get(name);
+ if (queueExecutor != null) {
+ return queueExecutor;
+ }
+ QueueDao queueDao = queueSystem.createQueueDao(c);
+ Queue q = queueDao.findByName(name);
+ if (q == null) {
+ if (!autoCreate) {
+ throw new SQLException("No such queue: '" + name + "'.");
+ }
+ q = new Queue(name, interval);
+ queueDao.insert(q);
+ }
+ queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q);
+ queues.put(name, queueExecutor);
+ return queueExecutor;
+ }
diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java
new file mode 100755
index 0000000..15003f7
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Queue.java
@@ -0,0 +1,24 @@
+package io.trygvis.queue;
+public class Queue {
+ public final String name;
+ public final long interval;
+ public Queue(String name, long interval) {
+ this.name = name;
+ this.interval = interval;
+ }
+ public Queue withInterval(int interval) {
+ return new Queue(name, interval);
+ }
+ public String toString() {
+ return "Queue{" +
+ "name='" + name + '\'' +
+ ", interval=" + interval +
+ '}';
+ }
diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java
new file mode 100644
index 0000000..2f69e11
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueDao.java
@@ -0,0 +1,45 @@
+package io.trygvis.queue;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+public class QueueDao {
+ private final Connection connection;
+ public QueueDao(Connection connection) {
+ this.connection = connection;
+ }
+ public Queue findByName(String name) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("SELECT name, interval FROM queue WHERE name=?")) {
+ stmt.setString(1, name);
+ ResultSet rs = stmt.executeQuery();
+ return rs.next() ? mapRow(rs) : null;
+ }
+ }
+ public void insert(Queue q) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("INSERT INTO queue(name, interval) VALUES(?, ?)")) {
+ int i = 1;
+ stmt.setString(i++, q.name);
+ stmt.setLong(i, q.interval);
+ stmt.executeUpdate();
+ }
+ }
+ public void update(Queue q) throws SQLException {
+ try (PreparedStatement stmt = connection.prepareStatement("UPDATE queue SET interval=? WHERE name=?")) {
+ int i = 1;
+ stmt.setLong(i++, q.interval);
+ stmt.setString(i, q.name);
+ stmt.executeUpdate();
+ }
+ }
+ public Queue mapRow(ResultSet rs) throws SQLException {
+ return new Queue(rs.getString(1), rs.getLong(2));
+ }
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java
new file mode 100644
index 0000000..88e5b46
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueExecutor.java
@@ -0,0 +1,177 @@
+package io.trygvis.queue;
+import io.trygvis.async.QueueStats;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*;
+import static io.trygvis.queue.Task.TaskState.NEW;
+public class QueueExecutor {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final QueueSystem queueSystem;
+ private final SqlEffectExecutor sqlEffectExecutor;
+ public final Queue queue;
+ private final Stats stats = new Stats();
+ public enum TaskExecutionResult {
+ OK,
+ }
+ public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = sqlEffectExecutor;
+ this.queue = queue;
+ }
+ private static class Stats {
+ public int total;
+ public int ok;
+ public int failed;
+ public int scheduled;
+ public int missed;
+ public synchronized QueueStats toStats() {
+ return new QueueStats(total, ok, failed, missed, scheduled);
+ }
+ }
+ public QueueStats getStats() {
+ return stats.toStats();
+ }
+ public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
+ log.info("Consuming tasks: request={}", req);
+ List<Task> tasks;
+ do {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize);
+ }
+ });
+ log.info("Consuming chunk with {} tasks", tasks.size());
+ applyTasks(req, effect, tasks);
+ } while (tasks.size() > 0);
+ }
+ public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
+ for (Task task : tasks) {
+ TaskExecutionResult result = applyTask(effect, task);
+ if (result == FAILED && req.stopOnError) {
+ throw new RuntimeException("Error while executing task, id=" + task.id());
+ }
+ }
+ }
+ /**
+ * Executed each task in its own transaction.
+ * <p/>
+ * If the task fails, the status is set to error in a separate transaction.
+ */
+ public TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
+ try {
+ Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
+ @Override
+ public Integer doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW);
+ }
+ });
+ if (count == 0) {
+ log.warn("Missed task {}", task.id());
+ synchronized (stats) {
+ stats.missed++;
+ }
+ return MISSED;
+ }
+ log.info("Executing task {}", task.id());
+ final List<Task> newTasks = effect.apply(task);
+ final Date now = new Date();
+ log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ for (Task newTask : newTasks) {
+ schedule(c, newTask);
+ }
+ queueSystem.createTaskDao(c).update(task.markOk(now));
+ }
+ });
+ synchronized (stats) {
+ stats.total++;
+ stats.ok++;
+ }
+ return OK;
+ } catch (Exception e) {
+ final Date now = new Date();
+ log.error("Unable to execute task, id=" + task.id(), e);
+ synchronized (stats) {
+ stats.total++;
+ stats.failed++;
+ }
+ try {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+ taskDao.update(task.markFailed(now));
+ }
+ });
+ } catch (SQLException e1) {
+ log.error("Error while marking task as failed.", e1);
+ }
+ return FAILED;
+ }
+ }
+ public void schedule(Connection c, Task task) throws SQLException {
+ schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
+ }
+ public Task schedule(Connection c, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, null, scheduled, arguments);
+ }
+ public Task schedule(Connection c, long parent, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, parent, scheduled, arguments);
+ }
+ private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+ long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
+ synchronized (stats) {
+ stats.scheduled++;
+ }
+ return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
+ }
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
new file mode 100644
index 0000000..1c38f1f
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -0,0 +1,54 @@
+package io.trygvis.queue;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+public interface QueueService {
+ QueueExecutor getQueue(String name, int interval, boolean autoCreate) throws SQLException;
+ void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException;
+ public static class TaskExecutionRequest {
+ public final long chunkSize;
+ public final boolean stopOnError;
+ public final Long interval;
+ public final boolean continueOnFullChunk;
+ // TODO: saveExceptions
+ public TaskExecutionRequest(long chunkSize, boolean stopOnError) {
+ this(chunkSize, stopOnError, null, true);
+ }
+ private TaskExecutionRequest(long chunkSize, boolean stopOnError, Long interval, boolean continueOnFullChunk) {
+ this.chunkSize = chunkSize;
+ this.stopOnError = stopOnError;
+ this.interval = interval;
+ this.continueOnFullChunk = continueOnFullChunk;
+ }
+ public TaskExecutionRequest interval(long interval) {
+ return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk);
+ }
+ public TaskExecutionRequest continueOnFullChunk(boolean continueOnFullChunk) {
+ return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk);
+ }
+ @Override
+ public String toString() {
+ return "TaskExecutionRequest{" +
+ "chunkSize=" + chunkSize +
+ ", stopOnError=" + stopOnError +
+ '}';
+ }
+ public long interval(Queue queue) {
+ if (interval != null) {
+ return interval;
+ }
+ return queue.interval;
+ }
+ }
diff --git a/src/main/java/io/trygvis/queue/QueueStats.java b/src/main/java/io/trygvis/queue/QueueStats.java
new file mode 100644
index 0000000..5b048b3
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueStats.java
@@ -0,0 +1,20 @@
+package io.trygvis.queue;
+import java.util.EnumMap;
+public class QueueStats {
+ public final String name;
+ public final long totalTaskCount;
+ public final EnumMap<Task.TaskState, Long> states;
+ public QueueStats(String name, EnumMap<Task.TaskState, Long> states) {
+ this.name = name;
+ this.states = states;
+ long c = 0;
+ for (Long l : states.values()) {
+ c += l;
+ }
+ this.totalTaskCount = c;
+ }
diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java
new file mode 100644
index 0000000..5954526
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueSystem.java
@@ -0,0 +1,61 @@
+package io.trygvis.queue;
+import io.trygvis.async.JdbcAsyncService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.SQLException;
+public class QueueSystem {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ public final SqlEffectExecutor sqlEffectExecutor;
+ private final JdbcQueueService queueService;
+ private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ if (c.getAutoCommit()) {
+ throw new SQLException("The connection cannot be in auto-commit mode.");
+ }
+ DatabaseMetaData metaData = c.getMetaData();
+ String productName = metaData.getDatabaseProductName();
+ String productVersion = metaData.getDatabaseProductVersion();
+ log.info("productName = " + productName);
+ log.info("productVersion = " + productVersion);
+ }
+ });
+ this.sqlEffectExecutor = sqlEffectExecutor;
+ queueService = new JdbcQueueService(this);
+ }
+ /**
+ * Initializes the queue system. Use this as the first thing do as it will validate the database.
+ */
+ public static QueueSystem initialize(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
+ return new QueueSystem(sqlEffectExecutor);
+ }
+ public JdbcQueueService createQueueService() {
+ return queueService;
+ }
+ public QueueDao createQueueDao(Connection c) {
+ return new QueueDao(c);
+ }
+ public TaskDao createTaskDao(Connection c) {
+ return new TaskDao(c);
+ }
+ public JdbcAsyncService createAsyncService() {
+ return new JdbcAsyncService(this);
+ }
diff --git a/src/main/java/io/trygvis/queue/SqlEffect.java b/src/main/java/io/trygvis/queue/SqlEffect.java
new file mode 100644
index 0000000..7864bcd
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/SqlEffect.java
@@ -0,0 +1,12 @@
+package io.trygvis.queue;
+import java.sql.Connection;
+import java.sql.SQLException;
+public interface SqlEffect<A> {
+ A doInConnection(Connection c) throws SQLException;
+ interface Void {
+ void doInConnection(Connection c) throws SQLException;
+ }
diff --git a/src/main/java/io/trygvis/queue/SqlEffectExecutor.java b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java
new file mode 100644
index 0000000..92802da
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java
@@ -0,0 +1,50 @@
+package io.trygvis.queue;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+public class SqlEffectExecutor {
+ private final DataSource dataSource;
+ public SqlEffectExecutor(DataSource dataSource) {
+ this.dataSource = dataSource;
+ }
+ public <A> A transaction(SqlEffect<A> effect) throws SQLException {
+// int pid;
+ try (Connection c = dataSource.getConnection()) {
+// pid = getPid(c);
+// System.out.println("pid = " + pid);
+ boolean ok = false;
+ try {
+ A a = effect.doInConnection(c);
+ c.commit();
+ ok = true;
+ return a;
+ } finally {
+// System.out.println("Closing, pid = " + pid);
+ if (!ok) {
+ try {
+ c.rollback();
+ } catch (SQLException e) {
+ // ignore
+ }
+ }
+ }
+ }
+ }
+ public void transaction(final SqlEffect.Void effect) throws SQLException {
+ transaction(new SqlEffect<Object>() {
+ @Override
+ public Object doInConnection(Connection c) throws SQLException {
+ effect.doInConnection(c);
+ return null;
+ }
+ });
+ }
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
new file mode 100755
index 0000000..8038050
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -0,0 +1,117 @@
+package io.trygvis.queue;
+import java.util.Date;
+import java.util.List;
+import static io.trygvis.queue.Task.TaskState.*;
+import static java.util.Arrays.asList;
+public class Task {
+ public enum TaskState {
+ NEW,
+ OK,
+ }
+ private final long id;
+ public final Long parent;
+ public final String queue;
+ public final TaskState state;
+ public final Date scheduled;
+ public final Date lastRun;
+ public final int runCount;
+ public final Date completed;
+ public final List<String> arguments;
+ public Task(long id, Long parent, String queue, TaskState state, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
+ this.id = id;
+ this.parent = parent;
+ this.queue = queue;
+ this.state = state;
+ this.scheduled = scheduled;
+ this.lastRun = lastRun;
+ this.runCount = runCount;
+ this.completed = completed;
+ this.arguments = arguments;
+ }
+ public Task markProcessing() {
+ return new Task(id, parent, queue, PROCESSING, scheduled, new Date(), runCount + 1, completed, arguments);
+ }
+ public Task markOk(Date completed) {
+ return new Task(id, parent, queue, OK, scheduled, lastRun, runCount, completed, arguments);
+ }
+ public Task markFailed(Date now) {
+ return new Task(id, parent, queue, FAILED, scheduled, lastRun, runCount, completed, arguments);
+ }
+ public String toString() {
+ return "Task{" +
+ "id=" + id +
+ ", parent=" + parent +
+ ", queue=" + queue +
+ ", state=" + state +
+ ", scheduled=" + scheduled +
+ ", lastRun=" + lastRun +
+ ", runCount=" + runCount +
+ ", completed=" + completed +
+ ", arguments='" + arguments + '\'' +
+ '}';
+ }
+ public long id() {
+ if (id == 0) {
+ throw new RuntimeException("This task has not been persisted yet.");
+ }
+ return id;
+ }
+ public boolean isDone() {
+ return completed != null;
+ }
+ public Task childTask(String queue, Date scheduled, String... arguments) {
+ return new Task(0, id(), queue, NEW, scheduled, null, 0, null, asList(arguments));
+ }
+ public static Task newTask(String queue, Date scheduled, String... arguments) {
+ return new Task(0, null, queue, NEW, scheduled, null, 0, null, asList(arguments));
+ }
+ public static Task newTask(String queue, Date scheduled, List<String> arguments) {
+ return new Task(0, null, queue, NEW, scheduled, null, 0, null, arguments);
+ }
+ public static List<String> stringToArguments(String arguments) {
+ return asList(arguments.split(","));
+ }
+ public static String argumentsToString(List<String> arguments) {
+ StringBuilder builder = new StringBuilder();
+ for (int i = 0, argumentsLength = arguments.size(); i < argumentsLength; i++) {
+ String argument = arguments.get(i);
+ if (argument.contains(",")) {
+ throw new RuntimeException("The argument string can't contain a comma.");
+ }
+ if (i > 0) {
+ builder.append(',');
+ }
+ builder.append(argument);
+ }
+ return builder.toString();
+ }
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
new file mode 100644
index 0000000..365b44b
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -0,0 +1,148 @@
+package io.trygvis.queue;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Timestamp;
+import java.sql.Types;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.EnumMap;
+import java.util.List;
+import static io.trygvis.queue.Task.*;
+public class TaskDao {
+ private final Connection c;
+ public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments";
+ TaskDao(Connection c) {
+ this.c = c;
+ }
+ public long insert(Long parent, String queue, TaskState state, Date scheduled, List<String> arguments) throws SQLException {
+ String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)";
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
+ int i = 1;
+ if (parent == null) {
+ stmt.setNull(i++, Types.BIGINT);
+ } else {
+ stmt.setLong(i++, parent);
+ }
+ stmt.setString(i++, queue);
+ stmt.setString(i++, state.name());
+ stmt.setTimestamp(i++, new Timestamp(scheduled.getTime()));
+ stmt.setString(i, argumentsToString(arguments));
+ stmt.executeUpdate();
+ }
+ try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) {
+ ResultSet rs = stmt.executeQuery();
+ rs.next();
+ return rs.getLong(1);
+ }
+ }
+ public Task findById(long id) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) {
+ stmt.setLong(1, id);
+ ResultSet rs = stmt.executeQuery();
+ return rs.next() ? mapRow(rs) : null;
+ }
+ }
+ public List<Task> findByQueueAndState(String queue, TaskState state, long limit) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) {
+ int i = 1;
+ stmt.setString(i++, queue);
+ stmt.setString(i++, state.name());
+ stmt.setLong(i, limit);
+ ResultSet rs = stmt.executeQuery();
+ List<Task> list = new ArrayList<>();
+ while (rs.next()) {
+ list.add(mapRow(rs));
+ }
+ return list;
+ }
+ }
+ public QueueStats findQueueStatsByName(String queue) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) {
+ int i = 1;
+ stmt.setString(i, queue);
+ ResultSet rs = stmt.executeQuery();
+ EnumMap<TaskState, Long> states = new EnumMap<>(TaskState.class);
+ while (rs.next()) {
+ states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2));
+ }
+ return new QueueStats(queue, states);
+ }
+ }
+ public int update(Task task) throws SQLException {
+ return update(task, null);
+ }
+ public int update(Task task, TaskState state) throws SQLException {
+ String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?";
+ if (state != null) {
+ sql += " AND state=?";
+ }
+ try (PreparedStatement stmt = c.prepareStatement(sql)) {
+ int i = 1;
+ stmt.setString(i++, task.state.name());
+ stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime()));
+ setTimestamp(stmt, i++, task.lastRun);
+ stmt.setInt(i++, task.runCount);
+ setTimestamp(stmt, i++, task.completed);
+ stmt.setLong(i++, task.id());
+ if (state != null) {
+ stmt.setString(i, state.name());
+ }
+ return stmt.executeUpdate();
+ }
+ }
+ public void setState(Task task, TaskState state) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) {
+ int i = 1;
+ stmt.setString(i++, state.name());
+ stmt.setLong(i, task.id());
+ stmt.executeUpdate();
+ }
+ }
+ private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException {
+ if (date == null) {
+ stmt.setNull(parameterIndex, Types.TIMESTAMP);
+ } else {
+ stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime()));
+ }
+ }
+ public Task mapRow(ResultSet rs) throws SQLException {
+ String arguments = rs.getString(9);
+ int i = 1;
+ return new Task(
+ rs.getLong(i++),
+ rs.getLong(i++),
+ rs.getString(i++),
+ TaskState.valueOf(rs.getString(i++)),
+ rs.getTimestamp(i++),
+ rs.getTimestamp(i++),
+ rs.getInt(i++),
+ rs.getTimestamp(i),
+ arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList());
+ }
+ public void rollback() throws SQLException {
+ c.rollback();
+ c.close();
+ }
diff --git a/src/main/java/io/trygvis/queue/TaskEffect.java b/src/main/java/io/trygvis/queue/TaskEffect.java
new file mode 100644
index 0000000..186797f
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskEffect.java
@@ -0,0 +1,7 @@
+package io.trygvis.queue;
+import java.util.List;
+public interface TaskEffect {
+ List<Task> apply(Task task) throws Exception;
diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java
new file mode 100644
index 0000000..6890d58
--- /dev/null
+++ b/src/main/java/io/trygvis/spring/DefaultConfig.java
@@ -0,0 +1,31 @@
+package io.trygvis.spring;
+import io.trygvis.async.AsyncService;
+import io.trygvis.queue.SqlEffectExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.jdbc.core.JdbcTemplate;
+import javax.sql.DataSource;
+import java.sql.SQLException;
+public class DefaultConfig {
+ @Bean
+ public QueueSystem queueSystem(DataSource ds) throws SQLException {
+ return QueueSystem.initialize(new SqlEffectExecutor(ds));
+ }
+ @Bean
+ public AsyncService asyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ return new SpringJdbcAsyncService(queueSystem, jdbcTemplate);
+ }
+ @Bean
+ public QueueService queueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ return new SpringQueueService(queueSystem, jdbcTemplate);
+ }
diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
new file mode 100644
index 0000000..12dc71e
--- /dev/null
+++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
@@ -0,0 +1,110 @@
+package io.trygvis.spring;
+import io.trygvis.async.AsyncService;
+import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.QueueController;
+import io.trygvis.queue.SqlEffect;
+import io.trygvis.queue.JdbcQueueService;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.annotation.Transactional;
+import org.springframework.transaction.support.TransactionSynchronization;
+import org.springframework.transaction.support.TransactionSynchronizationAdapter;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import static org.springframework.transaction.annotation.Propagation.REQUIRED;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
+public class SpringJdbcAsyncService implements AsyncService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final ScheduledThreadPoolExecutor executor;
+ private final JdbcTemplate jdbcTemplate;
+ private final JdbcAsyncService jdbcAsyncService;
+ private final JdbcQueueService queueService;
+ private final QueueSystem queueSystem;
+ public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ this.queueSystem = queueSystem;
+ this.jdbcTemplate = jdbcTemplate;
+ jdbcAsyncService = new JdbcAsyncService(queueSystem);
+ queueService = queueSystem.createQueueService();
+ executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+ }
+ @Transactional(propagation = REQUIRED)
+ public QueueController registerQueue(final Queue queue, final QueueService.TaskExecutionRequest req, final TaskEffect processor) throws SQLException {
+ QueueExecutor queueExecutor = queueSystem.sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor>() {
+ @Override
+ public QueueExecutor doInConnection(Connection c) throws SQLException {
+ return queueService.lookupQueue(c, queue.name, queue.interval, true);
+ }
+ });
+ final QueueController queueController = jdbcAsyncService.registerQueue(queueExecutor, req, processor);
+ registerSynchronization(new TransactionSynchronizationAdapter() {
+ public void afterCompletion(int status) {
+ log.info("Transaction completed with status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ queueController.start(executor);
+ }
+ }
+ });
+ return queueController;
+ }
+ public QueueExecutor getQueue(String name) {
+ return jdbcAsyncService.getQueue(name);
+ }
+ @Transactional(propagation = REQUIRED)
+ public Task schedule(final Queue queue, final Date scheduled, final List<String> args) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ QueueExecutor queueExecutor = queueService.getQueue(queue.name);
+ return queueExecutor.schedule(c, scheduled, args);
+ }
+ });
+ }
+ public Task schedule(final Queue queue, final long parent, final Date scheduled, final List<String> args) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ QueueExecutor queueExecutor = queueService.getQueue(queue.name);
+ return queueExecutor.schedule(c, parent, scheduled, args);
+ }
+ });
+ }
+ @Transactional(readOnly = true)
+ public Task update(final Task ref) {
+ return jdbcTemplate.execute(new ConnectionCallback<Task>() {
+ @Override
+ public Task doInConnection(Connection c) throws SQLException {
+ return jdbcAsyncService.update(c, ref);
+ }
+ });
+ }
diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java
new file mode 100644
index 0000000..2027ab5
--- /dev/null
+++ b/src/main/java/io/trygvis/spring/SpringQueueService.java
@@ -0,0 +1,49 @@
+package io.trygvis.spring;
+import io.trygvis.queue.JdbcQueueService;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import org.springframework.dao.DataAccessException;
+import org.springframework.jdbc.core.ConnectionCallback;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.transaction.annotation.Transactional;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+public class SpringQueueService implements QueueService {
+ public final JdbcTemplate jdbcTemplate;
+ public JdbcQueueService queueService;
+ public SpringQueueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ this.jdbcTemplate = jdbcTemplate;
+ this.queueService = queueSystem.createQueueService();
+ }
+ @Transactional
+ public QueueExecutor getQueue(final String name, final int interval, final boolean autoCreate) throws SQLException {
+ return jdbcTemplate.execute(new ConnectionCallback<QueueExecutor>() {
+ @Override
+ public QueueExecutor doInConnection(Connection c) throws SQLException, DataAccessException {
+ return queueService.lookupQueue(c, name, interval, autoCreate);
+ }
+ });
+ }
+ @Transactional
+ public void schedule(final Queue queue, final Date scheduled, final List<String> arguments) throws SQLException {
+ jdbcTemplate.execute(new ConnectionCallback<Object>() {
+ @Override
+ public Object doInConnection(Connection c) throws SQLException, DataAccessException {
+ queueService.getQueue(queue.name).schedule(c, scheduled, arguments);
+ return null;
+ }
+ });
+ }
diff --git a/src/main/resources/create-postgresql.sql b/src/main/resources/create-postgresql.sql
new file mode 100644
index 0000000..a0739c2
--- /dev/null
+++ b/src/main/resources/create-postgresql.sql
@@ -0,0 +1,32 @@
+ name VARCHAR(100) NOT NULL,
+ interval INTEGER NOT NULL,
+ CONSTRAINT pk_queue PRIMARY KEY (name)
+ parent BIGINT,
+ queue VARCHAR(100) NOT NULL,
+ state VARCHAR(10) NOT NULL,
+ last_run TIMESTAMP,
+ run_count INT NOT NULL,
+ completed TIMESTAMP,
+ arguments VARCHAR(100),
+ CONSTRAINT fk_task__queue FOREIGN KEY (queue) REFERENCES queue (name),
+ CONSTRAINT fk_task__parent FOREIGN KEY (parent) REFERENCES task (id)
+CREATE INDEX ix_task__queue__state ON task (queue, state);
diff --git a/src/test/java/io/trygvis/test/Article.java b/src/test/java/io/trygvis/test/Article.java
new file mode 100755
index 0000000..bf52e41
--- /dev/null
+++ b/src/test/java/io/trygvis/test/Article.java
@@ -0,0 +1,46 @@
+package io.trygvis.test;
+import java.util.Date;
+public class Article {
+ private Integer id;
+ private Date created;
+ private Date updated;
+ private String title;
+ private String body;
+ @SuppressWarnings("UnusedDeclaration")
+ private Article() {
+ }
+ public Article(Date created, Date updated, String title, String body) {
+ this.created = created;
+ this.updated = updated;
+ this.title = title;
+ this.body = body;
+ }
+ public Integer getId() {
+ return id;
+ }
+ public Date getCreated() {
+ return created;
+ }
+ public Date getUpdated() {
+ return updated;
+ }
+ public void setUpdated(Date updated) {
+ this.updated = updated;
+ }
+ public String getTitle() {
+ return title;
+ }
+ public String getBody() {
+ return body;
+ }
diff --git a/src/test/java/io/trygvis/test/CreateArticleCallable.java b/src/test/java/io/trygvis/test/CreateArticleCallable.java
new file mode 100755
index 0000000..396fc89
--- /dev/null
+++ b/src/test/java/io/trygvis/test/CreateArticleCallable.java
@@ -0,0 +1,46 @@
+package io.trygvis.test;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import static java.util.Collections.emptyList;
+import static org.springframework.transaction.annotation.Propagation.MANDATORY;
+@Transactional(propagation = MANDATORY)
+public class CreateArticleCallable implements TaskEffect {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private Random random = new Random();
+ @Override
+ public List<Task> apply(Task task) throws Exception {
+ List<String> arguments = task.arguments;
+ log.info("CreateArticeJob.run: BEGIN");
+ if (random.nextInt() % 3 == 0) {
+ throw new RuntimeException("failing create article");
+ }
+ Date now = new Date();
+ log.info("now = {}", now);
+ Article article = new Article(new Date(), null, "title", "body");
+// entityManager.persist(article);
+ log.info("CreateArticeJob.run: END");
+ return emptyList();
+ }
diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java
new file mode 100644
index 0000000..33c2807
--- /dev/null
+++ b/src/test/java/io/trygvis/test/DbUtil.java
@@ -0,0 +1,64 @@
+package io.trygvis.test;
+import com.jolbox.bonecp.BoneCPDataSource;
+import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
+import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
+import javax.sql.DataSource;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import static java.lang.System.getProperty;
+public class DbUtil {
+ public static DataSource createDataSource() throws SQLException {
+ String username = getProperty("user.name");
+ String jdbcUrl = getProperty("database.url", "jdbc:postgresql://localhost/" + username);
+ String user = getProperty("database.username", username);
+ String pass = getProperty("database.password", username);
+ return createDataSource(jdbcUrl, user, pass);
+ }
+ public static BoneCPDataSource createDataSource(String jdbcUrl, String username, String password) throws SQLException {
+ BoneCPDataSource ds = new BoneCPDataSource();
+ ds.setLogStatementsEnabled(true);
+ ds.setJdbcUrl(jdbcUrl);
+ ds.setUsername(username);
+ ds.setPassword(password);
+ ds.setConnectionTestStatement("/* ping*/SELECT 1");
+ ds.setDefaultAutoCommit(false);
+ ds.setIdleConnectionTestPeriodInSeconds(60);
+ ds.setIdleMaxAgeInSeconds(240);
+ ds.setMaxConnectionsPerPartition(4);
+ ds.setMinConnectionsPerPartition(1);
+ ds.setPartitionCount(4);
+ ds.setAcquireIncrement(1);
+ ds.setStatementsCacheSize(1000);
+ ds.setReleaseHelperThreads(1);
+ ds.setStatisticsEnabled(true);
+ ds.setLogStatementsEnabled(true);
+ ds.setLogWriter(new PrintWriter(System.err));
+ return ds;
+ }
+ public static DataSource springifyDataSource(DataSource ds) {
+ return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds));
+ }
+ public static int getPid(Connection c) throws SQLException {
+ int pid;
+ try (Statement statement = c.createStatement()) {
+ ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()");
+ rs.next();
+ pid = rs.getInt(1);
+ }
+ return pid;
+ }
diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java
new file mode 100755
index 0000000..f03d6fa
--- /dev/null
+++ b/src/test/java/io/trygvis/test/Main.java
@@ -0,0 +1,120 @@
+package io.trygvis.test;
+import io.trygvis.async.AsyncService;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.support.TransactionTemplate;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import static java.lang.System.*;
+import static java.lang.Thread.sleep;
+public class Main {
+ private static final Logger log = LoggerFactory.getLogger(Main.class);
+ public static void main(String[] args) throws Exception {
+ SLF4JBridgeHandler.install();
+ String username = getProperty("user.name");
+ setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
+ setProperty("database.username", username);
+ setProperty("database.password", username);
+ log.info("Starting context");
+ ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
+ log.info("Started context");
+ try {
+ context.getBean(Main.class).run();
+// log.info("Sleeping");
+// sleep(1000 * 1000);
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ log.info("Stopping context");
+ context.stop();
+ log.info("Stopped context");
+ exit(0);
+ }
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+ @Autowired
+ private AsyncService asyncService;
+ @Autowired
+ private QueueService queueService;
+ @Autowired
+ @Qualifier("createArticle")
+ private TaskEffect createArticleCallable;
+ @Autowired
+ @Qualifier("updateArticle")
+ private TaskEffect updateArticleCallable;
+ public void run() throws Exception {
+ log.info("Main.run");
+ final Queue q = null; // queueService.lookupQueue(c, "create-article", 1);
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+ asyncService.registerQueue(q, req, createArticleCallable);
+// log.info("queue registered: ref = {}", q);
+// asyncService.registerQueue("update-queue", 1, updateArticleCallable);
+// q = asyncService.lookupQueue("create-queue");
+ final List<Task> tasks = new ArrayList<>();
+ final int count = 1;
+ log.info("Creating {} tasks", count);
+// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+// protected void doInTransactionWithoutResult(TransactionStatus status) {
+// for (int i = 0; i < count; i++) {
+// tasks.add(asyncService.schedule(q));
+// }
+// }
+// });
+ log.info("Created {} tasks", count);
+ while (true) {
+ sleep(10000);
+ log.info("Checking for status of {} tasks", tasks.size());
+ for (Iterator<Task> iterator = tasks.iterator(); iterator.hasNext(); ) {
+ Task task = iterator.next();
+ task = asyncService.update(task);
+// log.info("task = {}", task);
+ if (task.isDone()) {
+ iterator.remove();
+ }
+ }
+ if (tasks.isEmpty()) {
+ log.info("No more tasks");
+ break;
+ }
+ }
+ }
diff --git a/src/test/java/io/trygvis/test/UpdateArticleCallable.java b/src/test/java/io/trygvis/test/UpdateArticleCallable.java
new file mode 100755
index 0000000..6aff20f
--- /dev/null
+++ b/src/test/java/io/trygvis/test/UpdateArticleCallable.java
@@ -0,0 +1,47 @@
+package io.trygvis.test;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+import static java.util.Collections.emptyList;
+public class UpdateArticleCallable implements TaskEffect {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final Random r = new Random();
+ @Override
+ public List<Task> apply(Task task) throws Exception {
+ List<String> arguments = task.arguments;
+ log.info("UpdateArticeJob.run: BEGIN");
+ Date now = new Date();
+ log.info("now = {}", now);
+ TypedQuery<Article> q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class));
+ List<Article> list = q.getResultList();
+ log.info("Got {} articles", list.size());
+ Article a = list.get(r.nextInt(list.size()));
+ a.setUpdated(new Date());
+ entityManager.persist(a);
+ log.info("UpdateArticeJob.run: END");
+ return emptyList();
+ }
diff --git a/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java
new file mode 100644
index 0000000..bd86732
--- /dev/null
+++ b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java
@@ -0,0 +1,37 @@
+package io.trygvis.test.activemq;
+import io.trygvis.activemq.ActiveMqHinter;
+import io.trygvis.async.QueueController;
+import io.trygvis.queue.QueueService;
+import io.trygvis.test.jdbc.AsyncConsumerExample;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import javax.jms.JMSException;
+public class AsyncConsumerWithActiveMq extends AsyncConsumerExample implements AutoCloseable {
+ private final ActiveMqHinter activeMqHinter;
+ public AsyncConsumerWithActiveMq(ActiveMQConnectionFactory cf) throws JMSException {
+ activeMqHinter = new ActiveMqHinter(cf);
+ }
+ public static void main(String[] args) throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:tcp://localhost:61616");
+ int poolSize = 4;
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1, true).
+ interval(60 * 6000).
+ continueOnFullChunk(false);
+ try (AsyncConsumerWithActiveMq consumer = new AsyncConsumerWithActiveMq(cf)) {
+ consumer.work(poolSize, req);
+ }
+ }
+ public void close() throws JMSException {
+ activeMqHinter.close();
+ }
+ protected void wrapInputQueue(QueueController input) throws Exception {
+ activeMqHinter.createHinter(input);
+ }
diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
new file mode 100644
index 0000000..16640dd
--- /dev/null
+++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
@@ -0,0 +1,107 @@
+package io.trygvis.test.jdbc;
+import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.QueueController;
+import io.trygvis.async.QueueStats;
+import io.trygvis.queue.JdbcQueueService;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.SqlEffect;
+import io.trygvis.queue.SqlEffectExecutor;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import static io.trygvis.queue.Task.newTask;
+import static io.trygvis.test.DbUtil.createDataSource;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.singletonList;
+public class AsyncConsumerExample {
+ private static String inputName = "my-input";
+ private static String outputName = "my-output";
+ private static int interval = 1000;
+ private static final TaskEffect adder = new TaskEffect() {
+ public List<Task> apply(Task task) throws Exception {
+ Long a = Long.valueOf(task.arguments.get(0));
+ Long b = Long.valueOf(task.arguments.get(1));
+ return singletonList(newTask(outputName, new Date(), Long.toString(a + b)));
+ }
+ };
+ public static void main(String[] args) throws Exception {
+ int poolSize = 4;
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+ new AsyncConsumerExample().work(poolSize, req);
+ }
+ public void work(int poolSize, QueueService.TaskExecutionRequest req) throws Exception {
+ System.out.println("Starting consumer");
+ DataSource ds = createDataSource();
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ JdbcAsyncService asyncService = queueSystem.createAsyncService();
+ final JdbcQueueService queueService = queueSystem.createQueueService();
+ QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() {
+ @Override
+ public QueueExecutor[] doInConnection(Connection c) throws SQLException {
+ return new QueueExecutor[]{
+ queueService.lookupQueue(c, inputName, interval, true),
+ queueService.lookupQueue(c, outputName, interval, true)
+ };
+ }
+ });
+ final QueueExecutor input = queues[0];
+ final QueueExecutor output = queues[1];
+ QueueController controller = asyncService.registerQueue(input, req, adder);
+ wrapInputQueue(controller);
+ Timer timer = new Timer();
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ System.out.println(input.getStats());
+ System.out.println(output.getStats());
+ }
+ }, 1000, 1000);
+ long start = currentTimeMillis();
+ controller.start(new ScheduledThreadPoolExecutor(poolSize));
+ Thread.sleep(60 * 1000);
+ controller.stop();
+ long end = currentTimeMillis();
+ timer.cancel();
+ QueueStats stats = input.getStats();
+ System.out.println("Summary:");
+ System.out.println(stats.toString());
+ System.out.println(output.getStats().toString());
+ long duration = end - start;
+ double rate = 1000 * ((double) stats.totalMessageCount) / duration;
+ System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s");
+ }
+ protected void wrapInputQueue(QueueController input) throws Exception {
+ }
diff --git a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
new file mode 100644
index 0000000..0b7ba50
--- /dev/null
+++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
@@ -0,0 +1,126 @@
+package io.trygvis.test.jdbc;
+import io.trygvis.queue.SqlEffect;
+import io.trygvis.queue.SqlEffectExecutor;
+import io.trygvis.queue.JdbcQueueService;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueStats;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskDao;
+import io.trygvis.queue.TaskEffect;
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import static io.trygvis.queue.Task.TaskState;
+import static io.trygvis.test.DbUtil.createDataSource;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+public class PlainJavaExample {
+ private static final Random r = new Random();
+ private static String inputName = "my-input";
+ private static String outputName = "my-output";
+ private static int interval = 10;
+ public static class Consumer {
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting consumer");
+ DataSource ds = createDataSource();
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+ final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ final JdbcQueueService queueService = queueSystem.createQueueService();
+ QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() {
+ @Override
+ public QueueExecutor[] doInConnection(Connection c) throws SQLException {
+ QueueExecutor[] queues = {
+ queueService.lookupQueue(c, inputName, interval, true),
+ queueService.lookupQueue(c, outputName, interval, true)};
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+ QueueStats stats = taskDao.findQueueStatsByName(inputName);
+ System.out.println("Queue stats for " + stats.name + ". Total number of tasks: " + stats.totalTaskCount);
+ for (Map.Entry<TaskState, Long> entry : stats.states.entrySet()) {
+ System.out.println(entry.getKey() + " = " + entry.getValue());
+ }
+ return queues;
+ }
+ });
+ final QueueExecutor input = queues[0];
+ final QueueExecutor output = queues[1];
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false);
+ input.consumeAll(req, new TaskEffect() {
+ public List<Task> apply(Task task) throws Exception {
+ Long a = Long.valueOf(task.arguments.get(0));
+ Long b = Long.valueOf(task.arguments.get(1));
+ System.out.println("a + b = " + a + " + " + b + " = " + (a + b));
+ if (r.nextInt(3000) > 0) {
+ return singletonList(task.childTask(output.queue.name, new Date(), Long.toString(a + b)));
+ }
+ throw new RuntimeException("Simulated exception while processing task.");
+ }
+ });
+ System.out.println("Done");
+ }
+ }
+ public static class Producer {
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting producer");
+ int chunks = 100;
+ final int chunk = 10000;
+ DataSource ds = createDataSource();
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ final JdbcQueueService queueService = queueSystem.createQueueService();
+ final QueueExecutor queue;
+ try (Connection c = ds.getConnection()) {
+ queue = queueService.lookupQueue(c, inputName, interval, true);
+ c.commit();
+ }
+ for (int i = 0; i < chunks; i++) {
+ long start = currentTimeMillis();
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ for (int j = 0; j < chunk; j++) {
+ queue.schedule(c, new Date(), asList("10", "20"));
+ }
+ }
+ });
+ long end = currentTimeMillis();
+ long time = end - start;
+ System.out.println("Scheduled " + chunk + " tasks in " + time + "ms, " + (((double) chunk * 1000)) / ((double) time) + " chunks per second");
+ }
+ }
+ }
diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
new file mode 100644
index 0000000..93372f6
--- /dev/null
+++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
@@ -0,0 +1,88 @@
+package io.trygvis.test.spring;
+import io.trygvis.async.AsyncService;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import io.trygvis.spring.DefaultConfig;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.lang.System.getProperty;
+import static java.lang.System.setProperty;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.fest.assertions.Assertions.assertThat;
+@ContextConfiguration(classes = {TestConfig.class, DefaultConfig.class})
+public class PlainSpringTest {
+ @Autowired
+ private AsyncService asyncService;
+ @Autowired
+ private QueueService queueService;
+ private final QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+ static {
+ String username = getProperty("user.name");
+ setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
+ setProperty("database.username", username);
+ setProperty("database.password", username);
+ }
+ @Test
+ public void testBasic() throws SQLException, InterruptedException {
+ QueueExecutor queueA = queueService.getQueue("a", 1000, true);
+// final AtomicReference<List<String>> refA = new AtomicReference<>();
+ asyncService.registerQueue(queueA.queue, req, new TaskEffect() {
+ @Override
+ public List<Task> apply(Task task) throws Exception {
+// refA.set(task.arguments);
+// synchronized (refA) {
+// refA.notify();
+// }
+ System.out.println("task.arguments = " + task.arguments);
+ return asList(task.childTask("b", new Date(), task.arguments.get(0), "world"));
+ }
+ });
+ QueueExecutor queueB = queueService.getQueue("b", 1000, true);
+ final AtomicReference<List<String>> refB = new AtomicReference<>();
+ asyncService.registerQueue(queueB.queue, req, new TaskEffect() {
+ @Override
+ public List<Task> apply(Task task) throws Exception {
+// System.out.println("task.arguments = " + task.arguments);
+ refB.set(task.arguments);
+ synchronized (refB) {
+ refB.notify();
+ }
+ return emptyList();
+ }
+ });
+ synchronized (refB) {
+ System.out.println("Scheduling task");
+ queueService.schedule(queueA.queue, new Date(), asList("hello"));
+ System.out.println("Task scheduled, waiting");
+ refB.wait(10000);
+ System.out.println("Back!");
+ }
+// System.out.println("refA.get() = " + refA.get());
+ System.out.println("refB.get() = " + refB.get());
+ assertThat(refB.get()).containsExactly("hello", "world");
+ }
diff --git a/src/test/java/io/trygvis/test/spring/TestConfig.java b/src/test/java/io/trygvis/test/spring/TestConfig.java
new file mode 100755
index 0000000..4df8ce1
--- /dev/null
+++ b/src/test/java/io/trygvis/test/spring/TestConfig.java
@@ -0,0 +1,57 @@
+package io.trygvis.test.spring;
+import com.jolbox.bonecp.BoneCPDataSource;
+import io.trygvis.test.DbUtil;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
+import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.annotation.EnableTransactionManagement;
+import org.springframework.transaction.support.TransactionTemplate;
+import javax.sql.DataSource;
+import java.sql.SQLException;
+@ComponentScan(basePackages = "io.trygvis")
+public class TestConfig {
+ @Bean
+ public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception {
+ return new PropertySourcesPlaceholderConfigurer() {{
+ setProperties(System.getProperties());
+ setLocalOverride(true);
+ }};
+ }
+ @Bean
+ public JdbcTemplate jdbcTemplate(DataSource dataSource) {
+ return new JdbcTemplate(dataSource);
+ }
+ @Bean
+ public DataSource dataSource(@Value("${database.url}") String jdbcUrl,
+ @Value("${database.username}") String username,
+ @Value("${database.password}") String password) throws SQLException {
+ BoneCPDataSource ds = DbUtil.createDataSource(jdbcUrl, username, password);
+ return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds));
+ }
+ @Bean
+ public PlatformTransactionManager transactionManager(DataSource dataSource) {
+ return new DataSourceTransactionManager(dataSource);
+ }
+ @Bean
+ public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) {
+ return new TransactionTemplate(platformTransactionManager);
+ }
diff --git a/src/test/resources/applicationContext.xml b/src/test/resources/applicationContext.xml
new file mode 100755
index 0000000..5f173b3
--- /dev/null
+++ b/src/test/resources/applicationContext.xml
@@ -0,0 +1,22 @@
+<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:aop="http://www.springframework.org/schema/aop"
+ xmlns:context="http://www.springframework.org/schema/context"
+ xmlns:jdbc="http://www.springframework.org/schema/jdbc"
+ xmlns:jee="http://www.springframework.org/schema/jee"
+ xmlns:tx="http://www.springframework.org/schema/tx"
+ xmlns:jpa="http://www.springframework.org/schema/data/jpa"
+ xsi:schemaLocation="
+ http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd
+ http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd
+ http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd
+ http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd
+ http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd
+ http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd
+ http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd
+ http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd">
+ <context:annotation-config/>
+ <bean class="io.trygvis.test.spring.TestConfig"/>
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
new file mode 100755
index 0000000..f964cd3
--- /dev/null
+++ b/src/test/resources/logback.xml
@@ -0,0 +1,29 @@
+<configuration debug="false">
+ <logger name="io.trygvis" level="WARN"/>
+ <logger name="org.springframework" level="INFO"/>
+ <!--
+ <logger name="org.springframework" level="DEBUG"/>
+ -->
+ <!--
+ <logger name="org.springframework.jdbc" level="INFO"/>
+ <logger name="org.springframework.jdbc.datasource.DataSourceTransactionManager" level="DEBUG"/>
+ <logger name="org.springframework.orm" level="INFO"/>
+ <logger name="org.springframework.transaction" level="DEBUG"/>
+ -->
+ <logger name="org.hibernate" level="INFO"/>
+ <logger name="org.hibernate.SQL" level="INFO"/>
+ <logger name="com.jolbox" level="INFO"/>
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%-15thread] %-5level %-60logger{60} - %msg%n</pattern>
+ </encoder>
+ </appender>
+ <root level="WARN">
+ <appender-ref ref="STDOUT"/>
+ </root>