aboutsummaryrefslogtreecommitdiff
path: root/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java')
-rw-r--r--src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java32
1 files changed, 29 insertions, 3 deletions
diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
index dd478d7..361c2c5 100644
--- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
@@ -2,6 +2,7 @@ package io.trygvis.test.jdbc;
import io.trygvis.async.JdbcAsyncService;
import io.trygvis.async.QueueController;
+import io.trygvis.async.QueueStats;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
@@ -16,10 +17,13 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static io.trygvis.queue.Task.newTask;
import static io.trygvis.test.DbUtil.createDataSource;
+import static java.lang.System.currentTimeMillis;
import static java.util.Collections.singletonList;
public class AsyncConsumerExample {
@@ -27,7 +31,7 @@ public class AsyncConsumerExample {
private static String inputName = "my-input";
private static String outputName = "my-output";
- private static int interval = 10;
+ private static int interval = 1000;
private static final TaskEffect adder = new TaskEffect() {
public List<Task> apply(Task task) throws Exception {
@@ -68,8 +72,30 @@ public class AsyncConsumerExample {
QueueController controller = asyncService.registerQueue(input, req, adder);
- controller.start(new ScheduledThreadPoolExecutor(2));
- Thread.sleep(60 * 1000);
+ Timer timer = new Timer();
+ timer.scheduleAtFixedRate(new TimerTask() {
+ @Override
+ public void run() {
+ System.out.println(input.getStats());
+ System.out.println(output.getStats());
+ }
+ }, 1000, 1000);
+
+ long start = currentTimeMillis();
+ controller.start(new ScheduledThreadPoolExecutor(1));
+ Thread.sleep(5 * 1000);
controller.stop();
+ long end = currentTimeMillis();
+ timer.cancel();
+
+ QueueStats stats = input.getStats();
+
+ System.out.println("Summary:");
+ System.out.println(stats.toString());
+ System.out.println(output.getStats().toString());
+
+ long duration = end - start;
+ double rate = 1000 * ((double) stats.totalMessageCount) / duration;
+ System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s");
}
}