diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2012-12-22 00:31:00 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2012-12-22 00:32:28 +0100 |
commit | c8c863ce36f57954369a0b4a15e6c5e720f03f87 (patch) | |
tree | 2e49e11db5be949571642ceca947bb7b2178c777 /src/main/java/io/trygvis/esper/testing/object | |
parent | 012b0864e95e120ea57433ab0e719cc6011c7647 (diff) | |
download | esper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.tar.gz esper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.tar.bz2 esper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.tar.xz esper-testing-c8c863ce36f57954369a0b4a15e6c5e720f03f87.zip |
o Moving stuff to utils package.
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing/object')
5 files changed, 0 insertions, 217 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/object/ActorRef.java b/src/main/java/io/trygvis/esper/testing/object/ActorRef.java deleted file mode 100644 index bc64da3..0000000 --- a/src/main/java/io/trygvis/esper/testing/object/ActorRef.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.trygvis.esper.testing.object; - -import java.io.*; - -public interface ActorRef<A> extends Closeable { - A underlying(); -} diff --git a/src/main/java/io/trygvis/esper/testing/object/ObjectFactory.java b/src/main/java/io/trygvis/esper/testing/object/ObjectFactory.java deleted file mode 100644 index 8e7d4b0..0000000 --- a/src/main/java/io/trygvis/esper/testing/object/ObjectFactory.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.trygvis.esper.testing.object; - -import java.io.*; - -public interface ObjectFactory<K, V extends Closeable> { - V create(K k); -} diff --git a/src/main/java/io/trygvis/esper/testing/object/ObjectManager.java b/src/main/java/io/trygvis/esper/testing/object/ObjectManager.java deleted file mode 100644 index a4fbc60..0000000 --- a/src/main/java/io/trygvis/esper/testing/object/ObjectManager.java +++ /dev/null @@ -1,64 +0,0 @@ -package io.trygvis.esper.testing.object; - -import org.slf4j.*; - -import java.io.*; -import java.util.*; - -public class ObjectManager<K, V extends Closeable> implements Closeable { - private static final Logger logger = LoggerFactory.getLogger(ObjectManager.class); - - private final String type; - private final ObjectFactory<K, V> objectFactory; - private Map<K, V> objects = new HashMap<>(); - private boolean closed = false; - - public ObjectManager(String type, Set<K> initialKeys, ObjectFactory<K, V> objectFactory) { - this.type = type; - this.objectFactory = objectFactory; - - update(new HashSet<>(initialKeys)); - } - - public synchronized void update(Collection<K> newKeys) { - if (closed) { - throw new RuntimeException("This instance is closed: type=" + type); - } - Set<K> found = new HashSet<>(newKeys); - found.removeAll(objects.keySet()); - - Set<K> gone = new HashSet<>(objects.keySet()); - gone.removeAll(newKeys); - - for (K k : gone) { - try { - logger.debug("Removing " + type + " with id=" + k); - objects.remove(k).close(); - } catch (IOException e) { - e.printStackTrace(System.out); - } - } - - for (K k : found) { - logger.debug("Adding " + type + " with id=" + k); - objects.put(k, objectFactory.create(k)); - } - } - - public synchronized void close() throws IOException { - if (closed) { - logger.warn("Already closed: type=" + type); - return; - } - update(Collections.<K>emptyList()); - closed = true; - } - - public synchronized Collection<V> getObjects() { - return new ArrayList<>(objects.values()); - } - - public void setObjects(Map<K, V> objects) { - this.objects = objects; - } -} diff --git a/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java b/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java deleted file mode 100644 index 0e9912b..0000000 --- a/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java +++ /dev/null @@ -1,132 +0,0 @@ -package io.trygvis.esper.testing.object; - -import org.slf4j.*; - -import javax.sql.*; -import java.io.*; -import java.sql.*; -import java.util.concurrent.*; - -import static java.lang.System.currentTimeMillis; - -public class ObjectUtil { - - public static <A extends TransactionalActor> ActorRef<A> threadedActor(String threadName, long delay, DataSource dataSource, String name, A actor) { - return new ThreadedActor<>(dataSource, threadName, name, actor, delay); - } - - public static <A extends TransactionalActor> ActorRef<A> scheduledActorWithFixedDelay(ScheduledExecutorService scheduledExecutorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, String name, A actor) { - return new ScheduledActor<>(scheduledExecutorService, initialDelay, delay, unit, dataSource, name, actor); - } - - private static class TransactionalActorWrapper<A extends TransactionalActor> implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(TransactionalActorWrapper.class); - - private final DataSource dataSource; - private final String name; - private final A actor; - - TransactionalActorWrapper(DataSource dataSource, String name, A actor) { - this.dataSource = dataSource; - this.name = name; - this.actor = actor; - } - - public void run() { - try { - Connection c = dataSource.getConnection(); - - try { - try (PreparedStatement s = c.prepareStatement("set application_name = 'Actor: " + name + "';")) { -// s.setString(1, "Actor: " + name); - s.executeUpdate(); - s.close(); - } - - actor.act(c); - long start = currentTimeMillis(); - c.commit(); - long end = currentTimeMillis(); - logger.debug("COMMIT performed in in " + (end - start) + "ms."); - } - catch(SQLException e) { - c.rollback(); - throw e; - } finally { - c.close(); - } - } catch (Throwable e) { - logger.warn("Exception in thread " + Thread.currentThread().getName()); - e.printStackTrace(System.out); - } - } - } - - static class ScheduledActor<A extends TransactionalActor> implements ActorRef<A>, Runnable { - private final ScheduledFuture<?> future; - - private final TransactionalActorWrapper<A> actor; - - ScheduledActor(ScheduledExecutorService executorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, String name, A actor) { - future = executorService.scheduleWithFixedDelay(this, initialDelay, delay, unit); - this.actor = new TransactionalActorWrapper<>(dataSource, name, actor); - } - - public A underlying() { - return actor.actor; - } - - public void close() throws IOException { - future.cancel(true); - } - - @Override - public void run() { - actor.run(); - } - } - - static class ThreadedActor<A extends TransactionalActor> implements ActorRef<A>, Runnable, Closeable { - - private final TransactionalActorWrapper<A> actor; - private final long delay; - private final Thread thread; - private boolean shouldRun = true; - - ThreadedActor(DataSource dataSource, String threadName, String name, A actor, long delay) { - this.actor = new TransactionalActorWrapper<>(dataSource, name, actor); - this.delay = delay; - thread = new Thread(this, threadName); - thread.setDaemon(true); - thread.start(); - } - - public A underlying() { - return actor.actor; - } - - @SuppressWarnings("ConstantConditions") - public void run() { - while (shouldRun) { - actor.run(); - - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - // ignore - } - } - } - - public void close() throws IOException { - shouldRun = false; - thread.interrupt(); - while (thread.isAlive()) { - try { - thread.join(); - } catch (InterruptedException ignore) { - } - } - } - } -} diff --git a/src/main/java/io/trygvis/esper/testing/object/TransactionalActor.java b/src/main/java/io/trygvis/esper/testing/object/TransactionalActor.java deleted file mode 100644 index 4d3cdce..0000000 --- a/src/main/java/io/trygvis/esper/testing/object/TransactionalActor.java +++ /dev/null @@ -1,7 +0,0 @@ -package io.trygvis.esper.testing.object; - -import java.sql.*; - -public interface TransactionalActor { - void act(Connection c) throws Exception; -} |