From 7e2d2c074c3f09266dda31c772f4bec61e8d5742 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Tue, 6 May 2014 22:03:39 +0200 Subject: o Support for listening on messages too. o Configurable topic name. --- .../jenkinsci/plugins/activemq/ActiveMqClient.java | 100 ++++++++++++++++----- .../plugins/activemq/ActiveMqGlobalConfig.java | 82 ++++++++++------- .../jenkinsci/plugins/activemq/ActiveMqPlugin.java | 18 +++- .../activemq/ActiveMqGlobalConfig/config.groovy | 5 ++ .../plugins/activemq/MessageListenerMain.java | 2 +- 5 files changed, 146 insertions(+), 61 deletions(-) diff --git a/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqClient.java b/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqClient.java index ffb4313..59ccffe 100644 --- a/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqClient.java +++ b/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqClient.java @@ -1,46 +1,78 @@ package org.jenkinsci.plugins.activemq; -import org.apache.activemq.pool.PooledConnectionFactory; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.slf4j.Logger; import javax.jms.Connection; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import java.io.CharArrayWriter; import java.io.IOException; +import java.io.StringReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import static java.lang.String.valueOf; +import static java.util.Collections.unmodifiableMap; import static javax.jms.Session.AUTO_ACKNOWLEDGE; +import static org.slf4j.LoggerFactory.getLogger; -public class ActiveMqClient { +public class ActiveMqClient implements MessageListener { - public static final String TOPIC_NAME = "jenkins.build-result"; + public static class BuildRequest { + public final Map parameters; + + public BuildRequest(Map parameters) { + this.parameters = unmodifiableMap(new HashMap(parameters)); + } + } + + public static interface BuildRequestListener { + void onBuildRequest(BuildRequest req); + } + + private final Logger log = getLogger(getClass()); public final String brokerUrl; - private final PooledConnectionFactory connectionFactory; + private final BuildRequestListener buildRequestListener; + private final Connection connection; + private final Session session; + private final MessageProducer buildResultProducer; - public ActiveMqClient(String brokerUrl) { + public ActiveMqClient(String brokerUrl, String topicName, BuildRequestListener buildRequestListener) throws JMSException, URISyntaxException { this.brokerUrl = brokerUrl; - this.connectionFactory = new PooledConnectionFactory(brokerUrl); - } + this.buildRequestListener = buildRequestListener; - public synchronized void sendMessage(String jobName, int buildNumber, String result) { - try { - Connection connection = connectionFactory.createConnection(); - connection.start(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(new URI("failover:(" + brokerUrl + ")")); - Session session = connection.createSession(false, AUTO_ACKNOWLEDGE); + connection = connectionFactory.createConnection(); + connection.start(); - Destination destination = session.createTopic(TOPIC_NAME); + session = connection.createSession(false, AUTO_ACKNOWLEDGE); + Destination topic = session.createTopic(topicName); + buildResultProducer = session.createProducer(topic); - MessageProducer producer = session.createProducer(destination); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + MessageConsumer buildRequestConsumer = session.createConsumer(topic); + + buildRequestConsumer.setMessageListener(this); + } + + public synchronized void sendMessage(String jobName, int buildNumber, String result) { + try { + buildResultProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); Properties properties = new Properties(); + properties.setProperty("eventType", "jenkins build"); properties.setProperty("jobName", jobName); properties.setProperty("buildNumber", valueOf(buildNumber)); properties.setProperty("result", result); @@ -48,20 +80,40 @@ public class ActiveMqClient { CharArrayWriter buf = new CharArrayWriter(); properties.store(buf, null); TextMessage message = session.createTextMessage(buf.toString()); - System.out.println("message.getJMSMessageID() = " + message.getJMSMessageID()); - producer.send(message); - System.out.println("message.getJMSMessageID() = " + message.getJMSMessageID()); - - session.close(); - connection.close(); + buildResultProducer.send(message); } catch (JMSException e) { - e.printStackTrace(); + log.warn("Could not send message", e); } catch (IOException e) { - e.printStackTrace(); + log.warn("Could not send message", e); } } public synchronized void close() { - connectionFactory.clear(); + try { + connection.close(); + } catch (JMSException e) { + log.info("Got exception while closing connection.", e); + } + } + + @Override + public void onMessage(Message message) { + try { + if (message instanceof TextMessage) { + TextMessage m = (TextMessage) message; + + Properties p = new Properties(); + p.load(new StringReader(m.getText())); + HashMap parameters = new HashMap(); + for (Map.Entry e : p.entrySet()) { + parameters.put(e.getKey().toString(), e.getValue().toString()); + } + buildRequestListener.onBuildRequest(new BuildRequest(parameters)); + } + } catch (IOException e) { + log.info("Exception while handling message", e); + } catch (JMSException e) { + log.info("Exception while handling message", e); + } } } diff --git a/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqGlobalConfig.java b/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqGlobalConfig.java index 4bc8066..008613b 100644 --- a/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqGlobalConfig.java +++ b/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqGlobalConfig.java @@ -5,24 +5,27 @@ import hudson.util.FormValidation; import jenkins.model.GlobalConfiguration; import jenkins.model.Jenkins; import net.sf.json.JSONObject; -import org.apache.activemq.ActiveMQConnectionFactory; import org.kohsuke.stapler.QueryParameter; import org.kohsuke.stapler.StaplerRequest; import org.slf4j.Logger; -import javax.jms.Connection; import javax.jms.JMSException; -import java.net.URI; import java.net.URISyntaxException; +import static org.apache.commons.lang.StringUtils.trimToNull; +import static org.apache.commons.lang.builder.ToStringBuilder.reflectionToString; import static org.slf4j.LoggerFactory.getLogger; +@SuppressWarnings("UnusedDeclaration") @Extension public class ActiveMqGlobalConfig extends GlobalConfiguration { + public static final String DEFAULT_TOPIC_NAME = "jenkins"; + private static final Logger log = getLogger(ActiveMqPlugin.class); private String brokerUrl; private boolean enable; + private String topicName; public ActiveMqGlobalConfig() { log.info("ActiveMqGlobalConfig.ActiveMqGlobalConfig"); @@ -30,21 +33,9 @@ public class ActiveMqGlobalConfig extends GlobalConfiguration { log.info("this = " + this); } - public String getUrl() { - log.info("ActiveMqGlobalConfig.getUrl"); - return brokerUrl; - } - - public void setUrl(String url) { - log.info("ActiveMqGlobalConfig.setUrl"); - log.info("url = " + url); - this.brokerUrl = url; - } - - public FormValidation doCheckUrl(@QueryParameter String value) { - log.info("ActiveMqGlobalConfig.doCheckUrl"); - return doCheckBrokerUrl(value); - } + // ----------------------------------------------------------------------- + // Broker url + // ----------------------------------------------------------------------- public String getBrokerUrl() { log.info("ActiveMqGlobalConfig.getBrokerUrl"); @@ -67,30 +58,25 @@ public class ActiveMqGlobalConfig extends GlobalConfiguration { return FormValidation.ok(); } - ActiveMQConnectionFactory connectionFactory; + ActiveMqClient client; try { - URI uri = new URI(value); - connectionFactory = new ActiveMQConnectionFactory(uri); + client = new ActiveMqClient(brokerUrl, DEFAULT_TOPIC_NAME, null); } catch (URISyntaxException e) { return FormValidation.error("Invalid URI: " + e.getMessage()); - } - - Connection connection; - try { - connection = connectionFactory.createConnection(); } catch (JMSException e) { return FormValidation.warning("Unable to connect to broker"); } - try { - connection.close(); - } catch (JMSException ignore) { - } + client.close(); return FormValidation.ok("Successfully connected to broker"); } + // ----------------------------------------------------------------------- + // Enable + // ----------------------------------------------------------------------- + public boolean isEnable() { log.info("ActiveMqGlobalConfig.isEnable"); return enable; @@ -102,6 +88,37 @@ public class ActiveMqGlobalConfig extends GlobalConfiguration { save(); } + // ----------------------------------------------------------------------- + // Topic Name + // ----------------------------------------------------------------------- + + public String getTopicName() { + if (topicName == null || doCheckBrokerUrl(topicName).kind != FormValidation.Kind.OK) { + topicName = DEFAULT_TOPIC_NAME; + } + + return topicName; + } + + public void setTopicName(String topicName) { + this.topicName = topicName; + save(); + } + + public FormValidation doCheckTopicName(@QueryParameter String value) { + value = trimToNull(value); + + if (value == null) { + return FormValidation.ok(); + } + + if (value.matches("^[a-zA-Z]+")) { + return FormValidation.ok(); + } + + return FormValidation.error("Invalid topic name."); + } + @Override public boolean configure(StaplerRequest req, JSONObject json) throws FormException { log.info("ActiveMqGlobalConfig.configure"); @@ -116,9 +133,6 @@ public class ActiveMqGlobalConfig extends GlobalConfiguration { @Override public String toString() { - return "ActiveMqGlobalConfig{" + - ", brokerUrl='" + brokerUrl + '\'' + - ", enable=" + enable + - "} " + super.toString(); + return reflectionToString(this); } } diff --git a/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqPlugin.java b/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqPlugin.java index 6ec65c0..c9c277c 100644 --- a/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqPlugin.java +++ b/src/main/java/org/jenkinsci/plugins/activemq/ActiveMqPlugin.java @@ -8,12 +8,14 @@ import jenkins.model.GlobalConfiguration; import org.slf4j.Logger; import javax.annotation.Nonnull; +import javax.jms.JMSException; +import java.net.URISyntaxException; import java.util.Map; import static org.slf4j.LoggerFactory.getLogger; @Extension -public class ActiveMqPlugin extends Plugin /*implements ReconfigurableDescribable*/ { +public class ActiveMqPlugin extends Plugin implements ActiveMqClient.BuildRequestListener /*implements ReconfigurableDescribable*/ { // public static final String DISPLAY_NAME = "ActiveMQ Plugin Display Name"; @@ -66,7 +68,13 @@ public class ActiveMqPlugin extends Plugin /*implements ReconfigurableDescribabl } log.info("Creating client of broker {}", brokerUrl); - client = new ActiveMqClient(brokerUrl); + try { + client = new ActiveMqClient(brokerUrl, config.getTopicName(), this); + } catch (JMSException e) { + log.warn("Unable to connect to queue"); + } catch (URISyntaxException e) { + log.warn("Unable to connect to queue"); + } } else { if (client != null) { log.info("Disposing current JMS client."); @@ -75,4 +83,10 @@ public class ActiveMqPlugin extends Plugin /*implements ReconfigurableDescribabl } } } + + @Override + public void onBuildRequest(ActiveMqClient.BuildRequest req) { + log.info("ActiveMqPlugin.onBuildRequest"); + log.info(req.parameters.toString()); + } } diff --git a/src/main/resources/org/jenkinsci/plugins/activemq/ActiveMqGlobalConfig/config.groovy b/src/main/resources/org/jenkinsci/plugins/activemq/ActiveMqGlobalConfig/config.groovy index 7c5cae3..13ef396 100644 --- a/src/main/resources/org/jenkinsci/plugins/activemq/ActiveMqGlobalConfig/config.groovy +++ b/src/main/resources/org/jenkinsci/plugins/activemq/ActiveMqGlobalConfig/config.groovy @@ -1,5 +1,7 @@ package org.jenkinsci.plugins.activemq.ActiveMqGlobalConfig +import org.jenkinsci.plugins.activemq.ActiveMqGlobalConfig; + def f=namespace(lib.FormTagLib) f.section(title:_("ActiveMQ Configuration")) { @@ -9,6 +11,9 @@ f.section(title:_("ActiveMQ Configuration")) { f.entry(title:_("Broker URL"), field:"brokerUrl") { f.textbox() } + f.entry(title:_("Topic Name"), field:"topicName") { + f.textbox(default: ActiveMqGlobalConfig.DEFAULT_TOPIC_NAME) + } // f.entry(title:_("URL"), field:"url") { // f.textbox(default: "my url") // } diff --git a/src/test/java/org/jenkinsci/plugins/activemq/MessageListenerMain.java b/src/test/java/org/jenkinsci/plugins/activemq/MessageListenerMain.java index ed7def3..a2bd76f 100644 --- a/src/test/java/org/jenkinsci/plugins/activemq/MessageListenerMain.java +++ b/src/test/java/org/jenkinsci/plugins/activemq/MessageListenerMain.java @@ -21,7 +21,7 @@ public class MessageListenerMain { System.out.println("Connected"); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Topic topic = session.createTopic(ActiveMqClient.TOPIC_NAME); + Topic topic = session.createTopic("pipeline"); MessageConsumer consumer = session.createConsumer(topic); while (true) { -- cgit v1.2.3