From 54d7b2ce520e57cc0ffb9582546b80a32fa00682 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 12 Jun 2013 22:55:18 +0200 Subject: wip --- src/main/java/io/trygvis/async/QueueThread.java | 79 +++++++++++++++++++++---- 1 file changed, 66 insertions(+), 13 deletions(-) (limited to 'src/main/java/io/trygvis/async/QueueThread.java') diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index ea77911..61196b6 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; @@ -28,11 +29,15 @@ class QueueThread implements Runnable { public final Queue queue; - public boolean shouldRun = true; + private boolean shouldRun = true; private boolean checkForNewTasks; - private boolean busy; + private boolean available; + + private boolean running; + + private Thread thread; QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) { this.queueSystem = queueSystem; @@ -44,9 +49,10 @@ class QueueThread implements Runnable { public void ping() { synchronized (this) { - if (!busy) { + System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks); + if (available) { log.info("Sending ping to " + queue); - notify(); + notifyAll(); } else { checkForNewTasks = true; } @@ -70,27 +76,74 @@ class QueueThread implements Runnable { if (tasks.size() > 0) { queueService.executeTask(req, taskEffect, tasks); } + + // If we found exactly the same number of tasks that we asked for, there is most likely more to go. + if (tasks.size() == req.chunkSize) { + continue; + } } catch (Throwable e) { - log.warn("Error while executing tasks.", e); + if (shouldRun) { + log.warn("Error while executing tasks.", e); + } } synchronized (this) { - busy = false; + available = true; if (checkForNewTasks) { log.info("Ping received!"); checkForNewTasks = false; - continue; + } else { + try { + wait(queue.interval); + } catch (InterruptedException e) { + // ignore + } } - try { - wait(); - } catch (InterruptedException e) { - // ignore - } + available = false; + } + } + + log.info("Thread for queue {} has stopped.", queue.name); + running = false; + synchronized (this) { + this.notifyAll(); + } + } + + public synchronized void start(ScheduledThreadPoolExecutor executor) { + if (running) { + throw new IllegalStateException("Already running"); + } - busy = true; + log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval); + + running = true; + + thread = new Thread(this, queue.name); + thread.setDaemon(true); + thread.start(); + } + + public synchronized void stop() { + if (!running) { + return; + } + + log.info("Stopping thread for queue {}", queue.name); + + shouldRun = false; + + thread.interrupt(); + while (running) { + try { + wait(1000); + } catch (InterruptedException e) { + // continue } + thread.interrupt(); } + thread = null; } } -- cgit v1.2.3