aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-22 09:39:49 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-22 10:16:37 +0200
commit29dc40a0f7fa765d6f66e7a1bdd31083f71286de (patch)
tree9270bc21dc8d7982a9b761b40261db9b7bd4a41c
parent49c70dd5bdafe3461c03a4ce45ec7e78a1a479a5 (diff)
downloadquartz-based-queue-29dc40a0f7fa765d6f66e7a1bdd31083f71286de.tar.gz
quartz-based-queue-29dc40a0f7fa765d6f66e7a1bdd31083f71286de.tar.bz2
quartz-based-queue-29dc40a0f7fa765d6f66e7a1bdd31083f71286de.tar.xz
quartz-based-queue-29dc40a0f7fa765d6f66e7a1bdd31083f71286de.zip
o Adding an ActiveMQ connection to messages can be sent as a hint to the consumer.HEADmaster
-rwxr-xr-xpom.xml9
-rw-r--r--src/main/java/io/trygvis/activemq/ActiveMqHinter.java152
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java10
-rw-r--r--src/main/java/io/trygvis/async/QueueController.java70
-rw-r--r--src/main/java/io/trygvis/async/QueueStats.java5
-rw-r--r--src/main/java/io/trygvis/queue/QueueExecutor.java8
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java26
-rw-r--r--src/test/java/io/trygvis/test/DbUtil.java4
-rw-r--r--src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java37
-rw-r--r--src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java18
-rwxr-xr-xsrc/test/resources/logback.xml2
11 files changed, 293 insertions, 48 deletions
diff --git a/pom.xml b/pom.xml
index a86ce8d..5c5fb02 100755
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,15 @@
<scope>test</scope>
</dependency>
+ <!-- ActiveMQ Dependencies -->
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-client</artifactId>
+ <version>5.8.0</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Other dependencies -->
<dependency>
<groupId>com.jolbox</groupId>
<artifactId>bonecp</artifactId>
diff --git a/src/main/java/io/trygvis/activemq/ActiveMqHinter.java b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
new file mode 100644
index 0000000..f2cfb6e
--- /dev/null
+++ b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
@@ -0,0 +1,152 @@
+package io.trygvis.activemq;
+
+import io.trygvis.async.QueueController;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.TextMessage;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.sql.SQLException;
+
+import static java.lang.Long.parseLong;
+import static java.lang.System.arraycopy;
+import static java.nio.charset.Charset.forName;
+import static javax.jms.Session.AUTO_ACKNOWLEDGE;
+
+public class ActiveMqHinter implements AutoCloseable {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueConnection c;
+
+ private static final Charset utf8 = forName("utf-8");
+
+ public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException {
+ log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL());
+ c = connectionFactory.createQueueConnection();
+ c.start();
+ log.info("Connected, clientId = {}", c.getClientID());
+ }
+
+ public void createHinter(final QueueController controller) throws JMSException {
+ QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(controller.queue.queue.name);
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ if ((message instanceof TextMessage)) {
+ String body;
+ try {
+ TextMessage textMessage = (TextMessage) message;
+ body = textMessage.getText();
+ } catch (JMSException e) {
+ log.warn("Exception while reading body.", e);
+ throw new RuntimeException("Exception while reading body.", e);
+ }
+
+ consumeString(new StringReader(body), controller);
+ } else if (message instanceof BytesMessage) {
+ final BytesMessage bytesMessage = (BytesMessage) message;
+ consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller);
+ } else {
+ throw new RuntimeException("Unsupported message type: " + message.getClass());
+ }
+ }
+ });
+
+ controller.addOnStopListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ consumer.close();
+ } catch (JMSException e) {
+ log.error("Error while closing JMS consumer", e);
+ }
+ }
+ });
+ }
+
+ private void consumeString(Reader reader, QueueController controller) {
+ try {
+ BufferedReader r = new BufferedReader(reader);
+
+ String line = r.readLine();
+
+ while (line != null) {
+ for (String id : line.split(",")) {
+ controller.hint(parseLong(id.trim()));
+ }
+ line = r.readLine();
+ }
+ } catch (IOException | SQLException e) {
+ log.warn("Could not consume body.", e);
+ throw new RuntimeException("Could not consume body.", e);
+ } catch (NumberFormatException e) {
+ log.warn("Could not consume body.", e);
+ throw e;
+ }
+ }
+
+ public void close() throws JMSException {
+ c.close();
+ }
+
+ private static class ByteMessageInputStream extends InputStream {
+ private final BytesMessage bytesMessage;
+
+ public ByteMessageInputStream(BytesMessage bytesMessage) {
+ this.bytesMessage = bytesMessage;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ try {
+ return bytesMessage.readBytes(b);
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public int read(byte[] out, int off, int len) throws IOException {
+ byte[] b = new byte[len];
+ try {
+ int read = bytesMessage.readBytes(b);
+ if (read == -1) {
+ return -1;
+ }
+ arraycopy(b, 0, out, off, read);
+ return read;
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ try {
+ return bytesMessage.readByte();
+ } catch (javax.jms.MessageEOFException e) {
+ return -1;
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 57ab5c6..46f1f30 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -1,14 +1,10 @@
package io.trygvis.async;
-import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.QueueExecutor;
import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
-import io.trygvis.queue.TaskDao;
import io.trygvis.queue.TaskEffect;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
@@ -19,16 +15,12 @@ import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
public class JdbcAsyncService {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
private final Map<String, QueueController> queues = new HashMap<>();
private final QueueSystem queueSystem;
- private final JdbcQueueService queueService;
public JdbcAsyncService(QueueSystem queueSystem) {
this.queueSystem = queueSystem;
- this.queueService = queueSystem.createQueueService();
}
public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) {
@@ -40,8 +32,6 @@ public class JdbcAsyncService {
queues.put(queue.queue.name, queueController);
- log.info("registerQueue: LEAVE");
-
return queueController;
}
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java
index 8aed91f..a343d42 100644
--- a/src/main/java/io/trygvis/async/QueueController.java
+++ b/src/main/java/io/trygvis/async/QueueController.java
@@ -1,7 +1,6 @@
package io.trygvis.async;
import io.trygvis.queue.QueueExecutor;
-import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.SqlEffect;
import io.trygvis.queue.SqlEffectExecutor;
@@ -13,11 +12,10 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static io.trygvis.queue.QueueExecutor.TaskExecutionResult;
@@ -47,6 +45,8 @@ public class QueueController {
private ScheduledThreadPoolExecutor executor;
+ private List<Runnable> stopListeners = new ArrayList<>();
+
public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) {
this.queueSystem = queueSystem;
this.req = req;
@@ -55,36 +55,50 @@ public class QueueController {
this.queue = queue;
}
- public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List<Task> tasks,
- ScheduledThreadPoolExecutor executor) {
- ExecutorCompletionService<TaskExecutionResult> completionService = new ExecutorCompletionService<>(executor);
+ public void scheduleAll(final List<Task> tasks) throws InterruptedException {
+ ExecutorCompletionService<TaskExecutionResult> service = new ExecutorCompletionService<>(executor);
- List<Future<TaskExecutionResult>> results = new ArrayList<>(tasks.size());
+ for (final Task task : tasks) {
+ service.submit(new Callable<TaskExecutionResult>() {
+ @Override
+ public TaskExecutionResult call() throws Exception {
+ return queue.applyTask(taskEffect, task);
+ }
+ });
+ }
+ }
+ public void invokeAll(final List<Task> tasks) throws InterruptedException {
+ List<Callable<TaskExecutionResult>> callables = new ArrayList<>(tasks.size());
for (final Task task : tasks) {
- results.add(completionService.submit(new Callable<TaskExecutionResult>() {
+ callables.add(new Callable<TaskExecutionResult>() {
@Override
public TaskExecutionResult call() throws Exception {
- return queue.applyTask(effect, task);
+ return queue.applyTask(taskEffect, task);
}
- }));
+ });
}
- for (Future<TaskExecutionResult> result : results) {
- while(!result.isDone()) {
+ executor.invokeAll(callables);
+ }
+
+ public void hint(final long id) throws SQLException {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ Task task = queueSystem.createTaskDao(c).findById(id);
+
try {
- result.get();
- break;
+ scheduleAll(Collections.singletonList(task));
} catch (InterruptedException e) {
- if(shouldRun) {
- continue;
- }
- } catch (ExecutionException e) {
- log.error("Unexpected exception.", e);
- break;
+ throw new SQLException(e);
}
}
- }
+ });
+ }
+
+ public synchronized void addOnStopListener(Runnable runnable) {
+ stopListeners.add(runnable);
}
private class QueueThread implements Runnable {
@@ -102,7 +116,7 @@ public class QueueController {
log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name);
if (tasks.size() > 0) {
- executeTasks(req, taskEffect, tasks, executor);
+ invokeAll(tasks);
}
} catch (Throwable e) {
if (shouldRun) {
@@ -111,7 +125,7 @@ public class QueueController {
}
// If we found exactly the same number of tasks that we asked for, there is most likely more to go.
- if (tasks != null && tasks.size() == req.chunkSize) {
+ if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) {
log.info("Got a full chunk, continuing directly.");
continue;
}
@@ -122,7 +136,7 @@ public class QueueController {
checkForNewTasks = false;
} else {
try {
- wait(queue.queue.interval);
+ wait(req.interval(queue.queue));
} catch (InterruptedException e) {
// ignore
}
@@ -160,6 +174,14 @@ public class QueueController {
log.info("Stopping thread for queue {}", queue.queue.name);
+ for (Runnable runnable : stopListeners) {
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ log.error("Error while running stop listener " + runnable, e);
+ }
+ }
+
shouldRun = false;
// TODO: empty out the currently executing tasks.
diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java
index 3694c06..8edc720 100644
--- a/src/main/java/io/trygvis/async/QueueStats.java
+++ b/src/main/java/io/trygvis/async/QueueStats.java
@@ -4,12 +4,14 @@ public class QueueStats {
public final int totalMessageCount;
public final int okMessageCount;
public final int failedMessageCount;
+ public final int missedMessageCount;
public final int scheduledMessageCount;
- public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int scheduledMessageCount) {
+ public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) {
this.totalMessageCount = totalMessageCount;
this.okMessageCount = okMessageCount;
this.failedMessageCount = failedMessageCount;
+ this.missedMessageCount = missedMessageCount;
this.scheduledMessageCount = scheduledMessageCount;
}
@@ -19,6 +21,7 @@ public class QueueStats {
"totalMessageCount=" + totalMessageCount +
", okMessageCount=" + okMessageCount +
", failedMessageCount=" + failedMessageCount +
+ ", missedMessageCount=" + missedMessageCount +
", scheduledMessageCount=" + scheduledMessageCount +
'}';
}
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java
index 468059d..88e5b46 100644
--- a/src/main/java/io/trygvis/queue/QueueExecutor.java
+++ b/src/main/java/io/trygvis/queue/QueueExecutor.java
@@ -40,9 +40,10 @@ public class QueueExecutor {
public int ok;
public int failed;
public int scheduled;
+ public int missed;
- public QueueStats toStats() {
- return new QueueStats(total, ok, failed, scheduled);
+ public synchronized QueueStats toStats() {
+ return new QueueStats(total, ok, failed, missed, scheduled);
}
}
@@ -94,6 +95,9 @@ public class QueueExecutor {
if (count == 0) {
log.warn("Missed task {}", task.id());
+ synchronized (stats) {
+ stats.missed++;
+ }
return MISSED;
}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
index f4ce536..1c38f1f 100644
--- a/src/main/java/io/trygvis/queue/QueueService.java
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -12,15 +12,27 @@ public interface QueueService {
public static class TaskExecutionRequest {
public final long chunkSize;
public final boolean stopOnError;
+ public final Long interval;
+ public final boolean continueOnFullChunk;
// TODO: saveExceptions
public TaskExecutionRequest(long chunkSize, boolean stopOnError) {
- if (chunkSize <= 0) {
- throw new IllegalArgumentException("chunkSize has to be bigger than zero.");
- }
+ this(chunkSize, stopOnError, null, true);
+ }
+ private TaskExecutionRequest(long chunkSize, boolean stopOnError, Long interval, boolean continueOnFullChunk) {
this.chunkSize = chunkSize;
this.stopOnError = stopOnError;
+ this.interval = interval;
+ this.continueOnFullChunk = continueOnFullChunk;
+ }
+
+ public TaskExecutionRequest interval(long interval) {
+ return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk);
+ }
+
+ public TaskExecutionRequest continueOnFullChunk(boolean continueOnFullChunk) {
+ return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk);
}
@Override
@@ -30,5 +42,13 @@ public interface QueueService {
", stopOnError=" + stopOnError +
'}';
}
+
+ public long interval(Queue queue) {
+ if (interval != null) {
+ return interval;
+ }
+
+ return queue.interval;
+ }
}
}
diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java
index 025442e..33c2807 100644
--- a/src/test/java/io/trygvis/test/DbUtil.java
+++ b/src/test/java/io/trygvis/test/DbUtil.java
@@ -37,8 +37,8 @@ public class DbUtil {
ds.setIdleConnectionTestPeriodInSeconds(60);
ds.setIdleMaxAgeInSeconds(240);
ds.setMaxConnectionsPerPartition(4);
- ds.setMinConnectionsPerPartition(2);
- ds.setPartitionCount(2);
+ ds.setMinConnectionsPerPartition(1);
+ ds.setPartitionCount(4);
ds.setAcquireIncrement(1);
ds.setStatementsCacheSize(1000);
ds.setReleaseHelperThreads(1);
diff --git a/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java
new file mode 100644
index 0000000..bd86732
--- /dev/null
+++ b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java
@@ -0,0 +1,37 @@
+package io.trygvis.test.activemq;
+
+import io.trygvis.activemq.ActiveMqHinter;
+import io.trygvis.async.QueueController;
+import io.trygvis.queue.QueueService;
+import io.trygvis.test.jdbc.AsyncConsumerExample;
+import org.apache.activemq.ActiveMQConnectionFactory;
+
+import javax.jms.JMSException;
+
+public class AsyncConsumerWithActiveMq extends AsyncConsumerExample implements AutoCloseable {
+ private final ActiveMqHinter activeMqHinter;
+
+ public AsyncConsumerWithActiveMq(ActiveMQConnectionFactory cf) throws JMSException {
+ activeMqHinter = new ActiveMqHinter(cf);
+ }
+
+ public static void main(String[] args) throws Exception {
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:tcp://localhost:61616");
+
+ int poolSize = 4;
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1, true).
+ interval(60 * 6000).
+ continueOnFullChunk(false);
+ try (AsyncConsumerWithActiveMq consumer = new AsyncConsumerWithActiveMq(cf)) {
+ consumer.work(poolSize, req);
+ }
+ }
+
+ public void close() throws JMSException {
+ activeMqHinter.close();
+ }
+
+ protected void wrapInputQueue(QueueController input) throws Exception {
+ activeMqHinter.createHinter(input);
+ }
+}
diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
index 14db21b..16640dd 100644
--- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
@@ -3,12 +3,12 @@ package io.trygvis.test.jdbc;
import io.trygvis.async.JdbcAsyncService;
import io.trygvis.async.QueueController;
import io.trygvis.async.QueueStats;
-import io.trygvis.queue.SqlEffect;
-import io.trygvis.queue.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.QueueExecutor;
import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.SqlEffect;
+import io.trygvis.queue.SqlEffectExecutor;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -43,6 +43,12 @@ public class AsyncConsumerExample {
};
public static void main(String[] args) throws Exception {
+ int poolSize = 4;
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+ new AsyncConsumerExample().work(poolSize, req);
+ }
+
+ public void work(int poolSize, QueueService.TaskExecutionRequest req) throws Exception {
System.out.println("Starting consumer");
DataSource ds = createDataSource();
@@ -66,9 +72,8 @@ public class AsyncConsumerExample {
final QueueExecutor input = queues[0];
final QueueExecutor output = queues[1];
- QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
-
QueueController controller = asyncService.registerQueue(input, req, adder);
+ wrapInputQueue(controller);
Timer timer = new Timer();
timer.scheduleAtFixedRate(new TimerTask() {
@@ -80,7 +85,7 @@ public class AsyncConsumerExample {
}, 1000, 1000);
long start = currentTimeMillis();
- controller.start(new ScheduledThreadPoolExecutor(4));
+ controller.start(new ScheduledThreadPoolExecutor(poolSize));
Thread.sleep(60 * 1000);
controller.stop();
long end = currentTimeMillis();
@@ -96,4 +101,7 @@ public class AsyncConsumerExample {
double rate = 1000 * ((double) stats.totalMessageCount) / duration;
System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s");
}
+
+ protected void wrapInputQueue(QueueController input) throws Exception {
+ }
}
diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml
index f6e0f47..f964cd3 100755
--- a/src/test/resources/logback.xml
+++ b/src/test/resources/logback.xml
@@ -23,7 +23,7 @@
</encoder>
</appender>
- <root level="INFO">
+ <root level="WARN">
<appender-ref ref="STDOUT"/>
</root>
</configuration>