diff options
Diffstat (limited to 'src/test')
-rw-r--r-- | src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java (renamed from src/test/java/io/trygvis/test/PlainJavaExample2.java) | 57 | ||||
-rw-r--r-- | src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java (renamed from src/test/java/io/trygvis/test/PlainJavaExample.java) | 2 |
2 files changed, 24 insertions, 35 deletions
diff --git a/src/test/java/io/trygvis/test/PlainJavaExample2.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java index faeebb2..8d981f2 100644 --- a/src/test/java/io/trygvis/test/PlainJavaExample2.java +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -1,6 +1,7 @@ -package io.trygvis.test; +package io.trygvis.test.jdbc; import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; @@ -10,15 +11,16 @@ import io.trygvis.queue.TaskEffect; import javax.sql.DataSource; import java.sql.Connection; +import java.sql.SQLException; import java.util.Date; import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.Task.newTask; import static io.trygvis.test.DbUtil.createDataSource; -import static java.util.Arrays.asList; import static java.util.Collections.singletonList; -public class PlainJavaExample2 { +public class AsyncConsumerExample { private static String inputName = "my-input"; private static String outputName = "my-output"; @@ -27,7 +29,6 @@ public class PlainJavaExample2 { private static final TaskEffect adder = new TaskEffect() { public List<Task> apply(Task task) throws Exception { - System.out.println("PlainJavaExample$Consumer.consumeAll"); Long a = Long.valueOf(task.arguments.get(0)); Long b = Long.valueOf(task.arguments.get(1)); @@ -42,43 +43,31 @@ public class PlainJavaExample2 { System.out.println("Starting consumer"); DataSource ds = createDataSource(); - Connection c = ds.getConnection(); SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - JdbcQueueService queueService = queueSystem.queueService; - - final Queue input = queueService.lookupQueue(c, inputName, interval, true); - final Queue output = queueService.lookupQueue(c, outputName, interval, true); - - JdbcAsyncService asyncService = new JdbcAsyncService(queueSystem); + JdbcAsyncService asyncService = queueSystem.createAsyncService(); + final JdbcQueueService queueService = queueSystem.queueService; + + Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() { + @Override + public Queue[] doInConnection(Connection c) throws SQLException { + return new Queue[]{ + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true) + }; + } + }); + + final Queue input = queues[0]; + final Queue output = queues[1]; asyncService.registerQueue(input, adder); -// queueService.consumeAll(c, input, adder); - - c.commit(); - } - } - - public static class Producer { - public static void main(String[] args) throws Exception { - System.out.println("Starting producer"); - - DataSource ds = createDataSource(); - Connection c = ds.getConnection(); - - SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); - - QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); - JdbcQueueService queueService = queueSystem.queueService; - - Queue queue = queueService.lookupQueue(c, inputName, interval, true); - - queueService.schedule(c, queue, new Date(), asList("10", "20")); - - c.commit(); + asyncService.startQueue(input, new ScheduledThreadPoolExecutor(2)); + Thread.sleep(5 * 1000); + asyncService.stopQueue(input); } } } diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java index cad8559..994c310 100644 --- a/src/test/java/io/trygvis/test/PlainJavaExample.java +++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java @@ -1,4 +1,4 @@ -package io.trygvis.test; +package io.trygvis.test.jdbc; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; |