diff options
Diffstat (limited to 'src/main/java/org/jenkinsci/plugins/activemq/ActiveMqClient.java')
-rw-r--r-- | src/main/java/org/jenkinsci/plugins/activemq/ActiveMqClient.java | 100 |
1 files changed, 76 insertions, 24 deletions
diff --git a/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqClient.java b/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqClient.java index ffb4313..59ccffe 100644 --- a/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqClient.java +++ b/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqClient.java @@ -1,46 +1,78 @@ package org.jenkinsci.plugins.activemq; -import org.apache.activemq.pool.PooledConnectionFactory; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.slf4j.Logger; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import java.io.CharArrayWriter; import java.io.IOException; +import java.io.StringReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static java.lang.String.valueOf; +import static java.util.Collections.unmodifiableMap; import static javax.jms.Session.AUTO_ACKNOWLEDGE; +import static org.slf4j.LoggerFactory.getLogger; -public class ActiveMqClient { +public class ActiveMqClient implements MessageListener { - public static final String TOPIC_NAME = "jenkins.build-result"; + public static class BuildRequest { + public final Map<String, String> parameters; + + public BuildRequest(Map<String, String> parameters) { + this.parameters = unmodifiableMap(new HashMap<String, String>(parameters)); + } + } + + public static interface BuildRequestListener { + void onBuildRequest(BuildRequest req); + } + + private final Logger log = getLogger(getClass()); public final String brokerUrl; - private final PooledConnectionFactory connectionFactory; + private final BuildRequestListener buildRequestListener; + private final Connection connection; + private final Session session; + private final MessageProducer buildResultProducer; - public ActiveMqClient(String brokerUrl) { + public ActiveMqClient(String brokerUrl, String topicName, BuildRequestListener buildRequestListener) throws JMSException, URISyntaxException { this.brokerUrl = brokerUrl; - this.connectionFactory = new PooledConnectionFactory(brokerUrl); - } + this.buildRequestListener = buildRequestListener; - public synchronized void sendMessage(String jobName, int buildNumber, String result) { - try { - Connection connection = connectionFactory.createConnection(); - connection.start(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(new URI("failover:(" + brokerUrl + ")")); - Session session = connection.createSession(false, AUTO_ACKNOWLEDGE); + connection = connectionFactory.createConnection(); + connection.start(); - Destination destination = session.createTopic(TOPIC_NAME); + session = connection.createSession(false, AUTO_ACKNOWLEDGE); + Destination topic = session.createTopic(topicName); + buildResultProducer = session.createProducer(topic); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + MessageConsumer buildRequestConsumer = session.createConsumer(topic); + + buildRequestConsumer.setMessageListener(this); + } + + public synchronized void sendMessage(String jobName, int buildNumber, String result) { + try { + buildResultProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Properties properties = new Properties(); + properties.setProperty("eventType", "jenkins build"); properties.setProperty("jobName", jobName); properties.setProperty("buildNumber", valueOf(buildNumber)); properties.setProperty("result", result); @@ -48,20 +80,40 @@ public class ActiveMqClient { CharArrayWriter buf = new CharArrayWriter(); properties.store(buf, null); TextMessage message = session.createTextMessage(buf.toString()); - System.out.println("message.getJMSMessageID() = " + message.getJMSMessageID()); - producer.send(message); - System.out.println("message.getJMSMessageID() = " + message.getJMSMessageID()); - - session.close(); - connection.close(); + buildResultProducer.send(message); } catch (JMSException e) { - e.printStackTrace(); + log.warn("Could not send message", e); } catch (IOException e) { - e.printStackTrace(); + log.warn("Could not send message", e); } } public synchronized void close() { - connectionFactory.clear(); + try { + connection.close(); + } catch (JMSException e) { + log.info("Got exception while closing connection.", e); + } + } + + @Override + public void onMessage(Message message) { + try { + if (message instanceof TextMessage) { + TextMessage m = (TextMessage) message; + + Properties p = new Properties(); + p.load(new StringReader(m.getText())); + HashMap<String, String> parameters = new HashMap<String, String>(); + for (Map.Entry<Object, Object> e : p.entrySet()) { + parameters.put(e.getKey().toString(), e.getValue().toString()); + } + buildRequestListener.onBuildRequest(new BuildRequest(parameters)); + } + } catch (IOException e) { + log.info("Exception while handling message", e); + } catch (JMSException e) { + log.info("Exception while handling message", e); + } } } |