aboutsummaryrefslogtreecommitdiff
path: root/src/test/java/io/trygvis
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-23 09:37:57 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-23 09:37:57 +0200
commit7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 (patch)
treec0bd7202845697207b04d518f613588df17d9e12 /src/test/java/io/trygvis
downloadjdbc-queue-master.tar.gz
jdbc-queue-master.tar.bz2
jdbc-queue-master.tar.xz
jdbc-queue-master.zip
o Initial import of JDBC queue.HEADmaster
Diffstat (limited to 'src/test/java/io/trygvis')
-rwxr-xr-xsrc/test/java/io/trygvis/test/Article.java46
-rwxr-xr-xsrc/test/java/io/trygvis/test/CreateArticleCallable.java46
-rw-r--r--src/test/java/io/trygvis/test/DbUtil.java64
-rwxr-xr-xsrc/test/java/io/trygvis/test/Main.java120
-rwxr-xr-xsrc/test/java/io/trygvis/test/UpdateArticleCallable.java47
-rw-r--r--src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java37
-rw-r--r--src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java107
-rw-r--r--src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java126
-rw-r--r--src/test/java/io/trygvis/test/spring/PlainSpringTest.java88
-rwxr-xr-xsrc/test/java/io/trygvis/test/spring/TestConfig.java57
10 files changed, 738 insertions, 0 deletions
diff --git a/src/test/java/io/trygvis/test/Article.java b/src/test/java/io/trygvis/test/Article.java
new file mode 100755
index 0000000..bf52e41
--- /dev/null
+++ b/src/test/java/io/trygvis/test/Article.java
@@ -0,0 +1,46 @@
+package io.trygvis.test;
+
+import java.util.Date;
+
+public class Article {
+ private Integer id;
+ private Date created;
+ private Date updated;
+ private String title;
+ private String body;
+
+ @SuppressWarnings("UnusedDeclaration")
+ private Article() {
+ }
+
+ public Article(Date created, Date updated, String title, String body) {
+ this.created = created;
+ this.updated = updated;
+ this.title = title;
+ this.body = body;
+ }
+
+ public Integer getId() {
+ return id;
+ }
+
+ public Date getCreated() {
+ return created;
+ }
+
+ public Date getUpdated() {
+ return updated;
+ }
+
+ public void setUpdated(Date updated) {
+ this.updated = updated;
+ }
+
+ public String getTitle() {
+ return title;
+ }
+
+ public String getBody() {
+ return body;
+ }
+}
diff --git a/src/test/java/io/trygvis/test/CreateArticleCallable.java b/src/test/java/io/trygvis/test/CreateArticleCallable.java
new file mode 100755
index 0000000..396fc89
--- /dev/null
+++ b/src/test/java/io/trygvis/test/CreateArticleCallable.java
@@ -0,0 +1,46 @@
+package io.trygvis.test;
+
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.Collections;
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+import static java.util.Collections.emptyList;
+import static org.springframework.transaction.annotation.Propagation.MANDATORY;
+
+@Component("createArticle")
+@Transactional(propagation = MANDATORY)
+public class CreateArticleCallable implements TaskEffect {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private Random random = new Random();
+
+ @Override
+ public List<Task> apply(Task task) throws Exception {
+ List<String> arguments = task.arguments;
+
+ log.info("CreateArticeJob.run: BEGIN");
+
+ if (random.nextInt() % 3 == 0) {
+ throw new RuntimeException("failing create article");
+ }
+
+ Date now = new Date();
+
+ log.info("now = {}", now);
+
+ Article article = new Article(new Date(), null, "title", "body");
+// entityManager.persist(article);
+
+ log.info("CreateArticeJob.run: END");
+
+ return emptyList();
+ }
+}
diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java
new file mode 100644
index 0000000..33c2807
--- /dev/null
+++ b/src/test/java/io/trygvis/test/DbUtil.java
@@ -0,0 +1,64 @@
+package io.trygvis.test;
+
+import com.jolbox.bonecp.BoneCPDataSource;
+import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
+import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
+
+import javax.sql.DataSource;
+import java.io.PrintWriter;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import static java.lang.System.getProperty;
+
+public class DbUtil {
+ public static DataSource createDataSource() throws SQLException {
+ String username = getProperty("user.name");
+ String jdbcUrl = getProperty("database.url", "jdbc:postgresql://localhost/" + username);
+ String user = getProperty("database.username", username);
+ String pass = getProperty("database.password", username);
+
+ return createDataSource(jdbcUrl, user, pass);
+ }
+
+ public static BoneCPDataSource createDataSource(String jdbcUrl, String username, String password) throws SQLException {
+ BoneCPDataSource ds = new BoneCPDataSource();
+
+ ds.setLogStatementsEnabled(true);
+
+ ds.setJdbcUrl(jdbcUrl);
+ ds.setUsername(username);
+ ds.setPassword(password);
+
+ ds.setConnectionTestStatement("/* ping*/SELECT 1");
+ ds.setDefaultAutoCommit(false);
+ ds.setIdleConnectionTestPeriodInSeconds(60);
+ ds.setIdleMaxAgeInSeconds(240);
+ ds.setMaxConnectionsPerPartition(4);
+ ds.setMinConnectionsPerPartition(1);
+ ds.setPartitionCount(4);
+ ds.setAcquireIncrement(1);
+ ds.setStatementsCacheSize(1000);
+ ds.setReleaseHelperThreads(1);
+ ds.setStatisticsEnabled(true);
+ ds.setLogStatementsEnabled(true);
+ ds.setLogWriter(new PrintWriter(System.err));
+ return ds;
+ }
+
+ public static DataSource springifyDataSource(DataSource ds) {
+ return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds));
+ }
+
+ public static int getPid(Connection c) throws SQLException {
+ int pid;
+ try (Statement statement = c.createStatement()) {
+ ResultSet rs = statement.executeQuery("SELECT pg_backend_pid()");
+ rs.next();
+ pid = rs.getInt(1);
+ }
+ return pid;
+ }
+}
diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java
new file mode 100755
index 0000000..f03d6fa
--- /dev/null
+++ b/src/test/java/io/trygvis/test/Main.java
@@ -0,0 +1,120 @@
+package io.trygvis.test;
+
+import io.trygvis.async.AsyncService;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.context.support.ClassPathXmlApplicationContext;
+import org.springframework.stereotype.Component;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import static java.lang.System.*;
+import static java.lang.Thread.sleep;
+
+@Component
+public class Main {
+ private static final Logger log = LoggerFactory.getLogger(Main.class);
+
+ public static void main(String[] args) throws Exception {
+ SLF4JBridgeHandler.install();
+
+ String username = getProperty("user.name");
+ setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
+ setProperty("database.username", username);
+ setProperty("database.password", username);
+
+ log.info("Starting context");
+ ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml");
+ log.info("Started context");
+
+ try {
+ context.getBean(Main.class).run();
+// log.info("Sleeping");
+// sleep(1000 * 1000);
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+
+ log.info("Stopping context");
+ context.stop();
+ log.info("Stopped context");
+
+ exit(0);
+ }
+
+ @Autowired
+ private TransactionTemplate transactionTemplate;
+
+ @Autowired
+ private AsyncService asyncService;
+
+ @Autowired
+ private QueueService queueService;
+
+ @Autowired
+ @Qualifier("createArticle")
+ private TaskEffect createArticleCallable;
+
+ @Autowired
+ @Qualifier("updateArticle")
+ private TaskEffect updateArticleCallable;
+
+ public void run() throws Exception {
+ log.info("Main.run");
+
+ final Queue q = null; // queueService.lookupQueue(c, "create-article", 1);
+
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+
+ asyncService.registerQueue(q, req, createArticleCallable);
+// log.info("queue registered: ref = {}", q);
+// asyncService.registerQueue("update-queue", 1, updateArticleCallable);
+
+// q = asyncService.lookupQueue("create-queue");
+
+ final List<Task> tasks = new ArrayList<>();
+
+ final int count = 1;
+ log.info("Creating {} tasks", count);
+// transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+// protected void doInTransactionWithoutResult(TransactionStatus status) {
+// for (int i = 0; i < count; i++) {
+// tasks.add(asyncService.schedule(q));
+// }
+// }
+// });
+ log.info("Created {} tasks", count);
+
+ while (true) {
+ sleep(10000);
+
+ log.info("Checking for status of {} tasks", tasks.size());
+ for (Iterator<Task> iterator = tasks.iterator(); iterator.hasNext(); ) {
+ Task task = iterator.next();
+
+ task = asyncService.update(task);
+
+// log.info("task = {}", task);
+
+ if (task.isDone()) {
+ iterator.remove();
+ }
+ }
+
+ if (tasks.isEmpty()) {
+ log.info("No more tasks");
+ break;
+ }
+ }
+ }
+}
diff --git a/src/test/java/io/trygvis/test/UpdateArticleCallable.java b/src/test/java/io/trygvis/test/UpdateArticleCallable.java
new file mode 100755
index 0000000..6aff20f
--- /dev/null
+++ b/src/test/java/io/trygvis/test/UpdateArticleCallable.java
@@ -0,0 +1,47 @@
+package io.trygvis.test;
+
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+
+import java.util.Date;
+import java.util.List;
+import java.util.Random;
+
+import static java.util.Collections.emptyList;
+
+@Component("updateArticle")
+public class UpdateArticleCallable implements TaskEffect {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final Random r = new Random();
+
+ @Override
+ public List<Task> apply(Task task) throws Exception {
+ List<String> arguments = task.arguments;
+
+ log.info("UpdateArticeJob.run: BEGIN");
+
+ Date now = new Date();
+
+ log.info("now = {}", now);
+
+/*
+ TypedQuery<Article> q = entityManager.createQuery(entityManager.getCriteriaBuilder().createQuery(Article.class));
+
+ List<Article> list = q.getResultList();
+ log.info("Got {} articles", list.size());
+
+ Article a = list.get(r.nextInt(list.size()));
+ a.setUpdated(new Date());
+
+ entityManager.persist(a);
+*/
+
+ log.info("UpdateArticeJob.run: END");
+
+ return emptyList();
+ }
+}
diff --git a/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java
new file mode 100644
index 0000000..bd86732
--- /dev/null
+++ b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java
@@ -0,0 +1,37 @@
+package io.trygvis.test.activemq;
+
+import io.trygvis.activemq.ActiveMqHinter;
+import io.trygvis.async.QueueController;
+import io.trygvis.queue.QueueService;
+import io.trygvis.test.jdbc.AsyncConsumerExample;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import javax.jms.JMSException;
+
+public class AsyncConsumerWithActiveMq extends AsyncConsumerExample implements AutoCloseable {
+ private final ActiveMqHinter activeMqHinter;
+
+ public AsyncConsumerWithActiveMq(ActiveMQConnectionFactory cf) throws JMSException {
+ activeMqHinter = new ActiveMqHinter(cf);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:tcp://localhost:61616");
+
+ int poolSize = 4;
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1, true).
+ interval(60 * 6000).
+ continueOnFullChunk(false);
+ try (AsyncConsumerWithActiveMq consumer = new AsyncConsumerWithActiveMq(cf)) {
+ consumer.work(poolSize, req);
+ }
+ }
+
+ public void close() throws JMSException {
+ activeMqHinter.close();
+ }
+
+ protected void wrapInputQueue(QueueController input) throws Exception {
+ activeMqHinter.createHinter(input);
+ }
+}
diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
new file mode 100644
index 0000000..16640dd
--- /dev/null
+++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
@@ -0,0 +1,107 @@
+package io.trygvis.test.jdbc;
+
+import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.QueueController;
+import io.trygvis.async.QueueStats;
+import io.trygvis.queue.JdbcQueueService;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.SqlEffect;
+import io.trygvis.queue.SqlEffectExecutor;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static io.trygvis.queue.Task.newTask;
+import static io.trygvis.test.DbUtil.createDataSource;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Collections.singletonList;
+
+public class AsyncConsumerExample {
+
+ private static String inputName = "my-input";
+ private static String outputName = "my-output";
+
+ private static int interval = 1000;
+
+ private static final TaskEffect adder = new TaskEffect() {
+ public List<Task> apply(Task task) throws Exception {
+ Long a = Long.valueOf(task.arguments.get(0));
+ Long b = Long.valueOf(task.arguments.get(1));
+
+ return singletonList(newTask(outputName, new Date(), Long.toString(a + b)));
+ }
+ };
+
+ public static void main(String[] args) throws Exception {
+ int poolSize = 4;
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+ new AsyncConsumerExample().work(poolSize, req);
+ }
+
+ public void work(int poolSize, QueueService.TaskExecutionRequest req) throws Exception {
+ System.out.println("Starting consumer");
+
+ DataSource ds = createDataSource();
+
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ JdbcAsyncService asyncService = queueSystem.createAsyncService();
+ final JdbcQueueService queueService = queueSystem.createQueueService();
+
+ QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() {
+ @Override
+ public QueueExecutor[] doInConnection(Connection c) throws SQLException {
+ return new QueueExecutor[]{
+ queueService.lookupQueue(c, inputName, interval, true),
+ queueService.lookupQueue(c, outputName, interval, true)
+ };
+ }
+ });
+
+ final QueueExecutor input = queues[0];
+ final QueueExecutor output = queues[1];
+
+ QueueController controller = asyncService.registerQueue(input, req, adder);
+ wrapInputQueue(controller);
+
+ Timer timer = new Timer();
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ System.out.println(input.getStats());
+ System.out.println(output.getStats());
+ }
+ }, 1000, 1000);
+
+ long start = currentTimeMillis();
+ controller.start(new ScheduledThreadPoolExecutor(poolSize));
+ Thread.sleep(60 * 1000);
+ controller.stop();
+ long end = currentTimeMillis();
+ timer.cancel();
+
+ QueueStats stats = input.getStats();
+
+ System.out.println("Summary:");
+ System.out.println(stats.toString());
+ System.out.println(output.getStats().toString());
+
+ long duration = end - start;
+ double rate = 1000 * ((double) stats.totalMessageCount) / duration;
+ System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s");
+ }
+
+ protected void wrapInputQueue(QueueController input) throws Exception {
+ }
+}
diff --git a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
new file mode 100644
index 0000000..0b7ba50
--- /dev/null
+++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
@@ -0,0 +1,126 @@
+package io.trygvis.test.jdbc;
+
+import io.trygvis.queue.SqlEffect;
+import io.trygvis.queue.SqlEffectExecutor;
+import io.trygvis.queue.JdbcQueueService;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueStats;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskDao;
+import io.trygvis.queue.TaskEffect;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static io.trygvis.queue.Task.TaskState;
+import static io.trygvis.test.DbUtil.createDataSource;
+import static java.lang.System.currentTimeMillis;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
+public class PlainJavaExample {
+ private static final Random r = new Random();
+
+ private static String inputName = "my-input";
+ private static String outputName = "my-output";
+
+ private static int interval = 10;
+
+ public static class Consumer {
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting consumer");
+
+ DataSource ds = createDataSource();
+
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+
+ final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ final JdbcQueueService queueService = queueSystem.createQueueService();
+
+ QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() {
+ @Override
+ public QueueExecutor[] doInConnection(Connection c) throws SQLException {
+ QueueExecutor[] queues = {
+ queueService.lookupQueue(c, inputName, interval, true),
+ queueService.lookupQueue(c, outputName, interval, true)};
+
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+
+ QueueStats stats = taskDao.findQueueStatsByName(inputName);
+ System.out.println("Queue stats for " + stats.name + ". Total number of tasks: " + stats.totalTaskCount);
+ for (Map.Entry<TaskState, Long> entry : stats.states.entrySet()) {
+ System.out.println(entry.getKey() + " = " + entry.getValue());
+ }
+
+ return queues;
+ }
+ });
+
+ final QueueExecutor input = queues[0];
+ final QueueExecutor output = queues[1];
+
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false);
+
+ input.consumeAll(req, new TaskEffect() {
+ public List<Task> apply(Task task) throws Exception {
+ Long a = Long.valueOf(task.arguments.get(0));
+ Long b = Long.valueOf(task.arguments.get(1));
+
+ System.out.println("a + b = " + a + " + " + b + " = " + (a + b));
+
+ if (r.nextInt(3000) > 0) {
+ return singletonList(task.childTask(output.queue.name, new Date(), Long.toString(a + b)));
+ }
+
+ throw new RuntimeException("Simulated exception while processing task.");
+ }
+ });
+ System.out.println("Done");
+ }
+ }
+
+ public static class Producer {
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting producer");
+
+ int chunks = 100;
+ final int chunk = 10000;
+
+ DataSource ds = createDataSource();
+
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ final JdbcQueueService queueService = queueSystem.createQueueService();
+
+ final QueueExecutor queue;
+ try (Connection c = ds.getConnection()) {
+ queue = queueService.lookupQueue(c, inputName, interval, true);
+ c.commit();
+ }
+
+ for (int i = 0; i < chunks; i++) {
+ long start = currentTimeMillis();
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ for (int j = 0; j < chunk; j++) {
+ queue.schedule(c, new Date(), asList("10", "20"));
+ }
+ }
+ });
+ long end = currentTimeMillis();
+
+ long time = end - start;
+ System.out.println("Scheduled " + chunk + " tasks in " + time + "ms, " + (((double) chunk * 1000)) / ((double) time) + " chunks per second");
+ }
+ }
+ }
+}
diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
new file mode 100644
index 0000000..93372f6
--- /dev/null
+++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
@@ -0,0 +1,88 @@
+package io.trygvis.test.spring;
+
+import io.trygvis.async.AsyncService;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import io.trygvis.spring.DefaultConfig;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
+
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.lang.System.getProperty;
+import static java.lang.System.setProperty;
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
+import static org.fest.assertions.Assertions.assertThat;
+
+@RunWith(SpringJUnit4ClassRunner.class)
+@ContextConfiguration(classes = {TestConfig.class, DefaultConfig.class})
+public class PlainSpringTest {
+
+ @Autowired
+ private AsyncService asyncService;
+
+ @Autowired
+ private QueueService queueService;
+
+ private final QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+
+ static {
+ String username = getProperty("user.name");
+ setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
+ setProperty("database.username", username);
+ setProperty("database.password", username);
+ }
+
+ @Test
+ public void testBasic() throws SQLException, InterruptedException {
+ QueueExecutor queueA = queueService.getQueue("a", 1000, true);
+// final AtomicReference<List<String>> refA = new AtomicReference<>();
+ asyncService.registerQueue(queueA.queue, req, new TaskEffect() {
+ @Override
+ public List<Task> apply(Task task) throws Exception {
+// refA.set(task.arguments);
+// synchronized (refA) {
+// refA.notify();
+// }
+ System.out.println("task.arguments = " + task.arguments);
+ return asList(task.childTask("b", new Date(), task.arguments.get(0), "world"));
+ }
+ });
+
+ QueueExecutor queueB = queueService.getQueue("b", 1000, true);
+ final AtomicReference<List<String>> refB = new AtomicReference<>();
+ asyncService.registerQueue(queueB.queue, req, new TaskEffect() {
+ @Override
+ public List<Task> apply(Task task) throws Exception {
+// System.out.println("task.arguments = " + task.arguments);
+ refB.set(task.arguments);
+ synchronized (refB) {
+ refB.notify();
+ }
+ return emptyList();
+ }
+ });
+
+ synchronized (refB) {
+ System.out.println("Scheduling task");
+ queueService.schedule(queueA.queue, new Date(), asList("hello"));
+ System.out.println("Task scheduled, waiting");
+ refB.wait(10000);
+ System.out.println("Back!");
+ }
+
+// System.out.println("refA.get() = " + refA.get());
+ System.out.println("refB.get() = " + refB.get());
+
+ assertThat(refB.get()).containsExactly("hello", "world");
+ }
+}
diff --git a/src/test/java/io/trygvis/test/spring/TestConfig.java b/src/test/java/io/trygvis/test/spring/TestConfig.java
new file mode 100755
index 0000000..4df8ce1
--- /dev/null
+++ b/src/test/java/io/trygvis/test/spring/TestConfig.java
@@ -0,0 +1,57 @@
+package io.trygvis.test.spring;
+
+import com.jolbox.bonecp.BoneCPDataSource;
+import io.trygvis.test.DbUtil;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy;
+import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.annotation.EnableTransactionManagement;
+import org.springframework.transaction.support.TransactionTemplate;
+
+import javax.sql.DataSource;
+import java.sql.SQLException;
+
+@Configuration
+@ComponentScan(basePackages = "io.trygvis")
+@EnableTransactionManagement
+public class TestConfig {
+
+ @Bean
+ public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() throws Exception {
+ return new PropertySourcesPlaceholderConfigurer() {{
+ setProperties(System.getProperties());
+ setLocalOverride(true);
+ }};
+ }
+
+ @Bean
+ public JdbcTemplate jdbcTemplate(DataSource dataSource) {
+ return new JdbcTemplate(dataSource);
+ }
+
+ @Bean
+ public DataSource dataSource(@Value("${database.url}") String jdbcUrl,
+ @Value("${database.username}") String username,
+ @Value("${database.password}") String password) throws SQLException {
+ BoneCPDataSource ds = DbUtil.createDataSource(jdbcUrl, username, password);
+
+ return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds));
+ }
+
+ @Bean
+ public PlatformTransactionManager transactionManager(DataSource dataSource) {
+ return new DataSourceTransactionManager(dataSource);
+ }
+
+ @Bean
+ public TransactionTemplate transactionTemplate(PlatformTransactionManager platformTransactionManager) {
+ return new TransactionTemplate(platformTransactionManager);
+ }
+}