From e57f313713ee0fc52d7dd1247c51914d1462dfc2 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 16 Jun 2013 21:26:17 +0200 Subject: wip --- .../io/trygvis/test/jdbc/AsyncConsumerExample.java | 32 ++++++++++++++++++++-- 1 file changed, 29 insertions(+), 3 deletions(-) (limited to 'src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java') diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java index dd478d7..361c2c5 100644 --- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -2,6 +2,7 @@ package io.trygvis.test.jdbc; import io.trygvis.async.JdbcAsyncService; import io.trygvis.async.QueueController; +import io.trygvis.async.QueueStats; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; @@ -16,10 +17,13 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.Task.newTask; import static io.trygvis.test.DbUtil.createDataSource; +import static java.lang.System.currentTimeMillis; import static java.util.Collections.singletonList; public class AsyncConsumerExample { @@ -27,7 +31,7 @@ public class AsyncConsumerExample { private static String inputName = "my-input"; private static String outputName = "my-output"; - private static int interval = 10; + private static int interval = 1000; private static final TaskEffect adder = new TaskEffect() { public List apply(Task task) throws Exception { @@ -68,8 +72,30 @@ public class AsyncConsumerExample { QueueController controller = asyncService.registerQueue(input, req, adder); - controller.start(new ScheduledThreadPoolExecutor(2)); - Thread.sleep(60 * 1000); + Timer timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + System.out.println(input.getStats()); + System.out.println(output.getStats()); + } + }, 1000, 1000); + + long start = currentTimeMillis(); + controller.start(new ScheduledThreadPoolExecutor(1)); + Thread.sleep(5 * 1000); controller.stop(); + long end = currentTimeMillis(); + timer.cancel(); + + QueueStats stats = input.getStats(); + + System.out.println("Summary:"); + System.out.println(stats.toString()); + System.out.println(output.getStats().toString()); + + long duration = end - start; + double rate = 1000 * ((double) stats.totalMessageCount) / duration; + System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s"); } } -- cgit v1.2.3