From 7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 23 Jun 2013 09:37:57 +0200 Subject: o Initial import of JDBC queue. --- .../java/io/trygvis/async/QueueController.java | 201 +++++++++++++++++++++ 1 file changed, 201 insertions(+) create mode 100644 src/main/java/io/trygvis/async/QueueController.java (limited to 'src/main/java/io/trygvis/async/QueueController.java') diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java new file mode 100644 index 0000000..a343d42 --- /dev/null +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -0,0 +1,201 @@ +package io.trygvis.async; + +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; +import static io.trygvis.queue.QueueService.TaskExecutionRequest; +import static io.trygvis.queue.Task.TaskState.NEW; + +public class QueueController { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueSystem queueSystem; + + private final SqlEffectExecutor sqlEffectExecutor; + + private final TaskEffect taskEffect; + + private final TaskExecutionRequest req; + + public final QueueExecutor queue; + + private boolean shouldRun = true; + + private boolean checkForNewTasks; + + private boolean running; + + private Thread thread; + + private ScheduledThreadPoolExecutor executor; + + private List stopListeners = new ArrayList<>(); + + public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { + this.queueSystem = queueSystem; + this.req = req; + this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; + this.taskEffect = taskEffect; + this.queue = queue; + } + + public void scheduleAll(final List tasks) throws InterruptedException { + ExecutorCompletionService service = new ExecutorCompletionService<>(executor); + + for (final Task task : tasks) { + service.submit(new Callable() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(taskEffect, task); + } + }); + } + } + + public void invokeAll(final List tasks) throws InterruptedException { + List> callables = new ArrayList<>(tasks.size()); + for (final Task task : tasks) { + callables.add(new Callable() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(taskEffect, task); + } + }); + } + + executor.invokeAll(callables); + } + + public void hint(final long id) throws SQLException { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + Task task = queueSystem.createTaskDao(c).findById(id); + + try { + scheduleAll(Collections.singletonList(task)); + } catch (InterruptedException e) { + throw new SQLException(e); + } + } + }); + } + + public synchronized void addOnStopListener(Runnable runnable) { + stopListeners.add(runnable); + } + + private class QueueThread implements Runnable { + public void run() { + while (shouldRun) { + List tasks = null; + + try { + tasks = sqlEffectExecutor.transaction(new SqlEffect>() { + public List doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize); + } + }); + + log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); + + if (tasks.size() > 0) { + invokeAll(tasks); + } + } catch (Throwable e) { + if (shouldRun) { + log.warn("Error while executing tasks.", e); + } + } + + // If we found exactly the same number of tasks that we asked for, there is most likely more to go. + if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) { + log.info("Got a full chunk, continuing directly."); + continue; + } + + synchronized (this) { + if (checkForNewTasks) { + log.info("Ping received!"); + checkForNewTasks = false; + } else { + try { + wait(req.interval(queue.queue)); + } catch (InterruptedException e) { + // ignore + } + } + } + } + + log.info("Thread for queue {} has stopped.", queue.queue.name); + running = false; + synchronized (this) { + this.notifyAll(); + } + } + } + + public synchronized void start(ScheduledThreadPoolExecutor executor) { + if (running) { + throw new IllegalStateException("Already running"); + } + + log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval); + + running = true; + this.executor = executor; + + thread = new Thread(new QueueThread(), "queue: " + queue.queue.name); + thread.setDaemon(true); + thread.start(); + } + + public synchronized void stop() { + if (!running) { + return; + } + + log.info("Stopping thread for queue {}", queue.queue.name); + + for (Runnable runnable : stopListeners) { + try { + runnable.run(); + } catch (Throwable e) { + log.error("Error while running stop listener " + runnable, e); + } + } + + shouldRun = false; + + // TODO: empty out the currently executing tasks. + + thread.interrupt(); + while (running) { + try { + wait(1000); + } catch (InterruptedException e) { + // continue + } + thread.interrupt(); + } + thread = null; + executor.shutdownNow(); + } +} -- cgit v1.2.3