aboutsummaryrefslogtreecommitdiff
path: root/src/test
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-09 15:15:46 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-09 15:15:46 +0200
commit33e3be55dc2d815cbd0208bf59d12a7e727f3105 (patch)
treea464f750d2cbdd6cdd805e574dd0aa66fa7027fd /src/test
parent7465fdb9aa847d29dacc56adbe473f1c1ceb298e (diff)
downloadquartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.tar.gz
quartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.tar.bz2
quartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.tar.xz
quartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.zip
wip
Diffstat (limited to 'src/test')
-rwxr-xr-xsrc/test/java/io/trygvis/test/CreateArticleCallable.java14
-rw-r--r--src/test/java/io/trygvis/test/DbUtil.java5
-rwxr-xr-xsrc/test/java/io/trygvis/test/Main.java9
-rw-r--r--src/test/java/io/trygvis/test/PlainJavaExample.java51
-rw-r--r--src/test/java/io/trygvis/test/PlainJavaExample2.java84
-rwxr-xr-xsrc/test/java/io/trygvis/test/UpdateArticleCallable.java15
-rw-r--r--src/test/java/io/trygvis/test/spring/PlainSpringTest.java11
-rwxr-xr-xsrc/test/resources/logback.xml25
8 files changed, 183 insertions, 31 deletions
diff --git a/src/test/java/io/trygvis/test/CreateArticleCallable.java b/src/test/java/io/trygvis/test/CreateArticleCallable.java
index a822a51..396fc89 100755
--- a/src/test/java/io/trygvis/test/CreateArticleCallable.java
+++ b/src/test/java/io/trygvis/test/CreateArticleCallable.java
@@ -1,25 +1,31 @@
package io.trygvis.test;
-import io.trygvis.async.AsyncService;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Random;
+import static java.util.Collections.emptyList;
import static org.springframework.transaction.annotation.Propagation.MANDATORY;
@Component("createArticle")
@Transactional(propagation = MANDATORY)
-public class CreateArticleCallable implements AsyncService.AsyncCallable {
+public class CreateArticleCallable implements TaskEffect {
private final Logger log = LoggerFactory.getLogger(getClass());
private Random random = new Random();
- public void run(List<String> arguments) throws Exception {
+ @Override
+ public List<Task> apply(Task task) throws Exception {
+ List<String> arguments = task.arguments;
+
log.info("CreateArticeJob.run: BEGIN");
if (random.nextInt() % 3 == 0) {
@@ -34,5 +40,7 @@ public class CreateArticleCallable implements AsyncService.AsyncCallable {
// entityManager.persist(article);
log.info("CreateArticeJob.run: END");
+
+ return emptyList();
}
}
diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java
index d0e5b47..2362e65 100644
--- a/src/test/java/io/trygvis/test/DbUtil.java
+++ b/src/test/java/io/trygvis/test/DbUtil.java
@@ -30,15 +30,16 @@ public class DbUtil {
ds.setUsername(username);
ds.setPassword(password);
+ ds.setConnectionTestStatement("/* ping*/SELECT 1");
ds.setDefaultAutoCommit(false);
ds.setIdleConnectionTestPeriodInSeconds(60);
ds.setIdleMaxAgeInSeconds(240);
- ds.setMaxConnectionsPerPartition(40);
+ ds.setMaxConnectionsPerPartition(1);
ds.setMinConnectionsPerPartition(0);
ds.setPartitionCount(1);
ds.setAcquireIncrement(1);
ds.setStatementsCacheSize(1000);
- ds.setReleaseHelperThreads(3);
+ ds.setReleaseHelperThreads(1);
ds.setStatisticsEnabled(true);
ds.setLogStatementsEnabled(true);
ds.setLogWriter(new PrintWriter(System.err));
diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java
index 43ee971..0721ec9 100755
--- a/src/test/java/io/trygvis/test/Main.java
+++ b/src/test/java/io/trygvis/test/Main.java
@@ -4,6 +4,7 @@ import io.trygvis.async.AsyncService;
import io.trygvis.queue.Queue;
import io.trygvis.queue.QueueService;
import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
@@ -62,22 +63,22 @@ public class Main {
@Autowired
@Qualifier("createArticle")
- private AsyncService.AsyncCallable createArticleCallable;
+ private TaskEffect createArticleCallable;
@Autowired
@Qualifier("updateArticle")
- private AsyncService.AsyncCallable updateArticleCallable;
+ private TaskEffect updateArticleCallable;
public void run() throws Exception {
log.info("Main.run");
- final Queue q = null; // queueService.getQueue(c, "create-article", 1);
+ final Queue q = null; // queueService.lookupQueue(c, "create-article", 1);
asyncService.registerQueue(q, createArticleCallable);
// log.info("queue registered: ref = {}", q);
// asyncService.registerQueue("update-queue", 1, updateArticleCallable);
-// q = asyncService.getQueue("create-queue");
+// q = asyncService.lookupQueue("create-queue");
final List<Task> tasks = new ArrayList<>();
diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java
index 338abad..b09d3e9 100644
--- a/src/test/java/io/trygvis/test/PlainJavaExample.java
+++ b/src/test/java/io/trygvis/test/PlainJavaExample.java
@@ -1,22 +1,26 @@
package io.trygvis.test;
+import io.trygvis.async.SqlEffect;
+import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
-import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
import javax.sql.DataSource;
import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import java.util.Random;
-import static io.trygvis.queue.JdbcQueueService.createQueueService;
-import static io.trygvis.queue.Task.newTask;
import static io.trygvis.test.DbUtil.createDataSource;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
public class PlainJavaExample {
+ private static final Random r = new Random();
private static String inputName = "my-input";
private static String outputName = "my-output";
@@ -28,26 +32,40 @@ public class PlainJavaExample {
System.out.println("Starting consumer");
DataSource ds = createDataSource();
- Connection c = ds.getConnection();
- JdbcQueueService queueService = createQueueService(c);
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ final JdbcQueueService queueService = queueSystem.queueService;
+
+ Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() {
+ @Override
+ public Queue[] doInConnection(Connection c) throws SQLException {
+ return new Queue[]{
+ queueService.lookupQueue(c, inputName, interval, true),
+ queueService.lookupQueue(c, outputName, interval, true)};
+ }
+ });
- final Queue input = queueService.getQueue(c, inputName, interval, true);
- final Queue output = queueService.getQueue(c, outputName, interval, true);
+ final Queue input = queues[0];
+ final Queue output = queues[1];
- queueService.consume(c, input, new QueueService.TaskEffect() {
- public List<Task> consume(Task task) throws Exception {
- System.out.println("PlainJavaExample$Consumer.consume");
+ queueService.consumeAll(input, new TaskEffect() {
+ public List<Task> apply(Task task) throws Exception {
+ System.out.println("PlainJavaExample$Consumer.consumeAll: arguments = " + task.arguments);
Long a = Long.valueOf(task.arguments.get(0));
Long b = Long.valueOf(task.arguments.get(1));
System.out.println("a + b = " + a + " + " + b + " = " + (a + b));
- return singletonList(newTask(output.name, new Date(), Long.toString(a + b)));
+ if(r.nextInt(3) == 0) {
+ return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b)));
+ }
+
+ throw new RuntimeException("Simulated exception while processing task.");
}
});
-
- c.commit();
+ System.out.println("Done");
}
}
@@ -58,9 +76,12 @@ public class PlainJavaExample {
DataSource ds = createDataSource();
Connection c = ds.getConnection();
- JdbcQueueService queueService = createQueueService(c);
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ JdbcQueueService queueService = queueSystem.queueService;
- Queue queue = queueService.getQueue(c, inputName, interval, true);
+ Queue queue = queueService.lookupQueue(c, inputName, interval, true);
queueService.schedule(c, queue, new Date(), asList("10", "20"));
diff --git a/src/test/java/io/trygvis/test/PlainJavaExample2.java b/src/test/java/io/trygvis/test/PlainJavaExample2.java
new file mode 100644
index 0000000..faeebb2
--- /dev/null
+++ b/src/test/java/io/trygvis/test/PlainJavaExample2.java
@@ -0,0 +1,84 @@
+package io.trygvis.test;
+
+import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.SqlEffectExecutor;
+import io.trygvis.queue.JdbcQueueService;
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.util.Date;
+import java.util.List;
+
+import static io.trygvis.queue.Task.newTask;
+import static io.trygvis.test.DbUtil.createDataSource;
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+
+public class PlainJavaExample2 {
+
+ private static String inputName = "my-input";
+ private static String outputName = "my-output";
+
+ private static int interval = 10;
+
+ private static final TaskEffect adder = new TaskEffect() {
+ public List<Task> apply(Task task) throws Exception {
+ System.out.println("PlainJavaExample$Consumer.consumeAll");
+ Long a = Long.valueOf(task.arguments.get(0));
+ Long b = Long.valueOf(task.arguments.get(1));
+
+ System.out.println("a + b = " + a + " + " + b + " = " + (a + b));
+
+ return singletonList(newTask(outputName, new Date(), Long.toString(a + b)));
+ }
+ };
+
+ public static class Consumer {
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting consumer");
+
+ DataSource ds = createDataSource();
+ Connection c = ds.getConnection();
+
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ JdbcQueueService queueService = queueSystem.queueService;
+
+ final Queue input = queueService.lookupQueue(c, inputName, interval, true);
+ final Queue output = queueService.lookupQueue(c, outputName, interval, true);
+
+ JdbcAsyncService asyncService = new JdbcAsyncService(queueSystem);
+
+ asyncService.registerQueue(input, adder);
+
+// queueService.consumeAll(c, input, adder);
+
+ c.commit();
+ }
+ }
+
+ public static class Producer {
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting producer");
+
+ DataSource ds = createDataSource();
+ Connection c = ds.getConnection();
+
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ JdbcQueueService queueService = queueSystem.queueService;
+
+ Queue queue = queueService.lookupQueue(c, inputName, interval, true);
+
+ queueService.schedule(c, queue, new Date(), asList("10", "20"));
+
+ c.commit();
+ }
+ }
+}
diff --git a/src/test/java/io/trygvis/test/UpdateArticleCallable.java b/src/test/java/io/trygvis/test/UpdateArticleCallable.java
index f50c10a..6aff20f 100755
--- a/src/test/java/io/trygvis/test/UpdateArticleCallable.java
+++ b/src/test/java/io/trygvis/test/UpdateArticleCallable.java
@@ -1,6 +1,7 @@
package io.trygvis.test;
-import io.trygvis.async.AsyncService;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
@@ -9,14 +10,18 @@ import java.util.Date;
import java.util.List;
import java.util.Random;
+import static java.util.Collections.emptyList;
+
@Component("updateArticle")
-public class UpdateArticleCallable
- implements AsyncService.AsyncCallable {
+public class UpdateArticleCallable implements TaskEffect {
private final Logger log = LoggerFactory.getLogger(getClass());
private final Random r = new Random();
- public void run(List<String> arguments) throws Exception {
+ @Override
+ public List<Task> apply(Task task) throws Exception {
+ List<String> arguments = task.arguments;
+
log.info("UpdateArticeJob.run: BEGIN");
Date now = new Date();
@@ -36,5 +41,7 @@ public class UpdateArticleCallable
*/
log.info("UpdateArticeJob.run: END");
+
+ return emptyList();
}
}
diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
index 07e67fb..d06d8d6 100644
--- a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
+++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
@@ -3,6 +3,8 @@ package io.trygvis.test.spring;
import io.trygvis.async.AsyncService;
import io.trygvis.queue.Queue;
import io.trygvis.queue.QueueService;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
import io.trygvis.spring.DefaultConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -17,6 +19,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.lang.System.getProperty;
import static java.lang.System.setProperty;
import static java.util.Arrays.asList;
+import static java.util.Collections.emptyList;
import static org.fest.assertions.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
@@ -41,13 +44,15 @@ public class PlainSpringTest {
public void testBasic() throws SQLException, InterruptedException {
Queue test = queueService.getQueue("test", 10, true);
final AtomicReference<List<String>> ref = new AtomicReference<>();
- asyncService.registerQueue(test, new AsyncService.AsyncCallable() {
- public void run(List<String> arguments) throws Exception {
+ asyncService.registerQueue(test, new TaskEffect() {
+ @Override
+ public List<Task> apply(Task task) throws Exception {
System.out.println("PlainSpringTest.run");
- ref.set(arguments);
+ ref.set(task.arguments);
synchronized (ref) {
ref.notify();
}
+ return emptyList();
}
});
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
new file mode 100755
index 0000000..676eac5
--- /dev/null
+++ b/src/test/resources/logback.xml
@@ -0,0 +1,25 @@
+<configuration debug="false">
+
+ <logger name="io.trygvis" level="DEBUG"/>
+<!-- <logger name="org.springframework" level="INFO"/> -->
+ <logger name="org.springframework" level="INFO"/>
+ <logger name="org.springframework.jdbc" level="INFO"/>
+ <logger name="org.springframework.jdbc.datasource.DataSourceTransactionManager" level="DEBUG"/>
+ <logger name="org.springframework.orm" level="INFO"/>
+ <logger name="org.springframework.transaction" level="DEBUG"/>
+
+ <logger name="org.hibernate" level="INFO"/>
+ <logger name="org.hibernate.SQL" level="INFO"/>
+
+ <logger name="com.jolbox" level="TRACE"/>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>%d{HH:mm:ss.SSS} [%-15thread] %-5level %-60logger{60} - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <root level="DEBUG">
+ <appender-ref ref="STDOUT"/>
+ </root>
+</configuration>