From 1ec4fae12c5e5363591013e5a759590d913d6782 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 16 Jun 2013 12:07:43 +0200 Subject: wip --- src/test/java/io/trygvis/test/Main.java | 4 +- .../io/trygvis/test/jdbc/AsyncConsumerExample.java | 52 +++++++++++----------- .../io/trygvis/test/jdbc/PlainJavaExample.java | 24 +++++----- .../io/trygvis/test/spring/PlainSpringTest.java | 15 ++++--- 4 files changed, 52 insertions(+), 43 deletions(-) (limited to 'src/test') diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java index 0721ec9..f03d6fa 100755 --- a/src/test/java/io/trygvis/test/Main.java +++ b/src/test/java/io/trygvis/test/Main.java @@ -74,7 +74,9 @@ public class Main { final Queue q = null; // queueService.lookupQueue(c, "create-article", 1); - asyncService.registerQueue(q, createArticleCallable); + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); + + asyncService.registerQueue(q, req, createArticleCallable); // log.info("queue registered: ref = {}", q); // asyncService.registerQueue("update-queue", 1, updateArticleCallable); diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java index 8d981f2..dd478d7 100644 --- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -1,10 +1,12 @@ package io.trygvis.test.jdbc; import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.QueueController; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -38,36 +40,36 @@ public class AsyncConsumerExample { } }; - public static class Consumer { - public static void main(String[] args) throws Exception { - System.out.println("Starting consumer"); + public static void main(String[] args) throws Exception { + System.out.println("Starting consumer"); - DataSource ds = createDataSource(); + DataSource ds = createDataSource(); - SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); - QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - JdbcAsyncService asyncService = queueSystem.createAsyncService(); - final JdbcQueueService queueService = queueSystem.queueService; + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + JdbcAsyncService asyncService = queueSystem.createAsyncService(); + final JdbcQueueService queueService = queueSystem.createQueueService(); - Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect() { - @Override - public Queue[] doInConnection(Connection c) throws SQLException { - return new Queue[]{ - queueService.lookupQueue(c, inputName, interval, true), - queueService.lookupQueue(c, outputName, interval, true) - }; - } - }); + QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public QueueExecutor[] doInConnection(Connection c) throws SQLException { + return new QueueExecutor[]{ + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true) + }; + } + }); - final Queue input = queues[0]; - final Queue output = queues[1]; + final QueueExecutor input = queues[0]; + final QueueExecutor output = queues[1]; - asyncService.registerQueue(input, adder); + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); - asyncService.startQueue(input, new ScheduledThreadPoolExecutor(2)); - Thread.sleep(5 * 1000); - asyncService.stopQueue(input); - } + QueueController controller = asyncService.registerQueue(input, req, adder); + + controller.start(new ScheduledThreadPoolExecutor(2)); + Thread.sleep(60 * 1000); + controller.stop(); } } diff --git a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java index 994c310..0e11ab3 100644 --- a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java +++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java @@ -3,7 +3,7 @@ package io.trygvis.test.jdbc; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueStats; import io.trygvis.queue.QueueSystem; @@ -42,12 +42,12 @@ public class PlainJavaExample { SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - final JdbcQueueService queueService = queueSystem.queueService; + final JdbcQueueService queueService = queueSystem.createQueueService(); - Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect() { + QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect() { @Override - public Queue[] doInConnection(Connection c) throws SQLException { - Queue[] queues = { + public QueueExecutor[] doInConnection(Connection c) throws SQLException { + QueueExecutor[] queues = { queueService.lookupQueue(c, inputName, interval, true), queueService.lookupQueue(c, outputName, interval, true)}; @@ -63,12 +63,12 @@ public class PlainJavaExample { } }); - final Queue input = queues[0]; - final Queue output = queues[1]; + final QueueExecutor input = queues[0]; + final QueueExecutor output = queues[1]; QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false); - queueService.consumeAll(input, req, new TaskEffect() { + input.consumeAll(req, new TaskEffect() { public List apply(Task task) throws Exception { Long a = Long.valueOf(task.arguments.get(0)); Long b = Long.valueOf(task.arguments.get(1)); @@ -76,7 +76,7 @@ public class PlainJavaExample { System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); if (r.nextInt(3000) > 0) { - return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b))); + return singletonList(task.childTask(output.queue.name, new Date(), Long.toString(a + b))); } throw new RuntimeException("Simulated exception while processing task."); @@ -98,9 +98,9 @@ public class PlainJavaExample { SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - final JdbcQueueService queueService = queueSystem.queueService; + final JdbcQueueService queueService = queueSystem.createQueueService(); - final Queue queue; + final QueueExecutor queue; try (Connection c = ds.getConnection()) { queue = queueService.lookupQueue(c, inputName, interval, true); c.commit(); @@ -112,7 +112,7 @@ public class PlainJavaExample { @Override public void doInConnection(Connection c) throws SQLException { for (int j = 0; j < chunk; j++) { - queueService.schedule(c, queue, new Date(), asList("10", "20")); + queue.schedule(c, new Date(), asList("10", "20")); } } }); diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java index d06d8d6..38d3361 100644 --- a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java +++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java @@ -1,7 +1,7 @@ package io.trygvis.test.spring; import io.trygvis.async.AsyncService; -import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -13,6 +13,7 @@ import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.sql.SQLException; +import java.util.Date; import java.util.List; import java.util.concurrent.atomic.AtomicReference; @@ -33,6 +34,8 @@ public class PlainSpringTest { @Autowired private QueueService queueService; + private final QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); + static { String username = getProperty("user.name"); setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username)); @@ -42,9 +45,9 @@ public class PlainSpringTest { @Test public void testBasic() throws SQLException, InterruptedException { - Queue test = queueService.getQueue("test", 10, true); + QueueExecutor test = queueService.getQueue("test", 10, true); final AtomicReference> ref = new AtomicReference<>(); - asyncService.registerQueue(test, new TaskEffect() { + asyncService.registerQueue(test.queue, req, new TaskEffect() { @Override public List apply(Task task) throws Exception { System.out.println("PlainSpringTest.run"); @@ -58,12 +61,14 @@ public class PlainSpringTest { synchronized (ref) { System.out.println("Scheduling task"); - asyncService.schedule(test, asList("hello", "world")); - System.out.println("Waiting"); + queueService.schedule(test.queue, new Date(), asList("hello", "world")); + System.out.println("Task scheduled, waiting"); ref.wait(1000); + System.out.println("Back!"); } List args = ref.get(); + System.out.println("args = " + args); assertNotNull(args); assertThat(args).containsExactly("hello", "world"); } -- cgit v1.2.3