From 29dc40a0f7fa765d6f66e7a1bdd31083f71286de Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 22 Jun 2013 09:39:49 +0200 Subject: o Adding an ActiveMQ connection to messages can be sent as a hint to the consumer. --- .../java/io/trygvis/activemq/ActiveMqHinter.java | 152 +++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 src/main/java/io/trygvis/activemq/ActiveMqHinter.java (limited to 'src/main/java/io/trygvis/activemq/ActiveMqHinter.java') diff --git a/src/main/java/io/trygvis/activemq/ActiveMqHinter.java b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java new file mode 100644 index 0000000..f2cfb6e --- /dev/null +++ b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java @@ -0,0 +1,152 @@ +package io.trygvis.activemq; + +import io.trygvis.async.QueueController; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.TextMessage; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.sql.SQLException; + +import static java.lang.Long.parseLong; +import static java.lang.System.arraycopy; +import static java.nio.charset.Charset.forName; +import static javax.jms.Session.AUTO_ACKNOWLEDGE; + +public class ActiveMqHinter implements AutoCloseable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueConnection c; + + private static final Charset utf8 = forName("utf-8"); + + public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException { + log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL()); + c = connectionFactory.createQueueConnection(); + c.start(); + log.info("Connected, clientId = {}", c.getClientID()); + } + + public void createHinter(final QueueController controller) throws JMSException { + QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(controller.queue.queue.name); + final MessageConsumer consumer = session.createConsumer(queue); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + if ((message instanceof TextMessage)) { + String body; + try { + TextMessage textMessage = (TextMessage) message; + body = textMessage.getText(); + } catch (JMSException e) { + log.warn("Exception while reading body.", e); + throw new RuntimeException("Exception while reading body.", e); + } + + consumeString(new StringReader(body), controller); + } else if (message instanceof BytesMessage) { + final BytesMessage bytesMessage = (BytesMessage) message; + consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller); + } else { + throw new RuntimeException("Unsupported message type: " + message.getClass()); + } + } + }); + + controller.addOnStopListener(new Runnable() { + @Override + public void run() { + try { + consumer.close(); + } catch (JMSException e) { + log.error("Error while closing JMS consumer", e); + } + } + }); + } + + private void consumeString(Reader reader, QueueController controller) { + try { + BufferedReader r = new BufferedReader(reader); + + String line = r.readLine(); + + while (line != null) { + for (String id : line.split(",")) { + controller.hint(parseLong(id.trim())); + } + line = r.readLine(); + } + } catch (IOException | SQLException e) { + log.warn("Could not consume body.", e); + throw new RuntimeException("Could not consume body.", e); + } catch (NumberFormatException e) { + log.warn("Could not consume body.", e); + throw e; + } + } + + public void close() throws JMSException { + c.close(); + } + + private static class ByteMessageInputStream extends InputStream { + private final BytesMessage bytesMessage; + + public ByteMessageInputStream(BytesMessage bytesMessage) { + this.bytesMessage = bytesMessage; + } + + @Override + public int read(byte[] b) throws IOException { + try { + return bytesMessage.readBytes(b); + } catch (JMSException e) { + throw new IOException(e); + } + } + + @Override + public int read(byte[] out, int off, int len) throws IOException { + byte[] b = new byte[len]; + try { + int read = bytesMessage.readBytes(b); + if (read == -1) { + return -1; + } + arraycopy(b, 0, out, off, read); + return read; + } catch (JMSException e) { + throw new IOException(e); + } + } + + @Override + public int read() throws IOException { + try { + return bytesMessage.readByte(); + } catch (javax.jms.MessageEOFException e) { + return -1; + } catch (JMSException e) { + throw new IOException(e); + } + } + } +} -- cgit v1.2.3