From 29dc40a0f7fa765d6f66e7a1bdd31083f71286de Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 22 Jun 2013 09:39:49 +0200 Subject: o Adding an ActiveMQ connection to messages can be sent as a hint to the consumer. --- src/test/java/io/trygvis/test/DbUtil.java | 4 +-- .../test/activemq/AsyncConsumerWithActiveMq.java | 37 ++++++++++++++++++++++ .../io/trygvis/test/jdbc/AsyncConsumerExample.java | 18 ++++++++--- 3 files changed, 52 insertions(+), 7 deletions(-) create mode 100644 src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java (limited to 'src/test/java/io/trygvis') diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java index 025442e..33c2807 100644 --- a/src/test/java/io/trygvis/test/DbUtil.java +++ b/src/test/java/io/trygvis/test/DbUtil.java @@ -37,8 +37,8 @@ public class DbUtil { ds.setIdleConnectionTestPeriodInSeconds(60); ds.setIdleMaxAgeInSeconds(240); ds.setMaxConnectionsPerPartition(4); - ds.setMinConnectionsPerPartition(2); - ds.setPartitionCount(2); + ds.setMinConnectionsPerPartition(1); + ds.setPartitionCount(4); ds.setAcquireIncrement(1); ds.setStatementsCacheSize(1000); ds.setReleaseHelperThreads(1); diff --git a/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java new file mode 100644 index 0000000..bd86732 --- /dev/null +++ b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java @@ -0,0 +1,37 @@ +package io.trygvis.test.activemq; + +import io.trygvis.activemq.ActiveMqHinter; +import io.trygvis.async.QueueController; +import io.trygvis.queue.QueueService; +import io.trygvis.test.jdbc.AsyncConsumerExample; +import org.apache.activemq.ActiveMQConnectionFactory; + +import javax.jms.JMSException; + +public class AsyncConsumerWithActiveMq extends AsyncConsumerExample implements AutoCloseable { + private final ActiveMqHinter activeMqHinter; + + public AsyncConsumerWithActiveMq(ActiveMQConnectionFactory cf) throws JMSException { + activeMqHinter = new ActiveMqHinter(cf); + } + + public static void main(String[] args) throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:tcp://localhost:61616"); + + int poolSize = 4; + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1, true). + interval(60 * 6000). + continueOnFullChunk(false); + try (AsyncConsumerWithActiveMq consumer = new AsyncConsumerWithActiveMq(cf)) { + consumer.work(poolSize, req); + } + } + + public void close() throws JMSException { + activeMqHinter.close(); + } + + protected void wrapInputQueue(QueueController input) throws Exception { + activeMqHinter.createHinter(input); + } +} diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java index 14db21b..16640dd 100644 --- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -3,12 +3,12 @@ package io.trygvis.test.jdbc; import io.trygvis.async.JdbcAsyncService; import io.trygvis.async.QueueController; import io.trygvis.async.QueueStats; -import io.trygvis.queue.SqlEffect; -import io.trygvis.queue.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.SqlEffectExecutor; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -43,6 +43,12 @@ public class AsyncConsumerExample { }; public static void main(String[] args) throws Exception { + int poolSize = 4; + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); + new AsyncConsumerExample().work(poolSize, req); + } + + public void work(int poolSize, QueueService.TaskExecutionRequest req) throws Exception { System.out.println("Starting consumer"); DataSource ds = createDataSource(); @@ -66,9 +72,8 @@ public class AsyncConsumerExample { final QueueExecutor input = queues[0]; final QueueExecutor output = queues[1]; - QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); - QueueController controller = asyncService.registerQueue(input, req, adder); + wrapInputQueue(controller); Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { @@ -80,7 +85,7 @@ public class AsyncConsumerExample { }, 1000, 1000); long start = currentTimeMillis(); - controller.start(new ScheduledThreadPoolExecutor(4)); + controller.start(new ScheduledThreadPoolExecutor(poolSize)); Thread.sleep(60 * 1000); controller.stop(); long end = currentTimeMillis(); @@ -96,4 +101,7 @@ public class AsyncConsumerExample { double rate = 1000 * ((double) stats.totalMessageCount) / duration; System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s"); } + + protected void wrapInputQueue(QueueController input) throws Exception { + } } -- cgit v1.2.3