diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2012-12-22 00:31:00 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2012-12-22 00:32:28 +0100 |
commit | c8c863ce36f57954369a0b4a15e6c5e720f03f87 (patch) | |
tree | 2e49e11db5be949571642ceca947bb7b2178c777 /src/main/java/io/trygvis/esper/testing/util | |
parent | 012b0864e95e120ea57433ab0e719cc6011c7647 (diff) | |
download | esper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.tar.gz esper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.tar.bz2 esper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.tar.xz esper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.zip |
o Moving stuff to utils package.
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing/util')
8 files changed, 359 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ActorRef.java b/src/main/java/io/trygvis/esper/testing/util/object/ActorRef.java new file mode 100644 index 0000000..49986e6 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/object/ActorRef.java @@ -0,0 +1,7 @@ +package io.trygvis.esper.testing.util.object; + +import java.io.*; + +public interface ActorRef<A> extends Closeable { + A underlying(); +} diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ObjectFactory.java b/src/main/java/io/trygvis/esper/testing/util/object/ObjectFactory.java new file mode 100644 index 0000000..ea53a46 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/object/ObjectFactory.java @@ -0,0 +1,7 @@ +package io.trygvis.esper.testing.util.object; + +import java.io.*; + +public interface ObjectFactory<K, V extends Closeable> { + V create(K k); +} diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ObjectManager.java b/src/main/java/io/trygvis/esper/testing/util/object/ObjectManager.java new file mode 100644 index 0000000..5b9d740 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/object/ObjectManager.java @@ -0,0 +1,64 @@ +package io.trygvis.esper.testing.util.object; + +import org.slf4j.*; + +import java.io.*; +import java.util.*; + +public class ObjectManager<K, V extends Closeable> implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(ObjectManager.class); + + private final String type; + private final ObjectFactory<K, V> objectFactory; + private Map<K, V> objects = new HashMap<>(); + private boolean closed = false; + + public ObjectManager(String type, Set<K> initialKeys, ObjectFactory<K, V> objectFactory) { + this.type = type; + this.objectFactory = objectFactory; + + update(new HashSet<>(initialKeys)); + } + + public synchronized void update(Collection<K> newKeys) { + if (closed) { + throw new RuntimeException("This instance is closed: type=" + type); + } + Set<K> found = new HashSet<>(newKeys); + found.removeAll(objects.keySet()); + + Set<K> gone = new HashSet<>(objects.keySet()); + gone.removeAll(newKeys); + + for (K k : gone) { + try { + logger.debug("Removing " + type + " with id=" + k); + objects.remove(k).close(); + } catch (IOException e) { + e.printStackTrace(System.out); + } + } + + for (K k : found) { + logger.debug("Adding " + type + " with id=" + k); + objects.put(k, objectFactory.create(k)); + } + } + + public synchronized void close() throws IOException { + if (closed) { + logger.warn("Already closed: type=" + type); + return; + } + update(Collections.<K>emptyList()); + closed = true; + } + + public synchronized Collection<V> getObjects() { + return new ArrayList<>(objects.values()); + } + + public void setObjects(Map<K, V> objects) { + this.objects = objects; + } +} diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ObjectUtil.java b/src/main/java/io/trygvis/esper/testing/util/object/ObjectUtil.java new file mode 100644 index 0000000..143a181 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/object/ObjectUtil.java @@ -0,0 +1,132 @@ +package io.trygvis.esper.testing.util.object; + +import org.slf4j.*; + +import javax.sql.*; +import java.io.*; +import java.sql.*; +import java.util.concurrent.*; + +import static java.lang.System.currentTimeMillis; + +public class ObjectUtil { + + public static <A extends TransactionalActor> ActorRef<A> threadedActor(String threadName, long delay, DataSource dataSource, String name, A actor) { + return new ThreadedActor<>(dataSource, threadName, name, actor, delay); + } + + public static <A extends TransactionalActor> ActorRef<A> scheduledActorWithFixedDelay(ScheduledExecutorService scheduledExecutorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, String name, A actor) { + return new ScheduledActor<>(scheduledExecutorService, initialDelay, delay, unit, dataSource, name, actor); + } + + private static class TransactionalActorWrapper<A extends TransactionalActor> implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(TransactionalActorWrapper.class); + + private final DataSource dataSource; + private final String name; + private final A actor; + + TransactionalActorWrapper(DataSource dataSource, String name, A actor) { + this.dataSource = dataSource; + this.name = name; + this.actor = actor; + } + + public void run() { + try { + Connection c = dataSource.getConnection(); + + try { + try (PreparedStatement s = c.prepareStatement("set application_name = 'Actor: " + name + "';")) { +// s.setString(1, "Actor: " + name); + s.executeUpdate(); + s.close(); + } + + actor.act(c); + long start = currentTimeMillis(); + c.commit(); + long end = currentTimeMillis(); + logger.debug("COMMIT performed in in " + (end - start) + "ms."); + } + catch(SQLException e) { + c.rollback(); + throw e; + } finally { + c.close(); + } + } catch (Throwable e) { + logger.warn("Exception in thread " + Thread.currentThread().getName()); + e.printStackTrace(System.out); + } + } + } + + static class ScheduledActor<A extends TransactionalActor> implements ActorRef<A>, Runnable { + private final ScheduledFuture<?> future; + + private final TransactionalActorWrapper<A> actor; + + ScheduledActor(ScheduledExecutorService executorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, String name, A actor) { + future = executorService.scheduleWithFixedDelay(this, initialDelay, delay, unit); + this.actor = new TransactionalActorWrapper<>(dataSource, name, actor); + } + + public A underlying() { + return actor.actor; + } + + public void close() throws IOException { + future.cancel(true); + } + + @Override + public void run() { + actor.run(); + } + } + + static class ThreadedActor<A extends TransactionalActor> implements ActorRef<A>, Runnable, Closeable { + + private final TransactionalActorWrapper<A> actor; + private final long delay; + private final Thread thread; + private boolean shouldRun = true; + + ThreadedActor(DataSource dataSource, String threadName, String name, A actor, long delay) { + this.actor = new TransactionalActorWrapper<>(dataSource, name, actor); + this.delay = delay; + thread = new Thread(this, threadName); + thread.setDaemon(true); + thread.start(); + } + + public A underlying() { + return actor.actor; + } + + @SuppressWarnings("ConstantConditions") + public void run() { + while (shouldRun) { + actor.run(); + + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + // ignore + } + } + } + + public void close() throws IOException { + shouldRun = false; + thread.interrupt(); + while (thread.isAlive()) { + try { + thread.join(); + } catch (InterruptedException ignore) { + } + } + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/util/object/TransactionalActor.java b/src/main/java/io/trygvis/esper/testing/util/object/TransactionalActor.java new file mode 100644 index 0000000..0799695 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/object/TransactionalActor.java @@ -0,0 +1,7 @@ +package io.trygvis.esper.testing.util.object; + +import java.sql.*; + +public interface TransactionalActor { + void act(Connection c) throws Exception; +} diff --git a/src/main/java/io/trygvis/esper/testing/util/sql/ResultSetF.java b/src/main/java/io/trygvis/esper/testing/util/sql/ResultSetF.java new file mode 100644 index 0000000..9e42242 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/sql/ResultSetF.java @@ -0,0 +1,18 @@ +package io.trygvis.esper.testing.util.sql; + +import java.sql.*; + +public class ResultSetF { + public static final SqlF<ResultSet, Integer> getInt = new SqlF<ResultSet, Integer>() { + public Integer apply(ResultSet rs) throws SQLException { + return rs.getInt(1); + } + }; + + public static final SqlF<ResultSet, Integer> getInteger = new SqlF<ResultSet, Integer>() { + public Integer apply(ResultSet rs) throws SQLException { + int i = rs.getInt(1); + return rs.wasNull() ? null : i; + } + }; +} diff --git a/src/main/java/io/trygvis/esper/testing/util/sql/SqlF.java b/src/main/java/io/trygvis/esper/testing/util/sql/SqlF.java new file mode 100644 index 0000000..e4e8197 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/sql/SqlF.java @@ -0,0 +1,15 @@ +package io.trygvis.esper.testing.util.sql; + +import java.sql.*; + +public abstract class SqlF<A, B> { + public abstract B apply(A a) throws SQLException; + + public <C> SqlF<A, C> andThen(final SqlF<B, C> f) { + return new SqlF<A, C>() { + public C apply(A a) throws SQLException { + return f.apply(SqlF.this.apply(a)); + } + }; + } +} diff --git a/src/main/java/io/trygvis/esper/testing/util/sql/SqlOption.java b/src/main/java/io/trygvis/esper/testing/util/sql/SqlOption.java new file mode 100644 index 0000000..286a872 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/sql/SqlOption.java @@ -0,0 +1,109 @@ +package io.trygvis.esper.testing.util.sql; + +import java.sql.*; + +public abstract class SqlOption<A> { + public static <A> SqlOption<A> none() { + return new None<>(); + } + + public static <A> SqlOption<A> some(A a) { + return new Some<>(a); + } + + public static SqlOption<ResultSet> fromRs(ResultSet rs) throws SQLException { + if (!rs.next()) { + return none(); + } + + return some(rs); + } + + // ----------------------------------------------------------------------- + // + // ----------------------------------------------------------------------- + + public abstract <B> SqlOption<B> map(SqlF<A, B> f) throws SQLException; + + public <B> SqlOption<B> flatMap(SqlF<A, SqlOption<B>> f) throws SQLException { + SqlOption<SqlOption<B>> x = map(f); + + if (x.isNone()) { + return none(); + } + + return x.get(); + } + + public abstract A get() throws SQLException; + + public abstract boolean isSome(); + + public boolean isNone() { + return !isSome(); + } + + public abstract A getOrElse(A a); + + public static <A> SqlOption<A> fromNull(A a) { + if (a != null) { + return some(a); + } else { + return none(); + } + } + + // ----------------------------------------------------------------------- + // + // ----------------------------------------------------------------------- + + private static class None<A> extends SqlOption<A> { + public <B> SqlOption<B> map(SqlF<A, B> f) { + return none(); + } + + public A get() throws SQLException { + throw new SQLException("get() on None"); + } + + public boolean isSome() { + return false; + } + + public A getOrElse(A a) { + return a; + } + + public String toString() { + return "None"; + } + } + + private static class Some<A> extends SqlOption<A> { + private final A a; + + private Some(A a) { + this.a = a; + } + + public <B> SqlOption<B> map(SqlF<A, B> f) throws SQLException { + return some(f.apply(a)); + } + + public A get() { + return a; + } + + public boolean isSome() { + return true; + } + + public A getOrElse(A a) { + return this.a; + } + + public String toString() { + return "Some(" + a + ")"; + } + } +} |