From 7465fdb9aa847d29dacc56adbe473f1c1ceb298e Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Tue, 4 Jun 2013 20:54:56 +0200 Subject: o Creating a QueueService on top of the DAOs. --- src/test/java/io/trygvis/test/DbUtil.java | 47 +++++++++++++++ src/test/java/io/trygvis/test/Main.java | 8 ++- .../java/io/trygvis/test/PlainJavaExample.java | 70 ++++++++++++++++++++++ .../io/trygvis/test/spring/PlainSpringTest.java | 10 +++- .../java/io/trygvis/test/spring/TestConfig.java | 1 + 5 files changed, 133 insertions(+), 3 deletions(-) create mode 100644 src/test/java/io/trygvis/test/DbUtil.java create mode 100644 src/test/java/io/trygvis/test/PlainJavaExample.java (limited to 'src/test') diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java new file mode 100644 index 0000000..d0e5b47 --- /dev/null +++ b/src/test/java/io/trygvis/test/DbUtil.java @@ -0,0 +1,47 @@ +package io.trygvis.test; + +import com.jolbox.bonecp.BoneCPDataSource; +import org.springframework.jdbc.datasource.LazyConnectionDataSourceProxy; +import org.springframework.jdbc.datasource.TransactionAwareDataSourceProxy; + +import javax.sql.DataSource; + +import java.io.PrintWriter; +import java.sql.SQLException; + +import static java.lang.System.getProperty; + +public class DbUtil { + public static DataSource createDataSource() throws SQLException { + String username = getProperty("user.name"); + String jdbcUrl = getProperty("database.url", "jdbc:postgresql://localhost/" + username); + String user = getProperty("database.username", username); + String pass = getProperty("database.password", username); + + return createDataSource(jdbcUrl, user, pass); + } + + public static DataSource createDataSource(String jdbcUrl, String username, String password) throws SQLException { + BoneCPDataSource ds = new BoneCPDataSource(); + + ds.setLogStatementsEnabled(true); + + ds.setJdbcUrl(jdbcUrl); + ds.setUsername(username); + ds.setPassword(password); + + ds.setDefaultAutoCommit(false); + ds.setIdleConnectionTestPeriodInSeconds(60); + ds.setIdleMaxAgeInSeconds(240); + ds.setMaxConnectionsPerPartition(40); + ds.setMinConnectionsPerPartition(0); + ds.setPartitionCount(1); + ds.setAcquireIncrement(1); + ds.setStatementsCacheSize(1000); + ds.setReleaseHelperThreads(3); + ds.setStatisticsEnabled(true); + ds.setLogStatementsEnabled(true); + ds.setLogWriter(new PrintWriter(System.err)); + return new TransactionAwareDataSourceProxy(new LazyConnectionDataSourceProxy(ds)); + } +} diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java index d274101..43ee971 100755 --- a/src/test/java/io/trygvis/test/Main.java +++ b/src/test/java/io/trygvis/test/Main.java @@ -2,6 +2,7 @@ package io.trygvis.test; import io.trygvis.async.AsyncService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueService; import io.trygvis.queue.Task; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,6 +57,9 @@ public class Main { @Autowired private AsyncService asyncService; + @Autowired + private QueueService queueService; + @Autowired @Qualifier("createArticle") private AsyncService.AsyncCallable createArticleCallable; @@ -67,7 +71,9 @@ public class Main { public void run() throws Exception { log.info("Main.run"); - final Queue q = asyncService.registerQueue("create-article", 1, createArticleCallable); + final Queue q = null; // queueService.getQueue(c, "create-article", 1); + + asyncService.registerQueue(q, createArticleCallable); // log.info("queue registered: ref = {}", q); // asyncService.registerQueue("update-queue", 1, updateArticleCallable); diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java new file mode 100644 index 0000000..338abad --- /dev/null +++ b/src/test/java/io/trygvis/test/PlainJavaExample.java @@ -0,0 +1,70 @@ +package io.trygvis.test; + +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.Task; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.util.Date; +import java.util.List; + +import static io.trygvis.queue.JdbcQueueService.createQueueService; +import static io.trygvis.queue.Task.newTask; +import static io.trygvis.test.DbUtil.createDataSource; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + +public class PlainJavaExample { + + private static String inputName = "my-input"; + private static String outputName = "my-output"; + + private static int interval = 10; + + public static class Consumer { + public static void main(String[] args) throws Exception { + System.out.println("Starting consumer"); + + DataSource ds = createDataSource(); + Connection c = ds.getConnection(); + + JdbcQueueService queueService = createQueueService(c); + + final Queue input = queueService.getQueue(c, inputName, interval, true); + final Queue output = queueService.getQueue(c, outputName, interval, true); + + queueService.consume(c, input, new QueueService.TaskEffect() { + public List consume(Task task) throws Exception { + System.out.println("PlainJavaExample$Consumer.consume"); + Long a = Long.valueOf(task.arguments.get(0)); + Long b = Long.valueOf(task.arguments.get(1)); + + System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); + + return singletonList(newTask(output.name, new Date(), Long.toString(a + b))); + } + }); + + c.commit(); + } + } + + public static class Producer { + public static void main(String[] args) throws Exception { + System.out.println("Starting producer"); + + DataSource ds = createDataSource(); + Connection c = ds.getConnection(); + + JdbcQueueService queueService = createQueueService(c); + + Queue queue = queueService.getQueue(c, inputName, interval, true); + + queueService.schedule(c, queue, new Date(), asList("10", "20")); + + c.commit(); + } + } +} diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java index 9a7a436..07e67fb 100644 --- a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java +++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java @@ -2,6 +2,7 @@ package io.trygvis.test.spring; import io.trygvis.async.AsyncService; import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueService; import io.trygvis.spring.DefaultConfig; import org.junit.Test; import org.junit.runner.RunWith; @@ -15,6 +16,7 @@ import java.util.concurrent.atomic.AtomicReference; import static java.lang.System.getProperty; import static java.lang.System.setProperty; +import static java.util.Arrays.asList; import static org.fest.assertions.Assertions.assertThat; import static org.junit.Assert.assertNotNull; @@ -25,6 +27,9 @@ public class PlainSpringTest { @Autowired private AsyncService asyncService; + @Autowired + private QueueService queueService; + static { String username = getProperty("user.name"); setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); @@ -34,8 +39,9 @@ public class PlainSpringTest { @Test public void testBasic() throws SQLException, InterruptedException { + Queue test = queueService.getQueue("test", 10, true); final AtomicReference> ref = new AtomicReference<>(); - Queue test = asyncService.registerQueue("test", 10, new AsyncService.AsyncCallable() { + asyncService.registerQueue(test, new AsyncService.AsyncCallable() { public void run(List arguments) throws Exception { System.out.println("PlainSpringTest.run"); ref.set(arguments); @@ -47,7 +53,7 @@ public class PlainSpringTest { synchronized (ref) { System.out.println("Scheduling task"); - asyncService.schedule(test, "hello", "world"); + asyncService.schedule(test, asList("hello", "world")); System.out.println("Waiting"); ref.wait(1000); } diff --git a/src/test/java/io/trygvis/test/spring/TestConfig.java b/src/test/java/io/trygvis/test/spring/TestConfig.java index 7853cb5..a1d95c0 100755 --- a/src/test/java/io/trygvis/test/spring/TestConfig.java +++ b/src/test/java/io/trygvis/test/spring/TestConfig.java @@ -100,6 +100,7 @@ public class TestConfig { ds.setUsername(username); ds.setPassword(password); + ds.setDefaultAutoCommit(false); ds.setIdleConnectionTestPeriodInSeconds(60); ds.setIdleMaxAgeInSeconds(240); ds.setMaxConnectionsPerPartition(40); -- cgit v1.2.3