aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-22 09:39:49 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-22 10:16:37 +0200
commit29dc40a0f7fa765d6f66e7a1bdd31083f71286de (patch)
tree9270bc21dc8d7982a9b761b40261db9b7bd4a41c /src/main/java/io/trygvis/activemq/ActiveMqHinter.java
parent49c70dd5bdafe3461c03a4ce45ec7e78a1a479a5 (diff)
downloadquartz-based-queue-29dc40a0f7fa765d6f66e7a1bdd31083f71286de.tar.gz
quartz-based-queue-29dc40a0f7fa765d6f66e7a1bdd31083f71286de.tar.bz2
quartz-based-queue-29dc40a0f7fa765d6f66e7a1bdd31083f71286de.tar.xz
quartz-based-queue-29dc40a0f7fa765d6f66e7a1bdd31083f71286de.zip
o Adding an ActiveMQ connection to messages can be sent as a hint to the consumer.HEADmaster
Diffstat (limited to 'src/main/java/io/trygvis/activemq/ActiveMqHinter.java')
-rw-r--r--src/main/java/io/trygvis/activemq/ActiveMqHinter.java152
1 files changed, 152 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/activemq/ActiveMqHinter.java b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
new file mode 100644
index 0000000..f2cfb6e
--- /dev/null
+++ b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
@@ -0,0 +1,152 @@
+package io.trygvis.activemq;
+
+import io.trygvis.async.QueueController;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.TextMessage;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.sql.SQLException;
+
+import static java.lang.Long.parseLong;
+import static java.lang.System.arraycopy;
+import static java.nio.charset.Charset.forName;
+import static javax.jms.Session.AUTO_ACKNOWLEDGE;
+
+public class ActiveMqHinter implements AutoCloseable {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueConnection c;
+
+ private static final Charset utf8 = forName("utf-8");
+
+ public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException {
+ log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL());
+ c = connectionFactory.createQueueConnection();
+ c.start();
+ log.info("Connected, clientId = {}", c.getClientID());
+ }
+
+ public void createHinter(final QueueController controller) throws JMSException {
+ QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(controller.queue.queue.name);
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ if ((message instanceof TextMessage)) {
+ String body;
+ try {
+ TextMessage textMessage = (TextMessage) message;
+ body = textMessage.getText();
+ } catch (JMSException e) {
+ log.warn("Exception while reading body.", e);
+ throw new RuntimeException("Exception while reading body.", e);
+ }
+
+ consumeString(new StringReader(body), controller);
+ } else if (message instanceof BytesMessage) {
+ final BytesMessage bytesMessage = (BytesMessage) message;
+ consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller);
+ } else {
+ throw new RuntimeException("Unsupported message type: " + message.getClass());
+ }
+ }
+ });
+
+ controller.addOnStopListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ consumer.close();
+ } catch (JMSException e) {
+ log.error("Error while closing JMS consumer", e);
+ }
+ }
+ });
+ }
+
+ private void consumeString(Reader reader, QueueController controller) {
+ try {
+ BufferedReader r = new BufferedReader(reader);
+
+ String line = r.readLine();
+
+ while (line != null) {
+ for (String id : line.split(",")) {
+ controller.hint(parseLong(id.trim()));
+ }
+ line = r.readLine();
+ }
+ } catch (IOException | SQLException e) {
+ log.warn("Could not consume body.", e);
+ throw new RuntimeException("Could not consume body.", e);
+ } catch (NumberFormatException e) {
+ log.warn("Could not consume body.", e);
+ throw e;
+ }
+ }
+
+ public void close() throws JMSException {
+ c.close();
+ }
+
+ private static class ByteMessageInputStream extends InputStream {
+ private final BytesMessage bytesMessage;
+
+ public ByteMessageInputStream(BytesMessage bytesMessage) {
+ this.bytesMessage = bytesMessage;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ try {
+ return bytesMessage.readBytes(b);
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public int read(byte[] out, int off, int len) throws IOException {
+ byte[] b = new byte[len];
+ try {
+ int read = bytesMessage.readBytes(b);
+ if (read == -1) {
+ return -1;
+ }
+ arraycopy(b, 0, out, off, read);
+ return read;
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ try {
+ return bytesMessage.readByte();
+ } catch (javax.jms.MessageEOFException e) {
+ return -1;
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}