diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2012-11-27 16:24:01 +0100 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2012-12-07 20:06:42 +0100 |
commit | 041bab815c5c554169835993735b8e5c35d436ed (patch) | |
tree | 473774956540013316dc10bd8428350c15f20933 /src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java | |
parent | e243a6fd6c444b451398ceb659ea4963a19122d0 (diff) | |
download | esper-testing-041bab815c5c554169835993735b8e5c35d436ed.tar.gz esper-testing-041bab815c5c554169835993735b8e5c35d436ed.tar.bz2 esper-testing-041bab815c5c554169835993735b8e5c35d436ed.tar.xz esper-testing-041bab815c5c554169835993735b8e5c35d436ed.zip |
o Switching the gitorious code to the actor structure.
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java')
-rwxr-xr-x[-rw-r--r--] | src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java | 78 |
1 files changed, 61 insertions, 17 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java b/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java index 2d23822..ecfaa34 100644..100755 --- a/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java +++ b/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java @@ -3,24 +3,79 @@ package io.trygvis.esper.testing.object; import javax.sql.*; import java.io.*; import java.sql.*; +import java.util.concurrent.*; public class ObjectUtil { - public static <A extends TransactionalActor> ActorRef<A> threadedActor(DataSource dataSource, String threadName, long delay, A actor) { + public static <A extends TransactionalActor> ActorRef<A> threadedActor(String threadName, long delay, DataSource dataSource, A actor) { return new ThreadedActor<>(dataSource, threadName, actor, delay); } - static class ThreadedActor<A extends TransactionalActor> implements ActorRef<A>, Runnable, Closeable { + public static <A extends TransactionalActor> ActorRef<A> scheduledActorWithFixedDelay(ScheduledExecutorService scheduledExecutorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, A actor) { + return new ScheduledActor<>(scheduledExecutorService, initialDelay, delay, unit, dataSource, actor); + } + private static class TransactionalActorWrapper<A extends TransactionalActor> implements Runnable { private final DataSource dataSource; private final A actor; + + TransactionalActorWrapper(DataSource dataSource, A actor) { + this.dataSource = dataSource; + this.actor = actor; + } + + public void run() { + try { + Connection c = dataSource.getConnection(); + try { + actor.act(c); + c.commit(); + } + catch(SQLException e) { + c.rollback(); + } finally { + c.close(); + } + } catch (Throwable e) { + System.out.println("Exception in thread " + Thread.currentThread().getName()); + e.printStackTrace(System.out); + } + } + } + + static class ScheduledActor<A extends TransactionalActor> implements ActorRef<A>, Runnable { + private final ScheduledFuture<?> future; + + private final TransactionalActorWrapper<A> actor; + + ScheduledActor(ScheduledExecutorService executorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, A actor) { + future = executorService.scheduleWithFixedDelay(this, initialDelay, delay, unit); + this.actor = new TransactionalActorWrapper<>(dataSource, actor); + } + + public A underlying() { + return actor.actor; + } + + public void close() throws IOException { + future.cancel(true); + } + + @Override + public void run() { + actor.run(); + } + } + + static class ThreadedActor<A extends TransactionalActor> implements ActorRef<A>, Runnable, Closeable { + + private final TransactionalActorWrapper<A> actor; private final long delay; private final Thread thread; private boolean shouldRun = true; ThreadedActor(DataSource dataSource, String threadName, A actor, long delay) { - this.dataSource = dataSource; - this.actor = actor; + this.actor = new TransactionalActorWrapper<A>(dataSource, actor); this.delay = delay; thread = new Thread(this, threadName); thread.setDaemon(true); @@ -28,24 +83,13 @@ public class ObjectUtil { } public A underlying() { - return actor; + return actor.actor; } @SuppressWarnings("ConstantConditions") public void run() { while (shouldRun) { - try { - try (Connection c = dataSource.getConnection()) { - try { - actor.act(c); - } finally { - c.rollback(); - } - } - } catch (Exception e) { - System.out.println("Exception in thread " + Thread.currentThread().getName()); - e.printStackTrace(System.out); - } + actor.run(); try { Thread.sleep(delay); |