diff options
Diffstat (limited to 'src/main/java/io/trygvis/esper/testing/task')
-rw-r--r-- | src/main/java/io/trygvis/esper/testing/task/TaskDao.java | 26 | ||||
-rw-r--r-- | src/main/java/io/trygvis/esper/testing/task/TaskManager.java | 224 |
2 files changed, 250 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/esper/testing/task/TaskDao.java b/src/main/java/io/trygvis/esper/testing/task/TaskDao.java new file mode 100644 index 0000000..242eb2a --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/task/TaskDao.java @@ -0,0 +1,26 @@ +package io.trygvis.esper.testing.task; + +import java.sql.*; +import java.util.*; + +public class TaskDao { + private final Connection c; + private final String table; + + public TaskDao(Connection c, String table) { + this.c = c; + this.table = table; + } + + public List<String> findTasks() throws SQLException { + try (PreparedStatement s = c.prepareStatement("SELECT task_id FROM ? FOR UPDATE")) { + s.setString(1, table); + ResultSet rs = s.executeQuery(); + List<String> list = new ArrayList<>(); + while(rs.next()) { + list.add(rs.getString(1)); + } + return list; + } + } +} diff --git a/src/main/java/io/trygvis/esper/testing/task/TaskManager.java b/src/main/java/io/trygvis/esper/testing/task/TaskManager.java new file mode 100644 index 0000000..e4daba5 --- /dev/null +++ b/src/main/java/io/trygvis/esper/testing/task/TaskManager.java @@ -0,0 +1,224 @@ +package io.trygvis.esper.testing.task; + +import com.jolbox.bonecp.*; +import org.slf4j.*; +import org.slf4j.helpers.*; + +import java.sql.*; +import java.util.*; +import java.util.concurrent.*; + +public class TaskManager<T> { + public final String table; + public final TaskExecutorFactory taskExecutorFactory; + public final Executor executor; + public final BoneCP boneCP; + + private final Set<String> inProgress = new HashSet<>(); + + public TaskManager(String table, TaskExecutorFactory taskExecutorFactory, Executor executor, BoneCP boneCP) { + this.table = table; + this.taskExecutorFactory = taskExecutorFactory; + this.executor = executor; + this.boneCP = boneCP; + + Thread thread = new Thread(new Runnable() { + public void run() { + TaskManager.this.run(); + } + }); + thread.start(); + } + + private void run() { + while (true) { + try { + try (Connection c = boneCP.getConnection()) { + singleRun(c); + } + } catch (SQLException e) { + e.printStackTrace(System.out); + } + } + } + + private void singleRun(Connection c) throws SQLException { + TaskDao taskDao = new TaskDao(c, table); + + List<String> ids = taskDao.findTasks(); + + System.out.println("Found " + ids.size() + " new tasks."); + + synchronized (inProgress) { + System.out.println("Have " + inProgress.size() + " tasks in progress already"); + ids.removeAll(inProgress); + + List<Runnable> runnables = new ArrayList<>(ids.size()); + + for (final String id : ids) { + System.out.println("Scheduling " + id); + + final TaskExecutor executor = taskExecutorFactory.create(); + runnables.add(new Runnable() { + public void run() { + System.out.println("Executing " + id); + try { + try (Connection c2 = boneCP.getConnection()) { + SqlLogger logger = new SqlLogger(); + executor.execute(id, c2, logger); + // TODO: insert log statements + System.out.println("Executing " + id); + c2.commit(); + } + } catch (SQLException e) { + e.printStackTrace(); + } finally { + synchronized (inProgress) { + inProgress.remove(id); + } + } + } + }); + inProgress.add(id); + } + for (Runnable runnable : runnables) { + executor.execute(runnable); + } + } + } + + /** + * See MessageFormatter + */ + private static class SqlLogger extends MarkerIgnoringBase { + + public boolean isTraceEnabled() { + throw new RuntimeException("Not implemented"); + } + + public void trace(String msg) { + throw new RuntimeException("Not implemented"); + } + + public void trace(String format, Object arg) { + throw new RuntimeException("Not implemented"); + } + + public void trace(String format, Object arg1, Object arg2) { + throw new RuntimeException("Not implemented"); + } + + public void trace(String format, Object[] argArray) { + throw new RuntimeException("Not implemented"); + } + + public void trace(String msg, Throwable t) { + throw new RuntimeException("Not implemented"); + } + + public boolean isDebugEnabled() { + throw new RuntimeException("Not implemented"); + } + + public void debug(String msg) { + throw new RuntimeException("Not implemented"); + } + + public void debug(String format, Object arg) { + throw new RuntimeException("Not implemented"); + } + + public void debug(String format, Object arg1, Object arg2) { + throw new RuntimeException("Not implemented"); + } + + public void debug(String format, Object[] argArray) { + throw new RuntimeException("Not implemented"); + } + + public void debug(String msg, Throwable t) { + throw new RuntimeException("Not implemented"); + } + + public boolean isInfoEnabled() { + throw new RuntimeException("Not implemented"); + } + + public void info(String msg) { + throw new RuntimeException("Not implemented"); + } + + public void info(String format, Object arg) { + throw new RuntimeException("Not implemented"); + } + + public void info(String format, Object arg1, Object arg2) { + throw new RuntimeException("Not implemented"); + } + + public void info(String format, Object[] argArray) { + throw new RuntimeException("Not implemented"); + } + + public void info(String msg, Throwable t) { + throw new RuntimeException("Not implemented"); + } + + public boolean isWarnEnabled() { + throw new RuntimeException("Not implemented"); + } + + public void warn(String msg) { + throw new RuntimeException("Not implemented"); + } + + public void warn(String format, Object arg) { + throw new RuntimeException("Not implemented"); + } + + public void warn(String format, Object[] argArray) { + throw new RuntimeException("Not implemented"); + } + + public void warn(String format, Object arg1, Object arg2) { + throw new RuntimeException("Not implemented"); + } + + public void warn(String msg, Throwable t) { + throw new RuntimeException("Not implemented"); + } + + public boolean isErrorEnabled() { + throw new RuntimeException("Not implemented"); + } + + public void error(String msg) { + throw new RuntimeException("Not implemented"); + } + + public void error(String format, Object arg) { + throw new RuntimeException("Not implemented"); + } + + public void error(String format, Object arg1, Object arg2) { + throw new RuntimeException("Not implemented"); + } + + public void error(String format, Object[] argArray) { + throw new RuntimeException("Not implemented"); + } + + public void error(String msg, Throwable t) { + throw new RuntimeException("Not implemented"); + } + } +} + +interface TaskExecutorFactory { + TaskExecutor create(); +} + +interface TaskExecutor { + void execute(String id, Connection c, Logger logger) + throws SQLException; +} |