From 7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 23 Jun 2013 09:37:57 +0200 Subject: o Initial import of JDBC queue. --- src/main/java/io/trygvis/async/AsyncService.java | 30 +++ .../java/io/trygvis/async/JdbcAsyncService.java | 75 ++++++++ .../java/io/trygvis/async/QueueController.java | 201 +++++++++++++++++++++ src/main/java/io/trygvis/async/QueueStats.java | 28 +++ .../io/trygvis/async/TaskFailureException.java | 7 + 5 files changed, 341 insertions(+) create mode 100755 src/main/java/io/trygvis/async/AsyncService.java create mode 100644 src/main/java/io/trygvis/async/JdbcAsyncService.java create mode 100644 src/main/java/io/trygvis/async/QueueController.java create mode 100644 src/main/java/io/trygvis/async/QueueStats.java create mode 100644 src/main/java/io/trygvis/async/TaskFailureException.java (limited to 'src/main/java/io/trygvis/async') diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java new file mode 100755 index 0000000..9332596 --- /dev/null +++ b/src/main/java/io/trygvis/async/AsyncService.java @@ -0,0 +1,30 @@ +package io.trygvis.async; + +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import java.sql.SQLException; +import java.util.Date; +import java.util.List; + +/** + * A simple framework for running tasks. + */ +public interface AsyncService { + + QueueController registerQueue(Queue queue, QueueService.TaskExecutionRequest req, TaskEffect processor) throws SQLException; + + QueueExecutor getQueue(String name); + + Task schedule(Queue queue, Date scheduled, List args); + + Task schedule(Queue queue, long parent, Date scheduled, List args); + + /** + * Polls for a new state of the execution. + */ + Task update(Task ref); +} diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java new file mode 100644 index 0000000..46f1f30 --- /dev/null +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -0,0 +1,75 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.HashMap; +import java.util.Map; + +import static java.lang.System.currentTimeMillis; +import static java.lang.Thread.sleep; + +public class JdbcAsyncService { + private final Map queues = new HashMap<>(); + + private final QueueSystem queueSystem; + + public JdbcAsyncService(QueueSystem queueSystem) { + this.queueSystem = queueSystem; + } + + public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { + if (queues.containsKey(queue.queue.name)) { + throw new IllegalArgumentException("Queue already exist."); + } + + QueueController queueController = new QueueController(queueSystem, req, processor, queue); + + queues.put(queue.queue.name, queueController); + + return queueController; + } + + public QueueExecutor getQueue(String name) { + return getQueueThread(name).queue; + } + + public Task await(Connection c, Task task, long timeout) throws SQLException { + final long start = currentTimeMillis(); + final long end = start + timeout; + + while (currentTimeMillis() < end) { + task = update(c, task); + + if (task == null) { + throw new RuntimeException("The task went away."); + } + + try { + sleep(100); + } catch (InterruptedException e) { + // break + } + } + + return task; + } + + public Task update(Connection c, Task ref) throws SQLException { + return queueSystem.createTaskDao(c).findById(ref.id()); + } + + private synchronized QueueController getQueueThread(String name) { + QueueController queueController = queues.get(name); + + if (queueController == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + return queueController; + } +} diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java new file mode 100644 index 0000000..a343d42 --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -0,0 +1,201 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueController { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + private final TaskEffect taskEffect; + + private final TaskExecutionRequest req; + + public final QueueExecutor queue; + + private boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean running; + + private Thread thread; + + private ScheduledThreadPoolExecutor executor; + + private List stopListeners = new ArrayList<>(); + + public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { + this.queueSystem = queueSystem; + this.req = req; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; + this.taskEffect = taskEffect; + this.queue = queue; + } + + public void scheduleAll(final List tasks) throws InterruptedException { + ExecutorCompletionService service = new ExecutorCompletionService<>(executor); + + for (final Task task : tasks) { + service.submit(new Callable() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(taskEffect, task); + } + }); + } + } + + public void invokeAll(final List tasks) throws InterruptedException { + List> callables = new ArrayList<>(tasks.size()); + for (final Task task : tasks) { + callables.add(new Callable() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(taskEffect, task); + } + }); + } + + executor.invokeAll(callables); + } + + public void hint(final long id) throws SQLException { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + Task task = queueSystem.createTaskDao(c).findById(id); + + try { + scheduleAll(Collections.singletonList(task)); + } catch (InterruptedException e) { + throw new SQLException(e); + } + } + }); + } + + public synchronized void addOnStopListener(Runnable runnable) { + stopListeners.add(runnable); + } + + private class QueueThread implements Runnable { + public void run() { + while (shouldRun) { + List tasks = null; + + try { + tasks = sqlEffectExecutor.transaction(new SqlEffect>() { + public List doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); + + if (tasks.size() > 0) { + invokeAll(tasks); + } + } catch (Throwable e) { + if (shouldRun) { + log.warn("Error while executing tasks.", e); + } + } + + // If we found exactly the same number of tasks that we asked for, there is most likely more to go. + if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) { + log.info("Got a full chunk, continuing directly."); + continue; + } + + synchronized (this) { + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + } else { + try { + wait(req.interval(queue.queue)); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + log.info("Thread for queue {} has stopped.", queue.queue.name); + running = false; + synchronized (this) { + this.notifyAll(); + } + } + } + + public synchronized void start(ScheduledThreadPoolExecutor executor) { + if (running) { + throw new IllegalStateException("Already running"); + } + + log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval); + + running = true; + this.executor = executor; + + thread = new Thread(new QueueThread(), "queue: " + queue.queue.name); + thread.setDaemon(true); + thread.start(); + } + + public synchronized void stop() { + if (!running) { + return; + } + + log.info("Stopping thread for queue {}", queue.queue.name); + + for (Runnable runnable : stopListeners) { + try { + runnable.run(); + } catch (Throwable e) { + log.error("Error while running stop listener " + runnable, e); + } + } + + shouldRun = false; + + // TODO: empty out the currently executing tasks. + + thread.interrupt(); + while (running) { + try { + wait(1000); + } catch (InterruptedException e) { + // continue + } + thread.interrupt(); + } + thread = null; + executor.shutdownNow(); + } +} diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java new file mode 100644 index 0000000..8edc720 --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueStats.java @@ -0,0 +1,28 @@ +package io.trygvis.async; + +public class QueueStats { + public final int totalMessageCount; + public final int okMessageCount; + public final int failedMessageCount; + public final int missedMessageCount; + public final int scheduledMessageCount; + + public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) { + this.totalMessageCount = totalMessageCount; + this.okMessageCount = okMessageCount; + this.failedMessageCount = failedMessageCount; + this.missedMessageCount = missedMessageCount; + this.scheduledMessageCount = scheduledMessageCount; + } + + @Override + public String toString() { + return "QueueStats{" + + "totalMessageCount=" + totalMessageCount + + ", okMessageCount=" + okMessageCount + + ", failedMessageCount=" + failedMessageCount + + ", missedMessageCount=" + missedMessageCount + + ", scheduledMessageCount=" + scheduledMessageCount + + '}'; + } +} diff --git a/src/main/java/io/trygvis/async/TaskFailureException.java b/src/main/java/io/trygvis/async/TaskFailureException.java new file mode 100644 index 0000000..7278e17 --- /dev/null +++ b/src/main/java/io/trygvis/async/TaskFailureException.java @@ -0,0 +1,7 @@ +package io.trygvis.async; + +class TaskFailureException extends RuntimeException { + public TaskFailureException(Exception e) { + super(e); + } +} -- cgit v1.2.3