aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java41
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java79
-rw-r--r--src/main/java/io/trygvis/queue/QueueSystem.java5
-rw-r--r--src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java2
-rw-r--r--src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java (renamed from src/test/java/io/trygvis/test/PlainJavaExample2.java)57
-rw-r--r--src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java (renamed from src/test/java/io/trygvis/test/PlainJavaExample.java)2
6 files changed, 115 insertions, 71 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index 6baa56e..ddfa150 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -19,7 +19,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
-import static java.util.concurrent.TimeUnit.SECONDS;
public class JdbcAsyncService {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -34,7 +33,7 @@ public class JdbcAsyncService {
this.queueService = queueSystem.createQueueService();
}
- public void registerQueue(Queue queue, TaskEffect processor) {
+ public synchronized void registerQueue(Queue queue, TaskEffect processor) {
final QueueThread queueThread = new QueueThread(queueSystem, processor, queue);
queues.put(queue.name, queueThread);
@@ -42,31 +41,22 @@ public class JdbcAsyncService {
log.info("registerQueue: LEAVE");
}
- public void startQueue(ScheduledThreadPoolExecutor executor, String name) {
- final QueueThread queueThread = queues.get(name);
+ public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) {
+ getQueueThread(queue.name).start(executor);
+ }
+
+ public synchronized void stopQueue(Queue queue) {
+ QueueThread queueThread = queues.remove(queue.name);
if (queueThread == null) {
- throw new RuntimeException("No such queue: " + name);
+ throw new RuntimeException("No such queue: '" + queue.name + "'.");
}
- long interval = queueThread.queue.interval;
- log.info("Starting thread for queue {} with poll interval = {}s", name, interval);
- executor.scheduleAtFixedRate(new Runnable() {
- public void run() {
- queueThread.ping();
- }
- }, 10, interval, SECONDS);
- Thread thread = new Thread(queueThread, name);
- thread.setDaemon(true);
- thread.start();
+ queueThread.stop();
}
public Queue getQueue(String name) {
- QueueThread queueThread = queues.get(name);
-
- if (queueThread == null) {
- throw new RuntimeException("No such queue: '" + name + "'.");
- }
+ QueueThread queueThread = getQueueThread(name);
return queueThread.queue;
}
@@ -80,8 +70,6 @@ public class JdbcAsyncService {
}
private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
-
Date scheduled = new Date();
Task task = queueService.schedule(c, queue, parent, scheduled, args);
@@ -116,4 +104,13 @@ public class JdbcAsyncService {
return taskDao.findById(ref.id());
}
+
+ private QueueThread getQueueThread(String name) {
+ QueueThread queueThread = queues.get(name);
+
+ if (queueThread == null) {
+ throw new RuntimeException("No such queue: '" + name + "'.");
+ }
+ return queueThread;
+ }
}
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index ea77911..61196b6 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import static io.trygvis.queue.QueueService.TaskExecutionRequest;
import static io.trygvis.queue.Task.TaskState.NEW;
@@ -28,11 +29,15 @@ class QueueThread implements Runnable {
public final Queue queue;
- public boolean shouldRun = true;
+ private boolean shouldRun = true;
private boolean checkForNewTasks;
- private boolean busy;
+ private boolean available;
+
+ private boolean running;
+
+ private Thread thread;
QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) {
this.queueSystem = queueSystem;
@@ -44,9 +49,10 @@ class QueueThread implements Runnable {
public void ping() {
synchronized (this) {
- if (!busy) {
+ System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks);
+ if (available) {
log.info("Sending ping to " + queue);
- notify();
+ notifyAll();
} else {
checkForNewTasks = true;
}
@@ -70,27 +76,74 @@ class QueueThread implements Runnable {
if (tasks.size() > 0) {
queueService.executeTask(req, taskEffect, tasks);
}
+
+ // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
+ if (tasks.size() == req.chunkSize) {
+ continue;
+ }
} catch (Throwable e) {
- log.warn("Error while executing tasks.", e);
+ if (shouldRun) {
+ log.warn("Error while executing tasks.", e);
+ }
}
synchronized (this) {
- busy = false;
+ available = true;
if (checkForNewTasks) {
log.info("Ping received!");
checkForNewTasks = false;
- continue;
+ } else {
+ try {
+ wait(queue.interval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
- try {
- wait();
- } catch (InterruptedException e) {
- // ignore
- }
+ available = false;
+ }
+ }
+
+ log.info("Thread for queue {} has stopped.", queue.name);
+ running = false;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+
+ public synchronized void start(ScheduledThreadPoolExecutor executor) {
+ if (running) {
+ throw new IllegalStateException("Already running");
+ }
- busy = true;
+ log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval);
+
+ running = true;
+
+ thread = new Thread(this, queue.name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public synchronized void stop() {
+ if (!running) {
+ return;
+ }
+
+ log.info("Stopping thread for queue {}", queue.name);
+
+ shouldRun = false;
+
+ thread.interrupt();
+ while (running) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // continue
}
+ thread.interrupt();
}
+ thread = null;
}
}
diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java
index 42c8fd8..3b0c018 100644
--- a/src/main/java/io/trygvis/queue/QueueSystem.java
+++ b/src/main/java/io/trygvis/queue/QueueSystem.java
@@ -1,5 +1,6 @@
package io.trygvis.queue;
+import io.trygvis.async.JdbcAsyncService;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import org.slf4j.Logger;
@@ -55,4 +56,8 @@ public class QueueSystem {
public TaskDao createTaskDao(Connection c) {
return new TaskDao(c);
}
+
+ public JdbcAsyncService createAsyncService() {
+ return new JdbcAsyncService(this);
+ }
}
diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
index 96442e6..b27e94d 100644
--- a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
+++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
@@ -46,7 +46,7 @@ public class SpringJdbcAsyncService implements AsyncService {
public void afterCompletion(int status) {
log.info("Transaction completed with status = {}", status);
if (status == TransactionSynchronization.STATUS_COMMITTED) {
- jdbcAsyncService.startQueue(executor, queue.name);
+ jdbcAsyncService.startQueue(queue, executor);
}
}
});
diff --git a/src/test/java/io/trygvis/test/PlainJavaExample2.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
index faeebb2..8d981f2 100644
--- a/src/test/java/io/trygvis/test/PlainJavaExample2.java
+++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
@@ -1,6 +1,7 @@
-package io.trygvis.test;
+package io.trygvis.test.jdbc;
import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
@@ -10,15 +11,16 @@ import io.trygvis.queue.TaskEffect;
import javax.sql.DataSource;
import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import static io.trygvis.queue.Task.newTask;
import static io.trygvis.test.DbUtil.createDataSource;
-import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
-public class PlainJavaExample2 {
+public class AsyncConsumerExample {
private static String inputName = "my-input";
private static String outputName = "my-output";
@@ -27,7 +29,6 @@ public class PlainJavaExample2 {
private static final TaskEffect adder = new TaskEffect() {
public List<Task> apply(Task task) throws Exception {
- System.out.println("PlainJavaExample$Consumer.consumeAll");
Long a = Long.valueOf(task.arguments.get(0));
Long b = Long.valueOf(task.arguments.get(1));
@@ -42,43 +43,31 @@ public class PlainJavaExample2 {
System.out.println("Starting consumer");
DataSource ds = createDataSource();
- Connection c = ds.getConnection();
SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- JdbcQueueService queueService = queueSystem.queueService;
-
- final Queue input = queueService.lookupQueue(c, inputName, interval, true);
- final Queue output = queueService.lookupQueue(c, outputName, interval, true);
-
- JdbcAsyncService asyncService = new JdbcAsyncService(queueSystem);
+ JdbcAsyncService asyncService = queueSystem.createAsyncService();
+ final JdbcQueueService queueService = queueSystem.queueService;
+
+ Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() {
+ @Override
+ public Queue[] doInConnection(Connection c) throws SQLException {
+ return new Queue[]{
+ queueService.lookupQueue(c, inputName, interval, true),
+ queueService.lookupQueue(c, outputName, interval, true)
+ };
+ }
+ });
+
+ final Queue input = queues[0];
+ final Queue output = queues[1];
asyncService.registerQueue(input, adder);
-// queueService.consumeAll(c, input, adder);
-
- c.commit();
- }
- }
-
- public static class Producer {
- public static void main(String[] args) throws Exception {
- System.out.println("Starting producer");
-
- DataSource ds = createDataSource();
- Connection c = ds.getConnection();
-
- SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
-
- QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- JdbcQueueService queueService = queueSystem.queueService;
-
- Queue queue = queueService.lookupQueue(c, inputName, interval, true);
-
- queueService.schedule(c, queue, new Date(), asList("10", "20"));
-
- c.commit();
+ asyncService.startQueue(input, new ScheduledThreadPoolExecutor(2));
+ Thread.sleep(5 * 1000);
+ asyncService.stopQueue(input);
}
}
}
diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
index cad8559..994c310 100644
--- a/src/test/java/io/trygvis/test/PlainJavaExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
@@ -1,4 +1,4 @@
-package io.trygvis.test;
+package io.trygvis.test.jdbc;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;