aboutsummaryrefslogtreecommitdiff
path: root/src/test/java/io/trygvis/test/jdbc
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/java/io/trygvis/test/jdbc')
-rw-r--r--src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java52
-rw-r--r--src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java24
2 files changed, 39 insertions, 37 deletions
diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
index 8d981f2..dd478d7 100644
--- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
@@ -1,10 +1,12 @@
package io.trygvis.test.jdbc;
import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.QueueController;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -38,36 +40,36 @@ public class AsyncConsumerExample {
}
};
- public static class Consumer {
- public static void main(String[] args) throws Exception {
- System.out.println("Starting consumer");
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting consumer");
- DataSource ds = createDataSource();
+ DataSource ds = createDataSource();
- SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
- QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- JdbcAsyncService asyncService = queueSystem.createAsyncService();
- final JdbcQueueService queueService = queueSystem.queueService;
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ JdbcAsyncService asyncService = queueSystem.createAsyncService();
+ final JdbcQueueService queueService = queueSystem.createQueueService();
- Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() {
- @Override
- public Queue[] doInConnection(Connection c) throws SQLException {
- return new Queue[]{
- queueService.lookupQueue(c, inputName, interval, true),
- queueService.lookupQueue(c, outputName, interval, true)
- };
- }
- });
+ QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() {
+ @Override
+ public QueueExecutor[] doInConnection(Connection c) throws SQLException {
+ return new QueueExecutor[]{
+ queueService.lookupQueue(c, inputName, interval, true),
+ queueService.lookupQueue(c, outputName, interval, true)
+ };
+ }
+ });
- final Queue input = queues[0];
- final Queue output = queues[1];
+ final QueueExecutor input = queues[0];
+ final QueueExecutor output = queues[1];
- asyncService.registerQueue(input, adder);
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
- asyncService.startQueue(input, new ScheduledThreadPoolExecutor(2));
- Thread.sleep(5 * 1000);
- asyncService.stopQueue(input);
- }
+ QueueController controller = asyncService.registerQueue(input, req, adder);
+
+ controller.start(new ScheduledThreadPoolExecutor(2));
+ Thread.sleep(60 * 1000);
+ controller.stop();
}
}
diff --git a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
index 994c310..0e11ab3 100644
--- a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
@@ -3,7 +3,7 @@ package io.trygvis.test.jdbc;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueStats;
import io.trygvis.queue.QueueSystem;
@@ -42,12 +42,12 @@ public class PlainJavaExample {
SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- final JdbcQueueService queueService = queueSystem.queueService;
+ final JdbcQueueService queueService = queueSystem.createQueueService();
- Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() {
+ QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() {
@Override
- public Queue[] doInConnection(Connection c) throws SQLException {
- Queue[] queues = {
+ public QueueExecutor[] doInConnection(Connection c) throws SQLException {
+ QueueExecutor[] queues = {
queueService.lookupQueue(c, inputName, interval, true),
queueService.lookupQueue(c, outputName, interval, true)};
@@ -63,12 +63,12 @@ public class PlainJavaExample {
}
});
- final Queue input = queues[0];
- final Queue output = queues[1];
+ final QueueExecutor input = queues[0];
+ final QueueExecutor output = queues[1];
QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false);
- queueService.consumeAll(input, req, new TaskEffect() {
+ input.consumeAll(req, new TaskEffect() {
public List<Task> apply(Task task) throws Exception {
Long a = Long.valueOf(task.arguments.get(0));
Long b = Long.valueOf(task.arguments.get(1));
@@ -76,7 +76,7 @@ public class PlainJavaExample {
System.out.println("a + b = " + a + " + " + b + " = " + (a + b));
if (r.nextInt(3000) > 0) {
- return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b)));
+ return singletonList(task.childTask(output.queue.name, new Date(), Long.toString(a + b)));
}
throw new RuntimeException("Simulated exception while processing task.");
@@ -98,9 +98,9 @@ public class PlainJavaExample {
SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- final JdbcQueueService queueService = queueSystem.queueService;
+ final JdbcQueueService queueService = queueSystem.createQueueService();
- final Queue queue;
+ final QueueExecutor queue;
try (Connection c = ds.getConnection()) {
queue = queueService.lookupQueue(c, inputName, interval, true);
c.commit();
@@ -112,7 +112,7 @@ public class PlainJavaExample {
@Override
public void doInConnection(Connection c) throws SQLException {
for (int j = 0; j < chunk; j++) {
- queueService.schedule(c, queue, new Date(), asList("10", "20"));
+ queue.schedule(c, new Date(), asList("10", "20"));
}
}
});