diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2012-11-09 01:54:53 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2012-11-09 01:54:53 +0100 |
commit | 796336d8ce3593e10b44f40a05de56a6cc2ba4e7 (patch) | |
tree | 7c4095de83d4d3ae7c241ca668f0c48d65a91a99 /src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java | |
parent | 2eae4836279646050e7e342752cde6e8f7c5b6cb (diff) | |
download | esper-testing-796336d8ce3593e10b44f40a05de56a6cc2ba4e7.tar.gz esper-testing-796336d8ce3593e10b44f40a05de56a6cc2ba4e7.tar.bz2 esper-testing-796336d8ce3593e10b44f40a05de56a6cc2ba4e7.tar.xz esper-testing-796336d8ce3593e10b44f40a05de56a6cc2ba4e7.zip |
wip
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java')
-rw-r--r-- | src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java | 201 |
1 files changed, 115 insertions, 86 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java index ee89527..6264bc7 100644 --- a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java +++ b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java @@ -1,22 +1,29 @@ package io.trygvis.esper.testing.gitorious; import com.jolbox.bonecp.*; -import com.jolbox.bonecp.hooks.*; +import fj.*; import io.trygvis.esper.testing.*; +import static java.lang.System.*; import org.apache.abdera.*; +import org.apache.abdera.model.*; +import org.apache.abdera.parser.*; import org.apache.abdera.protocol.client.*; -import org.apache.abdera.protocol.client.cache.*; +import org.codehaus.httpcache4j.*; import org.codehaus.httpcache4j.cache.*; import org.codehaus.httpcache4j.client.*; +import java.io.*; +import java.net.*; import java.sql.*; +import java.util.Date; import java.util.*; import java.util.concurrent.*; public class GitoriousImporter { -// private final AbderaClient abderaClient; + private final Parser parser; private final BoneCP boneCp; private final GitoriousClient gitoriousClient; + private final HTTPCache httpCache; public static void main(String[] args) throws Exception { Main.configureLog4j(); @@ -25,77 +32,86 @@ public class GitoriousImporter { public GitoriousImporter() throws Exception { Abdera abdera = new Abdera(); -// abderaClient = new AbderaClient(abdera, new LRUCache(abdera, 1000)); + parser = abdera.getParser(); BoneCPConfig config = new BoneCPConfig(); config.setJdbcUrl(DbMain.JDBC_URL); config.setUsername("esper"); config.setPassword(""); config.setDefaultAutoCommit(false); - config.setMaxConnectionsPerPartition(1); + config.setMaxConnectionsPerPartition(10); - config.setConnectionHook(new AbstractConnectionHook() { - public void onAcquire(ConnectionHandle c) { - try { - c.setDebugHandle(new Daos(c)); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - }); +// config.setConnectionHook(new AbstractConnectionHook() { +// public void onAcquire(ConnectionHandle c) { +// try { +// System.out.println("New SQL connection."); +// c.setDebugHandle(new Daos(c)); +// } catch (SQLException e) {connections +// throw new RuntimeException(e); +// } +// } +// }); boneCp = new BoneCP(config); - HTTPCache httpCache = new HTTPCache(new MemoryCacheStorage(), HTTPClientResponseResolver.createMultithreadedInstance()); + httpCache = new HTTPCache(new MemoryCacheStorage(), HTTPClientResponseResolver.createMultithreadedInstance()); - gitoriousClient = new GitoriousClient(httpCache, "https://gitorious.org"); + gitoriousClient = new GitoriousClient(httpCache, "http://gitorious.org"); final ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1); - int projectsUpdateInterval = 1000; - final int projectUpdateInterval = 1000; + int projectsUpdateDelay = 0 * 1000; + int projectsUpdateInterval = 60 * 1000; + int repositoriesUpdateDelay = 0; + int repositoriesUpdateInterval = 60 * 1000; -// service.scheduleAtFixedRate(new Runnable() { -// public void run() { -// try { -// discoverProjects(); -// } catch (Exception e) { -// e.printStackTrace(System.out); -// } -// } -// }, projectsUpdateInterval, projectsUpdateInterval, TimeUnit.MILLISECONDS); + service.scheduleAtFixedRate(new Runnable() { + public void run() { + try { + discoverProjects(); + } catch (Exception e) { + e.printStackTrace(System.out); + } + } + }, projectsUpdateDelay, projectsUpdateInterval, TimeUnit.MILLISECONDS); - discoverProjects(); + service.scheduleAtFixedRate(new Runnable() { + public void run() { + try { + updateRepositories(); + } catch (Exception e) { + e.printStackTrace(System.out); + } + } + }, repositoriesUpdateDelay, repositoriesUpdateInterval, TimeUnit.MILLISECONDS); } private void discoverProjects() throws Exception { Set<GitoriousProject> projects = gitoriousClient.findProjects(); - try (ConnectionHandle connection = (ConnectionHandle) boneCp.getConnection()) { - Daos daos = (Daos) connection.getDebugHandle(); + long start = currentTimeMillis(); + try (Daos daos = Daos.lookup(boneCp)) { GitoriousRepositoryDao repoDao = daos.gitoriousRepositoryDao; GitoriousProjectDao projectDao = daos.gitoriousProjectDao; - daos.begin(); System.out.println("Processing " + projects.size() + " projects."); for (GitoriousProject project : projects) { - if(projectDao.countProjects(project.slug) == 0) { + if (projectDao.countProjects(project.slug) == 0) { System.out.println("New project: " + project.slug + ", has " + project.repositories.size() + " repositories."); projectDao.insertProject(project); for (GitoriousRepository repository : project.repositories) { repoDao.insertRepository(repository); } - } - else { + } else { for (GitoriousRepository repository : project.repositories) { - if(repoDao.countRepositories(repository) == 0) { + if (repoDao.countRepositories(repository) == 0) { System.out.println("New repository for project " + repository.projectSlug + ": " + repository.name); repoDao.insertRepository(repository); } } for (GitoriousRepository repository : repoDao.selectForProject(project.slug)) { - if(project.repositories.contains(repository)) { + if (project.repositories.contains(repository)) { continue; } System.out.println("Gone repository for project " + repository.projectSlug + ": " + repository.name); @@ -104,11 +120,11 @@ public class GitoriousImporter { } } - for (String project : projectDao.selectAll()) { + for (String project : projectDao.selectSlugs()) { boolean found = false; for (Iterator<GitoriousProject> it = projects.iterator(); it.hasNext(); ) { GitoriousProject p = it.next(); - if(p.slug.equals(project)) { + if (p.slug.equals(project)) { found = true; break; } @@ -123,72 +139,85 @@ public class GitoriousImporter { projectDao.delete(project); } - connection.commit(); + daos.commit(); } + long end = currentTimeMillis(); + System.out.println("Processed in " + (end - start) + " ms"); } - /* - private void work() throws SQLException, InterruptedException { - String url = "http://qt.gitorious.org/projects/show/qt.atom"; + private void updateRepositories() throws SQLException, IOException { + try (Daos daos = Daos.lookup(boneCp)) { + List<P2<String, URI>> list = daos.gitoriousProjectDao.selectFeeds(); + System.out.println("Updating " + list.size() + " feeds."); + for (P2<String, URI> pair : list) { + updateFeed(daos, pair._1(), pair._2()); + daos.commit(); + } + } + } - while (true) { - Timestamp lastUpdate = atomDao.getAtomFeed(url); + private void updateFeed(Daos daos, String slug, URI uri) throws SQLException { + AtomDao atomDao = daos.atomDao; + GitoriousEventDao eventDao = daos.gitoriousEventDao; - System.out.println("Fetching " + url); - RequestOptions options = new RequestOptions(); - if (lastUpdate != null) { - options.setIfModifiedSince(lastUpdate); - } + Timestamp lastUpdate = atomDao.getAtomFeed(uri); - long start = System.currentTimeMillis(); - ClientResponse response = abderaClient.get(url, options); - long end = System.currentTimeMillis(); - System.out.println("Fetched in " + (end - start) + "ms"); + System.out.println("Fetching " + uri); + RequestOptions options = new RequestOptions(); + if (lastUpdate != null) { + options.setIfModifiedSince(lastUpdate); + } - // Use the server's timestamp - Date responseDate = response.getDateHeader("Date"); + long start = currentTimeMillis(); + HTTPResponse response = httpCache.execute(new HTTPRequest(uri, HTTPMethod.GET)); + long end = currentTimeMillis(); + System.out.println("Fetched in " + (end - start) + "ms"); - System.out.println("responseDate = " + responseDate); + // Use the server's timestamp + Date responseDate = response.getDate().toDate(); - Document<Element> document = response.getDocument(); - Feed feed = (Feed) document.getRoot(); + System.out.println("responseDate = " + responseDate); - for (Entry entry : feed.getEntries()) { - String entryId = entry.getId().toASCIIString(); - Date published = entry.getPublished(); - String title = entry.getTitle(); + Document<Element> document = null; + try { + document = parser.parse(response.getPayload().getInputStream()); + } catch (ParseException e) { + System.out.println("Error parsing " + uri); + e.printStackTrace(System.out); + return; + } - // Validate element - if (entryId == null || published == null || title == null) { - continue; - } + Feed feed = (Feed) document.getRoot(); - if (lastUpdate != null && lastUpdate.after(published)) { - System.out.println("Old entry: " + url + ":" + entryId); - continue; - } + for (Entry entry : feed.getEntries()) { + String entryId = entry.getId().toASCIIString(); + Date published = entry.getPublished(); + String title = entry.getTitle(); - System.out.println("New entry: " + url + ":" + entryId); - if (gitoriousDao.countEntryId(entryId) == 0) { - gitoriousDao.insertChange(entryId, title); - } else { - System.out.println("Already imported entry: " + entryId); - } + // Validate element + if (entryId == null || published == null || title == null) { + continue; } - if (lastUpdate == null) { - System.out.println("New atom feed"); - atomDao.insertAtomFeed(url, new Timestamp(responseDate.getTime())); - } else { - System.out.println("Updating atom feed"); - atomDao.updateAtomFeed(url, lastUpdate); + if (lastUpdate != null && lastUpdate.after(published)) { + System.out.println("Old entry: " + uri + ":" + entryId); + continue; } - connection.commit(); + System.out.println("New entry: " + uri + ":" + entryId); + if (eventDao.countEntryId(entryId) == 0) { + eventDao.insertChange(entryId, title); + } else { + System.out.println("Already imported entry: " + entryId); + } + } - System.out.println("Sleeping"); - Thread.sleep(10 * 1000); + if (lastUpdate == null) { + System.out.println("New atom feed"); + atomDao.insertAtomFeed(uri, new Timestamp(responseDate.getTime())); + } else { + System.out.println("Updating atom feed"); + atomDao.updateAtomFeed(uri, lastUpdate); } } - */ } |