diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-09 15:15:46 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-09 15:15:46 +0200 |
commit | 33e3be55dc2d815cbd0208bf59d12a7e727f3105 (patch) | |
tree | a464f750d2cbdd6cdd805e574dd0aa66fa7027fd /src/test | |
parent | 7465fdb9aa847d29dacc56adbe473f1c1ceb298e (diff) | |
download | quartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.tar.gz quartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.tar.bz2 quartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.tar.xz quartz-based-queue-33e3be55dc2d815cbd0208bf59d12a7e727f3105.zip |
wip
Diffstat (limited to 'src/test')
-rwxr-xr-x | src/test/java/io/trygvis/test/CreateArticleCallable.java | 14 | ||||
-rw-r--r-- | src/test/java/io/trygvis/test/DbUtil.java | 5 | ||||
-rwxr-xr-x | src/test/java/io/trygvis/test/Main.java | 9 | ||||
-rw-r--r-- | src/test/java/io/trygvis/test/PlainJavaExample.java | 51 | ||||
-rw-r--r-- | src/test/java/io/trygvis/test/PlainJavaExample2.java | 84 | ||||
-rwxr-xr-x | src/test/java/io/trygvis/test/UpdateArticleCallable.java | 15 | ||||
-rw-r--r-- | src/test/java/io/trygvis/test/spring/PlainSpringTest.java | 11 | ||||
-rwxr-xr-x | src/test/resources/logback.xml | 25 |
8 files changed, 183 insertions, 31 deletions
diff --git a/src/test/java/io/trygvis/test/CreateArticleCallable.java b/src/test/java/io/trygvis/test/CreateArticleCallable.java index a822a51..396fc89 100755 --- a/src/test/java/io/trygvis/test/CreateArticleCallable.java +++ b/src/test/java/io/trygvis/test/CreateArticleCallable.java @@ -1,25 +1,31 @@ package io.trygvis.test; -import io.trygvis.async.AsyncService; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import org.springframework.transaction.annotation.Transactional; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.Random; +import static java.util.Collections.emptyList; import static org.springframework.transaction.annotation.Propagation.MANDATORY; @Component("createArticle") @Transactional(propagation = MANDATORY) -public class CreateArticleCallable implements AsyncService.AsyncCallable { +public class CreateArticleCallable implements TaskEffect { private final Logger log = LoggerFactory.getLogger(getClass()); private Random random = new Random(); - public void run(List<String> arguments) throws Exception { + @Override + public List<Task> apply(Task task) throws Exception { + List<String> arguments = task.arguments; + log.info("CreateArticeJob.run: BEGIN"); if (random.nextInt() % 3 == 0) { @@ -34,5 +40,7 @@ public class CreateArticleCallable implements AsyncService.AsyncCallable { // entityManager.persist(article); log.info("CreateArticeJob.run: END"); + + return emptyList(); } } diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java index d0e5b47..2362e65 100644 --- a/src/test/java/io/trygvis/test/DbUtil.java +++ b/src/test/java/io/trygvis/test/DbUtil.java @@ -30,15 +30,16 @@ public class DbUtil { ds.setUsername(username); ds.setPassword(password); + ds.setConnectionTestStatement("/* ping*/SELECT 1"); ds.setDefaultAutoCommit(false); ds.setIdleConnectionTestPeriodInSeconds(60); ds.setIdleMaxAgeInSeconds(240); - ds.setMaxConnectionsPerPartition(40); + ds.setMaxConnectionsPerPartition(1); ds.setMinConnectionsPerPartition(0); ds.setPartitionCount(1); ds.setAcquireIncrement(1); ds.setStatementsCacheSize(1000); - ds.setReleaseHelperThreads(3); + ds.setReleaseHelperThreads(1); ds.setStatisticsEnabled(true); ds.setLogStatementsEnabled(true); ds.setLogWriter(new PrintWriter(System.err)); diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java index 43ee971..0721ec9 100755 --- a/src/test/java/io/trygvis/test/Main.java +++ b/src/test/java/io/trygvis/test/Main.java @@ -4,6 +4,7 @@ import io.trygvis.async.AsyncService; import io.trygvis.queue.Queue; import io.trygvis.queue.QueueService; import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.bridge.SLF4JBridgeHandler; @@ -62,22 +63,22 @@ public class Main { @Autowired @Qualifier("createArticle") - private AsyncService.AsyncCallable createArticleCallable; + private TaskEffect createArticleCallable; @Autowired @Qualifier("updateArticle") - private AsyncService.AsyncCallable updateArticleCallable; + private TaskEffect updateArticleCallable; public void run() throws Exception { log.info("Main.run"); - final Queue q = null; // queueService.getQueue(c, "create-article", 1); + final Queue q = null; // queueService.lookupQueue(c, "create-article", 1); asyncService.registerQueue(q, createArticleCallable); // log.info("queue registered: ref = {}", q); // asyncService.registerQueue("update-queue", 1, updateArticleCallable); -// q = asyncService.getQueue("create-queue"); +// q = asyncService.lookupQueue("create-queue"); final List<Task> tasks = new ArrayList<>(); diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java index 338abad..b09d3e9 100644 --- a/src/test/java/io/trygvis/test/PlainJavaExample.java +++ b/src/test/java/io/trygvis/test/PlainJavaExample.java @@ -1,22 +1,26 @@ package io.trygvis.test; +import io.trygvis.async.SqlEffect; +import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; -import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; import javax.sql.DataSource; import java.sql.Connection; +import java.sql.SQLException; import java.util.Date; import java.util.List; +import java.util.Random; -import static io.trygvis.queue.JdbcQueueService.createQueueService; -import static io.trygvis.queue.Task.newTask; import static io.trygvis.test.DbUtil.createDataSource; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; public class PlainJavaExample { + private static final Random r = new Random(); private static String inputName = "my-input"; private static String outputName = "my-output"; @@ -28,26 +32,40 @@ public class PlainJavaExample { System.out.println("Starting consumer"); DataSource ds = createDataSource(); - Connection c = ds.getConnection(); - JdbcQueueService queueService = createQueueService(c); + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + final JdbcQueueService queueService = queueSystem.queueService; + + Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() { + @Override + public Queue[] doInConnection(Connection c) throws SQLException { + return new Queue[]{ + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true)}; + } + }); - final Queue input = queueService.getQueue(c, inputName, interval, true); - final Queue output = queueService.getQueue(c, outputName, interval, true); + final Queue input = queues[0]; + final Queue output = queues[1]; - queueService.consume(c, input, new QueueService.TaskEffect() { - public List<Task> consume(Task task) throws Exception { - System.out.println("PlainJavaExample$Consumer.consume"); + queueService.consumeAll(input, new TaskEffect() { + public List<Task> apply(Task task) throws Exception { + System.out.println("PlainJavaExample$Consumer.consumeAll: arguments = " + task.arguments); Long a = Long.valueOf(task.arguments.get(0)); Long b = Long.valueOf(task.arguments.get(1)); System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); - return singletonList(newTask(output.name, new Date(), Long.toString(a + b))); + if(r.nextInt(3) == 0) { + return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b))); + } + + throw new RuntimeException("Simulated exception while processing task."); } }); - - c.commit(); + System.out.println("Done"); } } @@ -58,9 +76,12 @@ public class PlainJavaExample { DataSource ds = createDataSource(); Connection c = ds.getConnection(); - JdbcQueueService queueService = createQueueService(c); + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + JdbcQueueService queueService = queueSystem.queueService; - Queue queue = queueService.getQueue(c, inputName, interval, true); + Queue queue = queueService.lookupQueue(c, inputName, interval, true); queueService.schedule(c, queue, new Date(), asList("10", "20")); diff --git a/src/test/java/io/trygvis/test/PlainJavaExample2.java b/src/test/java/io/trygvis/test/PlainJavaExample2.java new file mode 100644 index 0000000..faeebb2 --- /dev/null +++ b/src/test/java/io/trygvis/test/PlainJavaExample2.java @@ -0,0 +1,84 @@ +package io.trygvis.test; + +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.SqlEffectExecutor; +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.Task.newTask; +import static io.trygvis.test.DbUtil.createDataSource; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + +public class PlainJavaExample2 { + + private static String inputName = "my-input"; + private static String outputName = "my-output"; + + private static int interval = 10; + + private static final TaskEffect adder = new TaskEffect() { + public List<Task> apply(Task task) throws Exception { + System.out.println("PlainJavaExample$Consumer.consumeAll"); + Long a = Long.valueOf(task.arguments.get(0)); + Long b = Long.valueOf(task.arguments.get(1)); + + System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); + + return singletonList(newTask(outputName, new Date(), Long.toString(a + b))); + } + }; + + public static class Consumer { + public static void main(String[] args) throws Exception { + System.out.println("Starting consumer"); + + DataSource ds = createDataSource(); + Connection c = ds.getConnection(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + JdbcQueueService queueService = queueSystem.queueService; + + final Queue input = queueService.lookupQueue(c, inputName, interval, true); + final Queue output = queueService.lookupQueue(c, outputName, interval, true); + + JdbcAsyncService asyncService = new JdbcAsyncService(queueSystem); + + asyncService.registerQueue(input, adder); + +// queueService.consumeAll(c, input, adder); + + c.commit(); + } + } + + public static class Producer { + public static void main(String[] args) throws Exception { + System.out.println("Starting producer"); + + DataSource ds = createDataSource(); + Connection c = ds.getConnection(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + JdbcQueueService queueService = queueSystem.queueService; + + Queue queue = queueService.lookupQueue(c, inputName, interval, true); + + queueService.schedule(c, queue, new Date(), asList("10", "20")); + + c.commit(); + } + } +} diff --git a/src/test/java/io/trygvis/test/UpdateArticleCallable.java b/src/test/java/io/trygvis/test/UpdateArticleCallable.java index f50c10a..6aff20f 100755 --- a/src/test/java/io/trygvis/test/UpdateArticleCallable.java +++ b/src/test/java/io/trygvis/test/UpdateArticleCallable.java @@ -1,6 +1,7 @@ package io.trygvis.test; -import io.trygvis.async.AsyncService; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; @@ -9,14 +10,18 @@ import java.util.Date; import java.util.List; import java.util.Random; +import static java.util.Collections.emptyList; + @Component("updateArticle") -public class UpdateArticleCallable - implements AsyncService.AsyncCallable { +public class UpdateArticleCallable implements TaskEffect { private final Logger log = LoggerFactory.getLogger(getClass()); private final Random r = new Random(); - public void run(List<String> arguments) throws Exception { + @Override + public List<Task> apply(Task task) throws Exception { + List<String> arguments = task.arguments; + log.info("UpdateArticeJob.run: BEGIN"); Date now = new Date(); @@ -36,5 +41,7 @@ public class UpdateArticleCallable */ log.info("UpdateArticeJob.run: END"); + + return emptyList(); } } diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java index 07e67fb..d06d8d6 100644 --- a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java +++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java @@ -3,6 +3,8 @@ package io.trygvis.test.spring; import io.trygvis.async.AsyncService; import io.trygvis.queue.Queue; import io.trygvis.queue.QueueService; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; import io.trygvis.spring.DefaultConfig; import org.junit.Test; import org.junit.runner.RunWith; @@ -17,6 +19,7 @@ import java.util.concurrent.atomic.AtomicReference; import static java.lang.System.getProperty; import static java.lang.System.setProperty; import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; import static org.fest.assertions.Assertions.assertThat; import static org.junit.Assert.assertNotNull; @@ -41,13 +44,15 @@ public class PlainSpringTest { public void testBasic() throws SQLException, InterruptedException { Queue test = queueService.getQueue("test", 10, true); final AtomicReference<List<String>> ref = new AtomicReference<>(); - asyncService.registerQueue(test, new AsyncService.AsyncCallable() { - public void run(List<String> arguments) throws Exception { + asyncService.registerQueue(test, new TaskEffect() { + @Override + public List<Task> apply(Task task) throws Exception { System.out.println("PlainSpringTest.run"); - ref.set(arguments); + ref.set(task.arguments); synchronized (ref) { ref.notify(); } + return emptyList(); } }); diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml new file mode 100755 index 0000000..676eac5 --- /dev/null +++ b/src/test/resources/logback.xml @@ -0,0 +1,25 @@ +<configuration debug="false"> + + <logger name="io.trygvis" level="DEBUG"/> +<!-- <logger name="org.springframework" level="INFO"/> --> + <logger name="org.springframework" level="INFO"/> + <logger name="org.springframework.jdbc" level="INFO"/> + <logger name="org.springframework.jdbc.datasource.DataSourceTransactionManager" level="DEBUG"/> + <logger name="org.springframework.orm" level="INFO"/> + <logger name="org.springframework.transaction" level="DEBUG"/> + + <logger name="org.hibernate" level="INFO"/> + <logger name="org.hibernate.SQL" level="INFO"/> + + <logger name="com.jolbox" level="TRACE"/> + + <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>%d{HH:mm:ss.SSS} [%-15thread] %-5level %-60logger{60} - %msg%n</pattern> + </encoder> + </appender> + + <root level="DEBUG"> + <appender-ref ref="STDOUT"/> + </root> +</configuration> |