diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2012-11-18 13:42:21 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2012-11-18 13:42:21 +0100 |
commit | 4f22d5bcc676612e271620726ed4248fc357a2d6 (patch) | |
tree | 905a496fbcba1a6c9674c6d9c8cec9d597e493a7 /src/main/java/io/trygvis/esper/testing | |
parent | c266c3e36007d968d3126b4f8475d3e29e219a19 (diff) | |
download | esper-testing-4f22d5bcc676612e271620726ed4248fc357a2d6.tar.gz esper-testing-4f22d5bcc676612e271620726ed4248fc357a2d6.tar.bz2 esper-testing-4f22d5bcc676612e271620726ed4248fc357a2d6.tar.xz esper-testing-4f22d5bcc676612e271620726ed4248fc357a2d6.zip |
o A version that's more useful because as it is closing its connections.
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing')
8 files changed, 339 insertions, 104 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/Http.java b/src/main/java/io/trygvis/esper/testing/Http.java new file mode 100644 index 0000000..55f4714 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/Http.java @@ -0,0 +1,26 @@ +package io.trygvis.esper.testing; + +import org.apache.http.conn.scheme.*; +import org.apache.http.conn.ssl.*; +import org.apache.http.impl.client.*; +import org.apache.http.impl.conn.tsccm.*; +import org.apache.http.params.*; +import org.codehaus.httpcache4j.cache.*; +import org.codehaus.httpcache4j.resolver.*; + +public class Http { + + public static final HTTPCache http; + + static { + SchemeRegistry schemeRegistry = new SchemeRegistry(); + schemeRegistry.register(new Scheme("http", PlainSocketFactory.getSocketFactory(), 80)); + schemeRegistry.register(new Scheme("https", SSLSocketFactory.getSocketFactory(), 443)); + + BasicHttpParams params = new BasicHttpParams(); + ThreadSafeClientConnManager cm = new ThreadSafeClientConnManager(params, schemeRegistry); + DefaultHttpClient httpClient = new DefaultHttpClient(cm, params); + HTTPClientResponseResolver resolver = new HTTPClientResponseResolver(httpClient); + http = new HTTPCache(new MemoryCacheStorage(), resolver); + } +} diff --git a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java index 4ee6322..b4bc683 100644 --- a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java +++ b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java @@ -8,7 +8,7 @@ import static java.lang.System.*; import org.apache.abdera.parser.*; import org.codehaus.httpcache4j.*; import org.codehaus.httpcache4j.cache.*; -import org.codehaus.httpcache4j.client.*; +import org.codehaus.httpcache4j.resolver.*; import java.io.*; import java.net.*; diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java index 4ecb4fb..f3d2941 100644 --- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java +++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java @@ -35,12 +35,10 @@ public class JenkinsClient { } public JenkinsXml fetchJobs(URI uri) throws XMLStreamException, JDOMException, IOException { - InputStream stream = fetchXml(uri); - - Element doc = parseDocument(stream).getRootElement(); + Element root = fetchXml(uri).getRootElement(); List<JenkinsJobEntryXml> jobs = new ArrayList<>(); - for (Element job : doc.getChildren("job")) { + for (Element job : root.getChildren("job")) { String name = trimToNull(job.getChildText("name")); String url = trimToNull(job.getChildText("url")); String color = trimToNull(job.getChildText("color")); @@ -53,15 +51,13 @@ public class JenkinsClient { } return new JenkinsXml( - Option.fromNull(doc.getChildText("nodeName")), - Option.fromNull(doc.getChildText("nodeDescription")), - Option.fromNull(doc.getChildText("description")), jobs); + Option.fromNull(root.getChildText("nodeName")), + Option.fromNull(root.getChildText("nodeDescription")), + Option.fromNull(root.getChildText("description")), jobs); } public JenkinsJobXml fetchJob(URI uri) throws IOException, JDOMException, XMLStreamException { - InputStream stream = fetchXml(uri); - - Element root = parseDocument(stream).getRootElement(); + Element root = fetchXml(uri).getRootElement(); switch (root.getName()) { case "freeStyleProject": @@ -73,47 +69,46 @@ public class JenkinsClient { } } - private Document parseDocument(InputStream stream) throws JDOMException, XMLStreamException { - return streamBuilder.build(xmlReader.createXMLStreamReader(stream)); - } - - private InputStream fetchXml(URI uri) throws IOException { - HTTPResponse response; + private Document fetchXml(URI uri) throws IOException, XMLStreamException, JDOMException { + HTTPResponse response = null; try { response = http.execute(new HTTPRequest(uri)); - } catch (HTTPException e) { - throw new IOException(e); - } - - if (response.getStatus().getCode() != 200) { - throw new IOException("Did not get 200 back, got " + response.getStatus().getCode()); - } - InputStream stream = response.getPayload().getInputStream(); + if (response.getStatus().getCode() != 200) { + throw new IOException("Did not get 200 back, got " + response.getStatus().getCode()); + } - if (!debugXml) { - return stream; - } + InputStream stream = response.getPayload().getInputStream(); + + if (debugXml) { + int size; + try { + size = parseInt(response.getHeaders().getFirstHeader("Content-Length").getValue()); + } catch (Throwable e) { + size = 10 * 1024; + } + + // TODO: Pretty print + + ByteArrayOutputStream buffer = new ByteArrayOutputStream(size); + IOUtils.copy(stream, buffer); + byte[] bytes = buffer.toByteArray(); + System.out.println("------------------------------------------------"); + System.out.write(bytes); + System.out.println(); + System.out.println("------------------------------------------------"); + stream = new ByteArrayInputStream(bytes); + } - int size; - try { - size = parseInt(response.getHeaders().getFirstHeader("Content-Length").getValue()); - } catch (Throwable e) { - size = 10 * 1024; + return streamBuilder.build(xmlReader.createXMLStreamReader(stream)); + } catch (HTTPException e) { + throw new IOException(e); + } finally { + if (response != null) { + response.consume(); + } } - - // TODO: Pretty print - - ByteArrayOutputStream buffer = new ByteArrayOutputStream(size); - IOUtils.copy(stream, buffer); - byte[] bytes = buffer.toByteArray(); - System.out.println("------------------------------------------------"); - System.out.write(bytes); - System.out.println(); - System.out.println("------------------------------------------------"); - stream = new ByteArrayInputStream(bytes); - return stream; } } diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java index b639108..b7d88dc 100644 --- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java +++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java @@ -3,14 +3,10 @@ package io.trygvis.esper.testing.jenkins; import fj.*; import fj.data.*; import io.trygvis.esper.testing.*; +import static io.trygvis.esper.testing.Http.http; import io.trygvis.esper.testing.object.*; -import org.apache.commons.httpclient.*; -import org.apache.commons.httpclient.params.*; -import org.codehaus.httpcache4j.cache.*; -import org.codehaus.httpcache4j.client.*; import org.joda.time.*; -import java.net.*; import java.net.URI; import java.util.HashSet; import java.util.concurrent.*; @@ -19,22 +15,14 @@ public class JenkinsImporter { public static void main(String[] args) throws Exception { Main.configureLog4j(); -// HTTPClientResponseResolver resolver = HTTPClientResponseResolver.createMultithreadedInstance(); -// HTTPClientResponseResolver resolver = new HTTPClientResponseResolver(new HttpClient(new MultiThreadedHttpConnectionManager())); - HTTPClientResponseResolver resolver = new HTTPClientResponseResolver(new HttpClient(new SimpleHttpConnectionManager())); - HttpClientParams params = new HttpClientParams(); -// params.setConnectionManagerTimeout(1000); - params.setSoTimeout(1000); - resolver.getClient().setParams(params); - HTTPCache http = new HTTPCache(new MemoryCacheStorage(), resolver); final JenkinsClient jenkinsClient = new JenkinsClient(http); - jenkinsClient.setDebugXml(true); + jenkinsClient.setDebugXml(false); HashSet<URI> servers = new HashSet<>(); servers.add(URI.create("https://builds.apache.org")); - final ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1); + final ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(5); ObjectManager<URI, JenkinsServer> serverManager = new ObjectManager<>("JenkinsServer", servers, new ObjectFactory<URI, JenkinsServer>() { public JenkinsServer create(URI uri) { diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java index 6596dfa..9aad891 100644 --- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java +++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java @@ -1,6 +1,10 @@ package io.trygvis.esper.testing.jenkins; +import fj.data.*; +import static fj.data.Option.*; +import static java.lang.System.currentTimeMillis; import org.codehaus.httpcache4j.util.*; +import org.slf4j.*; import java.io.*; import java.net.*; @@ -9,12 +13,11 @@ import java.util.concurrent.*; public class JenkinsJob implements Closeable { + private final Logger logger = LoggerFactory.getLogger("jenkins.job"); private final JenkinsClient client; private final URI uri; - private JenkinsJobXml latestStatus; - // private boolean shouldRun = true; - // private final Thread thread; + private Option<JenkinsJobXml> latestStatus = none(); private final ScheduledFuture<?> future; public JenkinsJob(ScheduledExecutorService executorService, JenkinsClient client, URI uri) { @@ -22,62 +25,35 @@ public class JenkinsJob implements Closeable { this.uri = URIBuilder.fromURI(uri).addRawPath("api/xml").toURI(); long initialDelay = (long) Math.random() + 1; - long period = (long) (Math.random() * 10d) + 1; + long period = (long) (Math.random() * 100d) + 1; future = executorService.scheduleAtFixedRate(new Runnable() { public void run() { JenkinsJob.this.doWork(); } }, initialDelay, period, TimeUnit.SECONDS); - -// thread = new Thread(new Runnable() { -// public void run() { -// JenkinsJob.this.run(); -// } -// }); -// thread.setDaemon(true); -// thread.start(); } - public JenkinsJobXml getLatestStatus() { + public Option<JenkinsJobXml> getStatus() { return latestStatus; } - /* - public void close() throws IOException { - shouldRun = false; - thread.interrupt(); - while (thread.isAlive()) { - try { - thread.join(); - } catch (InterruptedException e) { - continue; - } - } - } - - private void run() { - Random r = new Random(); - while (shouldRun) { - doWork(); - - try { - Thread.sleep(1000 + r.nextInt(10) * 1000); - } catch (InterruptedException e) { - // ignore - } - } - } - */ - public void close() throws IOException { future.cancel(true); } private void doWork() { + + String name = latestStatus.isSome() && latestStatus.some().name.isSome() ? + latestStatus.some().name.some() : uri.toASCIIString(); + try { - latestStatus = client.fetchJob(uri); - } catch (Exception e) { - e.printStackTrace(System.out); + logger.info("Updating " + name); + long start = currentTimeMillis(); + latestStatus = some(client.fetchJob(uri)); + long end = currentTimeMillis(); + logger.info("Updated " + name + " in " + (end - start) + "ms"); + } catch (Throwable e) { + logger.warn("Error updating " + name, e); } } } diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java index 707a69a..47bb005 100644 --- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java +++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java @@ -92,7 +92,7 @@ public class JenkinsServer implements Closeable { this.jenkins = some(P.p(xml, new LocalDateTime())); - jobManager.update(new HashSet<>(jobUris.subList(0, 10))); + jobManager.update(new HashSet<>(jobUris)); } catch (Throwable e) { e.printStackTrace(System.out); } diff --git a/src/main/java/io/trygvis/esper/testing/task/TaskDao.java b/src/main/java/io/trygvis/esper/testing/task/TaskDao.java new file mode 100644 index 0000000..242eb2a --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/task/TaskDao.java @@ -0,0 +1,26 @@ +package io.trygvis.esper.testing.task; + +import java.sql.*; +import java.util.*; + +public class TaskDao { + private final Connection c; + private final String table; + + public TaskDao(Connection c, String table) { + this.c = c; + this.table = table; + } + + public List<String> findTasks() throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT task_id FROM ? FOR UPDATE")) { + s.setString(1, table); + ResultSet rs = s.executeQuery(); + List<String> list = new ArrayList<>(); + while(rs.next()) { + list.add(rs.getString(1)); + } + return list; + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/task/TaskManager.java b/src/main/java/io/trygvis/esper/testing/task/TaskManager.java new file mode 100644 index 0000000..e4daba5 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/task/TaskManager.java @@ -0,0 +1,224 @@ +package io.trygvis.esper.testing.task; + +import com.jolbox.bonecp.*; +import org.slf4j.*; +import org.slf4j.helpers.*; + +import java.sql.*; +import java.util.*; +import java.util.concurrent.*; + +public class TaskManager<T> { + public final String table; + public final TaskExecutorFactory taskExecutorFactory; + public final Executor executor; + public final BoneCP boneCP; + + private final Set<String> inProgress = new HashSet<>(); + + public TaskManager(String table, TaskExecutorFactory taskExecutorFactory, Executor executor, BoneCP boneCP) { + this.table = table; + this.taskExecutorFactory = taskExecutorFactory; + this.executor = executor; + this.boneCP = boneCP; + + Thread thread = new Thread(new Runnable() { + public void run() { + TaskManager.this.run(); + } + }); + thread.start(); + } + + private void run() { + while (true) { + try { + try (Connection c = boneCP.getConnection()) { + singleRun(c); + } + } catch (SQLException e) { + e.printStackTrace(System.out); + } + } + } + + private void singleRun(Connection c) throws SQLException { + TaskDao taskDao = new TaskDao(c, table); + + List<String> ids = taskDao.findTasks(); + + System.out.println("Found " + ids.size() + " new tasks."); + + synchronized (inProgress) { + System.out.println("Have " + inProgress.size() + " tasks in progress already"); + ids.removeAll(inProgress); + + List<Runnable> runnables = new ArrayList<>(ids.size()); + + for (final String id : ids) { + System.out.println("Scheduling " + id); + + final TaskExecutor executor = taskExecutorFactory.create(); + runnables.add(new Runnable() { + public void run() { + System.out.println("Executing " + id); + try { + try (Connection c2 = boneCP.getConnection()) { + SqlLogger logger = new SqlLogger(); + executor.execute(id, c2, logger); + // TODO: insert log statements + System.out.println("Executing " + id); + c2.commit(); + } + } catch (SQLException e) { + e.printStackTrace(); + } finally { + synchronized (inProgress) { + inProgress.remove(id); + } + } + } + }); + inProgress.add(id); + } + for (Runnable runnable : runnables) { + executor.execute(runnable); + } + } + } + + /** + * See MessageFormatter + */ + private static class SqlLogger extends MarkerIgnoringBase { + + public boolean isTraceEnabled() { + throw new RuntimeException("Not implemented"); + } + + public void trace(String msg) { + throw new RuntimeException("Not implemented"); + } + + public void trace(String format, Object arg) { + throw new RuntimeException("Not implemented"); + } + + public void trace(String format, Object arg1, Object arg2) { + throw new RuntimeException("Not implemented"); + } + + public void trace(String format, Object[] argArray) { + throw new RuntimeException("Not implemented"); + } + + public void trace(String msg, Throwable t) { + throw new RuntimeException("Not implemented"); + } + + public boolean isDebugEnabled() { + throw new RuntimeException("Not implemented"); + } + + public void debug(String msg) { + throw new RuntimeException("Not implemented"); + } + + public void debug(String format, Object arg) { + throw new RuntimeException("Not implemented"); + } + + public void debug(String format, Object arg1, Object arg2) { + throw new RuntimeException("Not implemented"); + } + + public void debug(String format, Object[] argArray) { + throw new RuntimeException("Not implemented"); + } + + public void debug(String msg, Throwable t) { + throw new RuntimeException("Not implemented"); + } + + public boolean isInfoEnabled() { + throw new RuntimeException("Not implemented"); + } + + public void info(String msg) { + throw new RuntimeException("Not implemented"); + } + + public void info(String format, Object arg) { + throw new RuntimeException("Not implemented"); + } + + public void info(String format, Object arg1, Object arg2) { + throw new RuntimeException("Not implemented"); + } + + public void info(String format, Object[] argArray) { + throw new RuntimeException("Not implemented"); + } + + public void info(String msg, Throwable t) { + throw new RuntimeException("Not implemented"); + } + + public boolean isWarnEnabled() { + throw new RuntimeException("Not implemented"); + } + + public void warn(String msg) { + throw new RuntimeException("Not implemented"); + } + + public void warn(String format, Object arg) { + throw new RuntimeException("Not implemented"); + } + + public void warn(String format, Object[] argArray) { + throw new RuntimeException("Not implemented"); + } + + public void warn(String format, Object arg1, Object arg2) { + throw new RuntimeException("Not implemented"); + } + + public void warn(String msg, Throwable t) { + throw new RuntimeException("Not implemented"); + } + + public boolean isErrorEnabled() { + throw new RuntimeException("Not implemented"); + } + + public void error(String msg) { + throw new RuntimeException("Not implemented"); + } + + public void error(String format, Object arg) { + throw new RuntimeException("Not implemented"); + } + + public void error(String format, Object arg1, Object arg2) { + throw new RuntimeException("Not implemented"); + } + + public void error(String format, Object[] argArray) { + throw new RuntimeException("Not implemented"); + } + + public void error(String msg, Throwable t) { + throw new RuntimeException("Not implemented"); + } + } +} + +interface TaskExecutorFactory { + TaskExecutor create(); +} + +interface TaskExecutor { + void execute(String id, Connection c, Logger logger) + throws SQLException; +} |