aboutsummaryrefslogtreecommitdiff
path: root/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java')
-rw-r--r--src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java52
1 files changed, 27 insertions, 25 deletions
diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
index 8d981f2..dd478d7 100644
--- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
@@ -1,10 +1,12 @@
package io.trygvis.test.jdbc;
import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.QueueController;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -38,36 +40,36 @@ public class AsyncConsumerExample {
}
};
- public static class Consumer {
- public static void main(String[] args) throws Exception {
- System.out.println("Starting consumer");
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting consumer");
- DataSource ds = createDataSource();
+ DataSource ds = createDataSource();
- SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
- QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- JdbcAsyncService asyncService = queueSystem.createAsyncService();
- final JdbcQueueService queueService = queueSystem.queueService;
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ JdbcAsyncService asyncService = queueSystem.createAsyncService();
+ final JdbcQueueService queueService = queueSystem.createQueueService();
- Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() {
- @Override
- public Queue[] doInConnection(Connection c) throws SQLException {
- return new Queue[]{
- queueService.lookupQueue(c, inputName, interval, true),
- queueService.lookupQueue(c, outputName, interval, true)
- };
- }
- });
+ QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() {
+ @Override
+ public QueueExecutor[] doInConnection(Connection c) throws SQLException {
+ return new QueueExecutor[]{
+ queueService.lookupQueue(c, inputName, interval, true),
+ queueService.lookupQueue(c, outputName, interval, true)
+ };
+ }
+ });
- final Queue input = queues[0];
- final Queue output = queues[1];
+ final QueueExecutor input = queues[0];
+ final QueueExecutor output = queues[1];
- asyncService.registerQueue(input, adder);
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
- asyncService.startQueue(input, new ScheduledThreadPoolExecutor(2));
- Thread.sleep(5 * 1000);
- asyncService.stopQueue(input);
- }
+ QueueController controller = asyncService.registerQueue(input, req, adder);
+
+ controller.start(new ScheduledThreadPoolExecutor(2));
+ Thread.sleep(60 * 1000);
+ controller.stop();
}
}