aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2012-11-18 13:42:21 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2012-11-18 13:42:21 +0100
commit4f22d5bcc676612e271620726ed4248fc357a2d6 (patch)
tree905a496fbcba1a6c9674c6d9c8cec9d597e493a7 /src/main/java/io/trygvis
parentc266c3e36007d968d3126b4f8475d3e29e219a19 (diff)
downloadesper-testing-4f22d5bcc676612e271620726ed4248fc357a2d6.tar.gz
esper-testing-4f22d5bcc676612e271620726ed4248fc357a2d6.tar.bz2
esper-testing-4f22d5bcc676612e271620726ed4248fc357a2d6.tar.xz
esper-testing-4f22d5bcc676612e271620726ed4248fc357a2d6.zip
o A version that's more useful because as it is closing its connections.
Diffstat (limited to 'src/main/java/io/trygvis')
-rw-r--r--src/main/java/io/trygvis/esper/testing/Http.java26
-rw-r--r--src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java2
-rw-r--r--src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java83
-rw-r--r--src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java18
-rw-r--r--src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java62
-rw-r--r--src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java2
-rw-r--r--src/main/java/io/trygvis/esper/testing/task/TaskDao.java26
-rw-r--r--src/main/java/io/trygvis/esper/testing/task/TaskManager.java224
8 files changed, 339 insertions, 104 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/Http.java b/src/main/java/io/trygvis/esper/testing/Http.java
new file mode 100644
index 0000000..55f4714
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/Http.java
@@ -0,0 +1,26 @@
+package io.trygvis.esper.testing;
+
+import org.apache.http.conn.scheme.*;
+import org.apache.http.conn.ssl.*;
+import org.apache.http.impl.client.*;
+import org.apache.http.impl.conn.tsccm.*;
+import org.apache.http.params.*;
+import org.codehaus.httpcache4j.cache.*;
+import org.codehaus.httpcache4j.resolver.*;
+
+public class Http {
+
+ public static final HTTPCache http;
+
+ static {
+ SchemeRegistry schemeRegistry = new SchemeRegistry();
+ schemeRegistry.register(new Scheme("http", PlainSocketFactory.getSocketFactory(), 80));
+ schemeRegistry.register(new Scheme("https", SSLSocketFactory.getSocketFactory(), 443));
+
+ BasicHttpParams params = new BasicHttpParams();
+ ThreadSafeClientConnManager cm = new ThreadSafeClientConnManager(params, schemeRegistry);
+ DefaultHttpClient httpClient = new DefaultHttpClient(cm, params);
+ HTTPClientResponseResolver resolver = new HTTPClientResponseResolver(httpClient);
+ http = new HTTPCache(new MemoryCacheStorage(), resolver);
+ }
+}
diff --git a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java
index 4ee6322..b4bc683 100644
--- a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java
+++ b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java
@@ -8,7 +8,7 @@ import static java.lang.System.*;
import org.apache.abdera.parser.*;
import org.codehaus.httpcache4j.*;
import org.codehaus.httpcache4j.cache.*;
-import org.codehaus.httpcache4j.client.*;
+import org.codehaus.httpcache4j.resolver.*;
import java.io.*;
import java.net.*;
diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java
index 4ecb4fb..f3d2941 100644
--- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java
+++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsClient.java
@@ -35,12 +35,10 @@ public class JenkinsClient {
}
public JenkinsXml fetchJobs(URI uri) throws XMLStreamException, JDOMException, IOException {
- InputStream stream = fetchXml(uri);
-
- Element doc = parseDocument(stream).getRootElement();
+ Element root = fetchXml(uri).getRootElement();
List<JenkinsJobEntryXml> jobs = new ArrayList<>();
- for (Element job : doc.getChildren("job")) {
+ for (Element job : root.getChildren("job")) {
String name = trimToNull(job.getChildText("name"));
String url = trimToNull(job.getChildText("url"));
String color = trimToNull(job.getChildText("color"));
@@ -53,15 +51,13 @@ public class JenkinsClient {
}
return new JenkinsXml(
- Option.fromNull(doc.getChildText("nodeName")),
- Option.fromNull(doc.getChildText("nodeDescription")),
- Option.fromNull(doc.getChildText("description")), jobs);
+ Option.fromNull(root.getChildText("nodeName")),
+ Option.fromNull(root.getChildText("nodeDescription")),
+ Option.fromNull(root.getChildText("description")), jobs);
}
public JenkinsJobXml fetchJob(URI uri) throws IOException, JDOMException, XMLStreamException {
- InputStream stream = fetchXml(uri);
-
- Element root = parseDocument(stream).getRootElement();
+ Element root = fetchXml(uri).getRootElement();
switch (root.getName()) {
case "freeStyleProject":
@@ -73,47 +69,46 @@ public class JenkinsClient {
}
}
- private Document parseDocument(InputStream stream) throws JDOMException, XMLStreamException {
- return streamBuilder.build(xmlReader.createXMLStreamReader(stream));
- }
-
- private InputStream fetchXml(URI uri) throws IOException {
- HTTPResponse response;
+ private Document fetchXml(URI uri) throws IOException, XMLStreamException, JDOMException {
+ HTTPResponse response = null;
try {
response = http.execute(new HTTPRequest(uri));
- } catch (HTTPException e) {
- throw new IOException(e);
- }
-
- if (response.getStatus().getCode() != 200) {
- throw new IOException("Did not get 200 back, got " + response.getStatus().getCode());
- }
- InputStream stream = response.getPayload().getInputStream();
+ if (response.getStatus().getCode() != 200) {
+ throw new IOException("Did not get 200 back, got " + response.getStatus().getCode());
+ }
- if (!debugXml) {
- return stream;
- }
+ InputStream stream = response.getPayload().getInputStream();
+
+ if (debugXml) {
+ int size;
+ try {
+ size = parseInt(response.getHeaders().getFirstHeader("Content-Length").getValue());
+ } catch (Throwable e) {
+ size = 10 * 1024;
+ }
+
+ // TODO: Pretty print
+
+ ByteArrayOutputStream buffer = new ByteArrayOutputStream(size);
+ IOUtils.copy(stream, buffer);
+ byte[] bytes = buffer.toByteArray();
+ System.out.println("------------------------------------------------");
+ System.out.write(bytes);
+ System.out.println();
+ System.out.println("------------------------------------------------");
+ stream = new ByteArrayInputStream(bytes);
+ }
- int size;
- try {
- size = parseInt(response.getHeaders().getFirstHeader("Content-Length").getValue());
- } catch (Throwable e) {
- size = 10 * 1024;
+ return streamBuilder.build(xmlReader.createXMLStreamReader(stream));
+ } catch (HTTPException e) {
+ throw new IOException(e);
+ } finally {
+ if (response != null) {
+ response.consume();
+ }
}
-
- // TODO: Pretty print
-
- ByteArrayOutputStream buffer = new ByteArrayOutputStream(size);
- IOUtils.copy(stream, buffer);
- byte[] bytes = buffer.toByteArray();
- System.out.println("------------------------------------------------");
- System.out.write(bytes);
- System.out.println();
- System.out.println("------------------------------------------------");
- stream = new ByteArrayInputStream(bytes);
- return stream;
}
}
diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java
index b639108..b7d88dc 100644
--- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java
+++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsImporter.java
@@ -3,14 +3,10 @@ package io.trygvis.esper.testing.jenkins;
import fj.*;
import fj.data.*;
import io.trygvis.esper.testing.*;
+import static io.trygvis.esper.testing.Http.http;
import io.trygvis.esper.testing.object.*;
-import org.apache.commons.httpclient.*;
-import org.apache.commons.httpclient.params.*;
-import org.codehaus.httpcache4j.cache.*;
-import org.codehaus.httpcache4j.client.*;
import org.joda.time.*;
-import java.net.*;
import java.net.URI;
import java.util.HashSet;
import java.util.concurrent.*;
@@ -19,22 +15,14 @@ public class JenkinsImporter {
public static void main(String[] args) throws Exception {
Main.configureLog4j();
-// HTTPClientResponseResolver resolver = HTTPClientResponseResolver.createMultithreadedInstance();
-// HTTPClientResponseResolver resolver = new HTTPClientResponseResolver(new HttpClient(new MultiThreadedHttpConnectionManager()));
- HTTPClientResponseResolver resolver = new HTTPClientResponseResolver(new HttpClient(new SimpleHttpConnectionManager()));
- HttpClientParams params = new HttpClientParams();
-// params.setConnectionManagerTimeout(1000);
- params.setSoTimeout(1000);
- resolver.getClient().setParams(params);
- HTTPCache http = new HTTPCache(new MemoryCacheStorage(), resolver);
final JenkinsClient jenkinsClient = new JenkinsClient(http);
- jenkinsClient.setDebugXml(true);
+ jenkinsClient.setDebugXml(false);
HashSet<URI> servers = new HashSet<>();
servers.add(URI.create("https://builds.apache.org"));
- final ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(1);
+ final ScheduledThreadPoolExecutor executorService = new ScheduledThreadPoolExecutor(5);
ObjectManager<URI, JenkinsServer> serverManager = new ObjectManager<>("JenkinsServer", servers, new ObjectFactory<URI, JenkinsServer>() {
public JenkinsServer create(URI uri) {
diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java
index 6596dfa..9aad891 100644
--- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java
+++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsJob.java
@@ -1,6 +1,10 @@
package io.trygvis.esper.testing.jenkins;
+import fj.data.*;
+import static fj.data.Option.*;
+import static java.lang.System.currentTimeMillis;
import org.codehaus.httpcache4j.util.*;
+import org.slf4j.*;
import java.io.*;
import java.net.*;
@@ -9,12 +13,11 @@ import java.util.concurrent.*;
public class JenkinsJob implements Closeable {
+ private final Logger logger = LoggerFactory.getLogger("jenkins.job");
private final JenkinsClient client;
private final URI uri;
- private JenkinsJobXml latestStatus;
- // private boolean shouldRun = true;
- // private final Thread thread;
+ private Option<JenkinsJobXml> latestStatus = none();
private final ScheduledFuture<?> future;
public JenkinsJob(ScheduledExecutorService executorService, JenkinsClient client, URI uri) {
@@ -22,62 +25,35 @@ public class JenkinsJob implements Closeable {
this.uri = URIBuilder.fromURI(uri).addRawPath("api/xml").toURI();
long initialDelay = (long) Math.random() + 1;
- long period = (long) (Math.random() * 10d) + 1;
+ long period = (long) (Math.random() * 100d) + 1;
future = executorService.scheduleAtFixedRate(new Runnable() {
public void run() {
JenkinsJob.this.doWork();
}
}, initialDelay, period, TimeUnit.SECONDS);
-
-// thread = new Thread(new Runnable() {
-// public void run() {
-// JenkinsJob.this.run();
-// }
-// });
-// thread.setDaemon(true);
-// thread.start();
}
- public JenkinsJobXml getLatestStatus() {
+ public Option<JenkinsJobXml> getStatus() {
return latestStatus;
}
- /*
- public void close() throws IOException {
- shouldRun = false;
- thread.interrupt();
- while (thread.isAlive()) {
- try {
- thread.join();
- } catch (InterruptedException e) {
- continue;
- }
- }
- }
-
- private void run() {
- Random r = new Random();
- while (shouldRun) {
- doWork();
-
- try {
- Thread.sleep(1000 + r.nextInt(10) * 1000);
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
- */
-
public void close() throws IOException {
future.cancel(true);
}
private void doWork() {
+
+ String name = latestStatus.isSome() && latestStatus.some().name.isSome() ?
+ latestStatus.some().name.some() : uri.toASCIIString();
+
try {
- latestStatus = client.fetchJob(uri);
- } catch (Exception e) {
- e.printStackTrace(System.out);
+ logger.info("Updating " + name);
+ long start = currentTimeMillis();
+ latestStatus = some(client.fetchJob(uri));
+ long end = currentTimeMillis();
+ logger.info("Updated " + name + " in " + (end - start) + "ms");
+ } catch (Throwable e) {
+ logger.warn("Error updating " + name, e);
}
}
}
diff --git a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java
index 707a69a..47bb005 100644
--- a/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java
+++ b/src/main/java/io/trygvis/esper/testing/jenkins/JenkinsServer.java
@@ -92,7 +92,7 @@ public class JenkinsServer implements Closeable {
this.jenkins = some(P.p(xml, new LocalDateTime()));
- jobManager.update(new HashSet<>(jobUris.subList(0, 10)));
+ jobManager.update(new HashSet<>(jobUris));
} catch (Throwable e) {
e.printStackTrace(System.out);
}
diff --git a/src/main/java/io/trygvis/esper/testing/task/TaskDao.java b/src/main/java/io/trygvis/esper/testing/task/TaskDao.java
new file mode 100644
index 0000000..242eb2a
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/task/TaskDao.java
@@ -0,0 +1,26 @@
+package io.trygvis.esper.testing.task;
+
+import java.sql.*;
+import java.util.*;
+
+public class TaskDao {
+ private final Connection c;
+ private final String table;
+
+ public TaskDao(Connection c, String table) {
+ this.c = c;
+ this.table = table;
+ }
+
+ public List<String> findTasks() throws SQLException {
+ try (PreparedStatement s = c.prepareStatement("SELECT task_id FROM ? FOR UPDATE")) {
+ s.setString(1, table);
+ ResultSet rs = s.executeQuery();
+ List<String> list = new ArrayList<>();
+ while(rs.next()) {
+ list.add(rs.getString(1));
+ }
+ return list;
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/esper/testing/task/TaskManager.java b/src/main/java/io/trygvis/esper/testing/task/TaskManager.java
new file mode 100644
index 0000000..e4daba5
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/task/TaskManager.java
@@ -0,0 +1,224 @@
+package io.trygvis.esper.testing.task;
+
+import com.jolbox.bonecp.*;
+import org.slf4j.*;
+import org.slf4j.helpers.*;
+
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+public class TaskManager<T> {
+ public final String table;
+ public final TaskExecutorFactory taskExecutorFactory;
+ public final Executor executor;
+ public final BoneCP boneCP;
+
+ private final Set<String> inProgress = new HashSet<>();
+
+ public TaskManager(String table, TaskExecutorFactory taskExecutorFactory, Executor executor, BoneCP boneCP) {
+ this.table = table;
+ this.taskExecutorFactory = taskExecutorFactory;
+ this.executor = executor;
+ this.boneCP = boneCP;
+
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ TaskManager.this.run();
+ }
+ });
+ thread.start();
+ }
+
+ private void run() {
+ while (true) {
+ try {
+ try (Connection c = boneCP.getConnection()) {
+ singleRun(c);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace(System.out);
+ }
+ }
+ }
+
+ private void singleRun(Connection c) throws SQLException {
+ TaskDao taskDao = new TaskDao(c, table);
+
+ List<String> ids = taskDao.findTasks();
+
+ System.out.println("Found " + ids.size() + " new tasks.");
+
+ synchronized (inProgress) {
+ System.out.println("Have " + inProgress.size() + " tasks in progress already");
+ ids.removeAll(inProgress);
+
+ List<Runnable> runnables = new ArrayList<>(ids.size());
+
+ for (final String id : ids) {
+ System.out.println("Scheduling " + id);
+
+ final TaskExecutor executor = taskExecutorFactory.create();
+ runnables.add(new Runnable() {
+ public void run() {
+ System.out.println("Executing " + id);
+ try {
+ try (Connection c2 = boneCP.getConnection()) {
+ SqlLogger logger = new SqlLogger();
+ executor.execute(id, c2, logger);
+ // TODO: insert log statements
+ System.out.println("Executing " + id);
+ c2.commit();
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ } finally {
+ synchronized (inProgress) {
+ inProgress.remove(id);
+ }
+ }
+ }
+ });
+ inProgress.add(id);
+ }
+ for (Runnable runnable : runnables) {
+ executor.execute(runnable);
+ }
+ }
+ }
+
+ /**
+ * See MessageFormatter
+ */
+ private static class SqlLogger extends MarkerIgnoringBase {
+
+ public boolean isTraceEnabled() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void trace(String msg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void trace(String format, Object arg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void trace(String format, Object arg1, Object arg2) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void trace(String format, Object[] argArray) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void trace(String msg, Throwable t) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public boolean isDebugEnabled() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void debug(String msg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void debug(String format, Object arg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void debug(String format, Object arg1, Object arg2) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void debug(String format, Object[] argArray) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void debug(String msg, Throwable t) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public boolean isInfoEnabled() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void info(String msg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void info(String format, Object arg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void info(String format, Object arg1, Object arg2) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void info(String format, Object[] argArray) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void info(String msg, Throwable t) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public boolean isWarnEnabled() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void warn(String msg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void warn(String format, Object arg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void warn(String format, Object[] argArray) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void warn(String format, Object arg1, Object arg2) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void warn(String msg, Throwable t) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public boolean isErrorEnabled() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void error(String msg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void error(String format, Object arg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void error(String format, Object arg1, Object arg2) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void error(String format, Object[] argArray) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void error(String msg, Throwable t) {
+ throw new RuntimeException("Not implemented");
+ }
+ }
+}
+
+interface TaskExecutorFactory {
+ TaskExecutor create();
+}
+
+interface TaskExecutor {
+ void execute(String id, Connection c, Logger logger)
+ throws SQLException;
+}