aboutsummaryrefslogtreecommitdiff
path: root/src/test/java/io
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-22 09:39:49 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-22 10:16:37 +0200
commit29dc40a0f7fa765d6f66e7a1bdd31083f71286de (patch)
tree9270bc21dc8d7982a9b761b40261db9b7bd4a41c /src/test/java/io
parent49c70dd5bdafe3461c03a4ce45ec7e78a1a479a5 (diff)
downloadquartz-based-queue-master.tar.gz
quartz-based-queue-master.tar.bz2
quartz-based-queue-master.tar.xz
quartz-based-queue-master.zip
o Adding an ActiveMQ connection to messages can be sent as a hint to the consumer.HEADmaster
Diffstat (limited to 'src/test/java/io')
-rw-r--r--src/test/java/io/trygvis/test/DbUtil.java4
-rw-r--r--src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java37
-rw-r--r--src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java18
3 files changed, 52 insertions, 7 deletions
diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java
index 025442e..33c2807 100644
--- a/src/test/java/io/trygvis/test/DbUtil.java
+++ b/src/test/java/io/trygvis/test/DbUtil.java
@@ -37,8 +37,8 @@ public class DbUtil {
ds.setIdleConnectionTestPeriodInSeconds(60);
ds.setIdleMaxAgeInSeconds(240);
ds.setMaxConnectionsPerPartition(4);
- ds.setMinConnectionsPerPartition(2);
- ds.setPartitionCount(2);
+ ds.setMinConnectionsPerPartition(1);
+ ds.setPartitionCount(4);
ds.setAcquireIncrement(1);
ds.setStatementsCacheSize(1000);
ds.setReleaseHelperThreads(1);
diff --git a/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java
new file mode 100644
index 0000000..bd86732
--- /dev/null
+++ b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java
@@ -0,0 +1,37 @@
+package io.trygvis.test.activemq;
+
+import io.trygvis.activemq.ActiveMqHinter;
+import io.trygvis.async.QueueController;
+import io.trygvis.queue.QueueService;
+import io.trygvis.test.jdbc.AsyncConsumerExample;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import javax.jms.JMSException;
+
+public class AsyncConsumerWithActiveMq extends AsyncConsumerExample implements AutoCloseable {
+ private final ActiveMqHinter activeMqHinter;
+
+ public AsyncConsumerWithActiveMq(ActiveMQConnectionFactory cf) throws JMSException {
+ activeMqHinter = new ActiveMqHinter(cf);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:tcp://localhost:61616");
+
+ int poolSize = 4;
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1, true).
+ interval(60 * 6000).
+ continueOnFullChunk(false);
+ try (AsyncConsumerWithActiveMq consumer = new AsyncConsumerWithActiveMq(cf)) {
+ consumer.work(poolSize, req);
+ }
+ }
+
+ public void close() throws JMSException {
+ activeMqHinter.close();
+ }
+
+ protected void wrapInputQueue(QueueController input) throws Exception {
+ activeMqHinter.createHinter(input);
+ }
+}
diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
index 14db21b..16640dd 100644
--- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
@@ -3,12 +3,12 @@ package io.trygvis.test.jdbc;
import io.trygvis.async.JdbcAsyncService;
import io.trygvis.async.QueueController;
import io.trygvis.async.QueueStats;
-import io.trygvis.queue.SqlEffect;
-import io.trygvis.queue.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.QueueExecutor;
import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.SqlEffect;
+import io.trygvis.queue.SqlEffectExecutor;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -43,6 +43,12 @@ public class AsyncConsumerExample {
};
public static void main(String[] args) throws Exception {
+ int poolSize = 4;
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+ new AsyncConsumerExample().work(poolSize, req);
+ }
+
+ public void work(int poolSize, QueueService.TaskExecutionRequest req) throws Exception {
System.out.println("Starting consumer");
DataSource ds = createDataSource();
@@ -66,9 +72,8 @@ public class AsyncConsumerExample {
final QueueExecutor input = queues[0];
final QueueExecutor output = queues[1];
- QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
-
QueueController controller = asyncService.registerQueue(input, req, adder);
+ wrapInputQueue(controller);
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@@ -80,7 +85,7 @@ public class AsyncConsumerExample {
}, 1000, 1000);
long start = currentTimeMillis();
- controller.start(new ScheduledThreadPoolExecutor(4));
+ controller.start(new ScheduledThreadPoolExecutor(poolSize));
Thread.sleep(60 * 1000);
controller.stop();
long end = currentTimeMillis();
@@ -96,4 +101,7 @@ public class AsyncConsumerExample {
double rate = 1000 * ((double) stats.totalMessageCount) / duration;
System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s");
}
+
+ protected void wrapInputQueue(QueueController input) throws Exception {
+ }
}