From 54d7b2ce520e57cc0ffb9582546b80a32fa00682 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Wed, 12 Jun 2013 22:55:18 +0200 Subject: wip --- .../java/io/trygvis/async/JdbcAsyncService.java | 41 ++++--- src/main/java/io/trygvis/async/QueueThread.java | 79 ++++++++++--- src/main/java/io/trygvis/queue/QueueSystem.java | 5 + .../io/trygvis/spring/SpringJdbcAsyncService.java | 2 +- .../java/io/trygvis/test/PlainJavaExample.java | 126 --------------------- .../java/io/trygvis/test/PlainJavaExample2.java | 84 -------------- .../io/trygvis/test/jdbc/AsyncConsumerExample.java | 73 ++++++++++++ .../io/trygvis/test/jdbc/PlainJavaExample.java | 126 +++++++++++++++++++++ 8 files changed, 290 insertions(+), 246 deletions(-) delete mode 100644 src/test/java/io/trygvis/test/PlainJavaExample.java delete mode 100644 src/test/java/io/trygvis/test/PlainJavaExample2.java create mode 100644 src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java create mode 100644 src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 6baa56e..ddfa150 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -19,7 +19,6 @@ import java.util.concurrent.ScheduledThreadPoolExecutor; import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; -import static java.util.concurrent.TimeUnit.SECONDS; public class JdbcAsyncService { private final Logger log = LoggerFactory.getLogger(getClass()); @@ -34,7 +33,7 @@ public class JdbcAsyncService { this.queueService = queueSystem.createQueueService(); } - public void registerQueue(Queue queue, TaskEffect processor) { + public synchronized void registerQueue(Queue queue, TaskEffect processor) { final QueueThread queueThread = new QueueThread(queueSystem, processor, queue); queues.put(queue.name, queueThread); @@ -42,31 +41,22 @@ public class JdbcAsyncService { log.info("registerQueue: LEAVE"); } - public void startQueue(ScheduledThreadPoolExecutor executor, String name) { - final QueueThread queueThread = queues.get(name); + public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) { + getQueueThread(queue.name).start(executor); + } + + public synchronized void stopQueue(Queue queue) { + QueueThread queueThread = queues.remove(queue.name); if (queueThread == null) { - throw new RuntimeException("No such queue: " + name); + throw new RuntimeException("No such queue: '" + queue.name + "'."); } - long interval = queueThread.queue.interval; - log.info("Starting thread for queue {} with poll interval = {}s", name, interval); - executor.scheduleAtFixedRate(new Runnable() { - public void run() { - queueThread.ping(); - } - }, 10, interval, SECONDS); - Thread thread = new Thread(queueThread, name); - thread.setDaemon(true); - thread.start(); + queueThread.stop(); } public Queue getQueue(String name) { - QueueThread queueThread = queues.get(name); - - if (queueThread == null) { - throw new RuntimeException("No such queue: '" + name + "'."); - } + QueueThread queueThread = getQueueThread(name); return queueThread.queue; } @@ -80,8 +70,6 @@ public class JdbcAsyncService { } private Task scheduleInner(Connection c, Long parent, final Queue queue, List args) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - Date scheduled = new Date(); Task task = queueService.schedule(c, queue, parent, scheduled, args); @@ -116,4 +104,13 @@ public class JdbcAsyncService { return taskDao.findById(ref.id()); } + + private QueueThread getQueueThread(String name) { + QueueThread queueThread = queues.get(name); + + if (queueThread == null) { + throw new RuntimeException("No such queue: '" + name + "'."); + } + return queueThread; + } } diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index ea77911..61196b6 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -11,6 +11,7 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; @@ -28,11 +29,15 @@ class QueueThread implements Runnable { public final Queue queue; - public boolean shouldRun = true; + private boolean shouldRun = true; private boolean checkForNewTasks; - private boolean busy; + private boolean available; + + private boolean running; + + private Thread thread; QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) { this.queueSystem = queueSystem; @@ -44,9 +49,10 @@ class QueueThread implements Runnable { public void ping() { synchronized (this) { - if (!busy) { + System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks); + if (available) { log.info("Sending ping to " + queue); - notify(); + notifyAll(); } else { checkForNewTasks = true; } @@ -70,27 +76,74 @@ class QueueThread implements Runnable { if (tasks.size() > 0) { queueService.executeTask(req, taskEffect, tasks); } + + // If we found exactly the same number of tasks that we asked for, there is most likely more to go. + if (tasks.size() == req.chunkSize) { + continue; + } } catch (Throwable e) { - log.warn("Error while executing tasks.", e); + if (shouldRun) { + log.warn("Error while executing tasks.", e); + } } synchronized (this) { - busy = false; + available = true; if (checkForNewTasks) { log.info("Ping received!"); checkForNewTasks = false; - continue; + } else { + try { + wait(queue.interval); + } catch (InterruptedException e) { + // ignore + } } - try { - wait(); - } catch (InterruptedException e) { - // ignore - } + available = false; + } + } + + log.info("Thread for queue {} has stopped.", queue.name); + running = false; + synchronized (this) { + this.notifyAll(); + } + } + + public synchronized void start(ScheduledThreadPoolExecutor executor) { + if (running) { + throw new IllegalStateException("Already running"); + } - busy = true; + log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval); + + running = true; + + thread = new Thread(this, queue.name); + thread.setDaemon(true); + thread.start(); + } + + public synchronized void stop() { + if (!running) { + return; + } + + log.info("Stopping thread for queue {}", queue.name); + + shouldRun = false; + + thread.interrupt(); + while (running) { + try { + wait(1000); + } catch (InterruptedException e) { + // continue } + thread.interrupt(); } + thread = null; } } diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java index 42c8fd8..3b0c018 100644 --- a/src/main/java/io/trygvis/queue/QueueSystem.java +++ b/src/main/java/io/trygvis/queue/QueueSystem.java @@ -1,5 +1,6 @@ package io.trygvis.queue; +import io.trygvis.async.JdbcAsyncService; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import org.slf4j.Logger; @@ -55,4 +56,8 @@ public class QueueSystem { public TaskDao createTaskDao(Connection c) { return new TaskDao(c); } + + public JdbcAsyncService createAsyncService() { + return new JdbcAsyncService(this); + } } diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java index 96442e6..b27e94d 100644 --- a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java +++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java @@ -46,7 +46,7 @@ public class SpringJdbcAsyncService implements AsyncService { public void afterCompletion(int status) { log.info("Transaction completed with status = {}", status); if (status == TransactionSynchronization.STATUS_COMMITTED) { - jdbcAsyncService.startQueue(executor, queue.name); + jdbcAsyncService.startQueue(queue, executor); } } }); diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java deleted file mode 100644 index cad8559..0000000 --- a/src/test/java/io/trygvis/test/PlainJavaExample.java +++ /dev/null @@ -1,126 +0,0 @@ -package io.trygvis.test; - -import io.trygvis.async.SqlEffect; -import io.trygvis.async.SqlEffectExecutor; -import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; -import io.trygvis.queue.QueueService; -import io.trygvis.queue.QueueStats; -import io.trygvis.queue.QueueSystem; -import io.trygvis.queue.Task; -import io.trygvis.queue.TaskDao; -import io.trygvis.queue.TaskEffect; - -import javax.sql.DataSource; -import java.sql.Connection; -import java.sql.SQLException; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.Random; - -import static io.trygvis.queue.Task.TaskState; -import static io.trygvis.test.DbUtil.createDataSource; -import static java.lang.System.currentTimeMillis; -import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; - -public class PlainJavaExample { - private static final Random r = new Random(); - - private static String inputName = "my-input"; - private static String outputName = "my-output"; - - private static int interval = 10; - - public static class Consumer { - public static void main(String[] args) throws Exception { - System.out.println("Starting consumer"); - - DataSource ds = createDataSource(); - - SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); - - final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - final JdbcQueueService queueService = queueSystem.queueService; - - Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect() { - @Override - public Queue[] doInConnection(Connection c) throws SQLException { - Queue[] queues = { - queueService.lookupQueue(c, inputName, interval, true), - queueService.lookupQueue(c, outputName, interval, true)}; - - TaskDao taskDao = queueSystem.createTaskDao(c); - - QueueStats stats = taskDao.findQueueStatsByName(inputName); - System.out.println("Queue stats for " + stats.name + ". Total number of tasks: " + stats.totalTaskCount); - for (Map.Entry entry : stats.states.entrySet()) { - System.out.println(entry.getKey() + " = " + entry.getValue()); - } - - return queues; - } - }); - - final Queue input = queues[0]; - final Queue output = queues[1]; - - QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false); - - queueService.consumeAll(input, req, new TaskEffect() { - public List apply(Task task) throws Exception { - Long a = Long.valueOf(task.arguments.get(0)); - Long b = Long.valueOf(task.arguments.get(1)); - - System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); - - if (r.nextInt(3000) > 0) { - return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b))); - } - - throw new RuntimeException("Simulated exception while processing task."); - } - }); - System.out.println("Done"); - } - } - - public static class Producer { - public static void main(String[] args) throws Exception { - System.out.println("Starting producer"); - - int chunks = 100; - final int chunk = 10000; - - DataSource ds = createDataSource(); - - SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); - - QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - final JdbcQueueService queueService = queueSystem.queueService; - - final Queue queue; - try (Connection c = ds.getConnection()) { - queue = queueService.lookupQueue(c, inputName, interval, true); - c.commit(); - } - - for (int i = 0; i < chunks; i++) { - long start = currentTimeMillis(); - sqlEffectExecutor.transaction(new SqlEffect.Void() { - @Override - public void doInConnection(Connection c) throws SQLException { - for (int j = 0; j < chunk; j++) { - queueService.schedule(c, queue, new Date(), asList("10", "20")); - } - } - }); - long end = currentTimeMillis(); - - long time = end - start; - System.out.println("Scheduled " + chunk + " tasks in " + time + "ms, " + (((double) chunk * 1000)) / ((double) time) + " chunks per second"); - } - } - } -} diff --git a/src/test/java/io/trygvis/test/PlainJavaExample2.java b/src/test/java/io/trygvis/test/PlainJavaExample2.java deleted file mode 100644 index faeebb2..0000000 --- a/src/test/java/io/trygvis/test/PlainJavaExample2.java +++ /dev/null @@ -1,84 +0,0 @@ -package io.trygvis.test; - -import io.trygvis.async.JdbcAsyncService; -import io.trygvis.async.SqlEffectExecutor; -import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; -import io.trygvis.queue.QueueSystem; -import io.trygvis.queue.Task; -import io.trygvis.queue.TaskEffect; - -import javax.sql.DataSource; -import java.sql.Connection; -import java.util.Date; -import java.util.List; - -import static io.trygvis.queue.Task.newTask; -import static io.trygvis.test.DbUtil.createDataSource; -import static java.util.Arrays.asList; -import static java.util.Collections.singletonList; - -public class PlainJavaExample2 { - - private static String inputName = "my-input"; - private static String outputName = "my-output"; - - private static int interval = 10; - - private static final TaskEffect adder = new TaskEffect() { - public List apply(Task task) throws Exception { - System.out.println("PlainJavaExample$Consumer.consumeAll"); - Long a = Long.valueOf(task.arguments.get(0)); - Long b = Long.valueOf(task.arguments.get(1)); - - System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); - - return singletonList(newTask(outputName, new Date(), Long.toString(a + b))); - } - }; - - public static class Consumer { - public static void main(String[] args) throws Exception { - System.out.println("Starting consumer"); - - DataSource ds = createDataSource(); - Connection c = ds.getConnection(); - - SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); - - QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - JdbcQueueService queueService = queueSystem.queueService; - - final Queue input = queueService.lookupQueue(c, inputName, interval, true); - final Queue output = queueService.lookupQueue(c, outputName, interval, true); - - JdbcAsyncService asyncService = new JdbcAsyncService(queueSystem); - - asyncService.registerQueue(input, adder); - -// queueService.consumeAll(c, input, adder); - - c.commit(); - } - } - - public static class Producer { - public static void main(String[] args) throws Exception { - System.out.println("Starting producer"); - - DataSource ds = createDataSource(); - Connection c = ds.getConnection(); - - SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); - - QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - JdbcQueueService queueService = queueSystem.queueService; - - Queue queue = queueService.lookupQueue(c, inputName, interval, true); - - queueService.schedule(c, queue, new Date(), asList("10", "20")); - - c.commit(); - } - } -} diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java new file mode 100644 index 0000000..8d981f2 --- /dev/null +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -0,0 +1,73 @@ +package io.trygvis.test.jdbc; + +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.SqlEffect; +import io.trygvis.async.SqlEffectExecutor; +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.Task.newTask; +import static io.trygvis.test.DbUtil.createDataSource; +import static java.util.Collections.singletonList; + +public class AsyncConsumerExample { + + private static String inputName = "my-input"; + private static String outputName = "my-output"; + + private static int interval = 10; + + private static final TaskEffect adder = new TaskEffect() { + public List apply(Task task) throws Exception { + Long a = Long.valueOf(task.arguments.get(0)); + Long b = Long.valueOf(task.arguments.get(1)); + + System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); + + return singletonList(newTask(outputName, new Date(), Long.toString(a + b))); + } + }; + + public static class Consumer { + public static void main(String[] args) throws Exception { + System.out.println("Starting consumer"); + + DataSource ds = createDataSource(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + JdbcAsyncService asyncService = queueSystem.createAsyncService(); + final JdbcQueueService queueService = queueSystem.queueService; + + Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public Queue[] doInConnection(Connection c) throws SQLException { + return new Queue[]{ + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true) + }; + } + }); + + final Queue input = queues[0]; + final Queue output = queues[1]; + + asyncService.registerQueue(input, adder); + + asyncService.startQueue(input, new ScheduledThreadPoolExecutor(2)); + Thread.sleep(5 * 1000); + asyncService.stopQueue(input); + } + } +} diff --git a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java new file mode 100644 index 0000000..994c310 --- /dev/null +++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java @@ -0,0 +1,126 @@ +package io.trygvis.test.jdbc; + +import io.trygvis.async.SqlEffect; +import io.trygvis.async.SqlEffectExecutor; +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueStats; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskDao; +import io.trygvis.queue.TaskEffect; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static io.trygvis.queue.Task.TaskState; +import static io.trygvis.test.DbUtil.createDataSource; +import static java.lang.System.currentTimeMillis; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + +public class PlainJavaExample { + private static final Random r = new Random(); + + private static String inputName = "my-input"; + private static String outputName = "my-output"; + + private static int interval = 10; + + public static class Consumer { + public static void main(String[] args) throws Exception { + System.out.println("Starting consumer"); + + DataSource ds = createDataSource(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + final JdbcQueueService queueService = queueSystem.queueService; + + Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public Queue[] doInConnection(Connection c) throws SQLException { + Queue[] queues = { + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true)}; + + TaskDao taskDao = queueSystem.createTaskDao(c); + + QueueStats stats = taskDao.findQueueStatsByName(inputName); + System.out.println("Queue stats for " + stats.name + ". Total number of tasks: " + stats.totalTaskCount); + for (Map.Entry entry : stats.states.entrySet()) { + System.out.println(entry.getKey() + " = " + entry.getValue()); + } + + return queues; + } + }); + + final Queue input = queues[0]; + final Queue output = queues[1]; + + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false); + + queueService.consumeAll(input, req, new TaskEffect() { + public List apply(Task task) throws Exception { + Long a = Long.valueOf(task.arguments.get(0)); + Long b = Long.valueOf(task.arguments.get(1)); + + System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); + + if (r.nextInt(3000) > 0) { + return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b))); + } + + throw new RuntimeException("Simulated exception while processing task."); + } + }); + System.out.println("Done"); + } + } + + public static class Producer { + public static void main(String[] args) throws Exception { + System.out.println("Starting producer"); + + int chunks = 100; + final int chunk = 10000; + + DataSource ds = createDataSource(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + final JdbcQueueService queueService = queueSystem.queueService; + + final Queue queue; + try (Connection c = ds.getConnection()) { + queue = queueService.lookupQueue(c, inputName, interval, true); + c.commit(); + } + + for (int i = 0; i < chunks; i++) { + long start = currentTimeMillis(); + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (int j = 0; j < chunk; j++) { + queueService.schedule(c, queue, new Date(), asList("10", "20")); + } + } + }); + long end = currentTimeMillis(); + + long time = end - start; + System.out.println("Scheduled " + chunk + " tasks in " + time + "ms, " + (((double) chunk * 1000)) / ((double) time) + " chunks per second"); + } + } + } +} -- cgit v1.2.3