aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/esper/testing/task/TaskManager.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing/task/TaskManager.java')
-rw-r--r--src/main/java/io/trygvis/esper/testing/task/TaskManager.java224
1 files changed, 224 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/task/TaskManager.java b/src/main/java/io/trygvis/esper/testing/task/TaskManager.java
new file mode 100644
index 0000000..e4daba5
--- /dev/null
+++ b/src/main/java/io/trygvis/esper/testing/task/TaskManager.java
@@ -0,0 +1,224 @@
+package io.trygvis.esper.testing.task;
+
+import com.jolbox.bonecp.*;
+import org.slf4j.*;
+import org.slf4j.helpers.*;
+
+import java.sql.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+public class TaskManager<T> {
+ public final String table;
+ public final TaskExecutorFactory taskExecutorFactory;
+ public final Executor executor;
+ public final BoneCP boneCP;
+
+ private final Set<String> inProgress = new HashSet<>();
+
+ public TaskManager(String table, TaskExecutorFactory taskExecutorFactory, Executor executor, BoneCP boneCP) {
+ this.table = table;
+ this.taskExecutorFactory = taskExecutorFactory;
+ this.executor = executor;
+ this.boneCP = boneCP;
+
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ TaskManager.this.run();
+ }
+ });
+ thread.start();
+ }
+
+ private void run() {
+ while (true) {
+ try {
+ try (Connection c = boneCP.getConnection()) {
+ singleRun(c);
+ }
+ } catch (SQLException e) {
+ e.printStackTrace(System.out);
+ }
+ }
+ }
+
+ private void singleRun(Connection c) throws SQLException {
+ TaskDao taskDao = new TaskDao(c, table);
+
+ List<String> ids = taskDao.findTasks();
+
+ System.out.println("Found " + ids.size() + " new tasks.");
+
+ synchronized (inProgress) {
+ System.out.println("Have " + inProgress.size() + " tasks in progress already");
+ ids.removeAll(inProgress);
+
+ List<Runnable> runnables = new ArrayList<>(ids.size());
+
+ for (final String id : ids) {
+ System.out.println("Scheduling " + id);
+
+ final TaskExecutor executor = taskExecutorFactory.create();
+ runnables.add(new Runnable() {
+ public void run() {
+ System.out.println("Executing " + id);
+ try {
+ try (Connection c2 = boneCP.getConnection()) {
+ SqlLogger logger = new SqlLogger();
+ executor.execute(id, c2, logger);
+ // TODO: insert log statements
+ System.out.println("Executing " + id);
+ c2.commit();
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ } finally {
+ synchronized (inProgress) {
+ inProgress.remove(id);
+ }
+ }
+ }
+ });
+ inProgress.add(id);
+ }
+ for (Runnable runnable : runnables) {
+ executor.execute(runnable);
+ }
+ }
+ }
+
+ /**
+ * See MessageFormatter
+ */
+ private static class SqlLogger extends MarkerIgnoringBase {
+
+ public boolean isTraceEnabled() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void trace(String msg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void trace(String format, Object arg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void trace(String format, Object arg1, Object arg2) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void trace(String format, Object[] argArray) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void trace(String msg, Throwable t) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public boolean isDebugEnabled() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void debug(String msg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void debug(String format, Object arg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void debug(String format, Object arg1, Object arg2) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void debug(String format, Object[] argArray) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void debug(String msg, Throwable t) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public boolean isInfoEnabled() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void info(String msg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void info(String format, Object arg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void info(String format, Object arg1, Object arg2) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void info(String format, Object[] argArray) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void info(String msg, Throwable t) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public boolean isWarnEnabled() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void warn(String msg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void warn(String format, Object arg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void warn(String format, Object[] argArray) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void warn(String format, Object arg1, Object arg2) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void warn(String msg, Throwable t) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public boolean isErrorEnabled() {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void error(String msg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void error(String format, Object arg) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void error(String format, Object arg1, Object arg2) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void error(String format, Object[] argArray) {
+ throw new RuntimeException("Not implemented");
+ }
+
+ public void error(String msg, Throwable t) {
+ throw new RuntimeException("Not implemented");
+ }
+ }
+}
+
+interface TaskExecutorFactory {
+ TaskExecutor create();
+}
+
+interface TaskExecutor {
+ void execute(String id, Connection c, Logger logger)
+ throws SQLException;
+}