aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis')
-rwxr-xr-xsrc/main/java/io/trygvis/esper/testing/Config.java55
-rwxr-xr-xsrc/main/java/io/trygvis/esper/testing/Dao.java2
-rw-r--r--src/main/java/io/trygvis/esper/testing/Daos.java6
-rw-r--r--src/main/java/io/trygvis/esper/testing/Util.java (renamed from src/main/java/io/trygvis/esper/testing/XmlUtil.java)2
-rwxr-xr-xsrc/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java12
-rw-r--r--src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java12
-rwxr-xr-xsrc/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java20
-rw-r--r--src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java1
-rwxr-xr-xsrc/main/java/io/trygvis/esper/testing/nexus/NexusClient.java6
-rwxr-xr-xsrc/main/java/io/trygvis/esper/testing/nexus/NexusDao.java107
-rwxr-xr-xsrc/main/java/io/trygvis/esper/testing/nexus/NexusImporter.java76
-rw-r--r--src/main/java/io/trygvis/esper/testing/object/ActorRef.java7
-rw-r--r--src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java69
-rw-r--r--src/main/java/io/trygvis/esper/testing/object/TransactionalActor.java7
14 files changed, 306 insertions, 76 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/Config.java b/src/main/java/io/trygvis/esper/testing/Config.java
index d99a18a..ca0e6ff 100755
--- a/src/main/java/io/trygvis/esper/testing/Config.java
+++ b/src/main/java/io/trygvis/esper/testing/Config.java
@@ -2,29 +2,31 @@ package io.trygvis.esper.testing;
import ch.qos.logback.classic.*;
import ch.qos.logback.core.util.*;
+import com.jolbox.bonecp.*;
import fj.data.*;
-import java.io.*;
-import java.util.*;
-import org.apache.commons.httpclient.protocol.*;
+import static fj.data.Option.*;
+import static org.apache.commons.lang.StringUtils.*;
import org.slf4j.*;
-import static org.apache.commons.lang.StringUtils.*;
+import java.io.*;
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.atomic.*;
public class Config {
public final String gitoriousUrl;
public final Option<String> gitoriousSessionValue;
- public final String nexusUrl;
- public final String databaseDriver;
+ public final long nexusUpdateInterval;
+
public final String databaseUrl;
public final String databaseUsername;
public final String databasePassword;
- public Config(String gitoriousUrl, Option<String> gitoriousSessionValue, String nexusUrl, String databaseDriver, String databaseUrl, String databaseUsername, String databasePassword) {
+ public Config(String gitoriousUrl, Option<String> gitoriousSessionValue, long nexusUpdateInterval, String databaseUrl, String databaseUsername, String databasePassword) {
this.gitoriousUrl = gitoriousUrl;
this.gitoriousSessionValue = gitoriousSessionValue;
- this.nexusUrl = nexusUrl;
- this.databaseDriver = databaseDriver;
+ this.nexusUpdateInterval = nexusUpdateInterval;
this.databaseUrl = databaseUrl;
this.databaseUsername = databaseUsername;
this.databasePassword = databasePassword;
@@ -38,14 +40,9 @@ public class Config {
properties.load(inputStream);
}
- String driver = trimToNull(properties.getProperty("database.driver"));
-
-// Class.forName(driver);
-
return new Config(trimToNull(properties.getProperty("gitorious.url")),
- Option.fromNull(trimToNull(properties.getProperty("gitorious.sessionValue"))),
- trimToNull(properties.getProperty("nexus.url")),
- driver,
+ fromNull(trimToNull(properties.getProperty("gitorious.sessionValue"))),
+ fromNull(trimToNull(properties.getProperty("nexus.updateInterval"))).bind(parseInt).some() * 1000,
trimToNull(properties.getProperty("database.url")),
trimToNull(properties.getProperty("database.username")),
trimToNull(properties.getProperty("database.password")));
@@ -55,4 +52,30 @@ public class Config {
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();
StatusPrinter.print(lc);
}
+
+ public BoneCPDataSource createBoneCp() throws SQLException {
+ return new BoneCPDataSource(new BoneCPConfig(){{
+ setJdbcUrl(databaseUrl);
+ setUsername(databaseUsername);
+ setPassword(databasePassword);
+ setDefaultAutoCommit(false);
+ setMaxConnectionsPerPartition(10);
+ }});
+ }
+
+ public void addShutdownHook(final Thread t, final AtomicBoolean shouldRun) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ {
+ setName("Shutdown hook");
+ }
+
+ public void run() {
+ synchronized (shouldRun) {
+ shouldRun.set(false);
+ shouldRun.notifyAll();
+ t.interrupt();
+ }
+ }
+ });
+ }
}
diff --git a/src/main/java/io/trygvis/esper/testing/Dao.java b/src/main/java/io/trygvis/esper/testing/Dao.java
index 6c36264..e1a65fe 100755
--- a/src/main/java/io/trygvis/esper/testing/Dao.java
+++ b/src/main/java/io/trygvis/esper/testing/Dao.java
@@ -7,7 +7,7 @@ import java.sql.*;
import java.util.*;
public class Dao {
- private final Connection c;
+ protected final Connection c;
private final Map<String, PreparedStatement> statements = new HashMap<>();
protected Dao(Connection c) {
diff --git a/src/main/java/io/trygvis/esper/testing/Daos.java b/src/main/java/io/trygvis/esper/testing/Daos.java
index 27f9df8..1d5322f 100644
--- a/src/main/java/io/trygvis/esper/testing/Daos.java
+++ b/src/main/java/io/trygvis/esper/testing/Daos.java
@@ -1,8 +1,8 @@
package io.trygvis.esper.testing;
-import com.jolbox.bonecp.*;
import io.trygvis.esper.testing.gitorious.*;
+import javax.sql.*;
import java.io.*;
import java.sql.*;
@@ -38,7 +38,7 @@ public class Daos implements Closeable {
connection.commit();
}
- public static Daos lookup(BoneCP boneCp) throws SQLException {
- return new Daos(boneCp.getConnection());
+ public static Daos lookup(DataSource dataSource) throws SQLException {
+ return new Daos(dataSource.getConnection());
}
}
diff --git a/src/main/java/io/trygvis/esper/testing/XmlUtil.java b/src/main/java/io/trygvis/esper/testing/Util.java
index c2c571c..49d6a37 100644
--- a/src/main/java/io/trygvis/esper/testing/XmlUtil.java
+++ b/src/main/java/io/trygvis/esper/testing/Util.java
@@ -7,7 +7,7 @@ import org.jdom2.*;
import java.net.*;
-public class XmlUtil {
+public class Util {
public static F<String, Option<Integer>> parseInt = new F<String, Option<Integer>>() {
public Option<Integer> f(String s) {
try {
diff --git a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java
index 74e39ea..a0efda5 100755
--- a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java
+++ b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java
@@ -16,7 +16,7 @@ import java.util.Set;
import java.util.concurrent.*;
public class GitoriousImporter {
- private final BoneCP boneCp;
+ private final BoneCPDataSource boneCp;
private final GitoriousClient gitoriousClient;
public static void main(String[] args) throws Exception {
@@ -25,15 +25,7 @@ public class GitoriousImporter {
}
public GitoriousImporter(final Config config) throws Exception {
- BoneCPConfig boneCPConfig = new BoneCPConfig(){{
- setJdbcUrl(config.databaseUrl);
- setUsername(config.databaseUsername);
- setPassword(config.databasePassword);
- setDefaultAutoCommit(false);
- setMaxConnectionsPerPartition(10);
- }};
-
- boneCp = new BoneCP(boneCPConfig);
+ boneCp = config.createBoneCp();
gitoriousClient = new GitoriousClient(HttpClient.createHttpClient(config), config.gitoriousUrl);
diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java
index dac79d9..ed4cb59 100644
--- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java
+++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java
@@ -3,9 +3,9 @@ package io.trygvis.esper.testing.jenkins;
import fj.*;
import fj.data.*;
import io.trygvis.esper.testing.*;
-import static io.trygvis.esper.testing.XmlUtil.*;
+import static io.trygvis.esper.testing.Util.*;
import io.trygvis.esper.testing.jenkins.JenkinsJobXml.*;
-import static java.lang.Integer.*;
+import static java.lang.Integer.parseInt;
import static org.apache.commons.lang.StringUtils.*;
import org.codehaus.httpcache4j.*;
import org.codehaus.httpcache4j.cache.*;
@@ -183,8 +183,8 @@ class JenkinsJobXml {
public final URI url;
public static F<Element, Option<BuildXml>> buildXml = new F<Element, Option<BuildXml>>() {
public Option<BuildXml> f(Element element) {
- Option<Integer> number = childText(element, "number").bind(XmlUtil.parseInt);
- Option<URI> url = childText(element, "url").bind(parseUri);
+ Option<Integer> number = childText(element, "number").bind(Util.parseInt);
+ Option<URI> url = childText(element, "url").bind(Util.parseUri);
if(number.isNone() || url.isNone()) {
return Option.none();
@@ -205,9 +205,9 @@ class JenkinsJobXml {
childText(root, "description"),
childText(root, "displayName"),
childText(root, "name"),
- childText(root, "url").bind(parseUri).orSome(uri),
+ childText(root, "url").bind(Util.parseUri).orSome(uri),
childText(root, "color"),
- childText(root, "buildable").bind(parseBoolean).orSome(false),
+ childText(root, "buildable").bind(Util.parseBoolean).orSome(false),
child(root, "lastBuild").bind(BuildXml.buildXml),
child(root, "lastCompletedBuild").bind(BuildXml.buildXml),
child(root, "lastFailedBuild").bind(BuildXml.buildXml),
diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java
index 6a132d8..b929576 100755
--- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java
+++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java
@@ -4,11 +4,13 @@ import fj.*;
import fj.data.*;
import io.trygvis.esper.testing.*;
import io.trygvis.esper.testing.object.*;
+import static java.lang.Thread.currentThread;
import org.joda.time.*;
import java.net.URI;
import java.util.HashSet;
import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
public class JenkinsImporter {
public static void main(String[] args) throws Exception {
@@ -28,19 +30,11 @@ public class JenkinsImporter {
return new JenkinsServer(executorService, jenkinsClient, uri);
}
});
- final boolean[] shouldRun = new boolean[]{true};
- Runtime.getRuntime().addShutdownHook(new Thread() {
- {
- setName("Shutdown hoook");
- }
+ final AtomicBoolean shouldRun = new AtomicBoolean(true);
+ config.addShutdownHook(currentThread(), shouldRun);
- public void run() {
- shouldRun[0] = false;
- }
- });
-
- while (shouldRun[0]) {
+ while (shouldRun.get()) {
for (JenkinsServer server : serverManager.getObjects()) {
Option<P2<JenkinsXml, LocalDateTime>> o = server.getJenkins();
@@ -52,7 +46,9 @@ public class JenkinsImporter {
}
}
- Thread.sleep(1000);
+ synchronized (shouldRun) {
+ shouldRun.wait(1000);
+ }
}
serverManager.close();
diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java
index 47bb005..49b8995 100644
--- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java
+++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java
@@ -12,7 +12,6 @@ import java.net.*;
import java.util.*;
import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.*;
public class JenkinsServer implements Closeable {
diff --git a/src/main/java/io/trygvis/esper/testing/nexus/NexusClient.java b/src/main/java/io/trygvis/esper/testing/nexus/NexusClient.java
index 6477a80..02b5e3a 100755
--- a/src/main/java/io/trygvis/esper/testing/nexus/NexusClient.java
+++ b/src/main/java/io/trygvis/esper/testing/nexus/NexusClient.java
@@ -16,9 +16,9 @@ import javax.xml.stream.*;
public class NexusClient {
private final HTTPCache http;
- private final String nexusUrl;
+ private final URI nexusUrl;
- public NexusClient(HTTPCache http, String nexusUrl) {
+ public NexusClient(HTTPCache http, URI nexusUrl) {
this.http = http;
this.nexusUrl = nexusUrl;
}
@@ -36,7 +36,7 @@ public class NexusClient {
}
public ArtifactSearchResult fetchIndexPage(String groupId, Option<String> repositoryId, Option<Integer> from) throws IOException {
- URIBuilder uriBuilder = URIBuilder.fromURI(URI.create(nexusUrl)).
+ URIBuilder uriBuilder = URIBuilder.fromURI(nexusUrl).
addRawPath("/service/local/lucene/search").
addParameter("g", groupId + ".*");
diff --git a/src/main/java/io/trygvis/esper/testing/nexus/NexusDao.java b/src/main/java/io/trygvis/esper/testing/nexus/NexusDao.java
index 39d4233..00e15fe 100755
--- a/src/main/java/io/trygvis/esper/testing/nexus/NexusDao.java
+++ b/src/main/java/io/trygvis/esper/testing/nexus/NexusDao.java
@@ -5,21 +5,43 @@ import static fj.data.Option.*;
import io.trygvis.esper.testing.*;
import org.joda.time.*;
+import java.net.*;
+import java.sql.Array;
import java.sql.*;
+import java.util.*;
+import java.util.List;
public class NexusDao extends Dao {
protected NexusDao(Connection c) {
super(c);
}
- public void insertRepository(String repositoryId, LocalDateTime discoveryDate) throws SQLException {
- PreparedStatement s = prepareStatement("INSERT INTO nexus_repository(id) VALUES(?)");
- s.setString(1, repositoryId);
- s.executeUpdate();
+ /*
+ public void insertRepository(String repositoryId, URI nexusUri, LocalDateTime discoveryDate) throws SQLException {
+ int i = 1;
+ try (PreparedStatement s = prepareStatement("INSERT INTO nexus_repository(id, uri, discovered_date) VALUES(?, ?, ?)")) {
+ s.setString(i++, repositoryId);
+ s.setString(i++, nexusUri.toASCIIString());
+ s.setTimestamp(i, new Timestamp(discoveryDate.toDateTime().getMillis()));
+ s.executeUpdate();
+ }
+ }
+ */
+
+ public List<NexusServerDto> selectServer() throws SQLException {
+ try (PreparedStatement s = c.prepareStatement("SELECT url FROM nexus_server")) {
+ ResultSet rs = s.executeQuery();
+
+ List<NexusServerDto> servers = new ArrayList<>();
+ while(rs.next()) {
+ servers.add(new NexusServerDto(uri(rs.getString(1))));
+ }
+ return servers;
+ }
}
- public Option<NexusRepository> selectRepository(String repositoryId) throws SQLException {
- PreparedStatement s = prepareStatement("SELECT id, discovery_date, last_update, last_successful_update FROM nexus_repository WHERE id=?");
+ public Option<NexusRepositoryDto> selectRepository(String repositoryId) throws SQLException {
+ PreparedStatement s = prepareStatement("SELECT id, nexus_server_url, group_ids, discovery_date, last_update, last_successful_update FROM nexus_repository WHERE id=?");
s.setString(1, repositoryId);
try (ResultSet rs = s.executeQuery()) {
@@ -27,24 +49,81 @@ public class NexusDao extends Dao {
return Option.none();
}
- return some(new NexusRepository(
- rs.getString(1),
- fromNull(rs.getTimestamp(2)).map(timestampToLocalDateTime),
- fromNull(rs.getTimestamp(3)).map(timestampToLocalDateTime),
- fromNull(rs.getTimestamp(4)).map(timestampToLocalDateTime)
- ));
+ return some(toRepository(rs));
+ }
+ }
+
+ public List<NexusRepositoryDto> findRepositories(URI nexusUrl) throws SQLException {
+ PreparedStatement s = prepareStatement("SELECT id, nexus_server_url, group_ids, created_date, last_update, last_successful_update FROM nexus_repository WHERE nexus_server_url=?");
+ s.setString(1, nexusUrl.toASCIIString());
+
+ List<NexusRepositoryDto> list = new ArrayList<>();
+ try (ResultSet rs = s.executeQuery()) {
+ while(rs.next()) {
+ list.add(toRepository(rs));
+ }
+ }
+ return list;
+ }
+
+ private NexusRepositoryDto toRepository(ResultSet rs) throws SQLException {
+ int i = 1;
+
+ return new NexusRepositoryDto(
+ rs.getString(i++),
+ uri(rs.getString(i++)),
+ (String[]) rs.getArray(i++).getArray(),
+ fromNull(rs.getTimestamp(i++)).map(timestampToLocalDateTime),
+ fromNull(rs.getTimestamp(i++)).map(timestampToLocalDateTime),
+ fromNull(rs.getTimestamp(i)).map(timestampToLocalDateTime)
+ );
+ }
+
+ private URI uri(String s) {
+ try {
+ return URI.create(s);
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ return null;
}
}
}
-class NexusRepository {
+class NexusServerDto {
+ public final URI url;
+
+ NexusServerDto(URI url) {
+ this.url = url;
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (!(o instanceof NexusServerDto)) return false;
+
+ NexusServerDto that = (NexusServerDto) o;
+
+ if (!url.equals(that.url)) return false;
+
+ return true;
+ }
+
+ public int hashCode() {
+ return url.hashCode();
+ }
+}
+
+class NexusRepositoryDto {
public final String repositoryId;
+ public final URI nexusUrl;
+ public final String[] groupIds;
public final Option<LocalDateTime> discoveryDate;
public final Option<LocalDateTime> lastUpdate;
public final Option<LocalDateTime> lastSuccessfulUpdate;
- NexusRepository(String repositoryId, Option<LocalDateTime> discoveryDate, Option<LocalDateTime> lastUpdate, Option<LocalDateTime> lastSuccessfulUpdate) {
+ NexusRepositoryDto(String repositoryId, URI nexusUrl, String[] groupIds, Option<LocalDateTime> discoveryDate, Option<LocalDateTime> lastUpdate, Option<LocalDateTime> lastSuccessfulUpdate) {
this.repositoryId = repositoryId;
+ this.nexusUrl = nexusUrl;
+ this.groupIds = groupIds;
this.discoveryDate = discoveryDate;
this.lastUpdate = lastUpdate;
this.lastSuccessfulUpdate = lastSuccessfulUpdate;
diff --git a/src/main/java/io/trygvis/esper/testing/nexus/NexusImporter.java b/src/main/java/io/trygvis/esper/testing/nexus/NexusImporter.java
index 896f0e2..12d8c9c 100755
--- a/src/main/java/io/trygvis/esper/testing/nexus/NexusImporter.java
+++ b/src/main/java/io/trygvis/esper/testing/nexus/NexusImporter.java
@@ -1,24 +1,82 @@
package io.trygvis.esper.testing.nexus;
import com.google.common.collect.*;
+import com.jolbox.bonecp.*;
import fj.data.*;
import io.trygvis.esper.testing.*;
+import io.trygvis.esper.testing.object.*;
+import static java.lang.Thread.*;
import org.apache.commons.lang.*;
+import org.codehaus.httpcache4j.cache.*;
-import java.io.*;
+import java.sql.*;
import java.util.*;
+import java.util.concurrent.atomic.*;
public class NexusImporter {
- public static void main(String[] args) throws IOException {
- Config config = Config.loadFromDisk();
+ public static void main(String[] args) throws Exception {
+ final Config config = Config.loadFromDisk();
- NexusClient client = new NexusClient(HttpClient.createHttpClient(config), config.nexusUrl);
+ final HTTPCache http = HttpClient.createHttpClient(config);
- ArtifactSearchResult result = client.fetchIndex("eu.nets", Option.<String>none());
- ArrayList<ArtifactXml> artifacts = Lists.newArrayList(result.artifacts);
- Collections.sort(artifacts);
- for (ArtifactXml artifact : artifacts) {
- System.out.println("repo=" + StringUtils.join(artifact.repositories(), ", ") + ", artifact=" + artifact.getId());
+ final BoneCPDataSource boneCp = config.createBoneCp();
+
+ ObjectManager<NexusServerDto, ActorRef<NexusServer>> serverManager = new ObjectManager<>("Nexus server", Collections.<NexusServerDto>emptySet(), new ObjectFactory<NexusServerDto, ActorRef<NexusServer>>() {
+ public ActorRef<NexusServer> create(NexusServerDto server) {
+ final NexusClient client = new NexusClient(http, server.url);
+
+ return ObjectUtil.threadedActor(boneCp, config.nexusUpdateInterval, new NexusServer(client, server));
+ }
+ });
+
+ final AtomicBoolean shouldRun = new AtomicBoolean(true);
+ config.addShutdownHook(currentThread(), shouldRun);
+
+ while (shouldRun.get()) {
+ try {
+ try (Connection c = boneCp.getConnection()) {
+ serverManager.update(new NexusDao(c).selectServer());
+ }
+ } catch (SQLException e) {
+ e.printStackTrace(System.out);
+ }
+
+ synchronized (shouldRun) {
+ shouldRun.wait(1000);
+ }
}
+
+ serverManager.close();
+ }
+}
+
+class NexusServer implements TransactionalActor {
+
+ public final NexusClient client;
+ public final NexusServerDto server;
+
+ NexusServer(NexusClient client, NexusServerDto server) {
+ this.client = client;
+ this.server = server;
+ }
+
+ public void act(Connection c) throws Exception {
+ NexusDao dao = new NexusDao(c);
+
+ for (NexusRepositoryDto repository : dao.findRepositories(server.url)) {
+ System.out.println("Updating repository: " + repository.repositoryId);
+ for (String groupId : repository.groupIds) {
+ System.out.println("Updating groupId: " + groupId);
+ ArtifactSearchResult result = client.fetchIndex(groupId, Option.<String>none());
+
+ ArrayList<ArtifactXml> artifacts = Lists.newArrayList(result.artifacts);
+ Collections.sort(artifacts);
+ for (ArtifactXml artifact : artifacts) {
+ System.out.println("repo=" + StringUtils.join(artifact.repositories(), ", ") + ", artifact=" + artifact.getId());
+ }
+ }
+ }
+
+ c.commit();
}
}
diff --git a/src/main/java/io/trygvis/esper/testing/object/ActorRef.java b/src/main/java/io/trygvis/esper/testing/object/ActorRef.java
new file mode 100644
index 0000000..bc64da3
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/object/ActorRef.java
@@ -0,0 +1,7 @@
+package io.trygvis.esper.testing.object;
+
+import java.io.*;
+
+public interface ActorRef<A> extends Closeable {
+ A underlying();
+}
diff --git a/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java b/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java
new file mode 100644
index 0000000..6b6fe75
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java
@@ -0,0 +1,69 @@
+package io.trygvis.esper.testing.object;
+
+import javax.sql.*;
+import java.io.*;
+import java.sql.*;
+
+public class ObjectUtil {
+
+ public static <A extends TransactionalActor> ActorRef<A> threadedActor(DataSource dataSource, long delay, A actor) {
+ return new ThreadedActor<>(dataSource, actor, delay);
+ }
+
+ static class ThreadedActor<A extends TransactionalActor> implements ActorRef<A>, Runnable, Closeable {
+
+ private final DataSource dataSource;
+ private final A actor;
+ private final long delay;
+ private final Thread thread;
+ private boolean shouldRun = true;
+
+ ThreadedActor(DataSource dataSource, A actor, long delay) {
+ this.dataSource = dataSource;
+ this.actor = actor;
+ this.delay = delay;
+ thread = new Thread(this);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public A underlying() {
+ return actor;
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ public void run() {
+ while (shouldRun) {
+ try {
+ try (Connection c = dataSource.getConnection()) {
+ try {
+ actor.act(c);
+ } finally {
+ c.rollback();
+ }
+ }
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ shouldRun = false;
+ thread.interrupt();
+ while (thread.isAlive()) {
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ continue;
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/esper/testing/object/TransactionalActor.java b/src/main/java/io/trygvis/esper/testing/object/TransactionalActor.java
new file mode 100644
index 0000000..4d3cdce
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/object/TransactionalActor.java
@@ -0,0 +1,7 @@
+package io.trygvis.esper.testing.object;
+
+import java.sql.*;
+
+public interface TransactionalActor {
+ void act(Connection c) throws Exception;
+}