From cac8228f38136cfc41673458c58c25f168b1e1ff Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Tue, 20 Nov 2012 20:02:47 +0100 Subject: o Adding BoneCP init to Config. o Starting on an actor-like structure for the running jobs. o Loading Nexus servers and group ids to look for from the database. --- src/main/java/io/trygvis/esper/testing/Config.java | 55 ++++++++--- src/main/java/io/trygvis/esper/testing/Dao.java | 2 +- src/main/java/io/trygvis/esper/testing/Daos.java | 6 +- src/main/java/io/trygvis/esper/testing/Util.java | 48 +++++++++ .../java/io/trygvis/esper/testing/XmlUtil.java | 48 --------- .../esper/testing/gitorious/GitoriousImporter.java | 12 +-- .../esper/testing/jenkins/JenkinsClient.java | 12 +-- .../esper/testing/jenkins/JenkinsImporter.java | 20 ++-- .../esper/testing/jenkins/JenkinsServer.java | 1 - .../trygvis/esper/testing/nexus/NexusClient.java | 6 +- .../io/trygvis/esper/testing/nexus/NexusDao.java | 107 ++++++++++++++++++--- .../trygvis/esper/testing/nexus/NexusImporter.java | 76 +++++++++++++-- .../io/trygvis/esper/testing/object/ActorRef.java | 7 ++ .../trygvis/esper/testing/object/ObjectUtil.java | 69 +++++++++++++ .../esper/testing/object/TransactionalActor.java | 7 ++ 15 files changed, 353 insertions(+), 123 deletions(-) create mode 100644 src/main/java/io/trygvis/esper/testing/Util.java delete mode 100644 src/main/java/io/trygvis/esper/testing/XmlUtil.java create mode 100644 src/main/java/io/trygvis/esper/testing/object/ActorRef.java create mode 100644 src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java create mode 100644 src/main/java/io/trygvis/esper/testing/object/TransactionalActor.java (limited to 'src/main/java/io/trygvis') diff --git a/src/main/java/io/trygvis/esper/testing/Config.java b/src/main/java/io/trygvis/esper/testing/Config.java index d99a18a..ca0e6ff 100755 --- a/src/main/java/io/trygvis/esper/testing/Config.java +++ b/src/main/java/io/trygvis/esper/testing/Config.java @@ -2,29 +2,31 @@ package io.trygvis.esper.testing; import ch.qos.logback.classic.*; import ch.qos.logback.core.util.*; +import com.jolbox.bonecp.*; import fj.data.*; -import java.io.*; -import java.util.*; -import org.apache.commons.httpclient.protocol.*; +import static fj.data.Option.*; +import static org.apache.commons.lang.StringUtils.*; import org.slf4j.*; -import static org.apache.commons.lang.StringUtils.*; +import java.io.*; +import java.sql.*; +import java.util.*; +import java.util.concurrent.atomic.*; public class Config { public final String gitoriousUrl; public final Option gitoriousSessionValue; - public final String nexusUrl; - public final String databaseDriver; + public final long nexusUpdateInterval; + public final String databaseUrl; public final String databaseUsername; public final String databasePassword; - public Config(String gitoriousUrl, Option gitoriousSessionValue, String nexusUrl, String databaseDriver, String databaseUrl, String databaseUsername, String databasePassword) { + public Config(String gitoriousUrl, Option gitoriousSessionValue, long nexusUpdateInterval, String databaseUrl, String databaseUsername, String databasePassword) { this.gitoriousUrl = gitoriousUrl; this.gitoriousSessionValue = gitoriousSessionValue; - this.nexusUrl = nexusUrl; - this.databaseDriver = databaseDriver; + this.nexusUpdateInterval = nexusUpdateInterval; this.databaseUrl = databaseUrl; this.databaseUsername = databaseUsername; this.databasePassword = databasePassword; @@ -38,14 +40,9 @@ public class Config { properties.load(inputStream); } - String driver = trimToNull(properties.getProperty("database.driver")); - -// Class.forName(driver); - return new Config(trimToNull(properties.getProperty("gitorious.url")), - Option.fromNull(trimToNull(properties.getProperty("gitorious.sessionValue"))), - trimToNull(properties.getProperty("nexus.url")), - driver, + fromNull(trimToNull(properties.getProperty("gitorious.sessionValue"))), + fromNull(trimToNull(properties.getProperty("nexus.updateInterval"))).bind(parseInt).some() * 1000, trimToNull(properties.getProperty("database.url")), trimToNull(properties.getProperty("database.username")), trimToNull(properties.getProperty("database.password"))); @@ -55,4 +52,30 @@ public class Config { LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); StatusPrinter.print(lc); } + + public BoneCPDataSource createBoneCp() throws SQLException { + return new BoneCPDataSource(new BoneCPConfig(){{ + setJdbcUrl(databaseUrl); + setUsername(databaseUsername); + setPassword(databasePassword); + setDefaultAutoCommit(false); + setMaxConnectionsPerPartition(10); + }}); + } + + public void addShutdownHook(final Thread t, final AtomicBoolean shouldRun) { + Runtime.getRuntime().addShutdownHook(new Thread() { + { + setName("Shutdown hook"); + } + + public void run() { + synchronized (shouldRun) { + shouldRun.set(false); + shouldRun.notifyAll(); + t.interrupt(); + } + } + }); + } } diff --git a/src/main/java/io/trygvis/esper/testing/Dao.java b/src/main/java/io/trygvis/esper/testing/Dao.java index 6c36264..e1a65fe 100755 --- a/src/main/java/io/trygvis/esper/testing/Dao.java +++ b/src/main/java/io/trygvis/esper/testing/Dao.java @@ -7,7 +7,7 @@ import java.sql.*; import java.util.*; public class Dao { - private final Connection c; + protected final Connection c; private final Map statements = new HashMap<>(); protected Dao(Connection c) { diff --git a/src/main/java/io/trygvis/esper/testing/Daos.java b/src/main/java/io/trygvis/esper/testing/Daos.java index 27f9df8..1d5322f 100644 --- a/src/main/java/io/trygvis/esper/testing/Daos.java +++ b/src/main/java/io/trygvis/esper/testing/Daos.java @@ -1,8 +1,8 @@ package io.trygvis.esper.testing; -import com.jolbox.bonecp.*; import io.trygvis.esper.testing.gitorious.*; +import javax.sql.*; import java.io.*; import java.sql.*; @@ -38,7 +38,7 @@ public class Daos implements Closeable { connection.commit(); } - public static Daos lookup(BoneCP boneCp) throws SQLException { - return new Daos(boneCp.getConnection()); + public static Daos lookup(DataSource dataSource) throws SQLException { + return new Daos(dataSource.getConnection()); } } diff --git a/src/main/java/io/trygvis/esper/testing/Util.java b/src/main/java/io/trygvis/esper/testing/Util.java new file mode 100644 index 0000000..49d6a37 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/Util.java @@ -0,0 +1,48 @@ +package io.trygvis.esper.testing; + +import fj.*; +import fj.data.*; +import static fj.data.Option.*; +import org.jdom2.*; + +import java.net.*; + +public class Util { + public static F> parseInt = new F>() { + public Option f(String s) { + try { + return some(Integer.parseInt(s)); + } catch (NumberFormatException e) { + return none(); + } + } + }; + + public static F> parseUri = new F>() { + public Option f(String s) { + try { + return some(URI.create(s)); + } catch (Throwable e) { + return none(); + } + } + }; + + public static F> parseBoolean = new F>() { + public Option f(String s) { + try { + return some(Boolean.parseBoolean(s)); + } catch (Throwable e) { + return none(); + } + } + }; + + public static Option childText(Element e, String childName) { + return fromNull(e.getChildText(childName)); + } + + public static Option child(Element e, String childName) { + return fromNull(e.getChild(childName)); + } +} diff --git a/src/main/java/io/trygvis/esper/testing/XmlUtil.java b/src/main/java/io/trygvis/esper/testing/XmlUtil.java deleted file mode 100644 index c2c571c..0000000 --- a/src/main/java/io/trygvis/esper/testing/XmlUtil.java +++ /dev/null @@ -1,48 +0,0 @@ -package io.trygvis.esper.testing; - -import fj.*; -import fj.data.*; -import static fj.data.Option.*; -import org.jdom2.*; - -import java.net.*; - -public class XmlUtil { - public static F> parseInt = new F>() { - public Option f(String s) { - try { - return some(Integer.parseInt(s)); - } catch (NumberFormatException e) { - return none(); - } - } - }; - - public static F> parseUri = new F>() { - public Option f(String s) { - try { - return some(URI.create(s)); - } catch (Throwable e) { - return none(); - } - } - }; - - public static F> parseBoolean = new F>() { - public Option f(String s) { - try { - return some(Boolean.parseBoolean(s)); - } catch (Throwable e) { - return none(); - } - } - }; - - public static Option childText(Element e, String childName) { - return fromNull(e.getChildText(childName)); - } - - public static Option child(Element e, String childName) { - return fromNull(e.getChild(childName)); - } -} diff --git a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java index 74e39ea..a0efda5 100755 --- a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java +++ b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java @@ -16,7 +16,7 @@ import java.util.Set; import java.util.concurrent.*; public class GitoriousImporter { - private final BoneCP boneCp; + private final BoneCPDataSource boneCp; private final GitoriousClient gitoriousClient; public static void main(String[] args) throws Exception { @@ -25,15 +25,7 @@ public class GitoriousImporter { } public GitoriousImporter(final Config config) throws Exception { - BoneCPConfig boneCPConfig = new BoneCPConfig(){{ - setJdbcUrl(config.databaseUrl); - setUsername(config.databaseUsername); - setPassword(config.databasePassword); - setDefaultAutoCommit(false); - setMaxConnectionsPerPartition(10); - }}; - - boneCp = new BoneCP(boneCPConfig); + boneCp = config.createBoneCp(); gitoriousClient = new GitoriousClient(HttpClient.createHttpClient(config), config.gitoriousUrl); diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java index dac79d9..ed4cb59 100644 --- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java +++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java @@ -3,9 +3,9 @@ package io.trygvis.esper.testing.jenkins; import fj.*; import fj.data.*; import io.trygvis.esper.testing.*; -import static io.trygvis.esper.testing.XmlUtil.*; +import static io.trygvis.esper.testing.Util.*; import io.trygvis.esper.testing.jenkins.JenkinsJobXml.*; -import static java.lang.Integer.*; +import static java.lang.Integer.parseInt; import static org.apache.commons.lang.StringUtils.*; import org.codehaus.httpcache4j.*; import org.codehaus.httpcache4j.cache.*; @@ -183,8 +183,8 @@ class JenkinsJobXml { public final URI url; public static F> buildXml = new F>() { public Option f(Element element) { - Option number = childText(element, "number").bind(XmlUtil.parseInt); - Option url = childText(element, "url").bind(parseUri); + Option number = childText(element, "number").bind(Util.parseInt); + Option url = childText(element, "url").bind(Util.parseUri); if(number.isNone() || url.isNone()) { return Option.none(); @@ -205,9 +205,9 @@ class JenkinsJobXml { childText(root, "description"), childText(root, "displayName"), childText(root, "name"), - childText(root, "url").bind(parseUri).orSome(uri), + childText(root, "url").bind(Util.parseUri).orSome(uri), childText(root, "color"), - childText(root, "buildable").bind(parseBoolean).orSome(false), + childText(root, "buildable").bind(Util.parseBoolean).orSome(false), child(root, "lastBuild").bind(BuildXml.buildXml), child(root, "lastCompletedBuild").bind(BuildXml.buildXml), child(root, "lastFailedBuild").bind(BuildXml.buildXml), diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java index 6a132d8..b929576 100755 --- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java +++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java @@ -4,11 +4,13 @@ import fj.*; import fj.data.*; import io.trygvis.esper.testing.*; import io.trygvis.esper.testing.object.*; +import static java.lang.Thread.currentThread; import org.joda.time.*; import java.net.URI; import java.util.HashSet; import java.util.concurrent.*; +import java.util.concurrent.atomic.*; public class JenkinsImporter { public static void main(String[] args) throws Exception { @@ -28,19 +30,11 @@ public class JenkinsImporter { return new JenkinsServer(executorService, jenkinsClient, uri); } }); - final boolean[] shouldRun = new boolean[]{true}; - Runtime.getRuntime().addShutdownHook(new Thread() { - { - setName("Shutdown hoook"); - } + final AtomicBoolean shouldRun = new AtomicBoolean(true); + config.addShutdownHook(currentThread(), shouldRun); - public void run() { - shouldRun[0] = false; - } - }); - - while (shouldRun[0]) { + while (shouldRun.get()) { for (JenkinsServer server : serverManager.getObjects()) { Option> o = server.getJenkins(); @@ -52,7 +46,9 @@ public class JenkinsImporter { } } - Thread.sleep(1000); + synchronized (shouldRun) { + shouldRun.wait(1000); + } } serverManager.close(); diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java index 47bb005..49b8995 100644 --- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java +++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java @@ -12,7 +12,6 @@ import java.net.*; import java.util.*; import java.util.HashSet; import java.util.List; -import java.util.Set; import java.util.concurrent.*; public class JenkinsServer implements Closeable { diff --git a/src/main/java/io/trygvis/esper/testing/nexus/NexusClient.java b/src/main/java/io/trygvis/esper/testing/nexus/NexusClient.java index 6477a80..02b5e3a 100755 --- a/src/main/java/io/trygvis/esper/testing/nexus/NexusClient.java +++ b/src/main/java/io/trygvis/esper/testing/nexus/NexusClient.java @@ -16,9 +16,9 @@ import javax.xml.stream.*; public class NexusClient { private final HTTPCache http; - private final String nexusUrl; + private final URI nexusUrl; - public NexusClient(HTTPCache http, String nexusUrl) { + public NexusClient(HTTPCache http, URI nexusUrl) { this.http = http; this.nexusUrl = nexusUrl; } @@ -36,7 +36,7 @@ public class NexusClient { } public ArtifactSearchResult fetchIndexPage(String groupId, Option repositoryId, Option from) throws IOException { - URIBuilder uriBuilder = URIBuilder.fromURI(URI.create(nexusUrl)). + URIBuilder uriBuilder = URIBuilder.fromURI(nexusUrl). addRawPath("/service/local/lucene/search"). addParameter("g", groupId + ".*"); diff --git a/src/main/java/io/trygvis/esper/testing/nexus/NexusDao.java b/src/main/java/io/trygvis/esper/testing/nexus/NexusDao.java index 39d4233..00e15fe 100755 --- a/src/main/java/io/trygvis/esper/testing/nexus/NexusDao.java +++ b/src/main/java/io/trygvis/esper/testing/nexus/NexusDao.java @@ -5,21 +5,43 @@ import static fj.data.Option.*; import io.trygvis.esper.testing.*; import org.joda.time.*; +import java.net.*; +import java.sql.Array; import java.sql.*; +import java.util.*; +import java.util.List; public class NexusDao extends Dao { protected NexusDao(Connection c) { super(c); } - public void insertRepository(String repositoryId, LocalDateTime discoveryDate) throws SQLException { - PreparedStatement s = prepareStatement("INSERT INTO nexus_repository(id) VALUES(?)"); - s.setString(1, repositoryId); - s.executeUpdate(); + /* + public void insertRepository(String repositoryId, URI nexusUri, LocalDateTime discoveryDate) throws SQLException { + int i = 1; + try (PreparedStatement s = prepareStatement("INSERT INTO nexus_repository(id, uri, discovered_date) VALUES(?, ?, ?)")) { + s.setString(i++, repositoryId); + s.setString(i++, nexusUri.toASCIIString()); + s.setTimestamp(i, new Timestamp(discoveryDate.toDateTime().getMillis())); + s.executeUpdate(); + } + } + */ + + public List selectServer() throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT url FROM nexus_server")) { + ResultSet rs = s.executeQuery(); + + List servers = new ArrayList<>(); + while(rs.next()) { + servers.add(new NexusServerDto(uri(rs.getString(1)))); + } + return servers; + } } - public Option selectRepository(String repositoryId) throws SQLException { - PreparedStatement s = prepareStatement("SELECT id, discovery_date, last_update, last_successful_update FROM nexus_repository WHERE id=?"); + public Option selectRepository(String repositoryId) throws SQLException { + PreparedStatement s = prepareStatement("SELECT id, nexus_server_url, group_ids, discovery_date, last_update, last_successful_update FROM nexus_repository WHERE id=?"); s.setString(1, repositoryId); try (ResultSet rs = s.executeQuery()) { @@ -27,24 +49,81 @@ public class NexusDao extends Dao { return Option.none(); } - return some(new NexusRepository( - rs.getString(1), - fromNull(rs.getTimestamp(2)).map(timestampToLocalDateTime), - fromNull(rs.getTimestamp(3)).map(timestampToLocalDateTime), - fromNull(rs.getTimestamp(4)).map(timestampToLocalDateTime) - )); + return some(toRepository(rs)); + } + } + + public List findRepositories(URI nexusUrl) throws SQLException { + PreparedStatement s = prepareStatement("SELECT id, nexus_server_url, group_ids, created_date, last_update, last_successful_update FROM nexus_repository WHERE nexus_server_url=?"); + s.setString(1, nexusUrl.toASCIIString()); + + List list = new ArrayList<>(); + try (ResultSet rs = s.executeQuery()) { + while(rs.next()) { + list.add(toRepository(rs)); + } + } + return list; + } + + private NexusRepositoryDto toRepository(ResultSet rs) throws SQLException { + int i = 1; + + return new NexusRepositoryDto( + rs.getString(i++), + uri(rs.getString(i++)), + (String[]) rs.getArray(i++).getArray(), + fromNull(rs.getTimestamp(i++)).map(timestampToLocalDateTime), + fromNull(rs.getTimestamp(i++)).map(timestampToLocalDateTime), + fromNull(rs.getTimestamp(i)).map(timestampToLocalDateTime) + ); + } + + private URI uri(String s) { + try { + return URI.create(s); + } catch (Exception e) { + e.printStackTrace(System.out); + return null; } } } -class NexusRepository { +class NexusServerDto { + public final URI url; + + NexusServerDto(URI url) { + this.url = url; + } + + public boolean equals(Object o) { + if (this == o) return true; + if (!(o instanceof NexusServerDto)) return false; + + NexusServerDto that = (NexusServerDto) o; + + if (!url.equals(that.url)) return false; + + return true; + } + + public int hashCode() { + return url.hashCode(); + } +} + +class NexusRepositoryDto { public final String repositoryId; + public final URI nexusUrl; + public final String[] groupIds; public final Option discoveryDate; public final Option lastUpdate; public final Option lastSuccessfulUpdate; - NexusRepository(String repositoryId, Option discoveryDate, Option lastUpdate, Option lastSuccessfulUpdate) { + NexusRepositoryDto(String repositoryId, URI nexusUrl, String[] groupIds, Option discoveryDate, Option lastUpdate, Option lastSuccessfulUpdate) { this.repositoryId = repositoryId; + this.nexusUrl = nexusUrl; + this.groupIds = groupIds; this.discoveryDate = discoveryDate; this.lastUpdate = lastUpdate; this.lastSuccessfulUpdate = lastSuccessfulUpdate; diff --git a/src/main/java/io/trygvis/esper/testing/nexus/NexusImporter.java b/src/main/java/io/trygvis/esper/testing/nexus/NexusImporter.java index 896f0e2..12d8c9c 100755 --- a/src/main/java/io/trygvis/esper/testing/nexus/NexusImporter.java +++ b/src/main/java/io/trygvis/esper/testing/nexus/NexusImporter.java @@ -1,24 +1,82 @@ package io.trygvis.esper.testing.nexus; import com.google.common.collect.*; +import com.jolbox.bonecp.*; import fj.data.*; import io.trygvis.esper.testing.*; +import io.trygvis.esper.testing.object.*; +import static java.lang.Thread.*; import org.apache.commons.lang.*; +import org.codehaus.httpcache4j.cache.*; -import java.io.*; +import java.sql.*; import java.util.*; +import java.util.concurrent.atomic.*; public class NexusImporter { - public static void main(String[] args) throws IOException { - Config config = Config.loadFromDisk(); + public static void main(String[] args) throws Exception { + final Config config = Config.loadFromDisk(); - NexusClient client = new NexusClient(HttpClient.createHttpClient(config), config.nexusUrl); + final HTTPCache http = HttpClient.createHttpClient(config); - ArtifactSearchResult result = client.fetchIndex("eu.nets", Option.none()); - ArrayList artifacts = Lists.newArrayList(result.artifacts); - Collections.sort(artifacts); - for (ArtifactXml artifact : artifacts) { - System.out.println("repo=" + StringUtils.join(artifact.repositories(), ", ") + ", artifact=" + artifact.getId()); + final BoneCPDataSource boneCp = config.createBoneCp(); + + ObjectManager> serverManager = new ObjectManager<>("Nexus server", Collections.emptySet(), new ObjectFactory>() { + public ActorRef create(NexusServerDto server) { + final NexusClient client = new NexusClient(http, server.url); + + return ObjectUtil.threadedActor(boneCp, config.nexusUpdateInterval, new NexusServer(client, server)); + } + }); + + final AtomicBoolean shouldRun = new AtomicBoolean(true); + config.addShutdownHook(currentThread(), shouldRun); + + while (shouldRun.get()) { + try { + try (Connection c = boneCp.getConnection()) { + serverManager.update(new NexusDao(c).selectServer()); + } + } catch (SQLException e) { + e.printStackTrace(System.out); + } + + synchronized (shouldRun) { + shouldRun.wait(1000); + } } + + serverManager.close(); + } +} + +class NexusServer implements TransactionalActor { + + public final NexusClient client; + public final NexusServerDto server; + + NexusServer(NexusClient client, NexusServerDto server) { + this.client = client; + this.server = server; + } + + public void act(Connection c) throws Exception { + NexusDao dao = new NexusDao(c); + + for (NexusRepositoryDto repository : dao.findRepositories(server.url)) { + System.out.println("Updating repository: " + repository.repositoryId); + for (String groupId : repository.groupIds) { + System.out.println("Updating groupId: " + groupId); + ArtifactSearchResult result = client.fetchIndex(groupId, Option.none()); + + ArrayList artifacts = Lists.newArrayList(result.artifacts); + Collections.sort(artifacts); + for (ArtifactXml artifact : artifacts) { + System.out.println("repo=" + StringUtils.join(artifact.repositories(), ", ") + ", artifact=" + artifact.getId()); + } + } + } + + c.commit(); } } diff --git a/src/main/java/io/trygvis/esper/testing/object/ActorRef.java b/src/main/java/io/trygvis/esper/testing/object/ActorRef.java new file mode 100644 index 0000000..bc64da3 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/object/ActorRef.java @@ -0,0 +1,7 @@ +package io.trygvis.esper.testing.object; + +import java.io.*; + +public interface ActorRef extends Closeable { + A underlying(); +} diff --git a/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java b/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java new file mode 100644 index 0000000..6b6fe75 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java @@ -0,0 +1,69 @@ +package io.trygvis.esper.testing.object; + +import javax.sql.*; +import java.io.*; +import java.sql.*; + +public class ObjectUtil { + + public static ActorRef threadedActor(DataSource dataSource, long delay, A actor) { + return new ThreadedActor<>(dataSource, actor, delay); + } + + static class ThreadedActor implements ActorRef, Runnable, Closeable { + + private final DataSource dataSource; + private final A actor; + private final long delay; + private final Thread thread; + private boolean shouldRun = true; + + ThreadedActor(DataSource dataSource, A actor, long delay) { + this.dataSource = dataSource; + this.actor = actor; + this.delay = delay; + thread = new Thread(this); + thread.setDaemon(true); + thread.start(); + } + + public A underlying() { + return actor; + } + + @SuppressWarnings("ConstantConditions") + public void run() { + while (shouldRun) { + try { + try (Connection c = dataSource.getConnection()) { + try { + actor.act(c); + } finally { + c.rollback(); + } + } + } catch (Exception e) { + e.printStackTrace(System.out); + } + + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + // ignore + } + } + } + + public void close() throws IOException { + shouldRun = false; + thread.interrupt(); + while (thread.isAlive()) { + try { + thread.join(); + } catch (InterruptedException e) { + continue; + } + } + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/object/TransactionalActor.java b/src/main/java/io/trygvis/esper/testing/object/TransactionalActor.java new file mode 100644 index 0000000..4d3cdce --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/object/TransactionalActor.java @@ -0,0 +1,7 @@ +package io.trygvis.esper.testing.object; + +import java.sql.*; + +public interface TransactionalActor { + void act(Connection c) throws Exception; +} -- cgit v1.2.3