summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-04-30 07:12:56 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-04-30 07:12:56 +0200
commit0a0b01664cf620f983549999b24a7740594a57d4 (patch)
tree3a226f4e33ec986f3bae598a547690b3f99e2e8d
downloadjdbc-queue-master.tar.gz
jdbc-queue-master.tar.bz2
jdbc-queue-master.tar.xz
jdbc-queue-master.zip
o Initial import.HEADmaster
-rw-r--r--.gitignore3
-rwxr-xr-xpom.xml98
-rwxr-xr-xsrc/main/java/io/trygvis/queue/AsyncService.java27
-rw-r--r--src/main/java/io/trygvis/queue/JdbcAsyncService.java253
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Queue.java28
-rw-r--r--src/main/java/io/trygvis/queue/QueueDao.java36
-rwxr-xr-xsrc/main/java/io/trygvis/queue/Task.java60
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java54
-rw-r--r--src/main/java/io/trygvis/queue/TaskRef.java23
-rw-r--r--src/main/java/io/trygvis/queue/spring/QueueSpringConfig.java17
-rw-r--r--src/main/resources/create.sql27
-rwxr-xr-xsrc/test/java/io/trygvis/Main.java108
-rw-r--r--src/test/java/io/trygvis/queue/QueueTest.java170
-rwxr-xr-xsrc/test/resources/logback.xml24
14 files changed, 928 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..f83e8cf
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,3 @@
+.idea
+target
+*.iml
diff --git a/pom.xml b/pom.xml
new file mode 100755
index 0000000..b8fe1fe
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,98 @@
+<project>
+ <modelVersion>4.0.0</modelVersion>
+ <groupId>io.trygvis.jdbc.queue</groupId>
+ <artifactId>jdbc-queue</artifactId>
+ <version>1.0-SNAPSHOT</version>
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-context</artifactId>
+ <version>${version.spring}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jdbc</artifactId>
+ <version>${version.spring}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-tx</artifactId>
+ <version>${version.spring}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.jolbox</groupId>
+ <artifactId>bonecp-spring</artifactId>
+ <version>0.7.1.RELEASE</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- fuck you bonecp -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>14.0.1</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-test</artifactId>
+ <version>${version.spring}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>ch.qos.logback</groupId>
+ <artifactId>logback-classic</artifactId>
+ <version>1.0.9</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ <version>${version.slf4j}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jcl-over-slf4j</artifactId>
+ <version>${version.slf4j}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.postgresql</groupId>
+ <artifactId>postgresql</artifactId>
+ <version>9.2-1002-jdbc4</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.11</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>${version.slf4j}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${version.slf4j}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
+ <properties>
+ <version.slf4j>1.7.2</version.slf4j>
+ <version.spring>3.2.2.RELEASE</version.spring>
+ </properties>
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/src/main/java/io/trygvis/queue/AsyncService.java b/src/main/java/io/trygvis/queue/AsyncService.java
new file mode 100755
index 0000000..c12f794
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/AsyncService.java
@@ -0,0 +1,27 @@
+package io.trygvis.queue;
+
+import java.util.*;
+
+public interface AsyncService {
+
+ /**
+ * @param name
+ * @param interval how often the queue should be polled for missed tasks in seconds.
+ */
+ Queue registerQueue(String name, int interval, AsyncCallable callable);
+
+ void stopQueue(Queue queue);
+
+ Queue getQueue(String name);
+
+ TaskRef schedule(Queue queue, String... args);
+
+ /**
+ * Polls for a new state of the execution.
+ */
+ Task update(Task ref);
+
+ interface AsyncCallable {
+ void run(List<String> args) throws Exception;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/JdbcAsyncService.java b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
new file mode 100644
index 0000000..ae6cdba
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/JdbcAsyncService.java
@@ -0,0 +1,253 @@
+package io.trygvis.queue;
+
+import org.slf4j.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.annotation.*;
+import org.springframework.transaction.support.*;
+
+import javax.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static java.util.Arrays.*;
+import static java.util.concurrent.TimeUnit.*;
+import static org.springframework.transaction.annotation.Propagation.MANDATORY;
+import static org.springframework.transaction.support.TransactionSynchronizationManager.registerSynchronization;
+
+public class JdbcAsyncService implements AsyncService {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+
+ private final Map<String, QueueThread> queues = new HashMap<>();
+
+ private final TransactionTemplate transactionTemplate;
+
+ private final QueueDao queueDao;
+
+ private final TaskDao taskDao;
+
+ /**
+ * Accessed from all the queue threads.
+ */
+ private final Map<Long, TaskRef> taskRefs = Collections.synchronizedMap(new WeakHashMap<Long, TaskRef>());
+
+ public JdbcAsyncService(DataSource dataSource, PlatformTransactionManager transactionManager) {
+ this.transactionTemplate = new TransactionTemplate(transactionManager);
+ this.queueDao = new QueueDao(new JdbcTemplate(dataSource));
+ this.taskDao = new TaskDao(new JdbcTemplate(dataSource));
+ }
+
+ @Transactional(propagation = MANDATORY)
+ public Queue registerQueue(final String name, final int interval, AsyncCallable callable) {
+ log.info("registerQueue: ENTER");
+
+ Queue q = queueDao.findByName(name);
+
+ log.info("q = {}", q);
+
+ final long interval_;
+ if (q == null) {
+ q = new Queue(name, interval * 1000);
+ queueDao.insert(q);
+ interval_ = interval;
+ } else {
+ // Found an existing queue. Use the Settings from the database.
+ interval_ = q.interval;
+ }
+
+ final QueueThread queueThread = new QueueThread(q, callable);
+ queues.put(name, queueThread);
+
+ registerSynchronization(new TransactionSynchronizationAdapter() {
+ public void afterCompletion(int status) {
+ log.info("status = {}", status);
+ if (status == TransactionSynchronization.STATUS_COMMITTED) {
+ executor.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ queueThread.ping();
+ }
+ }, 1000, 1000 * interval_, MILLISECONDS);
+// Thread thread = new Thread(queueThread, name);
+// thread.setDaemon(true);
+// thread.start();
+ queueThread.start();
+ }
+ }
+ });
+
+ log.info("registerQueue: LEAVE");
+ return q;
+ }
+
+ public void stopQueue(Queue queue) {
+ QueueThread queueThread = queues.get(queue.name);
+
+ if (queueThread == null) {
+ throw new RuntimeException("No such queue: '" + queue.name + "'.");
+ }
+
+ queueThread.shutdown();
+ }
+
+ public Queue getQueue(String name) {
+ QueueThread queueThread = queues.get(name);
+
+ if (queueThread == null) {
+ throw new RuntimeException("No such queue: '" + name + "'.");
+ }
+
+ return queueThread.queue;
+ }
+
+ @Transactional(propagation = MANDATORY)
+ public TaskRef schedule(Queue queue, String... args) {
+ log.info("schedule: ENTER");
+
+ Date scheduled = new Date();
+
+ StringBuilder arguments = new StringBuilder();
+ for (String arg : args) {
+ arguments.append(arg).append(' ');
+ }
+
+ long id = taskDao.insert(queue.name, scheduled, arguments.toString());
+ Task task = new Task(id, queue.name, scheduled, null, 0, null, asList(args));
+ log.info("task = {}", task);
+ queues.get(queue.name).ping();
+// try {
+// Thread.sleep(500);
+// } catch (InterruptedException e) {
+// e.printStackTrace();
+// }
+
+ log.info("schedule: LEAVE");
+ TaskRef taskRef = new TaskRef(task);
+ taskRefs.put(task.id, taskRef);
+ return taskRef;
+ }
+
+ @Transactional(readOnly = true)
+ public Task update(Task ref) {
+ return taskDao.findById(ref.id);
+ }
+
+ class QueueThread extends Thread {
+ public boolean shouldRun = true;
+
+ public final Queue queue;
+
+ private final AsyncCallable callable;
+
+ QueueThread(Queue queue, AsyncCallable callable) {
+ super(queue.name);
+ this.queue = queue;
+ this.callable = callable;
+ }
+
+ public void ping() {
+ log.info("Sending ping to " + queue);
+ synchronized (this) {
+ notify();
+ }
+ }
+
+ public void run() {
+ while (shouldRun) {
+ List<Task> tasks = taskDao.findByNameAndCompletedIsNull(queue.name);
+
+ log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
+
+ try {
+ for (final Task task : tasks) {
+ try {
+ executeTask(task);
+ } catch (TransactionException | TaskFailureException e) {
+ log.warn("Task execution failed", e);
+ }
+ }
+ } catch (Exception e) {
+ if (!isInterrupted() && !shouldRun) {
+ log.warn("Error while executing tasks.", e);
+ } else {
+ log.warn("Error because queue was signalled to shut down.", e);
+ }
+ }
+
+ synchronized (this) {
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ log.info("Queue has stopped");
+
+ synchronized (this) {
+ this.notify();
+ }
+ }
+
+ private void executeTask(final Task task) {
+ final Date run = new Date();
+ log.info("Setting last run on task. date = {}, task = {}", run, task);
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ taskDao.update(task.registerRun());
+ }
+ });
+
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ Task t;
+
+ try {
+ callable.run(task.arguments);
+ Date completed = new Date();
+ t = task.registerComplete(completed);
+ log.info("Completed task: {}", t);
+ taskDao.update(t);
+ } catch (Exception e) {
+ throw new TaskFailureException(e);
+ }
+
+ TaskRef taskRef = taskRefs.get(task.id);
+
+ if (taskRef != null) {
+ log.info("Notifying listeners on task: {}", t);
+ //noinspection SynchronizationOnLocalVariableOrMethodParameter
+ synchronized (taskRef) {
+ taskRef.notifyAll();
+ }
+ }
+ }
+ });
+ }
+
+ public void shutdown() {
+ log.info("Shutting down queue");
+ shouldRun = false;
+ synchronized (this) {
+ this.interrupt();
+ }
+ while (isAlive()) {
+ synchronized (this) {
+ try {
+ this.wait(100);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+ }
+
+ private static class TaskFailureException extends RuntimeException {
+ public TaskFailureException(Exception e) {
+ super(e);
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Queue.java b/src/main/java/io/trygvis/queue/Queue.java
new file mode 100755
index 0000000..7e1fbff
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Queue.java
@@ -0,0 +1,28 @@
+package io.trygvis.queue;
+
+/**
+ * TODO: concurrency control. min/max number of consumers.
+ * TODO: check interval. How often should the fallback check the queue. Resolution in seconds.
+ */
+public class Queue<A, B> {
+
+ public final String name;
+
+ public final long interval;
+
+ public Queue(String name, long interval) {
+ this.name = name;
+ this.interval = interval;
+ }
+
+ public Queue withInterval(int interval) {
+ return new Queue(name, interval);
+ }
+
+ public String toString() {
+ return "Queue{" +
+ "name='" + name + '\'' +
+ ", interval=" + interval +
+ '}';
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueDao.java b/src/main/java/io/trygvis/queue/QueueDao.java
new file mode 100644
index 0000000..fcf975f
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueDao.java
@@ -0,0 +1,36 @@
+package io.trygvis.queue;
+
+import org.springframework.beans.factory.annotation.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.stereotype.*;
+
+import java.sql.*;
+
+import static org.springframework.dao.support.DataAccessUtils.*;
+
+public class QueueDao {
+
+ private final JdbcTemplate jdbcTemplate;
+
+ public QueueDao(JdbcTemplate jdbcTemplate) {
+ this.jdbcTemplate = jdbcTemplate;
+ }
+
+ public Queue findByName(String name) {
+ return singleResult(jdbcTemplate.query("SELECT name, interval FROM queue WHERE name=?", new QueueRowMapper(), name));
+ }
+
+ public void insert(Queue q) {
+ jdbcTemplate.update("INSERT INTO queue(name, interval) VALUES(?, ?)", q.name, q.interval);
+ }
+
+ public void update(Queue q) {
+ jdbcTemplate.update("UPDATE queue SET interval=? WHERE name=?", q.interval, q.name);
+ }
+
+ private class QueueRowMapper implements RowMapper<Queue> {
+ public Queue mapRow(ResultSet rs, int rowNum) throws SQLException {
+ return new Queue(rs.getString(1), rs.getLong(2));
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/Task.java b/src/main/java/io/trygvis/queue/Task.java
new file mode 100755
index 0000000..2a061bd
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/Task.java
@@ -0,0 +1,60 @@
+package io.trygvis.queue;
+
+import java.util.*;
+
+import static java.util.Collections.unmodifiableList;
+
+/**
+ * TODO: next run on failures. A default strategy from the queue, let the task be able to re-schedule itself.
+ */
+public class Task {
+
+ public final long id;
+
+ public final String queue;
+
+ public final Date scheduled;
+
+ public final Date lastRun;
+
+ public final int runCount;
+
+ public final Date completed;
+
+ public final List<String> arguments;
+
+ Task(long id, String queue, Date scheduled, Date lastRun, int runCount, Date completed, List<String> arguments) {
+ this.id = id;
+ this.queue = queue;
+ this.scheduled = scheduled;
+ this.lastRun = lastRun;
+ this.runCount = runCount;
+ this.completed = completed;
+
+ this.arguments = unmodifiableList(new ArrayList<>(arguments));
+ }
+
+ public Task registerRun() {
+ return new Task(id, queue, scheduled, new Date(), runCount + 1, completed, arguments);
+ }
+
+ public Task registerComplete(Date completed) {
+ return new Task(id, queue, scheduled, lastRun, runCount, new Date(), arguments);
+ }
+
+ public String toString() {
+ return "Task{" +
+ "id=" + id +
+ ", queue=" + queue +
+ ", scheduled=" + scheduled +
+ ", lastRun=" + lastRun +
+ ", runCount=" + runCount +
+ ", completed=" + completed +
+ ", arguments='" + arguments + '\'' +
+ '}';
+ }
+
+ public boolean isDone() {
+ return completed != null;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
new file mode 100644
index 0000000..84ce958
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -0,0 +1,54 @@
+package io.trygvis.queue;
+
+import org.springframework.jdbc.core.*;
+
+import java.sql.*;
+import java.util.Date;
+import java.util.*;
+
+import static java.util.Arrays.*;
+
+public class TaskDao {
+
+ private final JdbcTemplate jdbcTemplate;
+
+ public TaskDao(JdbcTemplate jdbcTemplate) {
+ this.jdbcTemplate = jdbcTemplate;
+ }
+
+ public long insert(String queue, Date scheduled, String arguments) {
+ jdbcTemplate.update("INSERT INTO task(id, run_count, queue, scheduled, arguments) " +
+ "VALUES(nextval('task_seq'), 0, ?, ?, ?)", queue, scheduled, arguments);
+ return jdbcTemplate.queryForObject("SELECT currval('task_seq')", Long.class);
+ }
+
+ public Task findById(long id) {
+ return jdbcTemplate.queryForObject("SELECT " + TaskRowMapper.fields + " FROM task WHERE id=?",
+ new TaskRowMapper(), id);
+ }
+
+ public List<Task> findByNameAndCompletedIsNull(String name) {
+ return jdbcTemplate.query("SELECT " + TaskRowMapper.fields + " FROM task WHERE queue=? AND completed IS NULL",
+ new TaskRowMapper(), name);
+ }
+
+ public void update(Task task) {
+ jdbcTemplate.update("UPDATE task SET scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?",
+ task.scheduled, task.lastRun, task.runCount, task.completed, task.id);
+ }
+
+ private class TaskRowMapper implements RowMapper<Task> {
+ public static final String fields = "id, queue, scheduled, last_run, run_count, completed, arguments";
+
+ public Task mapRow(ResultSet rs, int rowNum) throws SQLException {
+ return new Task(
+ rs.getLong(1),
+ rs.getString(2),
+ rs.getTimestamp(3),
+ rs.getTimestamp(4),
+ rs.getInt(5),
+ rs.getTimestamp(6),
+ asList(rs.getString(7).split(" ")));
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/TaskRef.java b/src/main/java/io/trygvis/queue/TaskRef.java
new file mode 100644
index 0000000..e465ee7
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/TaskRef.java
@@ -0,0 +1,23 @@
+package io.trygvis.queue;
+
+/**
+ * TODO: Implement waitForSuccess(). Rename waitFor to waitForAny(). Let waitForAny() throw TargetInvocationException
+ * which contains the exception that the task threw.
+ */
+public class TaskRef {
+ private Task task;
+
+ public TaskRef(Task task) {
+ this.task = task;
+ }
+
+ public Task getTask() {
+ return task;
+ }
+
+ public void waitFor(int timeout) throws InterruptedException {
+ synchronized (this) {
+ wait(timeout);
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/spring/QueueSpringConfig.java b/src/main/java/io/trygvis/queue/spring/QueueSpringConfig.java
new file mode 100644
index 0000000..0eb7695
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/spring/QueueSpringConfig.java
@@ -0,0 +1,17 @@
+package io.trygvis.queue.spring;
+
+import io.trygvis.queue.*;
+import org.springframework.context.annotation.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.annotation.*;
+
+import javax.sql.*;
+
+@Configuration
+@EnableTransactionManagement
+public class QueueSpringConfig {
+ @Bean
+ public AsyncService asyncService(DataSource dataSource, PlatformTransactionManager platformTransactionManager) {
+ return new JdbcAsyncService(dataSource, platformTransactionManager);
+ }
+}
diff --git a/src/main/resources/create.sql b/src/main/resources/create.sql
new file mode 100644
index 0000000..ed8913f
--- /dev/null
+++ b/src/main/resources/create.sql
@@ -0,0 +1,27 @@
+BEGIN;
+
+DROP TABLE IF EXISTS task;
+DROP TABLE IF EXISTS queue;
+DROP SEQUENCE IF EXISTS task_id;
+
+CREATE TABLE queue (
+ name VARCHAR(100) NOT NULL,
+ interval INTEGER NOT NULL,
+ CONSTRAINT pk_queue PRIMARY KEY (name)
+);
+
+CREATE TABLE task (
+ id INTEGER NOT NULL,
+ queue VARCHAR(100) NOT NULL,
+ scheduled TIMESTAMP NOT NULL,
+ last_run TIMESTAMP,
+ run_count INT NOT NULL,
+ completed TIMESTAMP,
+ arguments VARCHAR(100),
+ CONSTRAINT pk_task PRIMARY KEY (id),
+ CONSTRAINT fk_task__queue FOREIGN KEY (queue) REFERENCES queue (name)
+);
+
+CREATE SEQUENCE task_id;
+
+COMMIT;
diff --git a/src/test/java/io/trygvis/Main.java b/src/test/java/io/trygvis/Main.java
new file mode 100755
index 0000000..022d8cc
--- /dev/null
+++ b/src/test/java/io/trygvis/Main.java
@@ -0,0 +1,108 @@
+package io.trygvis;
+
+import io.trygvis.queue.*;
+import io.trygvis.queue.Queue;
+import org.slf4j.*;
+import org.slf4j.bridge.*;
+import org.springframework.beans.factory.annotation.*;
+import org.springframework.context.support.*;
+import org.springframework.stereotype.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.support.*;
+
+import java.util.*;
+
+import static java.lang.System.*;
+import static java.lang.Thread.*;
+
+@Component
+public class Main {
+/*
+ private static final Logger log = LoggerFactory.getLogger(Main.class);
+
+ public static void main(String[] args) throws Exception {
+ SLF4JBridgeHandler.install();
+
+ String username = getProperty("user.name");
+ setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
+ setProperty("database.username", username);
+ setProperty("database.password", username);
+// setProperty("hibernate.showSql", "true");
+ setProperty("hibernate.hbm2ddl.auto", "create"); // create
+ setProperty("hibernate.dialect", PostgreSQL82Dialect.class.getName());
+
+ log.info("Starting context");
+ ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
+ log.info("Started context");
+
+ try {
+ context.getBean(Main.class).run();
+ log.info("Sleeping");
+ sleep(1000 * 1000);
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+
+ log.info("Stopping context");
+ context.stop();
+ log.info("Stopped context");
+
+ exit(0);
+ }
+
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+
+ @Autowired
+ private AsyncService asyncService;
+
+ @Autowired
+ @Qualifier("createArticle")
+ private AsyncService.AsyncCallable createArticleCallable;
+
+ @Autowired
+ @Qualifier("createArticle")
+ private AsyncService.AsyncCallable updateArticleCallable;
+
+ public void run() throws Exception {
+ log.info("Main.run");
+
+ final Queue q = asyncService.registerQueue("create-queue", 10, createArticleCallable);
+// log.info("queue registered: ref = {}", q);
+// asyncService.registerQueue("update-queue", 1, updateArticleCallable);
+
+// q = asyncService.getQueue("create-queue");
+
+ final List<Task> tasks = new ArrayList<>();
+
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus status) {
+ for (int i = 0; i < 1; i++) {
+ tasks.add(asyncService.schedule(q));
+ }
+ }
+ });
+
+ while (true) {
+ sleep(10000);
+
+ log.info("tasks.size = {}", tasks.size());
+ for (Iterator<Task> iterator = tasks.iterator(); iterator.hasNext(); ) {
+ Task task = iterator.next();
+
+ task = asyncService.update(task);
+
+ log.info("task = {}", task);
+
+ if (task.isDone()) {
+ iterator.remove();
+ }
+ }
+
+ if (tasks.isEmpty()) {
+ break;
+ }
+ }
+ }
+*/
+}
diff --git a/src/test/java/io/trygvis/queue/QueueTest.java b/src/test/java/io/trygvis/queue/QueueTest.java
new file mode 100644
index 0000000..1aa77d7
--- /dev/null
+++ b/src/test/java/io/trygvis/queue/QueueTest.java
@@ -0,0 +1,170 @@
+package io.trygvis.queue;
+
+import com.jolbox.bonecp.*;
+import io.trygvis.queue.spring.*;
+import org.junit.*;
+import org.junit.runner.*;
+import org.slf4j.*;
+import org.slf4j.bridge.*;
+import org.springframework.beans.factory.annotation.*;
+import org.springframework.context.annotation.*;
+import org.springframework.context.support.*;
+import org.springframework.jdbc.core.*;
+import org.springframework.jdbc.datasource.*;
+import org.springframework.test.context.*;
+import org.springframework.test.context.junit4.*;
+import org.springframework.transaction.*;
+import org.springframework.transaction.support.*;
+
+import javax.sql.*;
+import java.util.*;
+
+import static java.lang.System.*;
+import static org.junit.Assert.*;
+import static org.springframework.transaction.TransactionDefinition.*;
+
+@SuppressWarnings("SpringJavaAutowiringInspection")
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = TestConfig.class)
+public class QueueTest {
+
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+
+ @Autowired
+ private JdbcTemplate jdbcTemplate;
+
+ @Autowired
+ private AsyncService asyncService;
+
+ private static Logger log;
+
+ @BeforeClass
+ public static void beforeClass() {
+ SLF4JBridgeHandler.install();
+
+ log = LoggerFactory.getLogger(QueueTest.class);
+
+ String username = getProperty("user.name");
+ setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
+ setProperty("database.username", username);
+ setProperty("database.password", username);
+ }
+
+ public int counter;
+ AsyncService.AsyncCallable callback = new AsyncService.AsyncCallable() {
+ public void run(List<String> args) throws Exception {
+ System.out.println("QueueTest.run");
+ counter++;
+ }
+ };
+
+ Queue queue;
+
+ @Before
+ public void before() {
+ queue = transactionTemplate.execute(new TransactionCallback<Queue>() {
+ public Queue doInTransaction(TransactionStatus status) {
+ return asyncService.registerQueue("wat", 10, callback);
+ }
+ });
+
+ int count = jdbcTemplate.update("DELETE FROM task");
+ log.info("Deleted {} tasks", count);
+ }
+
+ public void after() {
+ asyncService.stopQueue(queue);
+ }
+
+// @Test
+// public void testQueuePolling() {
+// long start = currentTimeMillis();
+// Task abc = transactionTemplate.execute(new TransactionCallback<Task>() {
+// public Task doInTransaction(TransactionStatus status) {
+// return asyncService.schedule(queue, "abc").getTask();
+// }
+// });
+//
+// while(!abc.isDone()) {
+// abc = asyncService.update(abc);
+// }
+//
+// long end = currentTimeMillis();
+// log.info("Completed in {}ms", end - start);
+//
+// assertEquals(1, counter);
+// }
+
+ @Test
+ public void testQueueWaiting() throws Exception {
+ long start = currentTimeMillis();
+ TaskRef abc = transactionTemplate.execute(new TransactionCallback<TaskRef>() {
+ public TaskRef doInTransaction(TransactionStatus status) {
+ return asyncService.schedule(queue, "abc");
+ }
+ });
+
+ log.info("Waiting for signal");
+ abc.waitFor(1000);
+ long end = currentTimeMillis();
+ log.info("Completed in {}ms", end - start);
+
+ assertEquals(1, counter);
+ }
+}
+
+@Configuration
+@Import(value = QueueSpringConfig.class)
+class TestConfig {
+
+ @Bean
+ public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception {
+ return new PropertySourcesPlaceholderConfigurer() {{
+ setProperties(System.getProperties());
+ setLocalOverride(true);
+ }};
+ }
+
+ @Bean
+ public JdbcTemplate jdbcTemplate(DataSource dataSource) {
+ return new JdbcTemplate(dataSource);
+ }
+
+ @Bean
+ public DataSource dataSource(@Value("${database.url}") String jdbcUrl,
+ @Value("${database.username}") String username,
+ @Value("${database.password}") String password) {
+ BoneCPDataSource ds = new BoneCPDataSource();
+
+ ds.setLogStatementsEnabled(true);
+
+ ds.setJdbcUrl(jdbcUrl);
+ ds.setUsername(username);
+ ds.setPassword(password);
+
+ ds.setIdleConnectionTestPeriodInSeconds(60);
+ ds.setIdleMaxAgeInSeconds(240);
+ ds.setMaxConnectionsPerPartition(40);
+ ds.setMinConnectionsPerPartition(0);
+ ds.setPartitionCount(1);
+ ds.setAcquireIncrement(1);
+ ds.setStatementsCacheSize(1000);
+ ds.setReleaseHelperThreads(3);
+ ds.setStatisticsEnabled(true);
+ return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds));
+ }
+
+ @Bean
+ public PlatformTransactionManager transactionManager(DataSource dataSource) {
+ return new DataSourceTransactionManager(dataSource);
+ }
+
+ @Bean
+ public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) {
+ DefaultTransactionDefinition td = new DefaultTransactionDefinition();
+ td.setPropagationBehavior(PROPAGATION_REQUIRED);
+ td.setIsolationLevel(ISOLATION_READ_COMMITTED);
+ return new TransactionTemplate(platformTransactionManager, td);
+ }
+} \ No newline at end of file
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
new file mode 100755
index 0000000..a250c6a
--- /dev/null
+++ b/src/test/resources/logback.xml
@@ -0,0 +1,24 @@
+<configuration debug="false">
+
+ <logger name="io.trygvis" level="DEBUG"/>
+<!-- <logger name="org.springframework" level="INFO"/> -->
+ <logger name="org.springframework" level="INFO"/>
+ <logger name="org.springframework.jdbc" level="DEBUG"/>
+ <logger name="org.springframework.orm" level="INFO"/>
+ <logger name="org.springframework.transaction" level="DEBUG"/>
+
+ <logger name="org.hibernate" level="INFO"/>
+ <logger name="org.hibernate.SQL" level="INFO"/>
+
+ <logger name="com.jolbox" level="INFO"/>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%-15thread] %-5level %-50logger{36} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="DEBUG">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>