aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2012-11-09 01:54:53 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2012-11-09 01:54:53 +0100
commit796336d8ce3593e10b44f40a05de56a6cc2ba4e7 (patch)
tree7c4095de83d4d3ae7c241ca668f0c48d65a91a99 /src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java
parent2eae4836279646050e7e342752cde6e8f7c5b6cb (diff)
downloadesper-testing-796336d8ce3593e10b44f40a05de56a6cc2ba4e7.tar.gz
esper-testing-796336d8ce3593e10b44f40a05de56a6cc2ba4e7.tar.bz2
esper-testing-796336d8ce3593e10b44f40a05de56a6cc2ba4e7.tar.xz
esper-testing-796336d8ce3593e10b44f40a05de56a6cc2ba4e7.zip
wip
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java')
-rw-r--r--src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java201
1 files changed, 115 insertions, 86 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java
index ee89527..6264bc7 100644
--- a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java
+++ b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java
@@ -1,22 +1,29 @@
package io.trygvis.esper.testing.gitorious;
import com.jolbox.bonecp.*;
-import com.jolbox.bonecp.hooks.*;
+import fj.*;
import io.trygvis.esper.testing.*;
+import static java.lang.System.*;
import org.apache.abdera.*;
+import org.apache.abdera.model.*;
+import org.apache.abdera.parser.*;
import org.apache.abdera.protocol.client.*;
-import org.apache.abdera.protocol.client.cache.*;
+import org.codehaus.httpcache4j.*;
import org.codehaus.httpcache4j.cache.*;
import org.codehaus.httpcache4j.client.*;
+import java.io.*;
+import java.net.*;
import java.sql.*;
+import java.util.Date;
import java.util.*;
import java.util.concurrent.*;
public class GitoriousImporter {
-// private final AbderaClient abderaClient;
+ private final Parser parser;
private final BoneCP boneCp;
private final GitoriousClient gitoriousClient;
+ private final HTTPCache httpCache;
public static void main(String[] args) throws Exception {
Main.configureLog4j();
@@ -25,77 +32,86 @@ public class GitoriousImporter {
public GitoriousImporter() throws Exception {
Abdera abdera = new Abdera();
-// abderaClient = new AbderaClient(abdera, new LRUCache(abdera, 1000));
+ parser = abdera.getParser();
BoneCPConfig config = new BoneCPConfig();
config.setJdbcUrl(DbMain.JDBC_URL);
config.setUsername("esper");
config.setPassword("");
config.setDefaultAutoCommit(false);
- config.setMaxConnectionsPerPartition(1);
+ config.setMaxConnectionsPerPartition(10);
- config.setConnectionHook(new AbstractConnectionHook() {
- public void onAcquire(ConnectionHandle c) {
- try {
- c.setDebugHandle(new Daos(c));
- } catch (SQLException e) {
- throw new RuntimeException(e);
- }
- }
- });
+// config.setConnectionHook(new AbstractConnectionHook() {
+// public void onAcquire(ConnectionHandle c) {
+// try {
+// System.out.println("New SQL connection.");
+// c.setDebugHandle(new Daos(c));
+// } catch (SQLException e) {connections
+// throw new RuntimeException(e);
+// }
+// }
+// });
boneCp = new BoneCP(config);
- HTTPCache httpCache = new HTTPCache(new MemoryCacheStorage(), HTTPClientResponseResolver.createMultithreadedInstance());
+ httpCache = new HTTPCache(new MemoryCacheStorage(), HTTPClientResponseResolver.createMultithreadedInstance());
- gitoriousClient = new GitoriousClient(httpCache, "https://gitorious.org");
+ gitoriousClient = new GitoriousClient(httpCache, "http://gitorious.org");
final ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1);
- int projectsUpdateInterval = 1000;
- final int projectUpdateInterval = 1000;
+ int projectsUpdateDelay = 0 * 1000;
+ int projectsUpdateInterval = 60 * 1000;
+ int repositoriesUpdateDelay = 0;
+ int repositoriesUpdateInterval = 60 * 1000;
-// service.scheduleAtFixedRate(new Runnable() {
-// public void run() {
-// try {
-// discoverProjects();
-// } catch (Exception e) {
-// e.printStackTrace(System.out);
-// }
-// }
-// }, projectsUpdateInterval, projectsUpdateInterval, TimeUnit.MILLISECONDS);
+ service.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ try {
+ discoverProjects();
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ }
+ }, projectsUpdateDelay, projectsUpdateInterval, TimeUnit.MILLISECONDS);
- discoverProjects();
+ service.scheduleAtFixedRate(new Runnable() {
+ public void run() {
+ try {
+ updateRepositories();
+ } catch (Exception e) {
+ e.printStackTrace(System.out);
+ }
+ }
+ }, repositoriesUpdateDelay, repositoriesUpdateInterval, TimeUnit.MILLISECONDS);
}
private void discoverProjects() throws Exception {
Set<GitoriousProject> projects = gitoriousClient.findProjects();
- try (ConnectionHandle connection = (ConnectionHandle) boneCp.getConnection()) {
- Daos daos = (Daos) connection.getDebugHandle();
+ long start = currentTimeMillis();
+ try (Daos daos = Daos.lookup(boneCp)) {
GitoriousRepositoryDao repoDao = daos.gitoriousRepositoryDao;
GitoriousProjectDao projectDao = daos.gitoriousProjectDao;
- daos.begin();
System.out.println("Processing " + projects.size() + " projects.");
for (GitoriousProject project : projects) {
- if(projectDao.countProjects(project.slug) == 0) {
+ if (projectDao.countProjects(project.slug) == 0) {
System.out.println("New project: " + project.slug + ", has " + project.repositories.size() + " repositories.");
projectDao.insertProject(project);
for (GitoriousRepository repository : project.repositories) {
repoDao.insertRepository(repository);
}
- }
- else {
+ } else {
for (GitoriousRepository repository : project.repositories) {
- if(repoDao.countRepositories(repository) == 0) {
+ if (repoDao.countRepositories(repository) == 0) {
System.out.println("New repository for project " + repository.projectSlug + ": " + repository.name);
repoDao.insertRepository(repository);
}
}
for (GitoriousRepository repository : repoDao.selectForProject(project.slug)) {
- if(project.repositories.contains(repository)) {
+ if (project.repositories.contains(repository)) {
continue;
}
System.out.println("Gone repository for project " + repository.projectSlug + ": " + repository.name);
@@ -104,11 +120,11 @@ public class GitoriousImporter {
}
}
- for (String project : projectDao.selectAll()) {
+ for (String project : projectDao.selectSlugs()) {
boolean found = false;
for (Iterator<GitoriousProject> it = projects.iterator(); it.hasNext(); ) {
GitoriousProject p = it.next();
- if(p.slug.equals(project)) {
+ if (p.slug.equals(project)) {
found = true;
break;
}
@@ -123,72 +139,85 @@ public class GitoriousImporter {
projectDao.delete(project);
}
- connection.commit();
+ daos.commit();
}
+ long end = currentTimeMillis();
+ System.out.println("Processed in " + (end - start) + " ms");
}
- /*
- private void work() throws SQLException, InterruptedException {
- String url = "http://qt.gitorious.org/projects/show/qt.atom";
+ private void updateRepositories() throws SQLException, IOException {
+ try (Daos daos = Daos.lookup(boneCp)) {
+ List<P2<String, URI>> list = daos.gitoriousProjectDao.selectFeeds();
+ System.out.println("Updating " + list.size() + " feeds.");
+ for (P2<String, URI> pair : list) {
+ updateFeed(daos, pair._1(), pair._2());
+ daos.commit();
+ }
+ }
+ }
- while (true) {
- Timestamp lastUpdate = atomDao.getAtomFeed(url);
+ private void updateFeed(Daos daos, String slug, URI uri) throws SQLException {
+ AtomDao atomDao = daos.atomDao;
+ GitoriousEventDao eventDao = daos.gitoriousEventDao;
- System.out.println("Fetching " + url);
- RequestOptions options = new RequestOptions();
- if (lastUpdate != null) {
- options.setIfModifiedSince(lastUpdate);
- }
+ Timestamp lastUpdate = atomDao.getAtomFeed(uri);
- long start = System.currentTimeMillis();
- ClientResponse response = abderaClient.get(url, options);
- long end = System.currentTimeMillis();
- System.out.println("Fetched in " + (end - start) + "ms");
+ System.out.println("Fetching " + uri);
+ RequestOptions options = new RequestOptions();
+ if (lastUpdate != null) {
+ options.setIfModifiedSince(lastUpdate);
+ }
- // Use the server's timestamp
- Date responseDate = response.getDateHeader("Date");
+ long start = currentTimeMillis();
+ HTTPResponse response = httpCache.execute(new HTTPRequest(uri, HTTPMethod.GET));
+ long end = currentTimeMillis();
+ System.out.println("Fetched in " + (end - start) + "ms");
- System.out.println("responseDate = " + responseDate);
+ // Use the server's timestamp
+ Date responseDate = response.getDate().toDate();
- Document<Element> document = response.getDocument();
- Feed feed = (Feed) document.getRoot();
+ System.out.println("responseDate = " + responseDate);
- for (Entry entry : feed.getEntries()) {
- String entryId = entry.getId().toASCIIString();
- Date published = entry.getPublished();
- String title = entry.getTitle();
+ Document<Element> document = null;
+ try {
+ document = parser.parse(response.getPayload().getInputStream());
+ } catch (ParseException e) {
+ System.out.println("Error parsing " + uri);
+ e.printStackTrace(System.out);
+ return;
+ }
- // Validate element
- if (entryId == null || published == null || title == null) {
- continue;
- }
+ Feed feed = (Feed) document.getRoot();
- if (lastUpdate != null && lastUpdate.after(published)) {
- System.out.println("Old entry: " + url + ":" + entryId);
- continue;
- }
+ for (Entry entry : feed.getEntries()) {
+ String entryId = entry.getId().toASCIIString();
+ Date published = entry.getPublished();
+ String title = entry.getTitle();
- System.out.println("New entry: " + url + ":" + entryId);
- if (gitoriousDao.countEntryId(entryId) == 0) {
- gitoriousDao.insertChange(entryId, title);
- } else {
- System.out.println("Already imported entry: " + entryId);
- }
+ // Validate element
+ if (entryId == null || published == null || title == null) {
+ continue;
}
- if (lastUpdate == null) {
- System.out.println("New atom feed");
- atomDao.insertAtomFeed(url, new Timestamp(responseDate.getTime()));
- } else {
- System.out.println("Updating atom feed");
- atomDao.updateAtomFeed(url, lastUpdate);
+ if (lastUpdate != null && lastUpdate.after(published)) {
+ System.out.println("Old entry: " + uri + ":" + entryId);
+ continue;
}
- connection.commit();
+ System.out.println("New entry: " + uri + ":" + entryId);
+ if (eventDao.countEntryId(entryId) == 0) {
+ eventDao.insertChange(entryId, title);
+ } else {
+ System.out.println("Already imported entry: " + entryId);
+ }
+ }
- System.out.println("Sleeping");
- Thread.sleep(10 * 1000);
+ if (lastUpdate == null) {
+ System.out.println("New atom feed");
+ atomDao.insertAtomFeed(uri, new Timestamp(responseDate.getTime()));
+ } else {
+ System.out.println("Updating atom feed");
+ atomDao.updateAtomFeed(uri, lastUpdate);
}
}
- */
}