From 1ec4fae12c5e5363591013e5a759590d913d6782 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 16 Jun 2013 12:07:43 +0200 Subject: wip --- .../io/trygvis/test/jdbc/AsyncConsumerExample.java | 52 +++++++++++----------- 1 file changed, 27 insertions(+), 25 deletions(-) (limited to 'src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java') diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java index 8d981f2..dd478d7 100644 --- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -1,10 +1,12 @@ package io.trygvis.test.jdbc; import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.QueueController; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; -import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -38,36 +40,36 @@ public class AsyncConsumerExample { } }; - public static class Consumer { - public static void main(String[] args) throws Exception { - System.out.println("Starting consumer"); + public static void main(String[] args) throws Exception { + System.out.println("Starting consumer"); - DataSource ds = createDataSource(); + DataSource ds = createDataSource(); - SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); - QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - JdbcAsyncService asyncService = queueSystem.createAsyncService(); - final JdbcQueueService queueService = queueSystem.queueService; + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + JdbcAsyncService asyncService = queueSystem.createAsyncService(); + final JdbcQueueService queueService = queueSystem.createQueueService(); - Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect() { - @Override - public Queue[] doInConnection(Connection c) throws SQLException { - return new Queue[]{ - queueService.lookupQueue(c, inputName, interval, true), - queueService.lookupQueue(c, outputName, interval, true) - }; - } - }); + QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public QueueExecutor[] doInConnection(Connection c) throws SQLException { + return new QueueExecutor[]{ + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true) + }; + } + }); - final Queue input = queues[0]; - final Queue output = queues[1]; + final QueueExecutor input = queues[0]; + final QueueExecutor output = queues[1]; - asyncService.registerQueue(input, adder); + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); - asyncService.startQueue(input, new ScheduledThreadPoolExecutor(2)); - Thread.sleep(5 * 1000); - asyncService.stopQueue(input); - } + QueueController controller = asyncService.registerQueue(input, req, adder); + + controller.start(new ScheduledThreadPoolExecutor(2)); + Thread.sleep(60 * 1000); + controller.stop(); } } -- cgit v1.2.3