aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/activemq/ActiveMqHinter.java')
-rw-r--r--src/main/java/io/trygvis/activemq/ActiveMqHinter.java152
1 files changed, 152 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/activemq/ActiveMqHinter.java b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
new file mode 100644
index 0000000..f2cfb6e
--- /dev/null
+++ b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java
@@ -0,0 +1,152 @@
+package io.trygvis.activemq;
+
+import io.trygvis.async.QueueController;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.TextMessage;
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.io.StringReader;
+import java.nio.charset.Charset;
+import java.sql.SQLException;
+
+import static java.lang.Long.parseLong;
+import static java.lang.System.arraycopy;
+import static java.nio.charset.Charset.forName;
+import static javax.jms.Session.AUTO_ACKNOWLEDGE;
+
+public class ActiveMqHinter implements AutoCloseable {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueConnection c;
+
+ private static final Charset utf8 = forName("utf-8");
+
+ public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException {
+ log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL());
+ c = connectionFactory.createQueueConnection();
+ c.start();
+ log.info("Connected, clientId = {}", c.getClientID());
+ }
+
+ public void createHinter(final QueueController controller) throws JMSException {
+ QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE);
+ Queue queue = session.createQueue(controller.queue.queue.name);
+ final MessageConsumer consumer = session.createConsumer(queue);
+
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ if ((message instanceof TextMessage)) {
+ String body;
+ try {
+ TextMessage textMessage = (TextMessage) message;
+ body = textMessage.getText();
+ } catch (JMSException e) {
+ log.warn("Exception while reading body.", e);
+ throw new RuntimeException("Exception while reading body.", e);
+ }
+
+ consumeString(new StringReader(body), controller);
+ } else if (message instanceof BytesMessage) {
+ final BytesMessage bytesMessage = (BytesMessage) message;
+ consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller);
+ } else {
+ throw new RuntimeException("Unsupported message type: " + message.getClass());
+ }
+ }
+ });
+
+ controller.addOnStopListener(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ consumer.close();
+ } catch (JMSException e) {
+ log.error("Error while closing JMS consumer", e);
+ }
+ }
+ });
+ }
+
+ private void consumeString(Reader reader, QueueController controller) {
+ try {
+ BufferedReader r = new BufferedReader(reader);
+
+ String line = r.readLine();
+
+ while (line != null) {
+ for (String id : line.split(",")) {
+ controller.hint(parseLong(id.trim()));
+ }
+ line = r.readLine();
+ }
+ } catch (IOException | SQLException e) {
+ log.warn("Could not consume body.", e);
+ throw new RuntimeException("Could not consume body.", e);
+ } catch (NumberFormatException e) {
+ log.warn("Could not consume body.", e);
+ throw e;
+ }
+ }
+
+ public void close() throws JMSException {
+ c.close();
+ }
+
+ private static class ByteMessageInputStream extends InputStream {
+ private final BytesMessage bytesMessage;
+
+ public ByteMessageInputStream(BytesMessage bytesMessage) {
+ this.bytesMessage = bytesMessage;
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ try {
+ return bytesMessage.readBytes(b);
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public int read(byte[] out, int off, int len) throws IOException {
+ byte[] b = new byte[len];
+ try {
+ int read = bytesMessage.readBytes(b);
+ if (read == -1) {
+ return -1;
+ }
+ arraycopy(b, 0, out, off, read);
+ return read;
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ try {
+ return bytesMessage.readByte();
+ } catch (javax.jms.MessageEOFException e) {
+ return -1;
+ } catch (JMSException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+}