aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java')
-rwxr-xr-x[-rw-r--r--]src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java78
1 files changed, 61 insertions, 17 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java b/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java
index 2d23822..ecfaa34 100644..100755
--- a/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java
+++ b/src/main/java/io/trygvis/esper/testing/object/ObjectUtil.java
@@ -3,24 +3,79 @@ package io.trygvis.esper.testing.object;
import javax.sql.*;
import java.io.*;
import java.sql.*;
+import java.util.concurrent.*;
public class ObjectUtil {
- public static <A extends TransactionalActor> ActorRef<A> threadedActor(DataSource dataSource, String threadName, long delay, A actor) {
+ public static <A extends TransactionalActor> ActorRef<A> threadedActor(String threadName, long delay, DataSource dataSource, A actor) {
return new ThreadedActor<>(dataSource, threadName, actor, delay);
}
- static class ThreadedActor<A extends TransactionalActor> implements ActorRef<A>, Runnable, Closeable {
+ public static <A extends TransactionalActor> ActorRef<A> scheduledActorWithFixedDelay(ScheduledExecutorService scheduledExecutorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, A actor) {
+ return new ScheduledActor<>(scheduledExecutorService, initialDelay, delay, unit, dataSource, actor);
+ }
+ private static class TransactionalActorWrapper<A extends TransactionalActor> implements Runnable {
private final DataSource dataSource;
private final A actor;
+
+ TransactionalActorWrapper(DataSource dataSource, A actor) {
+ this.dataSource = dataSource;
+ this.actor = actor;
+ }
+
+ public void run() {
+ try {
+ Connection c = dataSource.getConnection();
+ try {
+ actor.act(c);
+ c.commit();
+ }
+ catch(SQLException e) {
+ c.rollback();
+ } finally {
+ c.close();
+ }
+ } catch (Throwable e) {
+ System.out.println("Exception in thread " + Thread.currentThread().getName());
+ e.printStackTrace(System.out);
+ }
+ }
+ }
+
+ static class ScheduledActor<A extends TransactionalActor> implements ActorRef<A>, Runnable {
+ private final ScheduledFuture<?> future;
+
+ private final TransactionalActorWrapper<A> actor;
+
+ ScheduledActor(ScheduledExecutorService executorService, long initialDelay, long delay, TimeUnit unit, DataSource dataSource, A actor) {
+ future = executorService.scheduleWithFixedDelay(this, initialDelay, delay, unit);
+ this.actor = new TransactionalActorWrapper<>(dataSource, actor);
+ }
+
+ public A underlying() {
+ return actor.actor;
+ }
+
+ public void close() throws IOException {
+ future.cancel(true);
+ }
+
+ @Override
+ public void run() {
+ actor.run();
+ }
+ }
+
+ static class ThreadedActor<A extends TransactionalActor> implements ActorRef<A>, Runnable, Closeable {
+
+ private final TransactionalActorWrapper<A> actor;
private final long delay;
private final Thread thread;
private boolean shouldRun = true;
ThreadedActor(DataSource dataSource, String threadName, A actor, long delay) {
- this.dataSource = dataSource;
- this.actor = actor;
+ this.actor = new TransactionalActorWrapper<A>(dataSource, actor);
this.delay = delay;
thread = new Thread(this, threadName);
thread.setDaemon(true);
@@ -28,24 +83,13 @@ public class ObjectUtil {
}
public A underlying() {
- return actor;
+ return actor.actor;
}
@SuppressWarnings("ConstantConditions")
public void run() {
while (shouldRun) {
- try {
- try (Connection c = dataSource.getConnection()) {
- try {
- actor.act(c);
- } finally {
- c.rollback();
- }
- }
- } catch (Exception e) {
- System.out.println("Exception in thread " + Thread.currentThread().getName());
- e.printStackTrace(System.out);
- }
+ actor.run();
try {
Thread.sleep(delay);