aboutsummaryrefslogtreecommitdiff
path: root/src/main/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java')
-rw-r--r--src/main/java/io/trygvis/esper/testing/Main.java26
-rw-r--r--src/main/java/io/trygvis/esper/testing/XmlUtil.java38
-rw-r--r--src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java203
-rw-r--r--src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java70
-rw-r--r--src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java83
-rw-r--r--src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java100
-rw-r--r--src/main/java/io/trygvis/esper/testing/object/ObjectFactory.java7
-rw-r--r--src/main/java/io/trygvis/esper/testing/object/ObjectManager.java60
8 files changed, 549 insertions, 38 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/Main.java b/src/main/java/io/trygvis/esper/testing/Main.java
index ad7c71a..683b950 100644
--- a/src/main/java/io/trygvis/esper/testing/Main.java
+++ b/src/main/java/io/trygvis/esper/testing/Main.java
@@ -1,7 +1,10 @@
package io.trygvis.esper.testing;
+import ch.qos.logback.classic.*;
+import ch.qos.logback.core.util.*;
import com.espertech.esper.client.*;
import org.apache.log4j.*;
+import org.slf4j.*;
import java.util.*;
@@ -16,17 +19,22 @@ public class Main {
}
public static void configureLog4j() {
- Properties properties = new Properties();
- properties.setProperty("log4j.rootLogger", "DEBUG, A1");
- properties.setProperty("log4j.logger.httpclient.wire.content", "INFO");
- properties.setProperty("log4j.logger.httpclient.wire.header", "INFO");
- properties.setProperty("log4j.logger.org.apache.commons.httpclient", "INFO");
- properties.setProperty("log4j.appender.A1", "org.apache.log4j.ConsoleAppender");
- properties.setProperty("log4j.appender.A1.layout", "org.apache.log4j.PatternLayout");
- properties.setProperty("log4j.appender.A1.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
- PropertyConfigurator.configure(properties);
+ LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
+ StatusPrinter.print(lc);
}
+// public static void configureLog4j() {
+// Properties properties = new Properties();
+// properties.setProperty("log4j.rootLogger", "DEBUG, A1");
+// properties.setProperty("log4j.logger.httpclient.wire.content", "INFO");
+// properties.setProperty("log4j.logger.httpclient.wire.header", "INFO");
+// properties.setProperty("log4j.logger.org.apache.commons.httpclient", "INFO");
+// properties.setProperty("log4j.appender.A1", "org.apache.log4j.ConsoleAppender");
+// properties.setProperty("log4j.appender.A1.layout", "org.apache.log4j.PatternLayout");
+// properties.setProperty("log4j.appender.A1.layout.ConversionPattern", "%-4r [%t] %-5p %c %x - %m%n");
+// PropertyConfigurator.configure(properties);
+// }
+
private void work() throws Exception {
Configuration config = new Configuration();
diff --git a/src/main/java/io/trygvis/esper/testing/XmlUtil.java b/src/main/java/io/trygvis/esper/testing/XmlUtil.java
new file mode 100644
index 0000000..612667f
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/XmlUtil.java
@@ -0,0 +1,38 @@
+package io.trygvis.esper.testing;
+
+import fj.*;
+import fj.data.*;
+import static fj.data.Option.*;
+import org.jdom2.*;
+
+import java.net.*;
+
+public class XmlUtil {
+ public static F<String, Option<Integer>> parseInt = new F<String, Option<Integer>>() {
+ public Option<Integer> f(String s) {
+ try {
+ return some(Integer.parseInt(s));
+ } catch (NumberFormatException e) {
+ return none();
+ }
+ }
+ };
+
+ public static F<String, Option<URI>> parseUri = new F<String, Option<URI>>() {
+ public Option<URI> f(String s) {
+ try {
+ return some(URI.create(s));
+ } catch (Throwable e) {
+ return none();
+ }
+ }
+ };
+
+ public static Option<String> childText(Element e, String childName) {
+ return fromNull(e.getChildText(childName));
+ }
+
+ public static Option<Element> child(Element e, String childName) {
+ return fromNull(e.getChild(childName));
+ }
+}
diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java
index ddf040c..4ecb4fb 100644
--- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java
+++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java
@@ -1,37 +1,45 @@
package io.trygvis.esper.testing.jenkins;
+import fj.*;
+import fj.data.*;
+import io.trygvis.esper.testing.*;
+import static io.trygvis.esper.testing.XmlUtil.*;
+import static java.lang.Integer.*;
import static org.apache.commons.lang.StringUtils.*;
import org.codehaus.httpcache4j.*;
import org.codehaus.httpcache4j.cache.*;
-import org.codehaus.httpcache4j.util.*;
+import org.h2.util.*;
import org.jdom2.*;
import org.jdom2.input.*;
import javax.xml.stream.*;
+import java.io.*;
import java.net.*;
import java.util.*;
+import java.util.List;
public class JenkinsClient {
private static final XMLInputFactory xmlReader = XMLInputFactory.newFactory();
private static final StAXStreamBuilder streamBuilder = new StAXStreamBuilder();
private final HTTPCache http;
- private final URI apiXmlUri;
- public JenkinsClient(HTTPCache http, URI jenkinsUri) {
+ private boolean debugXml;
+
+ public JenkinsClient(HTTPCache http) {
this.http = http;
- this.apiXmlUri = URIBuilder.fromURI(jenkinsUri).addRawPath("api/xml").toURI();
+ this.debugXml = false;
}
- public List<JenkinsJobXml> fetchJobs() throws XMLStreamException, JDOMException {
- HTTPResponse response = http.execute(new HTTPRequest(apiXmlUri));
+ public void setDebugXml(boolean debugXml) {
+ this.debugXml = debugXml;
+ }
- if (response.getStatus().getCode() != 200) {
- throw new RuntimeException("Did not get 200 back, got " + response.getStatus().getCode());
- }
+ public JenkinsXml fetchJobs(URI uri) throws XMLStreamException, JDOMException, IOException {
+ InputStream stream = fetchXml(uri);
- Element doc = streamBuilder.build(xmlReader.createXMLStreamReader(response.getPayload().getInputStream())).getRootElement();
+ Element doc = parseDocument(stream).getRootElement();
- List<JenkinsJobXml> jobs = new ArrayList<>();
+ List<JenkinsJobEntryXml> jobs = new ArrayList<>();
for (Element job : doc.getChildren("job")) {
String name = trimToNull(job.getChildText("name"));
String url = trimToNull(job.getChildText("url"));
@@ -41,21 +49,184 @@ public class JenkinsClient {
continue;
}
- jobs.add(new JenkinsJobXml(name, url, color));
+ jobs.add(new JenkinsJobEntryXml(name, url, color));
+ }
+
+ return new JenkinsXml(
+ Option.fromNull(doc.getChildText("nodeName")),
+ Option.fromNull(doc.getChildText("nodeDescription")),
+ Option.fromNull(doc.getChildText("description")), jobs);
+ }
+
+ public JenkinsJobXml fetchJob(URI uri) throws IOException, JDOMException, XMLStreamException {
+ InputStream stream = fetchXml(uri);
+
+ Element root = parseDocument(stream).getRootElement();
+
+ switch (root.getName()) {
+ case "freeStyleProject":
+ return FreeStyleProjectXml.parse(root);
+ case "mavenModuleSet":
+ return MavenModuleSetXml.parse(root);
+ default:
+ throw new IOException("Unknown project type: " + root.getName());
+ }
+ }
+
+ private Document parseDocument(InputStream stream) throws JDOMException, XMLStreamException {
+ return streamBuilder.build(xmlReader.createXMLStreamReader(stream));
+ }
+
+ private InputStream fetchXml(URI uri) throws IOException {
+ HTTPResponse response;
+
+ try {
+ response = http.execute(new HTTPRequest(uri));
+ } catch (HTTPException e) {
+ throw new IOException(e);
+ }
+
+ if (response.getStatus().getCode() != 200) {
+ throw new IOException("Did not get 200 back, got " + response.getStatus().getCode());
+ }
+
+ InputStream stream = response.getPayload().getInputStream();
+
+ if (!debugXml) {
+ return stream;
+ }
+
+ int size;
+ try {
+ size = parseInt(response.getHeaders().getFirstHeader("Content-Length").getValue());
+ } catch (Throwable e) {
+ size = 10 * 1024;
}
- return jobs;
+ // TODO: Pretty print
+
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream(size);
+ IOUtils.copy(stream, buffer);
+ byte[] bytes = buffer.toByteArray();
+ System.out.println("------------------------------------------------");
+ System.out.write(bytes);
+ System.out.println();
+ System.out.println("------------------------------------------------");
+ stream = new ByteArrayInputStream(bytes);
+ return stream;
+ }
+}
+
+class JenkinsXml {
+ public final Option<String> nodeName;
+ public final Option<String> nodeDescription;
+ public final Option<String> description;
+ public final List<JenkinsJobEntryXml> jobs;
+
+ JenkinsXml(Option<String> nodeName, Option<String> nodeDescription, Option<String> description, List<JenkinsJobEntryXml> jobs) {
+ this.nodeName = nodeName;
+ this.nodeDescription = nodeDescription;
+ this.description = description;
+ this.jobs = jobs;
}
}
-class JenkinsJobXml {
+class JenkinsJobEntryXml {
public final String name;
public final String url;
public final String color;
- JenkinsJobXml(String name, String url, String color) {
+ JenkinsJobEntryXml(String name, String url, String color) {
+ this.name = name;
+ this.url = url;
+ this.color = color;
+ }
+}
+
+abstract class JenkinsJobXml {
+ public final Option<String> description;
+ public final Option<String> displayName;
+ public final Option<String> name;
+ public final Option<String> url;
+ public final Option<String> color;
+ public final Option<BuildXml> lastBuild;
+ public final Option<BuildXml> lastCompletedBuild;
+ public final Option<BuildXml> lastFailedBuild;
+ public final Option<BuildXml> lastSuccessfulBuild;
+ public final Option<BuildXml> lastUnsuccessfulBuild;
+
+ protected JenkinsJobXml(Option<String> description, Option<String> displayName, Option<String> name, Option<String> url, Option<String> color, Option<BuildXml> lastBuild, Option<BuildXml> lastCompletedBuild, Option<BuildXml> lastFailedBuild, Option<BuildXml> lastSuccessfulBuild, Option<BuildXml> lastUnsuccessfulBuild) {
+ this.description = description;
+ this.displayName = displayName;
this.name = name;
this.url = url;
this.color = color;
+ this.lastBuild = lastBuild;
+ this.lastCompletedBuild = lastCompletedBuild;
+ this.lastFailedBuild = lastFailedBuild;
+ this.lastSuccessfulBuild = lastSuccessfulBuild;
+ this.lastUnsuccessfulBuild = lastUnsuccessfulBuild;
+ }
+}
+
+class BuildXml {
+ public final int number;
+ public final URI url;
+ public static F<Element, Option<BuildXml>> buildXml = new F<Element, Option<BuildXml>>() {
+ public Option<BuildXml> f(Element element) {
+ Option<Integer> number = childText(element, "number").bind(XmlUtil.parseInt);
+ Option<URI> url = childText(element, "url").bind(parseUri);
+
+ if(number.isNone() || url.isNone()) {
+ return Option.none();
+ }
+
+ return Option.some(new BuildXml(number.some(), url.some()));
+ }
+ };
+
+ BuildXml(int number, URI url) {
+ this.number = number;
+ this.url = url;
+ }
+}
+
+class FreeStyleProjectXml extends JenkinsJobXml {
+ FreeStyleProjectXml(Option<String> description, Option<String> displayName, Option<String> name, Option<String> url, Option<String> color, Option<BuildXml> lastBuild, Option<BuildXml> lastCompletedBuild, Option<BuildXml> lastFailedBuild, Option<BuildXml> lastSuccessfulBuild, Option<BuildXml> lastUnsuccessfulBuild) {
+ super(description, displayName, name, url, color, lastBuild, lastCompletedBuild, lastFailedBuild, lastSuccessfulBuild, lastUnsuccessfulBuild);
+ }
+
+ public static JenkinsJobXml parse(Element root) {
+ return new FreeStyleProjectXml(
+ childText(root, "description"),
+ childText(root, "displayName"),
+ childText(root, "name"),
+ childText(root, "url"),
+ childText(root, "color"),
+ child(root, "lastBuild").bind(BuildXml.buildXml),
+ child(root, "lastCompletedBuild").bind(BuildXml.buildXml),
+ child(root, "lastFailedBuild").bind(BuildXml.buildXml),
+ child(root, "lastSuccessfulBuild").bind(BuildXml.buildXml),
+ child(root, "lastUnsuccessfulBuild").bind(BuildXml.buildXml));
+ }
+}
+
+class MavenModuleSetXml extends JenkinsJobXml {
+ MavenModuleSetXml(Option<String> description, Option<String> displayName, Option<String> name, Option<String> url, Option<String> color, Option<BuildXml> lastBuild, Option<BuildXml> lastCompletedBuild, Option<BuildXml> lastFailedBuild, Option<BuildXml> lastSuccessfulBuild, Option<BuildXml> lastUnsuccessfulBuild) {
+ super(description, displayName, name, url, color, lastBuild, lastCompletedBuild, lastFailedBuild, lastSuccessfulBuild, lastUnsuccessfulBuild);
}
-} \ No newline at end of file
+
+ public static JenkinsJobXml parse(Element root) {
+ return new MavenModuleSetXml(
+ childText(root, "description"),
+ childText(root, "displayName"),
+ childText(root, "name"),
+ childText(root, "url"),
+ childText(root, "color"),
+ child(root, "lastBuild").bind(BuildXml.buildXml),
+ child(root, "lastCompletedBuild").bind(BuildXml.buildXml),
+ child(root, "lastFailedBuild").bind(BuildXml.buildXml),
+ child(root, "lastSuccessfulBuild").bind(BuildXml.buildXml),
+ child(root, "lastUnsuccessfulBuild").bind(BuildXml.buildXml));
+ }
+}
diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java
index 942aa15..b639108 100644
--- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java
+++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java
@@ -1,30 +1,74 @@
package io.trygvis.esper.testing.jenkins;
+import fj.*;
+import fj.data.*;
import io.trygvis.esper.testing.*;
+import io.trygvis.esper.testing.object.*;
+import org.apache.commons.httpclient.*;
+import org.apache.commons.httpclient.params.*;
import org.codehaus.httpcache4j.cache.*;
import org.codehaus.httpcache4j.client.*;
+import org.joda.time.*;
import java.net.*;
-import java.util.*;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.concurrent.*;
public class JenkinsImporter {
- private final JenkinsClient jenkinsClient;
-
public static void main(String[] args) throws Exception {
Main.configureLog4j();
- new JenkinsImporter().work();
- }
- public JenkinsImporter() {
- HTTPCache http = new HTTPCache(new MemoryCacheStorage(), HTTPClientResponseResolver.createMultithreadedInstance());
- jenkinsClient = new JenkinsClient(http, URI.create("https://builds.apache.org"));
- }
+// HTTPClientResponseResolver resolver = HTTPClientResponseResolver.createMultithreadedInstance();
+// HTTPClientResponseResolver resolver = new HTTPClientResponseResolver(new HttpClient(new MultiThreadedHttpConnectionManager()));
+ HTTPClientResponseResolver resolver = new HTTPClientResponseResolver(new HttpClient(new SimpleHttpConnectionManager()));
+ HttpClientParams params = new HttpClientParams();
+// params.setConnectionManagerTimeout(1000);
+ params.setSoTimeout(1000);
+ resolver.getClient().setParams(params);
+ HTTPCache http = new HTTPCache(new MemoryCacheStorage(), resolver);
+ final JenkinsClient jenkinsClient = new JenkinsClient(http);
+
+ jenkinsClient.setDebugXml(true);
+
+ HashSet<URI> servers = new HashSet<>();
+ servers.add(URI.create("https://builds.apache.org"));
+
+ final ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1);
- private void work() throws Exception {
- List<JenkinsJobXml> jobs = jenkinsClient.fetchJobs();
+ ObjectManager<URI, JenkinsServer> serverManager = new ObjectManager<>("JenkinsServer", servers, new ObjectFactory<URI, JenkinsServer>() {
+ public JenkinsServer create(URI uri) {
+ return new JenkinsServer(executorService, jenkinsClient, uri);
+ }
+ });
+ final boolean[] shouldRun = new boolean[]{true};
- for (JenkinsJobXml job : jobs) {
- System.out.println("job.name = " + job.name);
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ {
+ setName("Shutdown hoook");
+ }
+
+ public void run() {
+ shouldRun[0] = false;
+ }
+ });
+
+ while (shouldRun[0]) {
+ for (JenkinsServer server : serverManager.getObjects()) {
+ Option<P2<JenkinsXml, LocalDateTime>> o = server.getJenkins();
+
+ if (o.isSome()) {
+ P2<JenkinsXml, LocalDateTime> p = o.some();
+ System.out.println("Last update: " + p._2() + ", jobs=" + p._1().jobs.size());
+ } else {
+ System.out.println("Never updated: url=" + server.uri);
+ }
+ }
+
+ Thread.sleep(1000);
}
+
+ serverManager.close();
+ executorService.shutdownNow();
}
}
diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java
new file mode 100644
index 0000000..6596dfa
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java
@@ -0,0 +1,83 @@
+package io.trygvis.esper.testing.jenkins;
+
+import org.codehaus.httpcache4j.util.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+public class JenkinsJob implements Closeable {
+
+ private final JenkinsClient client;
+ private final URI uri;
+
+ private JenkinsJobXml latestStatus;
+ // private boolean shouldRun = true;
+ // private final Thread thread;
+ private final ScheduledFuture<?> future;
+
+ public JenkinsJob(ScheduledExecutorService executorService, JenkinsClient client, URI uri) {
+ this.client = client;
+ this.uri = URIBuilder.fromURI(uri).addRawPath("api/xml").toURI();
+
+ long initialDelay = (long) Math.random() + 1;
+ long period = (long) (Math.random() * 10d) + 1;
+ future = executorService.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ JenkinsJob.this.doWork();
+ }
+ }, initialDelay, period, TimeUnit.SECONDS);
+
+// thread = new Thread(new Runnable() {
+// public void run() {
+// JenkinsJob.this.run();
+// }
+// });
+// thread.setDaemon(true);
+// thread.start();
+ }
+
+ public JenkinsJobXml getLatestStatus() {
+ return latestStatus;
+ }
+
+ /*
+ public void close() throws IOException {
+ shouldRun = false;
+ thread.interrupt();
+ while (thread.isAlive()) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+ }
+
+ private void run() {
+ Random r = new Random();
+ while (shouldRun) {
+ doWork();
+
+ try {
+ Thread.sleep(1000 + r.nextInt(10) * 1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ */
+
+ public void close() throws IOException {
+ future.cancel(true);
+ }
+
+ private void doWork() {
+ try {
+ latestStatus = client.fetchJob(uri);
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java
new file mode 100644
index 0000000..707a69a
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java
@@ -0,0 +1,100 @@
+package io.trygvis.esper.testing.jenkins;
+
+import fj.*;
+import fj.data.*;
+import static fj.data.Option.*;
+import io.trygvis.esper.testing.object.*;
+import org.codehaus.httpcache4j.util.*;
+import org.joda.time.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.*;
+
+public class JenkinsServer implements Closeable {
+
+ private final JenkinsClient client;
+ public final URI uri;
+ private final ObjectManager<URI, JenkinsJob> jobManager;
+
+ private boolean shouldRun = true;
+ private final Thread thread;
+
+ private Option<P2<JenkinsXml, LocalDateTime>> jenkins = none();
+
+ public JenkinsServer(final ScheduledExecutorService executorService, final JenkinsClient client, URI uri) {
+ this.client = client;
+ this.uri = URIBuilder.fromURI(uri).addRawPath("api/xml").toURI();
+
+ jobManager = new ObjectManager<>("JenkinsJob", Collections.<URI>emptySet(), new ObjectFactory<URI, JenkinsJob>() {
+ public JenkinsJob create(URI uri) {
+ return new JenkinsJob(executorService, client, uri);
+ }
+ });
+
+ thread = new Thread(new Runnable() {
+ public void run() {
+ JenkinsServer.this.run();
+ }
+ });
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public void close() throws IOException {
+ shouldRun = false;
+ thread.interrupt();
+ while (thread.isAlive()) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+ }
+
+ private void run() {
+ while (shouldRun) {
+ try {
+ doWork();
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+
+ try {
+ Thread.sleep(10 * 1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ public Option<P2<JenkinsXml, LocalDateTime>> getJenkins() {
+ return jenkins;
+ }
+
+ public Collection<JenkinsJob> getJobs() {
+ return jobManager.getObjects();
+ }
+
+ private void doWork() {
+ try {
+ JenkinsXml xml = client.fetchJobs(uri);
+
+ List<URI> jobUris = new ArrayList<>(xml.jobs.size());
+ for (JenkinsJobEntryXml job : xml.jobs) {
+ jobUris.add(URI.create(job.url));
+ }
+
+ this.jenkins = some(P.p(xml, new LocalDateTime()));
+
+ jobManager.update(new HashSet<>(jobUris.subList(0, 10)));
+ } catch (Throwable e) {
+ e.printStackTrace(System.out);
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/esper/testing/object/ObjectFactory.java b/src/main/java/io/trygvis/esper/testing/object/ObjectFactory.java
new file mode 100644
index 0000000..8e7d4b0
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/object/ObjectFactory.java
@@ -0,0 +1,7 @@
+package io.trygvis.esper.testing.object;
+
+import java.io.*;
+
+public interface ObjectFactory<K, V extends Closeable> {
+ V create(K k);
+}
diff --git a/src/main/java/io/trygvis/esper/testing/object/ObjectManager.java b/src/main/java/io/trygvis/esper/testing/object/ObjectManager.java
new file mode 100644
index 0000000..cd0dd1e
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/object/ObjectManager.java
@@ -0,0 +1,60 @@
+package io.trygvis.esper.testing.object;
+
+import java.io.*;
+import java.util.*;
+
+public class ObjectManager<K, V extends Closeable> implements Closeable {
+ private final String type;
+ private final ObjectFactory<K, V> objectFactory;
+ private Map<K, V> objects = new HashMap<>();
+ private boolean closed = false;
+
+ public ObjectManager(String type, Set<K> initialKeys, ObjectFactory<K, V> objectFactory) {
+ this.type = type;
+ this.objectFactory = objectFactory;
+
+ update(initialKeys);
+ }
+
+ public synchronized void update(Collection<K> newKeys) {
+ if (closed) {
+ throw new RuntimeException("This instance is closed: type=" + type);
+ }
+ Set<K> found = new HashSet<>(newKeys);
+ found.removeAll(objects.keySet());
+
+ Set<K> gone = new HashSet<>(objects.keySet());
+ gone.removeAll(newKeys);
+
+ for (K k : gone) {
+ try {
+ System.out.println("Removing " + type + " with id=" + k);
+ objects.remove(k).close();
+ } catch (IOException e) {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ for (K k : found) {
+ System.out.println("Adding " + type + " with id=" + k);
+ objects.put(k, objectFactory.create(k));
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (closed) {
+ System.out.println("Already closed: type=" + type);
+ return;
+ }
+ update(Collections.<K>emptyList());
+ closed = true;
+ }
+
+ public synchronized Collection<V> getObjects() {
+ return new ArrayList<>(objects.values());
+ }
+
+ public void setObjects(Map<K, V> objects) {
+ this.objects = objects;
+ }
+}