diff options
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing')
6 files changed, 199 insertions, 125 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/AtomDao.java b/src/main/java/io/trygvis/esper/testing/AtomDao.java index 0215545..1441dd5 100644 --- a/src/main/java/io/trygvis/esper/testing/AtomDao.java +++ b/src/main/java/io/trygvis/esper/testing/AtomDao.java @@ -1,5 +1,6 @@ package io.trygvis.esper.testing; +import java.net.*; import java.sql.*; public class AtomDao { @@ -13,8 +14,8 @@ public class AtomDao { updateAtomFeed = c.prepareStatement("UPDATE atom_feed SET last_update=? WHERE url=?"); } - public Timestamp getAtomFeed(String url) throws SQLException { - selectLastUpdate.setString(1, url); + public Timestamp getAtomFeed(URI uri) throws SQLException { + selectLastUpdate.setString(1, uri.toASCIIString()); ResultSet rs = selectLastUpdate.executeQuery(); if (!rs.next()) { return null; @@ -23,15 +24,15 @@ public class AtomDao { return rs.getTimestamp(1); } - public void insertAtomFeed(String url, Timestamp lastUpdate) throws SQLException { - insertAtomFeed.setString(1, url); + public void insertAtomFeed(URI uri, Timestamp lastUpdate) throws SQLException { + insertAtomFeed.setString(1, uri.toASCIIString()); insertAtomFeed.setTimestamp(2, lastUpdate); insertAtomFeed.executeUpdate(); } - public void updateAtomFeed(String url, Timestamp lastUpdate) throws SQLException { + public void updateAtomFeed(URI uri, Timestamp lastUpdate) throws SQLException { updateAtomFeed.setTimestamp(1, lastUpdate); - updateAtomFeed.setString(2, url); + updateAtomFeed.setString(2, uri.toASCIIString()); updateAtomFeed.executeUpdate(); } } diff --git a/src/main/java/io/trygvis/esper/testing/Daos.java b/src/main/java/io/trygvis/esper/testing/Daos.java index db97f9d..c8a3e4d 100644 --- a/src/main/java/io/trygvis/esper/testing/Daos.java +++ b/src/main/java/io/trygvis/esper/testing/Daos.java @@ -1,25 +1,47 @@ package io.trygvis.esper.testing; +import com.jolbox.bonecp.*; import io.trygvis.esper.testing.gitorious.*; +import java.io.*; import java.sql.*; -public class Daos { +public class Daos implements Closeable { + private final Connection connection; public final AtomDao atomDao; public final GitoriousEventDao gitoriousEventDao; public final GitoriousProjectDao gitoriousProjectDao; public final GitoriousRepositoryDao gitoriousRepositoryDao; - public final PreparedStatement begin; - - public Daos(Connection c) throws SQLException { - atomDao = new AtomDao(c); - gitoriousEventDao = new GitoriousEventDao(c); - gitoriousProjectDao = new GitoriousProjectDao(c); - gitoriousRepositoryDao = new GitoriousRepositoryDao(c); - begin = c.prepareStatement("BEGIN"); + public final int seq; + public static int counter = 1; + + public Daos(Connection connection) throws SQLException { + this.connection = connection; + this.seq = counter++; + atomDao = new AtomDao(connection); + gitoriousEventDao = new GitoriousEventDao(connection); + gitoriousProjectDao = new GitoriousProjectDao(connection); + gitoriousRepositoryDao = new GitoriousRepositoryDao(connection); + + System.out.println("Opened connection " + seq); + } + + public void close() throws IOException { + System.out.println("Closing connection " + seq); + try { + connection.rollback(); + connection.close(); + } catch (SQLException e) { + throw new IOException(e); + } + } + + public void commit() throws SQLException { + connection.commit(); } - public void begin() throws SQLException { - begin.executeUpdate(); + public static Daos lookup(BoneCP boneCp) throws SQLException { + return new Daos(boneCp.getConnection()); +// return (Daos) ((ConnectionHandle) boneCp.getConnection()).getDebugHandle(); } } diff --git a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java index ee89527..6264bc7 100644 --- a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java +++ b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousImporter.java @@ -1,22 +1,29 @@ package io.trygvis.esper.testing.gitorious; import com.jolbox.bonecp.*; -import com.jolbox.bonecp.hooks.*; +import fj.*; import io.trygvis.esper.testing.*; +import static java.lang.System.*; import org.apache.abdera.*; +import org.apache.abdera.model.*; +import org.apache.abdera.parser.*; import org.apache.abdera.protocol.client.*; -import org.apache.abdera.protocol.client.cache.*; +import org.codehaus.httpcache4j.*; import org.codehaus.httpcache4j.cache.*; import org.codehaus.httpcache4j.client.*; +import java.io.*; +import java.net.*; import java.sql.*; +import java.util.Date; import java.util.*; import java.util.concurrent.*; public class GitoriousImporter { -// private final AbderaClient abderaClient; + private final Parser parser; private final BoneCP boneCp; private final GitoriousClient gitoriousClient; + private final HTTPCache httpCache; public static void main(String[] args) throws Exception { Main.configureLog4j(); @@ -25,77 +32,86 @@ public class GitoriousImporter { public GitoriousImporter() throws Exception { Abdera abdera = new Abdera(); -// abderaClient = new AbderaClient(abdera, new LRUCache(abdera, 1000)); + parser = abdera.getParser(); BoneCPConfig config = new BoneCPConfig(); config.setJdbcUrl(DbMain.JDBC_URL); config.setUsername("esper"); config.setPassword(""); config.setDefaultAutoCommit(false); - config.setMaxConnectionsPerPartition(1); + config.setMaxConnectionsPerPartition(10); - config.setConnectionHook(new AbstractConnectionHook() { - public void onAcquire(ConnectionHandle c) { - try { - c.setDebugHandle(new Daos(c)); - } catch (SQLException e) { - throw new RuntimeException(e); - } - } - }); +// config.setConnectionHook(new AbstractConnectionHook() { +// public void onAcquire(ConnectionHandle c) { +// try { +// System.out.println("New SQL connection."); +// c.setDebugHandle(new Daos(c)); +// } catch (SQLException e) {connections +// throw new RuntimeException(e); +// } +// } +// }); boneCp = new BoneCP(config); - HTTPCache httpCache = new HTTPCache(new MemoryCacheStorage(), HTTPClientResponseResolver.createMultithreadedInstance()); + httpCache = new HTTPCache(new MemoryCacheStorage(), HTTPClientResponseResolver.createMultithreadedInstance()); - gitoriousClient = new GitoriousClient(httpCache, "https://gitorious.org"); + gitoriousClient = new GitoriousClient(httpCache, "http://gitorious.org"); final ScheduledThreadPoolExecutor service = new ScheduledThreadPoolExecutor(1); - int projectsUpdateInterval = 1000; - final int projectUpdateInterval = 1000; + int projectsUpdateDelay = 0 * 1000; + int projectsUpdateInterval = 60 * 1000; + int repositoriesUpdateDelay = 0; + int repositoriesUpdateInterval = 60 * 1000; -// service.scheduleAtFixedRate(new Runnable() { -// public void run() { -// try { -// discoverProjects(); -// } catch (Exception e) { -// e.printStackTrace(System.out); -// } -// } -// }, projectsUpdateInterval, projectsUpdateInterval, TimeUnit.MILLISECONDS); + service.scheduleAtFixedRate(new Runnable() { + public void run() { + try { + discoverProjects(); + } catch (Exception e) { + e.printStackTrace(System.out); + } + } + }, projectsUpdateDelay, projectsUpdateInterval, TimeUnit.MILLISECONDS); - discoverProjects(); + service.scheduleAtFixedRate(new Runnable() { + public void run() { + try { + updateRepositories(); + } catch (Exception e) { + e.printStackTrace(System.out); + } + } + }, repositoriesUpdateDelay, repositoriesUpdateInterval, TimeUnit.MILLISECONDS); } private void discoverProjects() throws Exception { Set<GitoriousProject> projects = gitoriousClient.findProjects(); - try (ConnectionHandle connection = (ConnectionHandle) boneCp.getConnection()) { - Daos daos = (Daos) connection.getDebugHandle(); + long start = currentTimeMillis(); + try (Daos daos = Daos.lookup(boneCp)) { GitoriousRepositoryDao repoDao = daos.gitoriousRepositoryDao; GitoriousProjectDao projectDao = daos.gitoriousProjectDao; - daos.begin(); System.out.println("Processing " + projects.size() + " projects."); for (GitoriousProject project : projects) { - if(projectDao.countProjects(project.slug) == 0) { + if (projectDao.countProjects(project.slug) == 0) { System.out.println("New project: " + project.slug + ", has " + project.repositories.size() + " repositories."); projectDao.insertProject(project); for (GitoriousRepository repository : project.repositories) { repoDao.insertRepository(repository); } - } - else { + } else { for (GitoriousRepository repository : project.repositories) { - if(repoDao.countRepositories(repository) == 0) { + if (repoDao.countRepositories(repository) == 0) { System.out.println("New repository for project " + repository.projectSlug + ": " + repository.name); repoDao.insertRepository(repository); } } for (GitoriousRepository repository : repoDao.selectForProject(project.slug)) { - if(project.repositories.contains(repository)) { + if (project.repositories.contains(repository)) { continue; } System.out.println("Gone repository for project " + repository.projectSlug + ": " + repository.name); @@ -104,11 +120,11 @@ public class GitoriousImporter { } } - for (String project : projectDao.selectAll()) { + for (String project : projectDao.selectSlugs()) { boolean found = false; for (Iterator<GitoriousProject> it = projects.iterator(); it.hasNext(); ) { GitoriousProject p = it.next(); - if(p.slug.equals(project)) { + if (p.slug.equals(project)) { found = true; break; } @@ -123,72 +139,85 @@ public class GitoriousImporter { projectDao.delete(project); } - connection.commit(); + daos.commit(); } + long end = currentTimeMillis(); + System.out.println("Processed in " + (end - start) + " ms"); } - /* - private void work() throws SQLException, InterruptedException { - String url = "http://qt.gitorious.org/projects/show/qt.atom"; + private void updateRepositories() throws SQLException, IOException { + try (Daos daos = Daos.lookup(boneCp)) { + List<P2<String, URI>> list = daos.gitoriousProjectDao.selectFeeds(); + System.out.println("Updating " + list.size() + " feeds."); + for (P2<String, URI> pair : list) { + updateFeed(daos, pair._1(), pair._2()); + daos.commit(); + } + } + } - while (true) { - Timestamp lastUpdate = atomDao.getAtomFeed(url); + private void updateFeed(Daos daos, String slug, URI uri) throws SQLException { + AtomDao atomDao = daos.atomDao; + GitoriousEventDao eventDao = daos.gitoriousEventDao; - System.out.println("Fetching " + url); - RequestOptions options = new RequestOptions(); - if (lastUpdate != null) { - options.setIfModifiedSince(lastUpdate); - } + Timestamp lastUpdate = atomDao.getAtomFeed(uri); - long start = System.currentTimeMillis(); - ClientResponse response = abderaClient.get(url, options); - long end = System.currentTimeMillis(); - System.out.println("Fetched in " + (end - start) + "ms"); + System.out.println("Fetching " + uri); + RequestOptions options = new RequestOptions(); + if (lastUpdate != null) { + options.setIfModifiedSince(lastUpdate); + } - // Use the server's timestamp - Date responseDate = response.getDateHeader("Date"); + long start = currentTimeMillis(); + HTTPResponse response = httpCache.execute(new HTTPRequest(uri, HTTPMethod.GET)); + long end = currentTimeMillis(); + System.out.println("Fetched in " + (end - start) + "ms"); - System.out.println("responseDate = " + responseDate); + // Use the server's timestamp + Date responseDate = response.getDate().toDate(); - Document<Element> document = response.getDocument(); - Feed feed = (Feed) document.getRoot(); + System.out.println("responseDate = " + responseDate); - for (Entry entry : feed.getEntries()) { - String entryId = entry.getId().toASCIIString(); - Date published = entry.getPublished(); - String title = entry.getTitle(); + Document<Element> document = null; + try { + document = parser.parse(response.getPayload().getInputStream()); + } catch (ParseException e) { + System.out.println("Error parsing " + uri); + e.printStackTrace(System.out); + return; + } - // Validate element - if (entryId == null || published == null || title == null) { - continue; - } + Feed feed = (Feed) document.getRoot(); - if (lastUpdate != null && lastUpdate.after(published)) { - System.out.println("Old entry: " + url + ":" + entryId); - continue; - } + for (Entry entry : feed.getEntries()) { + String entryId = entry.getId().toASCIIString(); + Date published = entry.getPublished(); + String title = entry.getTitle(); - System.out.println("New entry: " + url + ":" + entryId); - if (gitoriousDao.countEntryId(entryId) == 0) { - gitoriousDao.insertChange(entryId, title); - } else { - System.out.println("Already imported entry: " + entryId); - } + // Validate element + if (entryId == null || published == null || title == null) { + continue; } - if (lastUpdate == null) { - System.out.println("New atom feed"); - atomDao.insertAtomFeed(url, new Timestamp(responseDate.getTime())); - } else { - System.out.println("Updating atom feed"); - atomDao.updateAtomFeed(url, lastUpdate); + if (lastUpdate != null && lastUpdate.after(published)) { + System.out.println("Old entry: " + uri + ":" + entryId); + continue; } - connection.commit(); + System.out.println("New entry: " + uri + ":" + entryId); + if (eventDao.countEntryId(entryId) == 0) { + eventDao.insertChange(entryId, title); + } else { + System.out.println("Already imported entry: " + entryId); + } + } - System.out.println("Sleeping"); - Thread.sleep(10 * 1000); + if (lastUpdate == null) { + System.out.println("New atom feed"); + atomDao.insertAtomFeed(uri, new Timestamp(responseDate.getTime())); + } else { + System.out.println("Updating atom feed"); + atomDao.updateAtomFeed(uri, lastUpdate); } } - */ } diff --git a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousProject.java b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousProject.java index 6947512..1ccfac3 100644 --- a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousProject.java +++ b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousProject.java @@ -8,10 +8,12 @@ import java.util.*; public class GitoriousProject implements Comparable<GitoriousProject> { public final String slug; + public final URI atomFeed; public final List<GitoriousRepository> repositories; - public GitoriousProject(String slug, List<GitoriousRepository> repositories) { + public GitoriousProject(String slug, URI atomFeed, List<GitoriousRepository> repositories) { this.slug = slug; + this.atomFeed = atomFeed; this.repositories = repositories; } @@ -47,7 +49,7 @@ public class GitoriousProject implements Comparable<GitoriousProject> { repositoryList.add(r); } - return new GitoriousProject(slug, repositoryList); + return new GitoriousProject(slug, URI.create(gitoriousUrl + "/" + slug + ".atom"), repositoryList); } public static List<GitoriousProject> projectsFromXml(String gitoriousUrl, Element root) throws URISyntaxException { @@ -91,12 +93,10 @@ public class GitoriousProject implements Comparable<GitoriousProject> { class GitoriousRepository implements Comparable<GitoriousRepository> { public final String projectSlug; public final String name; - public final URI atom; - GitoriousRepository(String projectSlug, String name, URI atom) { + GitoriousRepository(String projectSlug, String name) { this.projectSlug = projectSlug; this.name = name; - this.atom = atom; } public static GitoriousRepository fromXml(String gitoriousUrl, String project, Element element) throws URISyntaxException { @@ -106,7 +106,7 @@ class GitoriousRepository implements Comparable<GitoriousRepository> { return null; } - return new GitoriousRepository(project, name, new URI(gitoriousUrl + "/" + project + "/" + name + ".atom")); + return new GitoriousRepository(project, name); } public int compareTo(GitoriousRepository o) { @@ -125,7 +125,6 @@ class GitoriousRepository implements Comparable<GitoriousRepository> { GitoriousRepository that = (GitoriousRepository) o; - if (!atom.equals(that.atom)) return false; if (!name.equals(that.name)) return false; if (!projectSlug.equals(that.projectSlug)) return false; @@ -135,7 +134,6 @@ class GitoriousRepository implements Comparable<GitoriousRepository> { public int hashCode() { int result = projectSlug.hashCode(); result = 31 * result + name.hashCode(); - result = 31 * result + atom.hashCode(); return result; } } diff --git a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousProjectDao.java b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousProjectDao.java index f47b126..432d154 100644 --- a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousProjectDao.java +++ b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousProjectDao.java @@ -1,5 +1,8 @@ package io.trygvis.esper.testing.gitorious; +import fj.*; + +import java.net.*; import java.sql.*; import java.util.*; @@ -9,32 +12,49 @@ public class GitoriousProjectDao extends Dao { } private final PreparedStatement countProjects = prepareStatement("SELECT count(*) FROM gitorious_project WHERE slug=?"); + public int countProjects(String slug) throws SQLException { countProjects.setString(1, slug); - try(ResultSet rs = countProjects.executeQuery()) { + try (ResultSet rs = countProjects.executeQuery()) { rs.next(); return rs.getInt(1); } } - private final PreparedStatement insertProject = prepareStatement("INSERT INTO gitorious_project(slug) VALUES(?)"); + private final PreparedStatement insertProject = prepareStatement("INSERT INTO gitorious_project(slug, atom_feed) VALUES(?, ?)"); + public void insertProject(GitoriousProject project) throws SQLException { insertProject.setString(1, project.slug); + insertProject.setString(2, project.atomFeed.toASCIIString()); insertProject.executeUpdate(); } private final PreparedStatement selectAll = prepareStatement("SELECT slug FROM gitorious_project"); - public List<String> selectAll() throws SQLException { + + public List<String> selectSlugs() throws SQLException { try (ResultSet rs = selectAll.executeQuery()) { List<String> list = new ArrayList<>(); - while(rs.next()) { + while (rs.next()) { list.add(rs.getString(1)); } return list; } } + private final PreparedStatement selectFeeds = prepareStatement("SELECT slug, atom_feed FROM gitorious_project"); + + public List<P2<String, URI>> selectFeeds() throws SQLException { + try (ResultSet rs = selectFeeds.executeQuery()) { + List<P2<String, URI>> list = new ArrayList<>(); + while (rs.next()) { + list.add(P.p(rs.getString(1), URI.create(rs.getString(2)))); + } + return list; + } + } + private final PreparedStatement delete = prepareStatement("DELETE FROM gitorious_project WHERE slug=?"); + public void delete(String slug) throws SQLException { delete.setString(1, slug); delete.executeUpdate(); diff --git a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousRepositoryDao.java b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousRepositoryDao.java index a88f122..5f57441 100644 --- a/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousRepositoryDao.java +++ b/src/main/java/io/trygvis/esper/testing/gitorious/GitoriousRepositoryDao.java @@ -10,6 +10,7 @@ public class GitoriousRepositoryDao extends Dao { } private final PreparedStatement countRepositories = prepareStatement("SELECT count(*) FROM gitorious_repository WHERE project_slug=? and name=?"); + public int countRepositories(GitoriousRepository repository) throws SQLException { countRepositories.setString(1, repository.projectSlug); countRepositories.setString(2, repository.name); @@ -19,26 +20,29 @@ public class GitoriousRepositoryDao extends Dao { } } - private final PreparedStatement selectForProject = prepareStatement("SELECT project_slug, name, atom_feed FROM gitorious_repository WHERE project_slug=?"); - public List<GitoriousRepository> selectForProject(String projectSlug) throws Exception { + private final PreparedStatement selectForProject = prepareStatement("SELECT project_slug, name FROM gitorious_repository WHERE project_slug=?"); + + public List<GitoriousRepository> selectForProject(String projectSlug) throws SQLException { selectForProject.setString(1, projectSlug); return executeQuery(selectForProject); } - private final PreparedStatement selectAll = prepareStatement("SELECT project_slug, name, atom_feed FROM gitorious_repository"); - public List<GitoriousRepository> selectAll() throws Exception { + private final PreparedStatement selectAll = prepareStatement("SELECT project_slug, name FROM gitorious_repository"); + + public List<GitoriousRepository> selectAll() throws SQLException { return executeQuery(selectAll); } - private final PreparedStatement insertRepository = prepareStatement("INSERT INTO gitorious_repository(project_slug, name, atom_feed) VALUES(?, ?, ?)"); + private final PreparedStatement insertRepository = prepareStatement("INSERT INTO gitorious_repository(project_slug, name) VALUES(?, ?)"); + public void insertRepository(GitoriousRepository repository) throws SQLException { insertRepository.setString(1, repository.projectSlug); insertRepository.setString(2, repository.name); - insertRepository.setString(3, repository.atom.toASCIIString()); insertRepository.executeUpdate(); } private final PreparedStatement delete = prepareStatement("DELETE FROM gitorious_repository WHERE project_slug=? and name=?"); + public void delete(GitoriousRepository repository) throws SQLException { delete.setString(1, repository.projectSlug); delete.setString(2, repository.name); @@ -46,20 +50,20 @@ public class GitoriousRepositoryDao extends Dao { } private final PreparedStatement deleteForProject = prepareStatement("DELETE FROM gitorious_repository WHERE project_slug=?"); + public void deleteForProject(String project) throws SQLException { deleteForProject.setString(1, project); deleteForProject.executeUpdate(); } - private List<GitoriousRepository> executeQuery(PreparedStatement statement) throws SQLException, URISyntaxException { + private List<GitoriousRepository> executeQuery(PreparedStatement statement) throws SQLException { try (ResultSet rs = statement.executeQuery()) { List<GitoriousRepository> list = new ArrayList<>(); - while(rs.next()) { + while (rs.next()) { list.add(new GitoriousRepository( rs.getString(1), - rs.getString(2), - new URI(rs.getString(3)) + rs.getString(2) )); } |