package io.trygvis.activemq; import io.trygvis.async.QueueController; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.jms.BytesMessage; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Queue; import javax.jms.QueueConnection; import javax.jms.QueueSession; import javax.jms.TextMessage; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.Reader; import java.io.StringReader; import java.nio.charset.Charset; import java.sql.SQLException; import static java.lang.Long.parseLong; import static java.lang.System.arraycopy; import static java.nio.charset.Charset.forName; import static javax.jms.Session.AUTO_ACKNOWLEDGE; public class ActiveMqHinter implements AutoCloseable { private final Logger log = LoggerFactory.getLogger(getClass()); private final QueueConnection c; private static final Charset utf8 = forName("utf-8"); public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException { log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL()); c = connectionFactory.createQueueConnection(); c.start(); log.info("Connected, clientId = {}", c.getClientID()); } public void createHinter(final QueueController controller) throws JMSException { QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE); Queue queue = session.createQueue(controller.queue.queue.name); final MessageConsumer consumer = session.createConsumer(queue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if ((message instanceof TextMessage)) { String body; try { TextMessage textMessage = (TextMessage) message; body = textMessage.getText(); } catch (JMSException e) { log.warn("Exception while reading body.", e); throw new RuntimeException("Exception while reading body.", e); } consumeString(new StringReader(body), controller); } else if (message instanceof BytesMessage) { final BytesMessage bytesMessage = (BytesMessage) message; consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller); } else { throw new RuntimeException("Unsupported message type: " + message.getClass()); } } }); controller.addOnStopListener(new Runnable() { @Override public void run() { try { consumer.close(); } catch (JMSException e) { log.error("Error while closing JMS consumer", e); } } }); } private void consumeString(Reader reader, QueueController controller) { try { BufferedReader r = new BufferedReader(reader); String line = r.readLine(); while (line != null) { for (String id : line.split(",")) { controller.hint(parseLong(id.trim())); } line = r.readLine(); } } catch (IOException | SQLException e) { log.warn("Could not consume body.", e); throw new RuntimeException("Could not consume body.", e); } catch (NumberFormatException e) { log.warn("Could not consume body.", e); throw e; } } public void close() throws JMSException { c.close(); } private static class ByteMessageInputStream extends InputStream { private final BytesMessage bytesMessage; public ByteMessageInputStream(BytesMessage bytesMessage) { this.bytesMessage = bytesMessage; } @Override public int read(byte[] b) throws IOException { try { return bytesMessage.readBytes(b); } catch (JMSException e) { throw new IOException(e); } } @Override public int read(byte[] out, int off, int len) throws IOException { byte[] b = new byte[len]; try { int read = bytesMessage.readBytes(b); if (read == -1) { return -1; } arraycopy(b, 0, out, off, read); return read; } catch (JMSException e) { throw new IOException(e); } } @Override public int read() throws IOException { try { return bytesMessage.readByte(); } catch (javax.jms.MessageEOFException e) { return -1; } catch (JMSException e) { throw new IOException(e); } } } }