aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/esper/testing/util
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2012-12-22 00:31:00 +0100
committerTrygve Laugstøl <trygvis@inamo.no>2012-12-22 00:32:28 +0100
commitc8c863ce36f57954369a0b4a15e6c5e720f03f87 (patch)
tree2e49e11db5be949571642ceca947bb7b2178c777 /src/main/java/io/trygvis/esper/testing/util
parent012b0864e95e120ea57433ab0e719cc6011c7647 (diff)
downloadesper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.tar.gz
esper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.tar.bz2
esper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.tar.xz
esper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.zip
o Moving stuff to utils package.
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing/util')
-rw-r--r--src/main/java/io/trygvis/esper/testing/util/object/ActorRef.java7
-rw-r--r--src/main/java/io/trygvis/esper/testing/util/object/ObjectFactory.java7
-rw-r--r--src/main/java/io/trygvis/esper/testing/util/object/ObjectManager.java64
-rw-r--r--src/main/java/io/trygvis/esper/testing/util/object/ObjectUtil.java132
-rw-r--r--src/main/java/io/trygvis/esper/testing/util/object/TransactionalActor.java7
-rw-r--r--src/main/java/io/trygvis/esper/testing/util/sql/ResultSetF.java18
-rw-r--r--src/main/java/io/trygvis/esper/testing/util/sql/SqlF.java15
-rw-r--r--src/main/java/io/trygvis/esper/testing/util/sql/SqlOption.java109
8 files changed, 359 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ActorRef.java b/src/main/java/io/trygvis/esper/testing/util/object/ActorRef.java
new file mode 100644
index 0000000..49986e6
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/util/object/ActorRef.java
@@ -0,0 +1,7 @@
+package io.trygvis.esper.testing.util.object;
+
+import java.io.*;
+
+public interface ActorRef<A> extends Closeable {
+ A underlying();
+}
diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ObjectFactory.java b/src/main/java/io/trygvis/esper/testing/util/object/ObjectFactory.java
new file mode 100644
index 0000000..ea53a46
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/util/object/ObjectFactory.java
@@ -0,0 +1,7 @@
+package io.trygvis.esper.testing.util.object;
+
+import java.io.*;
+
+public interface ObjectFactory<K, V extends Closeable> {
+ V create(K k);
+}
diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ObjectManager.java b/src/main/java/io/trygvis/esper/testing/util/object/ObjectManager.java
new file mode 100644
index 0000000..5b9d740
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/util/object/ObjectManager.java
@@ -0,0 +1,64 @@
+package io.trygvis.esper.testing.util.object;
+
+import org.slf4j.*;
+
+import java.io.*;
+import java.util.*;
+
+public class ObjectManager<K, V extends Closeable> implements Closeable {
+ private static final Logger logger = LoggerFactory.getLogger(ObjectManager.class);
+
+ private final String type;
+ private final ObjectFactory<K, V> objectFactory;
+ private Map<K, V> objects = new HashMap<>();
+ private boolean closed = false;
+
+ public ObjectManager(String type, Set<K> initialKeys, ObjectFactory<K, V> objectFactory) {
+ this.type = type;
+ this.objectFactory = objectFactory;
+
+ update(new HashSet<>(initialKeys));
+ }
+
+ public synchronized void update(Collection<K> newKeys) {
+ if (closed) {
+ throw new RuntimeException("This instance is closed: type=" + type);
+ }
+ Set<K> found = new HashSet<>(newKeys);
+ found.removeAll(objects.keySet());
+
+ Set<K> gone = new HashSet<>(objects.keySet());
+ gone.removeAll(newKeys);
+
+ for (K k : gone) {
+ try {
+ logger.debug("Removing " + type + " with id=" + k);
+ objects.remove(k).close();
+ } catch (IOException e) {
+ e.printStackTrace(System.out);
+ }
+ }
+
+ for (K k : found) {
+ logger.debug("Adding " + type + " with id=" + k);
+ objects.put(k, objectFactory.create(k));
+ }
+ }
+
+ public synchronized void close() throws IOException {
+ if (closed) {
+ logger.warn("Already closed: type=" + type);
+ return;
+ }
+ update(Collections.<K>emptyList());
+ closed = true;
+ }
+
+ public synchronized Collection<V> getObjects() {
+ return new ArrayList<>(objects.values());
+ }
+
+ public void setObjects(Map<K, V> objects) {
+ this.objects = objects;
+ }
+}
diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ObjectUtil.java b/src/main/java/io/trygvis/esper/testing/util/object/ObjectUtil.java
new file mode 100644
index 0000000..143a181
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/util/object/ObjectUtil.java
@@ -0,0 +1,132 @@
+package io.trygvis.esper.testing.util.object;
+
+import org.slf4j.*;
+
+import javax.sql.*;
+import java.io.*;
+import java.sql.*;
+import java.util.concurrent.*;
+
+import static java.lang.System.currentTimeMillis;
+
+public class ObjectUtil {
+
+ public static <A extends TransactionalActor> ActorRef<A> threadedActor(String threadName, long delay, DataSource dataSource, String name, A actor) {
+ return new ThreadedActor<>(dataSource, threadName, name, actor, delay);
+ }
+
+ public static <A extends TransactionalActor> ActorRef<A> scheduledActorWithFixedDelay(ScheduledExecutorService scheduledExecutorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, String name, A actor) {
+ return new ScheduledActor<>(scheduledExecutorService, initialDelay, delay, unit, dataSource, name, actor);
+ }
+
+ private static class TransactionalActorWrapper<A extends TransactionalActor> implements Runnable {
+ private static final Logger logger = LoggerFactory.getLogger(TransactionalActorWrapper.class);
+
+ private final DataSource dataSource;
+ private final String name;
+ private final A actor;
+
+ TransactionalActorWrapper(DataSource dataSource, String name, A actor) {
+ this.dataSource = dataSource;
+ this.name = name;
+ this.actor = actor;
+ }
+
+ public void run() {
+ try {
+ Connection c = dataSource.getConnection();
+
+ try {
+ try (PreparedStatement s = c.prepareStatement("set application_name = 'Actor: " + name + "';")) {
+// s.setString(1, "Actor: " + name);
+ s.executeUpdate();
+ s.close();
+ }
+
+ actor.act(c);
+ long start = currentTimeMillis();
+ c.commit();
+ long end = currentTimeMillis();
+ logger.debug("COMMIT performed in in " + (end - start) + "ms.");
+ }
+ catch(SQLException e) {
+ c.rollback();
+ throw e;
+ } finally {
+ c.close();
+ }
+ } catch (Throwable e) {
+ logger.warn("Exception in thread " + Thread.currentThread().getName());
+ e.printStackTrace(System.out);
+ }
+ }
+ }
+
+ static class ScheduledActor<A extends TransactionalActor> implements ActorRef<A>, Runnable {
+ private final ScheduledFuture<?> future;
+
+ private final TransactionalActorWrapper<A> actor;
+
+ ScheduledActor(ScheduledExecutorService executorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, String name, A actor) {
+ future = executorService.scheduleWithFixedDelay(this, initialDelay, delay, unit);
+ this.actor = new TransactionalActorWrapper<>(dataSource, name, actor);
+ }
+
+ public A underlying() {
+ return actor.actor;
+ }
+
+ public void close() throws IOException {
+ future.cancel(true);
+ }
+
+ @Override
+ public void run() {
+ actor.run();
+ }
+ }
+
+ static class ThreadedActor<A extends TransactionalActor> implements ActorRef<A>, Runnable, Closeable {
+
+ private final TransactionalActorWrapper<A> actor;
+ private final long delay;
+ private final Thread thread;
+ private boolean shouldRun = true;
+
+ ThreadedActor(DataSource dataSource, String threadName, String name, A actor, long delay) {
+ this.actor = new TransactionalActorWrapper<>(dataSource, name, actor);
+ this.delay = delay;
+ thread = new Thread(this, threadName);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public A underlying() {
+ return actor.actor;
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ public void run() {
+ while (shouldRun) {
+ actor.run();
+
+ try {
+ Thread.sleep(delay);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ public void close() throws IOException {
+ shouldRun = false;
+ thread.interrupt();
+ while (thread.isAlive()) {
+ try {
+ thread.join();
+ } catch (InterruptedException ignore) {
+ }
+ }
+ }
+ }
+}
diff --git a/src/main/java/io/trygvis/esper/testing/util/object/TransactionalActor.java b/src/main/java/io/trygvis/esper/testing/util/object/TransactionalActor.java
new file mode 100644
index 0000000..0799695
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/util/object/TransactionalActor.java
@@ -0,0 +1,7 @@
+package io.trygvis.esper.testing.util.object;
+
+import java.sql.*;
+
+public interface TransactionalActor {
+ void act(Connection c) throws Exception;
+}
diff --git a/src/main/java/io/trygvis/esper/testing/util/sql/ResultSetF.java b/src/main/java/io/trygvis/esper/testing/util/sql/ResultSetF.java
new file mode 100644
index 0000000..9e42242
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/util/sql/ResultSetF.java
@@ -0,0 +1,18 @@
+package io.trygvis.esper.testing.util.sql;
+
+import java.sql.*;
+
+public class ResultSetF {
+ public static final SqlF<ResultSet, Integer> getInt = new SqlF<ResultSet, Integer>() {
+ public Integer apply(ResultSet rs) throws SQLException {
+ return rs.getInt(1);
+ }
+ };
+
+ public static final SqlF<ResultSet, Integer> getInteger = new SqlF<ResultSet, Integer>() {
+ public Integer apply(ResultSet rs) throws SQLException {
+ int i = rs.getInt(1);
+ return rs.wasNull() ? null : i;
+ }
+ };
+}
diff --git a/src/main/java/io/trygvis/esper/testing/util/sql/SqlF.java b/src/main/java/io/trygvis/esper/testing/util/sql/SqlF.java
new file mode 100644
index 0000000..e4e8197
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/util/sql/SqlF.java
@@ -0,0 +1,15 @@
+package io.trygvis.esper.testing.util.sql;
+
+import java.sql.*;
+
+public abstract class SqlF<A, B> {
+ public abstract B apply(A a) throws SQLException;
+
+ public <C> SqlF<A, C> andThen(final SqlF<B, C> f) {
+ return new SqlF<A, C>() {
+ public C apply(A a) throws SQLException {
+ return f.apply(SqlF.this.apply(a));
+ }
+ };
+ }
+}
diff --git a/src/main/java/io/trygvis/esper/testing/util/sql/SqlOption.java b/src/main/java/io/trygvis/esper/testing/util/sql/SqlOption.java
new file mode 100644
index 0000000..286a872
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/util/sql/SqlOption.java
@@ -0,0 +1,109 @@
+package io.trygvis.esper.testing.util.sql;
+
+import java.sql.*;
+
+public abstract class SqlOption<A> {
+ public static <A> SqlOption<A> none() {
+ return new None<>();
+ }
+
+ public static <A> SqlOption<A> some(A a) {
+ return new Some<>(a);
+ }
+
+ public static SqlOption<ResultSet> fromRs(ResultSet rs) throws SQLException {
+ if (!rs.next()) {
+ return none();
+ }
+
+ return some(rs);
+ }
+
+ // -----------------------------------------------------------------------
+ //
+ // -----------------------------------------------------------------------
+
+ public abstract <B> SqlOption<B> map(SqlF<A, B> f) throws SQLException;
+
+ public <B> SqlOption<B> flatMap(SqlF<A, SqlOption<B>> f) throws SQLException {
+ SqlOption<SqlOption<B>> x = map(f);
+
+ if (x.isNone()) {
+ return none();
+ }
+
+ return x.get();
+ }
+
+ public abstract A get() throws SQLException;
+
+ public abstract boolean isSome();
+
+ public boolean isNone() {
+ return !isSome();
+ }
+
+ public abstract A getOrElse(A a);
+
+ public static <A> SqlOption<A> fromNull(A a) {
+ if (a != null) {
+ return some(a);
+ } else {
+ return none();
+ }
+ }
+
+ // -----------------------------------------------------------------------
+ //
+ // -----------------------------------------------------------------------
+
+ private static class None<A> extends SqlOption<A> {
+ public <B> SqlOption<B> map(SqlF<A, B> f) {
+ return none();
+ }
+
+ public A get() throws SQLException {
+ throw new SQLException("get() on None");
+ }
+
+ public boolean isSome() {
+ return false;
+ }
+
+ public A getOrElse(A a) {
+ return a;
+ }
+
+ public String toString() {
+ return "None";
+ }
+ }
+
+ private static class Some<A> extends SqlOption<A> {
+ private final A a;
+
+ private Some(A a) {
+ this.a = a;
+ }
+
+ public <B> SqlOption<B> map(SqlF<A, B> f) throws SQLException {
+ return some(f.apply(a));
+ }
+
+ public A get() {
+ return a;
+ }
+
+ public boolean isSome() {
+ return true;
+ }
+
+ public A getOrElse(A a) {
+ return this.a;
+ }
+
+ public String toString() {
+ return "Some(" + a + ")";
+ }
+ }
+}