From c8c863ce36f57954369a0b4a15e6c5e720f03f87 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 22 Dec 2012 00:31:00 +0100 Subject: o Moving stuff to utils package. --- .../esper/testing/util/object/ActorRef.java | 7 ++ .../esper/testing/util/object/ObjectFactory.java | 7 ++ .../esper/testing/util/object/ObjectManager.java | 64 ++++++++++ .../esper/testing/util/object/ObjectUtil.java | 132 +++++++++++++++++++++ .../testing/util/object/TransactionalActor.java | 7 ++ .../trygvis/esper/testing/util/sql/ResultSetF.java | 18 +++ .../io/trygvis/esper/testing/util/sql/SqlF.java | 15 +++ .../trygvis/esper/testing/util/sql/SqlOption.java | 109 +++++++++++++++++ 8 files changed, 359 insertions(+) create mode 100644 src/main/java/io/trygvis/esper/testing/util/object/ActorRef.java create mode 100644 src/main/java/io/trygvis/esper/testing/util/object/ObjectFactory.java create mode 100644 src/main/java/io/trygvis/esper/testing/util/object/ObjectManager.java create mode 100644 src/main/java/io/trygvis/esper/testing/util/object/ObjectUtil.java create mode 100644 src/main/java/io/trygvis/esper/testing/util/object/TransactionalActor.java create mode 100644 src/main/java/io/trygvis/esper/testing/util/sql/ResultSetF.java create mode 100644 src/main/java/io/trygvis/esper/testing/util/sql/SqlF.java create mode 100644 src/main/java/io/trygvis/esper/testing/util/sql/SqlOption.java (limited to 'src/main/java/io/trygvis/esper/testing/util') diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ActorRef.java b/src/main/java/io/trygvis/esper/testing/util/object/ActorRef.java new file mode 100644 index 0000000..49986e6 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/object/ActorRef.java @@ -0,0 +1,7 @@ +package io.trygvis.esper.testing.util.object; + +import java.io.*; + +public interface ActorRef extends Closeable { + A underlying(); +} diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ObjectFactory.java b/src/main/java/io/trygvis/esper/testing/util/object/ObjectFactory.java new file mode 100644 index 0000000..ea53a46 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/object/ObjectFactory.java @@ -0,0 +1,7 @@ +package io.trygvis.esper.testing.util.object; + +import java.io.*; + +public interface ObjectFactory { + V create(K k); +} diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ObjectManager.java b/src/main/java/io/trygvis/esper/testing/util/object/ObjectManager.java new file mode 100644 index 0000000..5b9d740 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/object/ObjectManager.java @@ -0,0 +1,64 @@ +package io.trygvis.esper.testing.util.object; + +import org.slf4j.*; + +import java.io.*; +import java.util.*; + +public class ObjectManager implements Closeable { + private static final Logger logger = LoggerFactory.getLogger(ObjectManager.class); + + private final String type; + private final ObjectFactory objectFactory; + private Map objects = new HashMap<>(); + private boolean closed = false; + + public ObjectManager(String type, Set initialKeys, ObjectFactory objectFactory) { + this.type = type; + this.objectFactory = objectFactory; + + update(new HashSet<>(initialKeys)); + } + + public synchronized void update(Collection newKeys) { + if (closed) { + throw new RuntimeException("This instance is closed: type=" + type); + } + Set found = new HashSet<>(newKeys); + found.removeAll(objects.keySet()); + + Set gone = new HashSet<>(objects.keySet()); + gone.removeAll(newKeys); + + for (K k : gone) { + try { + logger.debug("Removing " + type + " with id=" + k); + objects.remove(k).close(); + } catch (IOException e) { + e.printStackTrace(System.out); + } + } + + for (K k : found) { + logger.debug("Adding " + type + " with id=" + k); + objects.put(k, objectFactory.create(k)); + } + } + + public synchronized void close() throws IOException { + if (closed) { + logger.warn("Already closed: type=" + type); + return; + } + update(Collections.emptyList()); + closed = true; + } + + public synchronized Collection getObjects() { + return new ArrayList<>(objects.values()); + } + + public void setObjects(Map objects) { + this.objects = objects; + } +} diff --git a/src/main/java/io/trygvis/esper/testing/util/object/ObjectUtil.java b/src/main/java/io/trygvis/esper/testing/util/object/ObjectUtil.java new file mode 100644 index 0000000..143a181 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/object/ObjectUtil.java @@ -0,0 +1,132 @@ +package io.trygvis.esper.testing.util.object; + +import org.slf4j.*; + +import javax.sql.*; +import java.io.*; +import java.sql.*; +import java.util.concurrent.*; + +import static java.lang.System.currentTimeMillis; + +public class ObjectUtil { + + public static ActorRef threadedActor(String threadName, long delay, DataSource dataSource, String name, A actor) { + return new ThreadedActor<>(dataSource, threadName, name, actor, delay); + } + + public static ActorRef scheduledActorWithFixedDelay(ScheduledExecutorService scheduledExecutorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, String name, A actor) { + return new ScheduledActor<>(scheduledExecutorService, initialDelay, delay, unit, dataSource, name, actor); + } + + private static class TransactionalActorWrapper implements Runnable { + private static final Logger logger = LoggerFactory.getLogger(TransactionalActorWrapper.class); + + private final DataSource dataSource; + private final String name; + private final A actor; + + TransactionalActorWrapper(DataSource dataSource, String name, A actor) { + this.dataSource = dataSource; + this.name = name; + this.actor = actor; + } + + public void run() { + try { + Connection c = dataSource.getConnection(); + + try { + try (PreparedStatement s = c.prepareStatement("set application_name = 'Actor: " + name + "';")) { +// s.setString(1, "Actor: " + name); + s.executeUpdate(); + s.close(); + } + + actor.act(c); + long start = currentTimeMillis(); + c.commit(); + long end = currentTimeMillis(); + logger.debug("COMMIT performed in in " + (end - start) + "ms."); + } + catch(SQLException e) { + c.rollback(); + throw e; + } finally { + c.close(); + } + } catch (Throwable e) { + logger.warn("Exception in thread " + Thread.currentThread().getName()); + e.printStackTrace(System.out); + } + } + } + + static class ScheduledActor implements ActorRef, Runnable { + private final ScheduledFuture future; + + private final TransactionalActorWrapper actor; + + ScheduledActor(ScheduledExecutorService executorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, String name, A actor) { + future = executorService.scheduleWithFixedDelay(this, initialDelay, delay, unit); + this.actor = new TransactionalActorWrapper<>(dataSource, name, actor); + } + + public A underlying() { + return actor.actor; + } + + public void close() throws IOException { + future.cancel(true); + } + + @Override + public void run() { + actor.run(); + } + } + + static class ThreadedActor implements ActorRef, Runnable, Closeable { + + private final TransactionalActorWrapper actor; + private final long delay; + private final Thread thread; + private boolean shouldRun = true; + + ThreadedActor(DataSource dataSource, String threadName, String name, A actor, long delay) { + this.actor = new TransactionalActorWrapper<>(dataSource, name, actor); + this.delay = delay; + thread = new Thread(this, threadName); + thread.setDaemon(true); + thread.start(); + } + + public A underlying() { + return actor.actor; + } + + @SuppressWarnings("ConstantConditions") + public void run() { + while (shouldRun) { + actor.run(); + + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + // ignore + } + } + } + + public void close() throws IOException { + shouldRun = false; + thread.interrupt(); + while (thread.isAlive()) { + try { + thread.join(); + } catch (InterruptedException ignore) { + } + } + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/util/object/TransactionalActor.java b/src/main/java/io/trygvis/esper/testing/util/object/TransactionalActor.java new file mode 100644 index 0000000..0799695 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/object/TransactionalActor.java @@ -0,0 +1,7 @@ +package io.trygvis.esper.testing.util.object; + +import java.sql.*; + +public interface TransactionalActor { + void act(Connection c) throws Exception; +} diff --git a/src/main/java/io/trygvis/esper/testing/util/sql/ResultSetF.java b/src/main/java/io/trygvis/esper/testing/util/sql/ResultSetF.java new file mode 100644 index 0000000..9e42242 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/sql/ResultSetF.java @@ -0,0 +1,18 @@ +package io.trygvis.esper.testing.util.sql; + +import java.sql.*; + +public class ResultSetF { + public static final SqlF getInt = new SqlF() { + public Integer apply(ResultSet rs) throws SQLException { + return rs.getInt(1); + } + }; + + public static final SqlF getInteger = new SqlF() { + public Integer apply(ResultSet rs) throws SQLException { + int i = rs.getInt(1); + return rs.wasNull() ? null : i; + } + }; +} diff --git a/src/main/java/io/trygvis/esper/testing/util/sql/SqlF.java b/src/main/java/io/trygvis/esper/testing/util/sql/SqlF.java new file mode 100644 index 0000000..e4e8197 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/sql/SqlF.java @@ -0,0 +1,15 @@ +package io.trygvis.esper.testing.util.sql; + +import java.sql.*; + +public abstract class SqlF { + public abstract B apply(A a) throws SQLException; + + public SqlF andThen(final SqlF f) { + return new SqlF() { + public C apply(A a) throws SQLException { + return f.apply(SqlF.this.apply(a)); + } + }; + } +} diff --git a/src/main/java/io/trygvis/esper/testing/util/sql/SqlOption.java b/src/main/java/io/trygvis/esper/testing/util/sql/SqlOption.java new file mode 100644 index 0000000..286a872 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/util/sql/SqlOption.java @@ -0,0 +1,109 @@ +package io.trygvis.esper.testing.util.sql; + +import java.sql.*; + +public abstract class SqlOption { + public static SqlOption none() { + return new None<>(); + } + + public static SqlOption some(A a) { + return new Some<>(a); + } + + public static SqlOption fromRs(ResultSet rs) throws SQLException { + if (!rs.next()) { + return none(); + } + + return some(rs); + } + + // ----------------------------------------------------------------------- + // + // ----------------------------------------------------------------------- + + public abstract SqlOption map(SqlF f) throws SQLException; + + public SqlOption flatMap(SqlF> f) throws SQLException { + SqlOption> x = map(f); + + if (x.isNone()) { + return none(); + } + + return x.get(); + } + + public abstract A get() throws SQLException; + + public abstract boolean isSome(); + + public boolean isNone() { + return !isSome(); + } + + public abstract A getOrElse(A a); + + public static SqlOption fromNull(A a) { + if (a != null) { + return some(a); + } else { + return none(); + } + } + + // ----------------------------------------------------------------------- + // + // ----------------------------------------------------------------------- + + private static class None extends SqlOption { + public SqlOption map(SqlF f) { + return none(); + } + + public A get() throws SQLException { + throw new SQLException("get() on None"); + } + + public boolean isSome() { + return false; + } + + public A getOrElse(A a) { + return a; + } + + public String toString() { + return "None"; + } + } + + private static class Some extends SqlOption { + private final A a; + + private Some(A a) { + this.a = a; + } + + public SqlOption map(SqlF f) throws SQLException { + return some(f.apply(a)); + } + + public A get() { + return a; + } + + public boolean isSome() { + return true; + } + + public A getOrElse(A a) { + return this.a; + } + + public String toString() { + return "Some(" + a + ")"; + } + } +} -- cgit v1.2.3