package org.jenkinsci.plugins.activemq; import org.apache.activemq.ActiveMQConnectionFactory; import org.slf4j.Logger; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import java.io.CharArrayWriter; import java.io.IOException; import java.io.StringReader; import java.net.URI; import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; import java.util.Properties; import static java.lang.String.valueOf; import static java.util.Collections.unmodifiableMap; import static javax.jms.Session.AUTO_ACKNOWLEDGE; import static org.slf4j.LoggerFactory.getLogger; public class ActiveMqClient implements MessageListener { public static class BuildRequest { public final Map parameters; public BuildRequest(Map parameters) { this.parameters = unmodifiableMap(new HashMap(parameters)); } } public static interface BuildRequestListener { void onBuildRequest(BuildRequest req); } private final Logger log = getLogger(getClass()); public final String brokerUrl; private final BuildRequestListener buildRequestListener; private final Connection connection; private final Session session; private final MessageProducer buildResultProducer; public ActiveMqClient(String brokerUrl, String topicName, BuildRequestListener buildRequestListener) throws JMSException, URISyntaxException { this.brokerUrl = brokerUrl; this.buildRequestListener = buildRequestListener; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(new URI("failover:(" + brokerUrl + ")")); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(false, AUTO_ACKNOWLEDGE); Destination topic = session.createTopic(topicName); buildResultProducer = session.createProducer(topic); MessageConsumer buildRequestConsumer = session.createConsumer(topic); buildRequestConsumer.setMessageListener(this); } public synchronized void sendMessage(String jobName, int buildNumber, String result) { try { buildResultProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Properties properties = new Properties(); properties.setProperty("eventType", "jenkins build"); properties.setProperty("jobName", jobName); properties.setProperty("buildNumber", valueOf(buildNumber)); properties.setProperty("result", result); CharArrayWriter buf = new CharArrayWriter(); properties.store(buf, null); TextMessage message = session.createTextMessage(buf.toString()); buildResultProducer.send(message); } catch (JMSException e) { log.warn("Could not send message", e); } catch (IOException e) { log.warn("Could not send message", e); } } public synchronized void close() { try { connection.close(); } catch (JMSException e) { log.info("Got exception while closing connection.", e); } } @Override public void onMessage(Message message) { try { if (message instanceof TextMessage) { TextMessage m = (TextMessage) message; Properties p = new Properties(); p.load(new StringReader(m.getText())); HashMap parameters = new HashMap(); for (Map.Entry e : p.entrySet()) { parameters.put(e.getKey().toString(), e.getValue().toString()); } buildRequestListener.onBuildRequest(new BuildRequest(parameters)); } } catch (IOException e) { log.info("Exception while handling message", e); } catch (JMSException e) { log.info("Exception while handling message", e); } } }