From c8c863ce36f57954369a0b4a15e6c5e720f03f87 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 22 Dec 2012 00:31:00 +0100 Subject: o Moving stuff to utils package. --- .../trygvis/esper/testing/object/ObjectUtil.java | 132 --------------------- 1 file changed, 132 deletions(-) delete mode 100644 src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java (limited to 'src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java') diff --git a/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java b/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java deleted file mode 100644 index 0e9912b..0000000 --- a/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java +++ /dev/null @@ -1,132 +0,0 @@ -package io.trygvis.esper.testing.object; - -import org.slf4j.*; - -import javax.sql.*; -import java.io.*; -import java.sql.*; -import java.util.concurrent.*; - -import static java.lang.System.currentTimeMillis; - -public class ObjectUtil { - - public static ActorRef threadedActor(String threadName, long delay, DataSource dataSource, String name, A actor) { - return new ThreadedActor<>(dataSource, threadName, name, actor, delay); - } - - public static ActorRef scheduledActorWithFixedDelay(ScheduledExecutorService scheduledExecutorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, String name, A actor) { - return new ScheduledActor<>(scheduledExecutorService, initialDelay, delay, unit, dataSource, name, actor); - } - - private static class TransactionalActorWrapper implements Runnable { - private static final Logger logger = LoggerFactory.getLogger(TransactionalActorWrapper.class); - - private final DataSource dataSource; - private final String name; - private final A actor; - - TransactionalActorWrapper(DataSource dataSource, String name, A actor) { - this.dataSource = dataSource; - this.name = name; - this.actor = actor; - } - - public void run() { - try { - Connection c = dataSource.getConnection(); - - try { - try (PreparedStatement s = c.prepareStatement("set application_name = 'Actor: " + name + "';")) { -// s.setString(1, "Actor: " + name); - s.executeUpdate(); - s.close(); - } - - actor.act(c); - long start = currentTimeMillis(); - c.commit(); - long end = currentTimeMillis(); - logger.debug("COMMIT performed in in " + (end - start) + "ms."); - } - catch(SQLException e) { - c.rollback(); - throw e; - } finally { - c.close(); - } - } catch (Throwable e) { - logger.warn("Exception in thread " + Thread.currentThread().getName()); - e.printStackTrace(System.out); - } - } - } - - static class ScheduledActor implements ActorRef, Runnable { - private final ScheduledFuture future; - - private final TransactionalActorWrapper actor; - - ScheduledActor(ScheduledExecutorService executorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, String name, A actor) { - future = executorService.scheduleWithFixedDelay(this, initialDelay, delay, unit); - this.actor = new TransactionalActorWrapper<>(dataSource, name, actor); - } - - public A underlying() { - return actor.actor; - } - - public void close() throws IOException { - future.cancel(true); - } - - @Override - public void run() { - actor.run(); - } - } - - static class ThreadedActor implements ActorRef, Runnable, Closeable { - - private final TransactionalActorWrapper actor; - private final long delay; - private final Thread thread; - private boolean shouldRun = true; - - ThreadedActor(DataSource dataSource, String threadName, String name, A actor, long delay) { - this.actor = new TransactionalActorWrapper<>(dataSource, name, actor); - this.delay = delay; - thread = new Thread(this, threadName); - thread.setDaemon(true); - thread.start(); - } - - public A underlying() { - return actor.actor; - } - - @SuppressWarnings("ConstantConditions") - public void run() { - while (shouldRun) { - actor.run(); - - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - // ignore - } - } - } - - public void close() throws IOException { - shouldRun = false; - thread.interrupt(); - while (thread.isAlive()) { - try { - thread.join(); - } catch (InterruptedException ignore) { - } - } - } - } -} -- cgit v1.2.3