diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2012-12-21 12:16:29 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2012-12-21 12:16:29 +0100 |
commit | e7b1958ce5e93ead2d7d3c74eabe00a4186a048a (patch) | |
tree | a73bfce74bc740f76f11e5376a233eed026b93f5 /src/main/java/io/trygvis/esper/testing/core | |
parent | d25d523d2a7f7e4c3446d81740e09e487ad807d0 (diff) | |
download | esper-testing-e7b1958ce5e93ead2d7d3c74eabe00a4186a048a.tar.gz esper-testing-e7b1958ce5e93ead2d7d3c74eabe00a4186a048a.tar.bz2 esper-testing-e7b1958ce5e93ead2d7d3c74eabe00a4186a048a.tar.xz esper-testing-e7b1958ce5e93ead2d7d3c74eabe00a4186a048a.zip |
o Adding a 'core' domain module.
o Adding a table scanner and a job that converts jenkins builds to builds.
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing/core')
6 files changed, 306 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/core/BuildDto.java b/src/main/java/io/trygvis/esper/testing/core/BuildDto.java new file mode 100644 index 0000000..37de1b3 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/core/BuildDto.java @@ -0,0 +1,21 @@ +package io.trygvis.esper.testing.core; + +import io.trygvis.esper.testing.*; +import org.joda.time.*; + +import java.util.*; + +class BuildDto extends AbstractEntity { + public final DateTime timestamp; + public final boolean success; + public final UUID referenceUuid; + public final String referenceType; + + BuildDto(UUID uuid, DateTime createdDate, DateTime timestamp, boolean success, UUID referenceUuid, String referenceType) { + super(uuid, createdDate); + this.timestamp = timestamp; + this.success = success; + this.referenceUuid = referenceUuid; + this.referenceType = referenceType; + } +} diff --git a/src/main/java/io/trygvis/esper/testing/core/CoreDao.java b/src/main/java/io/trygvis/esper/testing/core/CoreDao.java new file mode 100644 index 0000000..a010dad --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/core/CoreDao.java @@ -0,0 +1,73 @@ +package io.trygvis.esper.testing.core; + +import io.trygvis.esper.testing.*; +import io.trygvis.esper.testing.sql.*; +import org.joda.time.*; + +import java.sql.*; +import java.util.*; + +import static io.trygvis.esper.testing.sql.SqlOption.*; +import static java.lang.System.*; + +public class CoreDao { + private final Connection c; + + public static final String PERSON = "uuid, created_date, name"; + + public static final SqlF<ResultSet, PersonDto> person = new SqlF<ResultSet, PersonDto>() { + public PersonDto apply(ResultSet rs) throws SQLException { + int i = 1; + return new PersonDto( + UUID.fromString(rs.getString(i++)), + new DateTime(rs.getTimestamp(i++).getTime()), + rs.getString(i)); + } + }; + + public static final String BUILD = "uuid, created_date, timestamp, success, reference_type, reference_uuid"; + + public CoreDao(Connection c) { + this.c = c; + } + + public SqlOption<PersonDto> selectPerson(String id) throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT " + PERSON + " FROM person WHERE id=?")) { + int i = 1; + s.setString(i, id); + return fromRs(s.executeQuery()).map(person); + } + } + + public SqlOption<PersonDto> selectPersonByJenkinsUuid(UUID jenkinsUser) throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT " + PERSON + " FROM person WHERE uuid=(SELECT person FROM person_jenkins_user WHERE jenkins_user=?)")) { + int i = 1; + s.setString(i, jenkinsUser.toString()); + return fromRs(s.executeQuery()).map(person); + } + } + + public UUID insertBuild(DateTime timestamp, boolean success, EntityRef ref) throws SQLException { + try (PreparedStatement s = c.prepareStatement("INSERT INTO build(" + BUILD + ") VALUES(?, ?, ?, ?, ?, ?)")) { + UUID uuid = UUID.randomUUID(); + int i = 1; + s.setString(i++, uuid.toString()); + s.setTimestamp(i++, new Timestamp(currentTimeMillis())); + s.setTimestamp(i++, new Timestamp(timestamp.getMillis())); + s.setBoolean(i++, success); + s.setString(i++, ref.type); + s.setString(i, ref.uuid.toString()); + s.executeUpdate(); + return uuid; + } + } + + public void insertBuildParticipant(UUID build, UUID person) throws SQLException { + try (PreparedStatement s = c.prepareStatement("INSERT INTO build_participant(build, person) VALUES(?, ?)")) { + int i = 1; + s.setString(i++, build.toString()); + s.setString(i, person.toString()); + s.executeUpdate(); + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/core/JenkinsBuildPoller.java b/src/main/java/io/trygvis/esper/testing/core/JenkinsBuildPoller.java new file mode 100644 index 0000000..cb9157c --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/core/JenkinsBuildPoller.java @@ -0,0 +1,53 @@ +package io.trygvis.esper.testing.core; + +import com.jolbox.bonecp.*; +import io.trygvis.esper.testing.*; +import io.trygvis.esper.testing.jenkins.*; +import io.trygvis.esper.testing.sql.*; +import org.slf4j.*; + +import java.sql.*; +import java.util.*; + +import static fj.data.Option.some; +import static io.trygvis.esper.testing.Config.*; +import static io.trygvis.esper.testing.EntityRef.jenkinsRef; + +public class JenkinsBuildPoller implements TablePoller.NewRowCallback<JenkinsBuildDto> { + Logger logger = LoggerFactory.getLogger(getClass()); + + public static void main(String[] args) throws Exception { + String pollerName = "jenkins_build"; + String tableName = "jenkins_build"; + String columnNames = JenkinsDao.JENKINS_BUILD; + SqlF<ResultSet, JenkinsBuildDto> f = JenkinsDao.jenkinsBuild; + TablePoller.NewRowCallback<JenkinsBuildDto> callback = new JenkinsBuildPoller(); + + Config config = loadFromDisk(); + + BoneCPDataSource dataSource = config.createBoneCp(); + + new TablePoller<>(pollerName, tableName, columnNames, some("array_length(users, 1) > 0"), f, callback).work(dataSource); + } + + public void process(Connection c, JenkinsBuildDto jenkinsBuild) throws SQLException { + Daos daos = new Daos(c); + CoreDao coreDao = daos.coreDao; + + UUID uuid = coreDao.insertBuild(jenkinsBuild.timestamp, "SUCCESS".equals(jenkinsBuild.result), jenkinsRef(jenkinsBuild.uuid)); + logger.info("Created build uuid={}", uuid); + + for (UUID user : jenkinsBuild.users) { + SqlOption<PersonDto> personO = coreDao.selectPersonByJenkinsUuid(user); + + // This happens if no one has claimed the user id. + if(personO.isNone()) { + continue; + } + + UUID person = personO.get().uuid; + logger.info("Created build participant, person={}", person); + coreDao.insertBuildParticipant(uuid, person); + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/core/PersonDto.java b/src/main/java/io/trygvis/esper/testing/core/PersonDto.java new file mode 100644 index 0000000..33355a3 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/core/PersonDto.java @@ -0,0 +1,15 @@ +package io.trygvis.esper.testing.core; + +import io.trygvis.esper.testing.*; +import org.joda.time.*; + +import java.util.*; + +public class PersonDto extends AbstractEntity { + public final String name; + + public PersonDto(UUID uuid, DateTime createdDate, String name) { + super(uuid, createdDate); + this.name = name; + } +} diff --git a/src/main/java/io/trygvis/esper/testing/core/TablePoller.java b/src/main/java/io/trygvis/esper/testing/core/TablePoller.java new file mode 100644 index 0000000..58a3e75 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/core/TablePoller.java @@ -0,0 +1,140 @@ +package io.trygvis.esper.testing.core; + +import fj.data.*; +import io.trygvis.esper.testing.sql.*; +import org.joda.time.format.*; +import org.slf4j.*; + +import javax.sql.*; +import java.sql.*; +import java.util.*; +import java.util.List; + +import static fj.data.Option.*; +import static java.lang.System.*; + +public class TablePoller<A> { + + private static final DateTimeFormatter formatter = ISODateTimeFormat.dateTime(); + + private final Logger logger = LoggerFactory.getLogger(getClass()); + private final String pollerName; + private final String tableName; + private final String columnNames; + private final String filter; + private final SqlF<ResultSet, A> f; + private final NewRowCallback<A> callback; + + public TablePoller(String pollerName, String tableName, String columnNames, Option<String> filter, SqlF<ResultSet, A> f, NewRowCallback<A> callback) { + this.pollerName = pollerName; + this.tableName = tableName; + this.columnNames = columnNames; + this.filter = filter.orSome("true"); + this.f = f; + this.callback = callback; + } + + public void work(DataSource dataSource) throws Exception { + while (true) { + try (Connection c = dataSource.getConnection()) { + long start = currentTimeMillis(); + TablePollerDao dao = new TablePollerDao(c); + + Option<Timestamp> o = dao.getLastCreatedDateForPoller(); + + if (o.isNone()) { + logger.info("First run of poller '{}'", pollerName); + } else { + logger.info("Running poller '{}', last run was {}", pollerName, formatter.print(o.some().getTime())); + } + + Timestamp lastCreatedDate = o.orSome(new Timestamp(0)); + + Option<Timestamp> o2 = dao.getOldestCreatedDateAfter(lastCreatedDate); + + if (o2.isSome()) { + Timestamp oldestCreatedDate = o2.some(); + + List<A> rows = dao.getRowsCreatedAt(oldestCreatedDate); + + logger.info("Processing {} rows created at {}", rows.size(), formatter.print(oldestCreatedDate.getTime())); + + for (A row : rows) { + callback.process(c, row); + } + } else { + logger.debug("No new rows."); + + Thread.sleep(1000); + } + + dao.insertOrUpdate(o.isNone(), o2.orSome(new Timestamp(start)), new Timestamp(start), currentTimeMillis() - start, null); + + c.commit(); + } + } + } + + public static interface NewRowCallback<A> { + void process(Connection c, A A) throws SQLException; + } + + private class TablePollerDao { + private final Connection c; + + private TablePollerDao(Connection c) { + this.c = c; + } + + public Option<Timestamp> getLastCreatedDateForPoller() throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT last_created_date FROM table_poller_status WHERE poller_name=?")) { + s.setString(1, pollerName); + ResultSet rs = s.executeQuery(); + if (!rs.next()) { + return none(); + } + + return some(rs.getTimestamp(1)); + } + } + + public Option<Timestamp> getOldestCreatedDateAfter(Timestamp timestamp) throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT min(created_date) FROM " + tableName + " WHERE created_date > ? AND " + filter)) { + s.setTimestamp(1, timestamp); + ResultSet rs = s.executeQuery(); + rs.next(); + return fromNull(rs.getTimestamp(1)); + } + } + + public List<A> getRowsCreatedAt(Timestamp timestamp) throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT " + columnNames + " FROM " + tableName + " WHERE created_date = ? AND " + filter)) { + s.setTimestamp(1, timestamp); + + ResultSet rs = s.executeQuery(); + + List<A> list = new ArrayList<>(); + while (rs.next()) { + list.add(f.apply(rs)); + } + return list; + } + } + + public void insertOrUpdate(boolean insert, Timestamp lastCreatedDate, Timestamp now, long duration, String status) throws SQLException { + String insertSql = "INSERT INTO table_poller_status(last_created_date, last_run, duration, status, poller_name) VALUES(?, ?, ?, ?, ?)"; + + String updateSql = "UPDATE table_poller_status SET last_created_date=?, last_run=?, duration=?, status=? WHERE poller_name=?"; + + try (PreparedStatement s = c.prepareStatement(insert ? insertSql : updateSql)) { + int i = 1; + s.setTimestamp(i++, lastCreatedDate); + s.setTimestamp(i++, now); + s.setLong(i++, duration); + s.setString(i++, status); + s.setString(i, pollerName); + s.executeUpdate(); + } + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/core/Unbreakable.java b/src/main/java/io/trygvis/esper/testing/core/Unbreakable.java new file mode 100644 index 0000000..1200516 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/core/Unbreakable.java @@ -0,0 +1,4 @@ +package io.trygvis.esper.testing.core; + +public class Unbreakable { +} |