aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis/async
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/java/io/trygvis/async')
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java30
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java75
-rw-r--r--src/main/java/io/trygvis/async/QueueController.java201
-rw-r--r--src/main/java/io/trygvis/async/QueueStats.java28
-rw-r--r--src/main/java/io/trygvis/async/TaskFailureException.java7
5 files changed, 341 insertions, 0 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
new file mode 100755
index 0000000..9332596
--- /dev/null
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -0,0 +1,30 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+
+/**
+ * A simple framework for running tasks.
+ */
+public interface AsyncService {
+
+ QueueController registerQueue(Queue queue, QueueService.TaskExecutionRequest req, TaskEffect processor) throws SQLException;
+
+ QueueExecutor getQueue(String name);
+
+ Task schedule(Queue queue, Date scheduled, List<String> args);
+
+ Task schedule(Queue queue, long parent, Date scheduled, List<String> args);
+
+ /**
+ * Polls for a new state of the execution.
+ */
+ Task update(Task ref);
+}
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
new file mode 100644
index 0000000..46f1f30
--- /dev/null
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -0,0 +1,75 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static java.lang.System.currentTimeMillis;
+import static java.lang.Thread.sleep;
+
+public class JdbcAsyncService {
+ private final Map<String, QueueController> queues = new HashMap<>();
+
+ private final QueueSystem queueSystem;
+
+ public JdbcAsyncService(QueueSystem queueSystem) {
+ this.queueSystem = queueSystem;
+ }
+
+ public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) {
+ if (queues.containsKey(queue.queue.name)) {
+ throw new IllegalArgumentException("Queue already exist.");
+ }
+
+ QueueController queueController = new QueueController(queueSystem, req, processor, queue);
+
+ queues.put(queue.queue.name, queueController);
+
+ return queueController;
+ }
+
+ public QueueExecutor getQueue(String name) {
+ return getQueueThread(name).queue;
+ }
+
+ public Task await(Connection c, Task task, long timeout) throws SQLException {
+ final long start = currentTimeMillis();
+ final long end = start + timeout;
+
+ while (currentTimeMillis() < end) {
+ task = update(c, task);
+
+ if (task == null) {
+ throw new RuntimeException("The task went away.");
+ }
+
+ try {
+ sleep(100);
+ } catch (InterruptedException e) {
+ // break
+ }
+ }
+
+ return task;
+ }
+
+ public Task update(Connection c, Task ref) throws SQLException {
+ return queueSystem.createTaskDao(c).findById(ref.id());
+ }
+
+ private synchronized QueueController getQueueThread(String name) {
+ QueueController queueController = queues.get(name);
+
+ if (queueController == null) {
+ throw new RuntimeException("No such queue: '" + name + "'.");
+ }
+ return queueController;
+ }
+}
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java
new file mode 100644
index 0000000..a343d42
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueController.java
@@ -0,0 +1,201 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.SqlEffect;
+import io.trygvis.queue.SqlEffectExecutor;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult;
+import static io.trygvis.queue.QueueService.TaskExecutionRequest;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
+public class QueueController {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ private final TaskEffect taskEffect;
+
+ private final TaskExecutionRequest req;
+
+ public final QueueExecutor queue;
+
+ private boolean shouldRun = true;
+
+ private boolean checkForNewTasks;
+
+ private boolean running;
+
+ private Thread thread;
+
+ private ScheduledThreadPoolExecutor executor;
+
+ private List<Runnable> stopListeners = new ArrayList<>();
+
+ public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) {
+ this.queueSystem = queueSystem;
+ this.req = req;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
+ this.taskEffect = taskEffect;
+ this.queue = queue;
+ }
+
+ public void scheduleAll(final List<Task> tasks) throws InterruptedException {
+ ExecutorCompletionService<TaskExecutionResult> service = new ExecutorCompletionService<>(executor);
+
+ for (final Task task : tasks) {
+ service.submit(new Callable<TaskExecutionResult>() {
+ @Override
+ public TaskExecutionResult call() throws Exception {
+ return queue.applyTask(taskEffect, task);
+ }
+ });
+ }
+ }
+
+ public void invokeAll(final List<Task> tasks) throws InterruptedException {
+ List<Callable<TaskExecutionResult>> callables = new ArrayList<>(tasks.size());
+ for (final Task task : tasks) {
+ callables.add(new Callable<TaskExecutionResult>() {
+ @Override
+ public TaskExecutionResult call() throws Exception {
+ return queue.applyTask(taskEffect, task);
+ }
+ });
+ }
+
+ executor.invokeAll(callables);
+ }
+
+ public void hint(final long id) throws SQLException {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ Task task = queueSystem.createTaskDao(c).findById(id);
+
+ try {
+ scheduleAll(Collections.singletonList(task));
+ } catch (InterruptedException e) {
+ throw new SQLException(e);
+ }
+ }
+ });
+ }
+
+ public synchronized void addOnStopListener(Runnable runnable) {
+ stopListeners.add(runnable);
+ }
+
+ private class QueueThread implements Runnable {
+ public void run() {
+ while (shouldRun) {
+ List<Task> tasks = null;
+
+ try {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name);
+
+ if (tasks.size() > 0) {
+ invokeAll(tasks);
+ }
+ } catch (Throwable e) {
+ if (shouldRun) {
+ log.warn("Error while executing tasks.", e);
+ }
+ }
+
+ // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
+ if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) {
+ log.info("Got a full chunk, continuing directly.");
+ continue;
+ }
+
+ synchronized (this) {
+ if (checkForNewTasks) {
+ log.info("Ping received!");
+ checkForNewTasks = false;
+ } else {
+ try {
+ wait(req.interval(queue.queue));
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ log.info("Thread for queue {} has stopped.", queue.queue.name);
+ running = false;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+ }
+
+ public synchronized void start(ScheduledThreadPoolExecutor executor) {
+ if (running) {
+ throw new IllegalStateException("Already running");
+ }
+
+ log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval);
+
+ running = true;
+ this.executor = executor;
+
+ thread = new Thread(new QueueThread(), "queue: " + queue.queue.name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public synchronized void stop() {
+ if (!running) {
+ return;
+ }
+
+ log.info("Stopping thread for queue {}", queue.queue.name);
+
+ for (Runnable runnable : stopListeners) {
+ try {
+ runnable.run();
+ } catch (Throwable e) {
+ log.error("Error while running stop listener " + runnable, e);
+ }
+ }
+
+ shouldRun = false;
+
+ // TODO: empty out the currently executing tasks.
+
+ thread.interrupt();
+ while (running) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ thread.interrupt();
+ }
+ thread = null;
+ executor.shutdownNow();
+ }
+}
diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java
new file mode 100644
index 0000000..8edc720
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueStats.java
@@ -0,0 +1,28 @@
+package io.trygvis.async;
+
+public class QueueStats {
+ public final int totalMessageCount;
+ public final int okMessageCount;
+ public final int failedMessageCount;
+ public final int missedMessageCount;
+ public final int scheduledMessageCount;
+
+ public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) {
+ this.totalMessageCount = totalMessageCount;
+ this.okMessageCount = okMessageCount;
+ this.failedMessageCount = failedMessageCount;
+ this.missedMessageCount = missedMessageCount;
+ this.scheduledMessageCount = scheduledMessageCount;
+ }
+
+ @Override
+ public String toString() {
+ return "QueueStats{" +
+ "totalMessageCount=" + totalMessageCount +
+ ", okMessageCount=" + okMessageCount +
+ ", failedMessageCount=" + failedMessageCount +
+ ", missedMessageCount=" + missedMessageCount +
+ ", scheduledMessageCount=" + scheduledMessageCount +
+ '}';
+ }
+}
diff --git a/src/main/java/io/trygvis/async/TaskFailureException.java b/src/main/java/io/trygvis/async/TaskFailureException.java
new file mode 100644
index 0000000..7278e17
--- /dev/null
+++ b/src/main/java/io/trygvis/async/TaskFailureException.java
@@ -0,0 +1,7 @@
+package io.trygvis.async;
+
+class TaskFailureException extends RuntimeException {
+ public TaskFailureException(Exception e) {
+ super(e);
+ }
+}