aboutsummaryrefslogtreecommitdiff
path: root/src/main/java/io/trygvis
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-12 22:55:18 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-12 22:55:18 +0200
commit54d7b2ce520e57cc0ffb9582546b80a32fa00682 (patch)
treead68043a4f05780c6b87a494f7498297ff978953 /src/main/java/io/trygvis
parent4b0bab9e722cf77ca0049c54515e8c93acefa355 (diff)
downloadquartz-based-queue-54d7b2ce520e57cc0ffb9582546b80a32fa00682.tar.gz
quartz-based-queue-54d7b2ce520e57cc0ffb9582546b80a32fa00682.tar.bz2
quartz-based-queue-54d7b2ce520e57cc0ffb9582546b80a32fa00682.tar.xz
quartz-based-queue-54d7b2ce520e57cc0ffb9582546b80a32fa00682.zip
wip
Diffstat (limited to 'src/main/java/io/trygvis')
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java41
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java79
-rw-r--r--src/main/java/io/trygvis/queue/QueueSystem.java5
-rw-r--r--src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java2
4 files changed, 91 insertions, 36 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 6baa56e..ddfa150 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -19,7 +19,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
-import static java.util.concurrent.TimeUnit.SECONDS;
public class JdbcAsyncService {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -34,7 +33,7 @@ public class JdbcAsyncService {
this.queueService = queueSystem.createQueueService();
}
- public void registerQueue(Queue queue, TaskEffect processor) {
+ public synchronized void registerQueue(Queue queue, TaskEffect processor) {
final QueueThread queueThread = new QueueThread(queueSystem, processor, queue);
queues.put(queue.name, queueThread);
@@ -42,31 +41,22 @@ public class JdbcAsyncService {
log.info("registerQueue: LEAVE");
}
- public void startQueue(ScheduledThreadPoolExecutor executor, String name) {
- final QueueThread queueThread = queues.get(name);
+ public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) {
+ getQueueThread(queue.name).start(executor);
+ }
+
+ public synchronized void stopQueue(Queue queue) {
+ QueueThread queueThread = queues.remove(queue.name);
if (queueThread == null) {
- throw new RuntimeException("No such queue: " + name);
+ throw new RuntimeException("No such queue: '" + queue.name + "'.");
}
- long interval = queueThread.queue.interval;
- log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
- executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- queueThread.ping();
- }
- }, 10, interval, SECONDS);
- Thread thread = new Thread(queueThread, name);
- thread.setDaemon(true);
- thread.start();
+ queueThread.stop();
}
public Queue getQueue(String name) {
- QueueThread queueThread = queues.get(name);
-
- if (queueThread == null) {
- throw new RuntimeException("No such queue: '" + name + "'.");
- }
+ QueueThread queueThread = getQueueThread(name);
return queueThread.queue;
}
@@ -80,8 +70,6 @@ public class JdbcAsyncService {
}
private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
-
Date scheduled = new Date();
Task task = queueService.schedule(c, queue, parent, scheduled, args);
@@ -116,4 +104,13 @@ public class JdbcAsyncService {
return taskDao.findById(ref.id());
}
+
+ private QueueThread getQueueThread(String name) {
+ QueueThread queueThread = queues.get(name);
+
+ if (queueThread == null) {
+ throw new RuntimeException("No such queue: '" + name + "'.");
+ }
+ return queueThread;
+ }
}
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index ea77911..61196b6 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import static io.trygvis.queue.QueueService.TaskExecutionRequest;
import static io.trygvis.queue.Task.TaskState.NEW;
@@ -28,11 +29,15 @@ class QueueThread implements Runnable {
public final Queue queue;
- public boolean shouldRun = true;
+ private boolean shouldRun = true;
private boolean checkForNewTasks;
- private boolean busy;
+ private boolean available;
+
+ private boolean running;
+
+ private Thread thread;
QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) {
this.queueSystem = queueSystem;
@@ -44,9 +49,10 @@ class QueueThread implements Runnable {
public void ping() {
synchronized (this) {
- if (!busy) {
+ System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks);
+ if (available) {
log.info("Sending ping to " + queue);
- notify();
+ notifyAll();
} else {
checkForNewTasks = true;
}
@@ -70,27 +76,74 @@ class QueueThread implements Runnable {
if (tasks.size() > 0) {
queueService.executeTask(req, taskEffect, tasks);
}
+
+ // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
+ if (tasks.size() == req.chunkSize) {
+ continue;
+ }
} catch (Throwable e) {
- log.warn("Error while executing tasks.", e);
+ if (shouldRun) {
+ log.warn("Error while executing tasks.", e);
+ }
}
synchronized (this) {
- busy = false;
+ available = true;
if (checkForNewTasks) {
log.info("Ping received!");
checkForNewTasks = false;
- continue;
+ } else {
+ try {
+ wait(queue.interval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
- try {
- wait();
- } catch (InterruptedException e) {
- // ignore
- }
+ available = false;
+ }
+ }
+
+ log.info("Thread for queue {} has stopped.", queue.name);
+ running = false;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ public synchronized void start(ScheduledThreadPoolExecutor executor) {
+ if (running) {
+ throw new IllegalStateException("Already running");
+ }
- busy = true;
+ log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval);
+
+ running = true;
+
+ thread = new Thread(this, queue.name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public synchronized void stop() {
+ if (!running) {
+ return;
+ }
+
+ log.info("Stopping thread for queue {}", queue.name);
+
+ shouldRun = false;
+
+ thread.interrupt();
+ while (running) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // continue
}
+ thread.interrupt();
}
+ thread = null;
}
}
diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java
index 42c8fd8..3b0c018 100644
--- a/src/main/java/io/trygvis/queue/QueueSystem.java
+++ b/src/main/java/io/trygvis/queue/QueueSystem.java
@@ -1,5 +1,6 @@
package io.trygvis.queue;
+import io.trygvis.async.JdbcAsyncService;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import org.slf4j.Logger;
@@ -55,4 +56,8 @@ public class QueueSystem {
public TaskDao createTaskDao(Connection c) {
return new TaskDao(c);
}
+
+ public JdbcAsyncService createAsyncService() {
+ return new JdbcAsyncService(this);
+ }
}
diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
index 96442e6..b27e94d 100644
--- a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
+++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
@@ -46,7 +46,7 @@ public class SpringJdbcAsyncService implements AsyncService {
public void afterCompletion(int status) {
log.info("Transaction completed with status = {}", status);
if (status == TransactionSynchronization.STATUS_COMMITTED) {
- jdbcAsyncService.startQueue(executor, queue.name);
+ jdbcAsyncService.startQueue(queue, executor);
}
}
});