diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-23 09:37:57 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-23 09:37:57 +0200 |
commit | 7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 (patch) | |
tree | c0bd7202845697207b04d518f613588df17d9e12 | |
download | jdbc-queue-7caa5b1f1e08f99cfe4465f091f47e2966d78aa7.tar.gz jdbc-queue-7caa5b1f1e08f99cfe4465f091f47e2966d78aa7.tar.bz2 jdbc-queue-7caa5b1f1e08f99cfe4465f091f47e2966d78aa7.tar.xz jdbc-queue-7caa5b1f1e08f99cfe4465f091f47e2966d78aa7.zip |
37 files changed, 2542 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..a52a3d3 --- /dev/null +++ b/.gitignore @@ -0,0 +1,35 @@ +.classpath +.project +.settings +target +.springBeans +cha.db.* +cha.h2.h2.db +cha.h2.lock.db +cha.h2.trace.db +updates.txt + +# maven release +pom.xml.backup +release.properties +pom.xml.branch +pom.xml.next +pom.xml.releaseBackup +pom.xml.tag + +*~ +*.iml +.idea/* +!.idea/dictionaries +!.idea/dictionaries/trygvis.xml +logs/ + +fiken-model/liquibase.properties + +pom.xml.releaseBackup +pom.xml.versionsBackup +release.properties + +rebel.xml +*-actual.xml +*.backup diff --git a/README.md b/README.md new file mode 100644 index 0000000..d13037d --- /dev/null +++ b/README.md @@ -0,0 +1,99 @@ +JDBC Queue (in lack of a better name) +===================================== + +JDBC queue is a library for writing transactional, messaging-oriented software. It does this by managing `tasks` in a +set of `queues` in a plain SQL database. It is currently only been tested with PostgreSQL but should be fairly portable. + +It consists of three major parts: + +* A core **queue** part which implements CRUD access to the queues, tasks and configuration. +* An **async** part works on top of a single queue. It controls a consumer thread and dispatches tasks to an normal + Java Executor. +* A **spring** layer that integrates connection and transaction handing with the standard Spring tools. + +The **queue** interface is indented to be used by: + +* Management code that want to get queue statistics or reconfigure the queues +* Cron jobs that want to consume everything that has been scheduled +* Applications that are run just to insert a small number of tasks + +The **async** layer provides the JMS like interface for each queue. It creates a consumer thread that polls the database +at a specified interval, marks the task for processing and passes it on to the executor. By using a multi-threaded +executor it can scale up quite easily. + +The **spring** layer makes sure that the parts plays along nicely with the existing JDBC/JPA/Hibernate code that you +already have. + +Features +======== + +Transactionality: each task is performed in an SQL transaction ensuring consistency between the task table and the +other tables used when processing the task. + +A task has: + +* state +* parent +* created_date +* last_updated +* completed_date + +Each task has an optional parent reference: this allows you to trace the messages around in your system to see what +effects each task had. + +"queue system" allowing multiple queue systems to be run in a single JVM + +Push: Intra-JVM notification of new elements on a queue for instant processing. + +Implementation +============== + + + +Performance +=========== + +Use this library if you want correctness and managebility over speed. + +Possible improvements +===================== + +**Batch processing of tasks in a single transaction**: let the consumer thread fetch a batch of N tasks, set all of +them to PROCESSING in a transaction and send the batch to a processor thread which will process all of them in one +transaction. + +This will significantly reduce the number of transactions required thus increasing speed. A possible issue is that if +one of the tasks fails it will abort the entire transaction. If this happens consistenly it can keep all of the tasks +from completion so some sort of mechanism to only pick tasks that haven't failed before might be useful. + +**Error handling strategies**: Currently there is no retrying or anything smart around tasks that fail. This definitely +needs to be improved. + +A generic class that re-schedules a task for execution and can be used as a TimerTask might be useful. + +Support locking rows instead of extra states: This might significantly improve performance and write pressure on the +db. + +**Configurable state machine**: Right now the possible states a task can be in is hard-coded. + +**Utilities to do routing**: this library does not intend to compete with normal JMS servers or specialized tools like +Apache Camel but it might still be useful to have some tools with the package: + +* A consumer that can be configured to replicate the task to a set of other queues creating a classic MQ topic. +* A consumer that can be configured to replicate the task from this database to another. As this will span two + transactions the operation has to be idempotent, but that should be doable. It might be useful to add some fields to + a task that points to the remote task. +* A conumer that take tasks that has failed too many times and move them to a dead letter queue. + +**Optional push notification between JVMs**: use a simple MQ with in-memory storage to provide push notification after +new tasks has been committed to the database. This will allow the system to behave like a RPC-like system, just with +proper transactional semantics. The normal database poller can be set to poll at a much lower interval to pick up +old messages whose notification was lost. + +**Schema dependent features**: JDBC queue does not depend on a very specific schema, it mainly requires two tables +with a certain set of columns. Features like the parent reference might not be useful for all applications so it might +be useful for a queue system to look in the task database to see if the column is there and fail if someone tries to +create a task with a parent reference that is not valid. + +This might also be implemented in a more simple fasion when creating the QueueSystem so the app doesn't have to +discover anything. @@ -0,0 +1,134 @@ +<project> + <modelVersion>4.0.0</modelVersion> + <groupId>io.trygvis.jdbc-queue</groupId> + <artifactId>jdbc-queue</artifactId> + <version>1.0-SNAPSHOT</version> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${version.slf4j}</version> + </dependency> + + <!-- All extra dependencies are provided --> + + <!-- Spring dependencies --> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-context</artifactId> + <version>${version.spring}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-tx</artifactId> + <version>${version.spring}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-jdbc</artifactId> + <version>${version.spring}</version> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-beans</artifactId> + <version>${version.spring}</version> + <scope>provided</scope> + </dependency> + + <!-- ActiveMQ Dependencies --> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + <version>5.8.0</version> + <scope>provided</scope> + </dependency> + + <!-- Test dependencies --> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-test</artifactId> + <version>${version.spring}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>4.11</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.easytesting</groupId> + <artifactId>fest-assert</artifactId> + <version>1.4</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.jolbox</groupId> + <artifactId>bonecp</artifactId> + <version>0.7.1.RELEASE</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + <version>14.0.1</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + <version>1.0.9</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jul-to-slf4j</artifactId> + <version>${version.slf4j}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>jcl-over-slf4j</artifactId> + <version>${version.slf4j}</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.postgresql</groupId> + <artifactId>postgresql</artifactId> + <version>9.2-1002-jdbc4</version> + <scope>test</scope> + </dependency> + </dependencies> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + <version>${version.slf4j}</version> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>log4j-over-slf4j</artifactId> + <version>${version.slf4j}</version> + </dependency> + </dependencies> + </dependencyManagement> + <properties> + <version.spring>3.2.2.RELEASE</version.spring> + <version.slf4j>1.7.2</version.slf4j> + </properties> + <build> + <plugins> + <plugin> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/src/main/java/io/trygvis/activemq/ActiveMqHinter.java b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java new file mode 100644 index 0000000..f2cfb6e --- /dev/null +++ b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java @@ -0,0 +1,152 @@ +package io.trygvis.activemq; + +import io.trygvis.async.QueueController; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.TextMessage; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.sql.SQLException; + +import static java.lang.Long.parseLong; +import static java.lang.System.arraycopy; +import static java.nio.charset.Charset.forName; +import static javax.jms.Session.AUTO_ACKNOWLEDGE; + +public class ActiveMqHinter implements AutoCloseable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueConnection c; + + private static final Charset utf8 = forName("utf-8"); + + public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException { + log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL()); + c = connectionFactory.createQueueConnection(); + c.start(); + log.info("Connected, clientId = {}", c.getClientID()); + } + + public void createHinter(final QueueController controller) throws JMSException { + QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(controller.queue.queue.name); + final MessageConsumer consumer = session.createConsumer(queue); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + if ((message instanceof TextMessage)) { + String body; + try { + TextMessage textMessage = (TextMessage) message; + body = textMessage.getText(); + } catch (JMSException e) { + log.warn("Exception while reading body.", e); + throw new RuntimeException("Exception while reading body.", e); + } + + consumeString(new StringReader(body), controller); + } else if (message instanceof BytesMessage) { + final BytesMessage bytesMessage = (BytesMessage) message; + consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller); + } else { + throw new RuntimeException("Unsupported message type: " + message.getClass()); + } + } + }); + + controller.addOnStopListener(new Runnable() { + @Override + public void run() { + try { + consumer.close(); + } catch (JMSException e) { + log.error("Error while closing JMS consumer", e); + } + } + }); + } + + private void consumeString(Reader reader, QueueController controller) { + try { + BufferedReader r = new BufferedReader(reader); + + String line = r.readLine(); + + while (line != null) { + for (String id : line.split(",")) { + controller.hint(parseLong(id.trim())); + } + line = r.readLine(); + } + } catch (IOException | SQLException e) { + log.warn("Could not consume body.", e); + throw new RuntimeException("Could not consume body.", e); + } catch (NumberFormatException e) { + log.warn("Could not consume body.", e); + throw e; + } + } + + public void close() throws JMSException { + c.close(); + } + + private static class ByteMessageInputStream extends InputStream { + private final BytesMessage bytesMessage; + + public ByteMessageInputStream(BytesMessage bytesMessage) { + this.bytesMessage = bytesMessage; + } + + @Override + public int read(byte[] b) throws IOException { + try { + return bytesMessage.readBytes(b); + } catch (JMSException e) { + throw new IOException(e); + } + } + + @Override + public int read(byte[] out, int off, int len) throws IOException { + byte[] b = new byte[len]; + try { + int read = bytesMessage.readBytes(b); + if (read == -1) { + return -1; + } + arraycopy(b, 0, out, off, read); + return read; + } catch (JMSException e) { + throw new IOException(e); + } + } + + @Override + public int read() throws IOException { + try { + return bytesMessage.readByte(); + } catch (javax.jms.MessageEOFException e) { + return -1; + } catch (JMSException e) { + throw new IOException(e); + } + } + } +} diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java new file mode 100755 index 0000000..9332596 --- /dev/null +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -0,0 +1,30 @@ +package io.trygvis.async; + +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +/** + * A simple framework for running tasks. + */ +public interface AsyncService { + + QueueController registerQueue(Queue queue, QueueService.TaskExecutionRequest req, TaskEffect processor) throws SQLException; + + QueueExecutor getQueue(String name); + + Task schedule(Queue queue, Date scheduled, List<String> args); + + Task schedule(Queue queue, long parent, Date scheduled, List<String> args); + + /** + * Polls for a new state of the execution. + */ + Task update(Task ref); +} diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java new file mode 100644 index 0000000..46f1f30 --- /dev/null +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -0,0 +1,75 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; + +public class JdbcAsyncService { + private final Map<String, QueueController> queues = new HashMap<>(); + + private final QueueSystem queueSystem; + + public JdbcAsyncService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + } + + public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { + if (queues.containsKey(queue.queue.name)) { + throw new IllegalArgumentException("Queue already exist."); + } + + QueueController queueController = new QueueController(queueSystem, req, processor, queue); + + queues.put(queue.queue.name, queueController); + + return queueController; + } + + public QueueExecutor getQueue(String name) { + return getQueueThread(name).queue; + } + + public Task await(Connection c, Task task, long timeout) throws SQLException { + final long start = currentTimeMillis(); + final long end = start + timeout; + + while (currentTimeMillis() < end) { + task = update(c, task); + + if (task == null) { + throw new RuntimeException("The task went away."); + } + + try { + sleep(100); + } catch (InterruptedException e) { + // break + } + } + + return task; + } + + public Task update(Connection c, Task ref) throws SQLException { + return queueSystem.createTaskDao(c).findById(ref.id()); + } + + private synchronized QueueController getQueueThread(String name) { + QueueController queueController = queues.get(name); + + if (queueController == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + return queueController; + } +} diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java new file mode 100644 index 0000000..a343d42 --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -0,0 +1,201 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueController { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + private final TaskEffect taskEffect; + + private final TaskExecutionRequest req; + + public final QueueExecutor queue; + + private boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean running; + + private Thread thread; + + private ScheduledThreadPoolExecutor executor; + + private List<Runnable> stopListeners = new ArrayList<>(); + + public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { + this.queueSystem = queueSystem; + this.req = req; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; + this.taskEffect = taskEffect; + this.queue = queue; + } + + public void scheduleAll(final List<Task> tasks) throws InterruptedException { + ExecutorCompletionService<TaskExecutionResult> service = new ExecutorCompletionService<>(executor); + + for (final Task task : tasks) { + service.submit(new Callable<TaskExecutionResult>() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(taskEffect, task); + } + }); + } + } + + public void invokeAll(final List<Task> tasks) throws InterruptedException { + List<Callable<TaskExecutionResult>> callables = new ArrayList<>(tasks.size()); + for (final Task task : tasks) { + callables.add(new Callable<TaskExecutionResult>() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(taskEffect, task); + } + }); + } + + executor.invokeAll(callables); + } + + public void hint(final long id) throws SQLException { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + Task task = queueSystem.createTaskDao(c).findById(id); + + try { + scheduleAll(Collections.singletonList(task)); + } catch (InterruptedException e) { + throw new SQLException(e); + } + } + }); + } + + public synchronized void addOnStopListener(Runnable runnable) { + stopListeners.add(runnable); + } + + private class QueueThread implements Runnable { + public void run() { + while (shouldRun) { + List<Task> tasks = null; + + try { + tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { + public List<Task> doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); + + if (tasks.size() > 0) { + invokeAll(tasks); + } + } catch (Throwable e) { + if (shouldRun) { + log.warn("Error while executing tasks.", e); + } + } + + // If we found exactly the same number of tasks that we asked for, there is most likely more to go. + if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) { + log.info("Got a full chunk, continuing directly."); + continue; + } + + synchronized (this) { + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + } else { + try { + wait(req.interval(queue.queue)); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + log.info("Thread for queue {} has stopped.", queue.queue.name); + running = false; + synchronized (this) { + this.notifyAll(); + } + } + } + + public synchronized void start(ScheduledThreadPoolExecutor executor) { + if (running) { + throw new IllegalStateException("Already running"); + } + + log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval); + + running = true; + this.executor = executor; + + thread = new Thread(new QueueThread(), "queue: " + queue.queue.name); + thread.setDaemon(true); + thread.start(); + } + + public synchronized void stop() { + if (!running) { + return; + } + + log.info("Stopping thread for queue {}", queue.queue.name); + + for (Runnable runnable : stopListeners) { + try { + runnable.run(); + } catch (Throwable e) { + log.error("Error while running stop listener " + runnable, e); + } + } + + shouldRun = false; + + // TODO: empty out the currently executing tasks. + + thread.interrupt(); + while (running) { + try { + wait(1000); + } catch (InterruptedException e) { + // continue + } + thread.interrupt(); + } + thread = null; + executor.shutdownNow(); + } +} diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java new file mode 100644 index 0000000..8edc720 --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueStats.java @@ -0,0 +1,28 @@ +package io.trygvis.async; + +public class QueueStats { + public final int totalMessageCount; + public final int okMessageCount; + public final int failedMessageCount; + public final int missedMessageCount; + public final int scheduledMessageCount; + + public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) { + this.totalMessageCount = totalMessageCount; + this.okMessageCount = okMessageCount; + this.failedMessageCount = failedMessageCount; + this.missedMessageCount = missedMessageCount; + this.scheduledMessageCount = scheduledMessageCount; + } + + @Override + public String toString() { + return "QueueStats{" + + "totalMessageCount=" + totalMessageCount + + ", okMessageCount=" + okMessageCount + + ", failedMessageCount=" + failedMessageCount + + ", missedMessageCount=" + missedMessageCount + + ", scheduledMessageCount=" + scheduledMessageCount + + '}'; + } +} diff --git a/src/main/java/io/trygvis/async/TaskFailureException.java b/src/main/java/io/trygvis/async/TaskFailureException.java new file mode 100644 index 0000000..7278e17 --- /dev/null +++ b/src/main/java/io/trygvis/async/TaskFailureException.java @@ -0,0 +1,7 @@ +package io.trygvis.async; + +class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } +} diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java new file mode 100644 index 0000000..ef0b5bb --- /dev/null +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -0,0 +1,55 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +public class JdbcQueueService { + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + private final Map<String, QueueExecutor> queues = new HashMap<>(); + + JdbcQueueService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; + } + + public synchronized QueueExecutor getQueue(String name) { + QueueExecutor queueExecutor = queues.get(name); + + if (queueExecutor != null) { + return queueExecutor; + } + + throw new IllegalArgumentException("No such queue: " + name); + } + + public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException { + QueueExecutor queueExecutor = queues.get(name); + + if (queueExecutor != null) { + return queueExecutor; + } + + QueueDao queueDao = queueSystem.createQueueDao(c); + + Queue q = queueDao.findByName(name); + + if (q == null) { + if (!autoCreate) { + throw new SQLException("No such queue: '" + name + "'."); + } + + q = new Queue(name, interval); + queueDao.insert(q); + } + + queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q); + queues.put(name, queueExecutor); + return queueExecutor; + } +} diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java new file mode 100755 index 0000000..15003f7 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Queue.java @@ -0,0 +1,24 @@ +package io.trygvis.queue; + +public class Queue { + + public final String name; + + public final long interval; + + public Queue(String name, long interval) { + this.name = name; + this.interval = interval; + } + + public Queue withInterval(int interval) { + return new Queue(name, interval); + } + + public String toString() { + return "Queue{" + + "name='" + name + '\'' + + ", interval=" + interval + + '}'; + } +} diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java new file mode 100644 index 0000000..2f69e11 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueDao.java @@ -0,0 +1,45 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +public class QueueDao { + + private final Connection connection; + + public QueueDao(Connection connection) { + this.connection = connection; + } + + public Queue findByName(String name) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("SELECT name, interval FROM queue WHERE name=?")) { + stmt.setString(1, name); + ResultSet rs = stmt.executeQuery(); + return rs.next() ? mapRow(rs) : null; + } + } + + public void insert(Queue q) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("INSERT INTO queue(name, interval) VALUES(?, ?)")) { + int i = 1; + stmt.setString(i++, q.name); + stmt.setLong(i, q.interval); + stmt.executeUpdate(); + } + } + + public void update(Queue q) throws SQLException { + try (PreparedStatement stmt = connection.prepareStatement("UPDATE queue SET interval=? WHERE name=?")) { + int i = 1; + stmt.setLong(i++, q.interval); + stmt.setString(i, q.name); + stmt.executeUpdate(); + } + } + + public Queue mapRow(ResultSet rs) throws SQLException { + return new Queue(rs.getString(1), rs.getLong(2)); + } +} diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java new file mode 100644 index 0000000..88e5b46 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -0,0 +1,177 @@ +package io.trygvis.queue; + +import io.trygvis.async.QueueStats; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueExecutor { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + public final Queue queue; + + private final Stats stats = new Stats(); + + public enum TaskExecutionResult { + OK, + FAILED, + MISSED + } + + public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) { + this.queueSystem = queueSystem; + this.sqlEffectExecutor = sqlEffectExecutor; + this.queue = queue; + } + + private static class Stats { + public int total; + public int ok; + public int failed; + public int scheduled; + public int missed; + + public synchronized QueueStats toStats() { + return new QueueStats(total, ok, failed, missed, scheduled); + } + } + + public QueueStats getStats() { + return stats.toStats(); + } + + public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + log.info("Consuming tasks: request={}", req); + + List<Task> tasks; + do { + tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { + @Override + public List<Task> doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); + } + }); + + log.info("Consuming chunk with {} tasks", tasks.size()); + + applyTasks(req, effect, tasks); + } while (tasks.size() > 0); + } + + public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) { + for (Task task : tasks) { + TaskExecutionResult result = applyTask(effect, task); + + if (result == FAILED && req.stopOnError) { + throw new RuntimeException("Error while executing task, id=" + task.id()); + } + } + } + + /** + * Executed each task in its own transaction. + * <p/> + * If the task fails, the status is set to error in a separate transaction. + */ + public TaskExecutionResult applyTask(TaskEffect effect, final Task task) { + try { + Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() { + @Override + public Integer doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW); + } + }); + + if (count == 0) { + log.warn("Missed task {}", task.id()); + synchronized (stats) { + stats.missed++; + } + return MISSED; + } + + log.info("Executing task {}", task.id()); + + final List<Task> newTasks = effect.apply(task); + + final Date now = new Date(); + + log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size()); + + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (Task newTask : newTasks) { + schedule(c, newTask); + } + + queueSystem.createTaskDao(c).update(task.markOk(now)); + } + }); + + synchronized (stats) { + stats.total++; + stats.ok++; + } + + return OK; + } catch (Exception e) { + final Date now = new Date(); + log.error("Unable to execute task, id=" + task.id(), e); + + synchronized (stats) { + stats.total++; + stats.failed++; + } + + try { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + taskDao.update(task.markFailed(now)); + } + }); + } catch (SQLException e1) { + log.error("Error while marking task as failed.", e1); + } + + return FAILED; + } + } + + public void schedule(Connection c, Task task) throws SQLException { + schedule(c, task.queue, task.parent, task.scheduled, task.arguments); + } + + public Task schedule(Connection c, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, null, scheduled, arguments); + } + + public Task schedule(Connection c, long parent, Date scheduled, List<String> arguments) throws SQLException { + return schedule(c, queue.name, parent, scheduled, arguments); + } + + private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException { + TaskDao taskDao = queueSystem.createTaskDao(c); + + long id = taskDao.insert(parent, queue, NEW, scheduled, arguments); + + synchronized (stats) { + stats.scheduled++; + } + + return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments); + } +} diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java new file mode 100644 index 0000000..1c38f1f --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -0,0 +1,54 @@ +package io.trygvis.queue; + +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public interface QueueService { + QueueExecutor getQueue(String name, int interval, boolean autoCreate) throws SQLException; + + void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException; + + public static class TaskExecutionRequest { + public final long chunkSize; + public final boolean stopOnError; + public final Long interval; + public final boolean continueOnFullChunk; + // TODO: saveExceptions + + public TaskExecutionRequest(long chunkSize, boolean stopOnError) { + this(chunkSize, stopOnError, null, true); + } + + private TaskExecutionRequest(long chunkSize, boolean stopOnError, Long interval, boolean continueOnFullChunk) { + this.chunkSize = chunkSize; + this.stopOnError = stopOnError; + this.interval = interval; + this.continueOnFullChunk = continueOnFullChunk; + } + + public TaskExecutionRequest interval(long interval) { + return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); + } + + public TaskExecutionRequest continueOnFullChunk(boolean continueOnFullChunk) { + return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); + } + + @Override + public String toString() { + return "TaskExecutionRequest{" + + "chunkSize=" + chunkSize + + ", stopOnError=" + stopOnError + + '}'; + } + + public long interval(Queue queue) { + if (interval != null) { + return interval; + } + + return queue.interval; + } + } +} diff --git a/src/main/java/io/trygvis/queue/QueueStats.java b/src/main/java/io/trygvis/queue/QueueStats.java new file mode 100644 index 0000000..5b048b3 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueStats.java @@ -0,0 +1,20 @@ +package io.trygvis.queue; + +import java.util.EnumMap; + +public class QueueStats { + public final String name; + public final long totalTaskCount; + public final EnumMap<Task.TaskState, Long> states; + + public QueueStats(String name, EnumMap<Task.TaskState, Long> states) { + this.name = name; + this.states = states; + + long c = 0; + for (Long l : states.values()) { + c += l; + } + this.totalTaskCount = c; + } +} diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java new file mode 100644 index 0000000..5954526 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueSystem.java @@ -0,0 +1,61 @@ +package io.trygvis.queue; + +import io.trygvis.async.JdbcAsyncService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.SQLException; + +public class QueueSystem { + private final Logger log = LoggerFactory.getLogger(getClass()); + + public final SqlEffectExecutor sqlEffectExecutor; + + private final JdbcQueueService queueService; + + private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + if (c.getAutoCommit()) { + throw new SQLException("The connection cannot be in auto-commit mode."); + } + + DatabaseMetaData metaData = c.getMetaData(); + String productName = metaData.getDatabaseProductName(); + String productVersion = metaData.getDatabaseProductVersion(); + + log.info("productName = " + productName); + log.info("productVersion = " + productVersion); + } + }); + + this.sqlEffectExecutor = sqlEffectExecutor; + queueService = new JdbcQueueService(this); + } + + /** + * Initializes the queue system. Use this as the first thing do as it will validate the database. + */ + public static QueueSystem initialize(SqlEffectExecutor sqlEffectExecutor) throws SQLException { + return new QueueSystem(sqlEffectExecutor); + } + + public JdbcQueueService createQueueService() { + return queueService; + } + + public QueueDao createQueueDao(Connection c) { + return new QueueDao(c); + } + + public TaskDao createTaskDao(Connection c) { + return new TaskDao(c); + } + + public JdbcAsyncService createAsyncService() { + return new JdbcAsyncService(this); + } +} diff --git a/src/main/java/io/trygvis/queue/SqlEffect.java b/src/main/java/io/trygvis/queue/SqlEffect.java new file mode 100644 index 0000000..7864bcd --- /dev/null +++ b/src/main/java/io/trygvis/queue/SqlEffect.java @@ -0,0 +1,12 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.SQLException; + +public interface SqlEffect<A> { + A doInConnection(Connection c) throws SQLException; + + interface Void { + void doInConnection(Connection c) throws SQLException; + } +} diff --git a/src/main/java/io/trygvis/queue/SqlEffectExecutor.java b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java new file mode 100644 index 0000000..92802da --- /dev/null +++ b/src/main/java/io/trygvis/queue/SqlEffectExecutor.java @@ -0,0 +1,50 @@ +package io.trygvis.queue; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; + +public class SqlEffectExecutor { + + private final DataSource dataSource; + + public SqlEffectExecutor(DataSource dataSource) { + this.dataSource = dataSource; + } + + public <A> A transaction(SqlEffect<A> effect) throws SQLException { +// int pid; + + try (Connection c = dataSource.getConnection()) { +// pid = getPid(c); +// System.out.println("pid = " + pid); + + boolean ok = false; + try { + A a = effect.doInConnection(c); + c.commit(); + ok = true; + return a; + } finally { +// System.out.println("Closing, pid = " + pid); + if (!ok) { + try { + c.rollback(); + } catch (SQLException e) { + // ignore + } + } + } + } + } + + public void transaction(final SqlEffect.Void effect) throws SQLException { + transaction(new SqlEffect<Object>() { + @Override + public Object doInConnection(Connection c) throws SQLException { + effect.doInConnection(c); + return null; + } + }); + } +} diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java new file mode 100755 index 0000000..8038050 --- /dev/null +++ b/src/main/java/io/trygvis/queue/Task.java @@ -0,0 +1,117 @@ +package io.trygvis.queue; + +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.Task.TaskState.*; +import static java.util.Arrays.asList; + +public class Task { + + public enum TaskState { + NEW, + PROCESSING, + OK, + FAILED + } + + private final long id; + + public final Long parent; + + public final String queue; + + public final TaskState state; + + public final Date scheduled; + + public final Date lastRun; + + public final int runCount; + + public final Date completed; + + public final List<String> arguments; + + public Task(long id, Long parent, String queue, TaskState state, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) { + this.id = id; + this.parent = parent; + this.queue = queue; + this.state = state; + this.scheduled = scheduled; + this.lastRun = lastRun; + this.runCount = runCount; + this.completed = completed; + + this.arguments = arguments; + } + + public Task markProcessing() { + return new Task(id, parent, queue, PROCESSING, scheduled, new Date(), runCount + 1, completed, arguments); + } + + public Task markOk(Date completed) { + return new Task(id, parent, queue, OK, scheduled, lastRun, runCount, completed, arguments); + } + + public Task markFailed(Date now) { + return new Task(id, parent, queue, FAILED, scheduled, lastRun, runCount, completed, arguments); + } + + public String toString() { + return "Task{" + + "id=" + id + + ", parent=" + parent + + ", queue=" + queue + + ", state=" + state + + ", scheduled=" + scheduled + + ", lastRun=" + lastRun + + ", runCount=" + runCount + + ", completed=" + completed + + ", arguments='" + arguments + '\'' + + '}'; + } + + public long id() { + if (id == 0) { + throw new RuntimeException("This task has not been persisted yet."); + } + + return id; + } + + public boolean isDone() { + return completed != null; + } + + public Task childTask(String queue, Date scheduled, String... arguments) { + return new Task(0, id(), queue, NEW, scheduled, null, 0, null, asList(arguments)); + } + + public static Task newTask(String queue, Date scheduled, String... arguments) { + return new Task(0, null, queue, NEW, scheduled, null, 0, null, asList(arguments)); + } + + public static Task newTask(String queue, Date scheduled, List<String> arguments) { + return new Task(0, null, queue, NEW, scheduled, null, 0, null, arguments); + } + + public static List<String> stringToArguments(String arguments) { + return asList(arguments.split(",")); + } + + public static String argumentsToString(List<String> arguments) { + StringBuilder builder = new StringBuilder(); + for (int i = 0, argumentsLength = arguments.size(); i < argumentsLength; i++) { + String argument = arguments.get(i); + if (argument.contains(",")) { + throw new RuntimeException("The argument string can't contain a comma."); + } + if (i > 0) { + builder.append(','); + } + builder.append(argument); + } + return builder.toString(); + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java new file mode 100644 index 0000000..365b44b --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -0,0 +1,148 @@ +package io.trygvis.queue; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Timestamp; +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.EnumMap; +import java.util.List; + +import static io.trygvis.queue.Task.*; + +public class TaskDao { + + private final Connection c; + + public static final String fields = "id, parent, queue, state, scheduled, last_run, run_count, completed, arguments"; + + TaskDao(Connection c) { + this.c = c; + } + + public long insert(Long parent, String queue, TaskState state, Date scheduled, List<String> arguments) throws SQLException { + String sql = "INSERT INTO task(id, parent, run_count, queue, state, scheduled, arguments) " + + "VALUES(nextval('task_seq'), ?, 0, ?, ?, ?, ?)"; + try (PreparedStatement stmt = c.prepareStatement(sql)) { + int i = 1; + if (parent == null) { + stmt.setNull(i++, Types.BIGINT); + } else { + stmt.setLong(i++, parent); + } + stmt.setString(i++, queue); + stmt.setString(i++, state.name()); + stmt.setTimestamp(i++, new Timestamp(scheduled.getTime())); + stmt.setString(i, argumentsToString(arguments)); + stmt.executeUpdate(); + } + try (PreparedStatement stmt = c.prepareStatement("SELECT currval('task_seq')")) { + ResultSet rs = stmt.executeQuery(); + rs.next(); + return rs.getLong(1); + } + } + + public Task findById(long id) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE id=?")) { + stmt.setLong(1, id); + ResultSet rs = stmt.executeQuery(); + return rs.next() ? mapRow(rs) : null; + } + } + + public List<Task> findByQueueAndState(String queue, TaskState state, long limit) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) { + int i = 1; + stmt.setString(i++, queue); + stmt.setString(i++, state.name()); + stmt.setLong(i, limit); + ResultSet rs = stmt.executeQuery(); + List<Task> list = new ArrayList<>(); + while (rs.next()) { + list.add(mapRow(rs)); + } + return list; + } + } + + public QueueStats findQueueStatsByName(String queue) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) { + int i = 1; + stmt.setString(i, queue); + ResultSet rs = stmt.executeQuery(); + EnumMap<TaskState, Long> states = new EnumMap<>(TaskState.class); + while (rs.next()) { + states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2)); + } + return new QueueStats(queue, states); + } + } + + public int update(Task task) throws SQLException { + return update(task, null); + } + + public int update(Task task, TaskState state) throws SQLException { + String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?"; + + if (state != null) { + sql += " AND state=?"; + } + + try (PreparedStatement stmt = c.prepareStatement(sql)) { + int i = 1; + stmt.setString(i++, task.state.name()); + stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); + setTimestamp(stmt, i++, task.lastRun); + stmt.setInt(i++, task.runCount); + setTimestamp(stmt, i++, task.completed); + stmt.setLong(i++, task.id()); + if (state != null) { + stmt.setString(i, state.name()); + } + return stmt.executeUpdate(); + } + } + + public void setState(Task task, TaskState state) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=? WHERE id = ?")) { + int i = 1; + stmt.setString(i++, state.name()); + stmt.setLong(i, task.id()); + stmt.executeUpdate(); + } + } + + private static void setTimestamp(PreparedStatement stmt, int parameterIndex, Date date) throws SQLException { + if (date == null) { + stmt.setNull(parameterIndex, Types.TIMESTAMP); + } else { + stmt.setTimestamp(parameterIndex, new Timestamp(date.getTime())); + } + } + + public Task mapRow(ResultSet rs) throws SQLException { + String arguments = rs.getString(9); + int i = 1; + return new Task( + rs.getLong(i++), + rs.getLong(i++), + rs.getString(i++), + TaskState.valueOf(rs.getString(i++)), + rs.getTimestamp(i++), + rs.getTimestamp(i++), + rs.getInt(i++), + rs.getTimestamp(i), + arguments != null ? stringToArguments(arguments) : Collections.<String>emptyList()); + } + + public void rollback() throws SQLException { + c.rollback(); + c.close(); + } +} diff --git a/src/main/java/io/trygvis/queue/TaskEffect.java b/src/main/java/io/trygvis/queue/TaskEffect.java new file mode 100644 index 0000000..186797f --- /dev/null +++ b/src/main/java/io/trygvis/queue/TaskEffect.java @@ -0,0 +1,7 @@ +package io.trygvis.queue; + +import java.util.List; + +public interface TaskEffect { + List<Task> apply(Task task) throws Exception; +} diff --git a/src/main/java/io/trygvis/spring/DefaultConfig.java b/src/main/java/io/trygvis/spring/DefaultConfig.java new file mode 100644 index 0000000..6890d58 --- /dev/null +++ b/src/main/java/io/trygvis/spring/DefaultConfig.java @@ -0,0 +1,31 @@ +package io.trygvis.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.jdbc.core.JdbcTemplate; + +import javax.sql.DataSource; +import java.sql.SQLException; + +@Configuration +public class DefaultConfig { + + @Bean + public QueueSystem queueSystem(DataSource ds) throws SQLException { + return QueueSystem.initialize(new SqlEffectExecutor(ds)); + } + + @Bean + public AsyncService asyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { + return new SpringJdbcAsyncService(queueSystem, jdbcTemplate); + } + + @Bean + public QueueService queueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { + return new SpringQueueService(queueSystem, jdbcTemplate); + } +} diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java new file mode 100644 index 0000000..12dc71e --- /dev/null +++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java @@ -0,0 +1,110 @@ +package io.trygvis.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.QueueController; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationAdapter; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static org.springframework.transaction.annotation.Propagation.REQUIRED; +import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization; + +public class SpringJdbcAsyncService implements AsyncService { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final ScheduledThreadPoolExecutor executor; + + private final JdbcTemplate jdbcTemplate; + + private final JdbcAsyncService jdbcAsyncService; + + private final JdbcQueueService queueService; + + private final QueueSystem queueSystem; + + public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { + this.queueSystem = queueSystem; + this.jdbcTemplate = jdbcTemplate; + jdbcAsyncService = new JdbcAsyncService(queueSystem); + queueService = queueSystem.createQueueService(); + executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory()); + } + + @Transactional(propagation = REQUIRED) + public QueueController registerQueue(final Queue queue, final QueueService.TaskExecutionRequest req, final TaskEffect processor) throws SQLException { + QueueExecutor queueExecutor = queueSystem.sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor>() { + @Override + public QueueExecutor doInConnection(Connection c) throws SQLException { + return queueService.lookupQueue(c, queue.name, queue.interval, true); + } + }); + + final QueueController queueController = jdbcAsyncService.registerQueue(queueExecutor, req, processor); + + registerSynchronization(new TransactionSynchronizationAdapter() { + public void afterCompletion(int status) { + log.info("Transaction completed with status = {}", status); + if (status == TransactionSynchronization.STATUS_COMMITTED) { + queueController.start(executor); + } + } + }); + + return queueController; + } + + public QueueExecutor getQueue(String name) { + return jdbcAsyncService.getQueue(name); + } + + @Transactional(propagation = REQUIRED) + public Task schedule(final Queue queue, final Date scheduled, final List<String> args) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + QueueExecutor queueExecutor = queueService.getQueue(queue.name); + return queueExecutor.schedule(c, scheduled, args); + } + }); + } + + public Task schedule(final Queue queue, final long parent, final Date scheduled, final List<String> args) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + QueueExecutor queueExecutor = queueService.getQueue(queue.name); + return queueExecutor.schedule(c, parent, scheduled, args); + } + }); + } + + @Transactional(readOnly = true) + public Task update(final Task ref) { + return jdbcTemplate.execute(new ConnectionCallback<Task>() { + @Override + public Task doInConnection(Connection c) throws SQLException { + return jdbcAsyncService.update(c, ref); + } + }); + } +} diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java new file mode 100644 index 0000000..2027ab5 --- /dev/null +++ b/src/main/java/io/trygvis/spring/SpringQueueService.java @@ -0,0 +1,49 @@ +package io.trygvis.spring; + +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import org.springframework.dao.DataAccessException; +import org.springframework.jdbc.core.ConnectionCallback; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.transaction.annotation.Transactional; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +public class SpringQueueService implements QueueService { + + public final JdbcTemplate jdbcTemplate; + + public JdbcQueueService queueService; + + public SpringQueueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) { + this.jdbcTemplate = jdbcTemplate; + this.queueService = queueSystem.createQueueService(); + } + + @Transactional + public QueueExecutor getQueue(final String name, final int interval, final boolean autoCreate) throws SQLException { + return jdbcTemplate.execute(new ConnectionCallback<QueueExecutor>() { + @Override + public QueueExecutor doInConnection(Connection c) throws SQLException, DataAccessException { + return queueService.lookupQueue(c, name, interval, autoCreate); + } + }); + } + + @Transactional + public void schedule(final Queue queue, final Date scheduled, final List<String> arguments) throws SQLException { + jdbcTemplate.execute(new ConnectionCallback<Object>() { + @Override + public Object doInConnection(Connection c) throws SQLException, DataAccessException { + queueService.getQueue(queue.name).schedule(c, scheduled, arguments); + return null; + } + }); + } +} diff --git a/src/main/resources/create-postgresql.sql b/src/main/resources/create-postgresql.sql new file mode 100644 index 0000000..a0739c2 --- /dev/null +++ b/src/main/resources/create-postgresql.sql @@ -0,0 +1,32 @@ +BEGIN; + +DROP TABLE IF EXISTS task; +DROP TABLE IF EXISTS queue; +DROP SEQUENCE IF EXISTS task_seq; + +CREATE TABLE queue ( + name VARCHAR(100) NOT NULL, + interval INTEGER NOT NULL, + CONSTRAINT pk_queue PRIMARY KEY (name) +); + +CREATE TABLE task ( + id BIGINT NOT NULL, + parent BIGINT, + queue VARCHAR(100) NOT NULL, + state VARCHAR(10) NOT NULL, + scheduled TIMESTAMP NOT NULL, + last_run TIMESTAMP, + run_count INT NOT NULL, + completed TIMESTAMP, + arguments VARCHAR(100), + CONSTRAINT pk_task PRIMARY KEY (id), + CONSTRAINT fk_task__queue FOREIGN KEY (queue) REFERENCES queue (name), + CONSTRAINT fk_task__parent FOREIGN KEY (parent) REFERENCES task (id) +); + +CREATE INDEX ix_task__queue__state ON task (queue, state); + +CREATE SEQUENCE task_seq; + +COMMIT; diff --git a/src/test/java/io/trygvis/test/Article.java b/src/test/java/io/trygvis/test/Article.java new file mode 100755 index 0000000..bf52e41 --- /dev/null +++ b/src/test/java/io/trygvis/test/Article.java @@ -0,0 +1,46 @@ +package io.trygvis.test; + +import java.util.Date; + +public class Article { + private Integer id; + private Date created; + private Date updated; + private String title; + private String body; + + @SuppressWarnings("UnusedDeclaration") + private Article() { + } + + public Article(Date created, Date updated, String title, String body) { + this.created = created; + this.updated = updated; + this.title = title; + this.body = body; + } + + public Integer getId() { + return id; + } + + public Date getCreated() { + return created; + } + + public Date getUpdated() { + return updated; + } + + public void setUpdated(Date updated) { + this.updated = updated; + } + + public String getTitle() { + return title; + } + + public String getBody() { + return body; + } +} diff --git a/src/test/java/io/trygvis/test/CreateArticleCallable.java b/src/test/java/io/trygvis/test/CreateArticleCallable.java new file mode 100755 index 0000000..396fc89 --- /dev/null +++ b/src/test/java/io/trygvis/test/CreateArticleCallable.java @@ -0,0 +1,46 @@ +package io.trygvis.test; + +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import org.springframework.transaction.annotation.Transactional; + +import java.util.Collections; +import java.util.Date; +import java.util.List; +import java.util.Random; + +import static java.util.Collections.emptyList; +import static org.springframework.transaction.annotation.Propagation.MANDATORY; + +@Component("createArticle") +@Transactional(propagation = MANDATORY) +public class CreateArticleCallable implements TaskEffect { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private Random random = new Random(); + + @Override + public List<Task> apply(Task task) throws Exception { + List<String> arguments = task.arguments; + + log.info("CreateArticeJob.run: BEGIN"); + + if (random.nextInt() % 3 == 0) { + throw new RuntimeException("failing create article"); + } + + Date now = new Date(); + + log.info("now = {}", now); + + Article article = new Article(new Date(), null, "title", "body"); +// entityManager.persist(article); + + log.info("CreateArticeJob.run: END"); + + return emptyList(); + } +} diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java new file mode 100644 index 0000000..33c2807 --- /dev/null +++ b/src/test/java/io/trygvis/test/DbUtil.java @@ -0,0 +1,64 @@ +package io.trygvis.test; + +import com.jolbox.bonecp.BoneCPDataSource; +import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; +import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; + +import javax.sql.DataSource; +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; + +import static java.lang.System.getProperty; + +public class DbUtil { + public static DataSource createDataSource() throws SQLException { + String username = getProperty("user.name"); + String jdbcUrl = getProperty("database.url", "jdbc:postgresql://localhost/" + username); + String user = getProperty("database.username", username); + String pass = getProperty("database.password", username); + + return createDataSource(jdbcUrl, user, pass); + } + + public static BoneCPDataSource createDataSource(String jdbcUrl, String username, String password) throws SQLException { + BoneCPDataSource ds = new BoneCPDataSource(); + + ds.setLogStatementsEnabled(true); + + ds.setJdbcUrl(jdbcUrl); + ds.setUsername(username); + ds.setPassword(password); + + ds.setConnectionTestStatement("/* ping*/SELECT 1"); + ds.setDefaultAutoCommit(false); + ds.setIdleConnectionTestPeriodInSeconds(60); + ds.setIdleMaxAgeInSeconds(240); + ds.setMaxConnectionsPerPartition(4); + ds.setMinConnectionsPerPartition(1); + ds.setPartitionCount(4); + ds.setAcquireIncrement(1); + ds.setStatementsCacheSize(1000); + ds.setReleaseHelperThreads(1); + ds.setStatisticsEnabled(true); + ds.setLogStatementsEnabled(true); + ds.setLogWriter(new PrintWriter(System.err)); + return ds; + } + + public static DataSource springifyDataSource(DataSource ds) { + return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); + } + + public static int getPid(Connection c) throws SQLException { + int pid; + try (Statement statement = c.createStatement()) { + ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()"); + rs.next(); + pid = rs.getInt(1); + } + return pid; + } +} diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java new file mode 100755 index 0000000..f03d6fa --- /dev/null +++ b/src/test/java/io/trygvis/test/Main.java @@ -0,0 +1,120 @@ +package io.trygvis.test; + +import io.trygvis.async.AsyncService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.slf4j.bridge.SLF4JBridgeHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.support.ClassPathXmlApplicationContext; +import org.springframework.stereotype.Component; +import org.springframework.transaction.support.TransactionTemplate; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import static java.lang.System.*; +import static java.lang.Thread.sleep; + +@Component +public class Main { + private static final Logger log = LoggerFactory.getLogger(Main.class); + + public static void main(String[] args) throws Exception { + SLF4JBridgeHandler.install(); + + String username = getProperty("user.name"); + setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); + setProperty("database.username", username); + setProperty("database.password", username); + + log.info("Starting context"); + ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml"); + log.info("Started context"); + + try { + context.getBean(Main.class).run(); +// log.info("Sleeping"); +// sleep(1000 * 1000); + } catch (Exception e) { + e.printStackTrace(System.out); + } + + log.info("Stopping context"); + context.stop(); + log.info("Stopped context"); + + exit(0); + } + + @Autowired + private TransactionTemplate transactionTemplate; + + @Autowired + private AsyncService asyncService; + + @Autowired + private QueueService queueService; + + @Autowired + @Qualifier("createArticle") + private TaskEffect createArticleCallable; + + @Autowired + @Qualifier("updateArticle") + private TaskEffect updateArticleCallable; + + public void run() throws Exception { + log.info("Main.run"); + + final Queue q = null; // queueService.lookupQueue(c, "create-article", 1); + + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); + + asyncService.registerQueue(q, req, createArticleCallable); +// log.info("queue registered: ref = {}", q); +// asyncService.registerQueue("update-queue", 1, updateArticleCallable); + +// q = asyncService.lookupQueue("create-queue"); + + final List<Task> tasks = new ArrayList<>(); + + final int count = 1; + log.info("Creating {} tasks", count); +// transactionTemplate.execute(new TransactionCallbackWithoutResult() { +// protected void doInTransactionWithoutResult(TransactionStatus status) { +// for (int i = 0; i < count; i++) { +// tasks.add(asyncService.schedule(q)); +// } +// } +// }); + log.info("Created {} tasks", count); + + while (true) { + sleep(10000); + + log.info("Checking for status of {} tasks", tasks.size()); + for (Iterator<Task> iterator = tasks.iterator(); iterator.hasNext(); ) { + Task task = iterator.next(); + + task = asyncService.update(task); + +// log.info("task = {}", task); + + if (task.isDone()) { + iterator.remove(); + } + } + + if (tasks.isEmpty()) { + log.info("No more tasks"); + break; + } + } + } +} diff --git a/src/test/java/io/trygvis/test/UpdateArticleCallable.java b/src/test/java/io/trygvis/test/UpdateArticleCallable.java new file mode 100755 index 0000000..6aff20f --- /dev/null +++ b/src/test/java/io/trygvis/test/UpdateArticleCallable.java @@ -0,0 +1,47 @@ +package io.trygvis.test; + +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; + +import java.util.Date; +import java.util.List; +import java.util.Random; + +import static java.util.Collections.emptyList; + +@Component("updateArticle") +public class UpdateArticleCallable implements TaskEffect { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final Random r = new Random(); + + @Override + public List<Task> apply(Task task) throws Exception { + List<String> arguments = task.arguments; + + log.info("UpdateArticeJob.run: BEGIN"); + + Date now = new Date(); + + log.info("now = {}", now); + +/* + TypedQuery<Article> q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class)); + + List<Article> list = q.getResultList(); + log.info("Got {} articles", list.size()); + + Article a = list.get(r.nextInt(list.size())); + a.setUpdated(new Date()); + + entityManager.persist(a); +*/ + + log.info("UpdateArticeJob.run: END"); + + return emptyList(); + } +} diff --git a/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java new file mode 100644 index 0000000..bd86732 --- /dev/null +++ b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java @@ -0,0 +1,37 @@ +package io.trygvis.test.activemq; + +import io.trygvis.activemq.ActiveMqHinter; +import io.trygvis.async.QueueController; +import io.trygvis.queue.QueueService; +import io.trygvis.test.jdbc.AsyncConsumerExample; +import org.apache.activemq.ActiveMQConnectionFactory; + +import javax.jms.JMSException; + +public class AsyncConsumerWithActiveMq extends AsyncConsumerExample implements AutoCloseable { + private final ActiveMqHinter activeMqHinter; + + public AsyncConsumerWithActiveMq(ActiveMQConnectionFactory cf) throws JMSException { + activeMqHinter = new ActiveMqHinter(cf); + } + + public static void main(String[] args) throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:tcp://localhost:61616"); + + int poolSize = 4; + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1, true). + interval(60 * 6000). + continueOnFullChunk(false); + try (AsyncConsumerWithActiveMq consumer = new AsyncConsumerWithActiveMq(cf)) { + consumer.work(poolSize, req); + } + } + + public void close() throws JMSException { + activeMqHinter.close(); + } + + protected void wrapInputQueue(QueueController input) throws Exception { + activeMqHinter.createHinter(input); + } +} diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java new file mode 100644 index 0000000..16640dd --- /dev/null +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -0,0 +1,107 @@ +package io.trygvis.test.jdbc; + +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.QueueController; +import io.trygvis.async.QueueStats; +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.Task.newTask; +import static io.trygvis.test.DbUtil.createDataSource; +import static java.lang.System.currentTimeMillis; +import static java.util.Collections.singletonList; + +public class AsyncConsumerExample { + + private static String inputName = "my-input"; + private static String outputName = "my-output"; + + private static int interval = 1000; + + private static final TaskEffect adder = new TaskEffect() { + public List<Task> apply(Task task) throws Exception { + Long a = Long.valueOf(task.arguments.get(0)); + Long b = Long.valueOf(task.arguments.get(1)); + + return singletonList(newTask(outputName, new Date(), Long.toString(a + b))); + } + }; + + public static void main(String[] args) throws Exception { + int poolSize = 4; + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); + new AsyncConsumerExample().work(poolSize, req); + } + + public void work(int poolSize, QueueService.TaskExecutionRequest req) throws Exception { + System.out.println("Starting consumer"); + + DataSource ds = createDataSource(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + JdbcAsyncService asyncService = queueSystem.createAsyncService(); + final JdbcQueueService queueService = queueSystem.createQueueService(); + + QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() { + @Override + public QueueExecutor[] doInConnection(Connection c) throws SQLException { + return new QueueExecutor[]{ + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true) + }; + } + }); + + final QueueExecutor input = queues[0]; + final QueueExecutor output = queues[1]; + + QueueController controller = asyncService.registerQueue(input, req, adder); + wrapInputQueue(controller); + + Timer timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + System.out.println(input.getStats()); + System.out.println(output.getStats()); + } + }, 1000, 1000); + + long start = currentTimeMillis(); + controller.start(new ScheduledThreadPoolExecutor(poolSize)); + Thread.sleep(60 * 1000); + controller.stop(); + long end = currentTimeMillis(); + timer.cancel(); + + QueueStats stats = input.getStats(); + + System.out.println("Summary:"); + System.out.println(stats.toString()); + System.out.println(output.getStats().toString()); + + long duration = end - start; + double rate = 1000 * ((double) stats.totalMessageCount) / duration; + System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s"); + } + + protected void wrapInputQueue(QueueController input) throws Exception { + } +} diff --git a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java new file mode 100644 index 0000000..0b7ba50 --- /dev/null +++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java @@ -0,0 +1,126 @@ +package io.trygvis.test.jdbc; + +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueStats; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskDao; +import io.trygvis.queue.TaskEffect; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static io.trygvis.queue.Task.TaskState; +import static io.trygvis.test.DbUtil.createDataSource; +import static java.lang.System.currentTimeMillis; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + +public class PlainJavaExample { + private static final Random r = new Random(); + + private static String inputName = "my-input"; + private static String outputName = "my-output"; + + private static int interval = 10; + + public static class Consumer { + public static void main(String[] args) throws Exception { + System.out.println("Starting consumer"); + + DataSource ds = createDataSource(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + final JdbcQueueService queueService = queueSystem.createQueueService(); + + QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() { + @Override + public QueueExecutor[] doInConnection(Connection c) throws SQLException { + QueueExecutor[] queues = { + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true)}; + + TaskDao taskDao = queueSystem.createTaskDao(c); + + QueueStats stats = taskDao.findQueueStatsByName(inputName); + System.out.println("Queue stats for " + stats.name + ". Total number of tasks: " + stats.totalTaskCount); + for (Map.Entry<TaskState, Long> entry : stats.states.entrySet()) { + System.out.println(entry.getKey() + " = " + entry.getValue()); + } + + return queues; + } + }); + + final QueueExecutor input = queues[0]; + final QueueExecutor output = queues[1]; + + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false); + + input.consumeAll(req, new TaskEffect() { + public List<Task> apply(Task task) throws Exception { + Long a = Long.valueOf(task.arguments.get(0)); + Long b = Long.valueOf(task.arguments.get(1)); + + System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); + + if (r.nextInt(3000) > 0) { + return singletonList(task.childTask(output.queue.name, new Date(), Long.toString(a + b))); + } + + throw new RuntimeException("Simulated exception while processing task."); + } + }); + System.out.println("Done"); + } + } + + public static class Producer { + public static void main(String[] args) throws Exception { + System.out.println("Starting producer"); + + int chunks = 100; + final int chunk = 10000; + + DataSource ds = createDataSource(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + final JdbcQueueService queueService = queueSystem.createQueueService(); + + final QueueExecutor queue; + try (Connection c = ds.getConnection()) { + queue = queueService.lookupQueue(c, inputName, interval, true); + c.commit(); + } + + for (int i = 0; i < chunks; i++) { + long start = currentTimeMillis(); + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (int j = 0; j < chunk; j++) { + queue.schedule(c, new Date(), asList("10", "20")); + } + } + }); + long end = currentTimeMillis(); + + long time = end - start; + System.out.println("Scheduled " + chunk + " tasks in " + time + "ms, " + (((double) chunk * 1000)) / ((double) time) + " chunks per second"); + } + } + } +} diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java new file mode 100644 index 0000000..93372f6 --- /dev/null +++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java @@ -0,0 +1,88 @@ +package io.trygvis.test.spring; + +import io.trygvis.async.AsyncService; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import io.trygvis.spring.DefaultConfig; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.atomic.AtomicReference; + +import static java.lang.System.getProperty; +import static java.lang.System.setProperty; +import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; +import static org.fest.assertions.Assertions.assertThat; + +@RunWith(SpringJUnit4ClassRunner.class) +@ContextConfiguration(classes = {TestConfig.class, DefaultConfig.class}) +public class PlainSpringTest { + + @Autowired + private AsyncService asyncService; + + @Autowired + private QueueService queueService; + + private final QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); + + static { + String username = getProperty("user.name"); + setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); + setProperty("database.username", username); + setProperty("database.password", username); + } + + @Test + public void testBasic() throws SQLException, InterruptedException { + QueueExecutor queueA = queueService.getQueue("a", 1000, true); +// final AtomicReference<List<String>> refA = new AtomicReference<>(); + asyncService.registerQueue(queueA.queue, req, new TaskEffect() { + @Override + public List<Task> apply(Task task) throws Exception { +// refA.set(task.arguments); +// synchronized (refA) { +// refA.notify(); +// } + System.out.println("task.arguments = " + task.arguments); + return asList(task.childTask("b", new Date(), task.arguments.get(0), "world")); + } + }); + + QueueExecutor queueB = queueService.getQueue("b", 1000, true); + final AtomicReference<List<String>> refB = new AtomicReference<>(); + asyncService.registerQueue(queueB.queue, req, new TaskEffect() { + @Override + public List<Task> apply(Task task) throws Exception { +// System.out.println("task.arguments = " + task.arguments); + refB.set(task.arguments); + synchronized (refB) { + refB.notify(); + } + return emptyList(); + } + }); + + synchronized (refB) { + System.out.println("Scheduling task"); + queueService.schedule(queueA.queue, new Date(), asList("hello")); + System.out.println("Task scheduled, waiting"); + refB.wait(10000); + System.out.println("Back!"); + } + +// System.out.println("refA.get() = " + refA.get()); + System.out.println("refB.get() = " + refB.get()); + + assertThat(refB.get()).containsExactly("hello", "world"); + } +} diff --git a/src/test/java/io/trygvis/test/spring/TestConfig.java b/src/test/java/io/trygvis/test/spring/TestConfig.java new file mode 100755 index 0000000..4df8ce1 --- /dev/null +++ b/src/test/java/io/trygvis/test/spring/TestConfig.java @@ -0,0 +1,57 @@ +package io.trygvis.test.spring; + +import com.jolbox.bonecp.BoneCPDataSource; +import io.trygvis.test.DbUtil; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.datasource.DataSourceTransactionManager; +import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; +import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.annotation.EnableTransactionManagement; +import org.springframework.transaction.support.TransactionTemplate; + +import javax.sql.DataSource; +import java.sql.SQLException; + +@Configuration +@ComponentScan(basePackages = "io.trygvis") +@EnableTransactionManagement +public class TestConfig { + + @Bean + public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception { + return new PropertySourcesPlaceholderConfigurer() {{ + setProperties(System.getProperties()); + setLocalOverride(true); + }}; + } + + @Bean + public JdbcTemplate jdbcTemplate(DataSource dataSource) { + return new JdbcTemplate(dataSource); + } + + @Bean + public DataSource dataSource(@Value("${database.url}") String jdbcUrl, + @Value("${database.username}") String username, + @Value("${database.password}") String password) throws SQLException { + BoneCPDataSource ds = DbUtil.createDataSource(jdbcUrl, username, password); + + return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); + } + + @Bean + public PlatformTransactionManager transactionManager(DataSource dataSource) { + return new DataSourceTransactionManager(dataSource); + } + + @Bean + public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) { + return new TransactionTemplate(platformTransactionManager); + } +} diff --git a/src/test/resources/applicationContext.xml b/src/test/resources/applicationContext.xml new file mode 100755 index 0000000..5f173b3 --- /dev/null +++ b/src/test/resources/applicationContext.xml @@ -0,0 +1,22 @@ +<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns="http://www.springframework.org/schema/beans" + xmlns:aop="http://www.springframework.org/schema/aop" + xmlns:context="http://www.springframework.org/schema/context" + xmlns:jdbc="http://www.springframework.org/schema/jdbc" + xmlns:jee="http://www.springframework.org/schema/jee" + xmlns:tx="http://www.springframework.org/schema/tx" + xmlns:jpa="http://www.springframework.org/schema/data/jpa" + xsi:schemaLocation=" + http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.2.xsd + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.2.xsd + http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.2.xsd + http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.2.xsd + http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.2.xsd + http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd + http://www.springframework.org/schema/jdbc http://www.springframework.org/schema/jdbc/spring-jdbc-3.2.xsd + http://www.springframework.org/schema/data/jpa http://www.springframework.org/schema/data/jpa/spring-jpa.xsd"> + + <context:annotation-config/> + <bean class="io.trygvis.test.spring.TestConfig"/> + +</beans> diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml new file mode 100755 index 0000000..f964cd3 --- /dev/null +++ b/src/test/resources/logback.xml @@ -0,0 +1,29 @@ +<configuration debug="false"> + + <logger name="io.trygvis" level="WARN"/> + <logger name="org.springframework" level="INFO"/> + <!-- + <logger name="org.springframework" level="DEBUG"/> + --> + <!-- + <logger name="org.springframework.jdbc" level="INFO"/> + <logger name="org.springframework.jdbc.datasource.DataSourceTransactionManager" level="DEBUG"/> + <logger name="org.springframework.orm" level="INFO"/> + <logger name="org.springframework.transaction" level="DEBUG"/> + --> + + <logger name="org.hibernate" level="INFO"/> + <logger name="org.hibernate.SQL" level="INFO"/> + + <logger name="com.jolbox" level="INFO"/> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%-15thread] %-5level %-60logger{60} - %msg%n</pattern> + </encoder> + </appender> + + <root level="WARN"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> |