aboutsummaryrefslogtreecommitdiff
path: root/src/test/java
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/java')
-rwxr-xr-xsrc/test/java/io/trygvis/test/Main.java4
-rw-r--r--src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java52
-rw-r--r--src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java24
-rw-r--r--src/test/java/io/trygvis/test/spring/PlainSpringTest.java15
4 files changed, 52 insertions, 43 deletions
diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java
index 0721ec9..f03d6fa 100755
--- a/src/test/java/io/trygvis/test/Main.java
+++ b/src/test/java/io/trygvis/test/Main.java
@@ -74,7 +74,9 @@ public class Main {
final Queue q = null; // queueService.lookupQueue(c, "create-article", 1);
- asyncService.registerQueue(q, createArticleCallable);
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+
+ asyncService.registerQueue(q, req, createArticleCallable);
// log.info("queue registered: ref = {}", q);
// asyncService.registerQueue("update-queue", 1, updateArticleCallable);
diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
index 8d981f2..dd478d7 100644
--- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
@@ -1,10 +1,12 @@
package io.trygvis.test.jdbc;
import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.QueueController;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -38,36 +40,36 @@ public class AsyncConsumerExample {
}
};
- public static class Consumer {
- public static void main(String[] args) throws Exception {
- System.out.println("Starting consumer");
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting consumer");
- DataSource ds = createDataSource();
+ DataSource ds = createDataSource();
- SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
- QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- JdbcAsyncService asyncService = queueSystem.createAsyncService();
- final JdbcQueueService queueService = queueSystem.queueService;
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ JdbcAsyncService asyncService = queueSystem.createAsyncService();
+ final JdbcQueueService queueService = queueSystem.createQueueService();
- Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() {
- @Override
- public Queue[] doInConnection(Connection c) throws SQLException {
- return new Queue[]{
- queueService.lookupQueue(c, inputName, interval, true),
- queueService.lookupQueue(c, outputName, interval, true)
- };
- }
- });
+ QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() {
+ @Override
+ public QueueExecutor[] doInConnection(Connection c) throws SQLException {
+ return new QueueExecutor[]{
+ queueService.lookupQueue(c, inputName, interval, true),
+ queueService.lookupQueue(c, outputName, interval, true)
+ };
+ }
+ });
- final Queue input = queues[0];
- final Queue output = queues[1];
+ final QueueExecutor input = queues[0];
+ final QueueExecutor output = queues[1];
- asyncService.registerQueue(input, adder);
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
- asyncService.startQueue(input, new ScheduledThreadPoolExecutor(2));
- Thread.sleep(5 * 1000);
- asyncService.stopQueue(input);
- }
+ QueueController controller = asyncService.registerQueue(input, req, adder);
+
+ controller.start(new ScheduledThreadPoolExecutor(2));
+ Thread.sleep(60 * 1000);
+ controller.stop();
}
}
diff --git a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
index 994c310..0e11ab3 100644
--- a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
@@ -3,7 +3,7 @@ package io.trygvis.test.jdbc;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueStats;
import io.trygvis.queue.QueueSystem;
@@ -42,12 +42,12 @@ public class PlainJavaExample {
SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- final JdbcQueueService queueService = queueSystem.queueService;
+ final JdbcQueueService queueService = queueSystem.createQueueService();
- Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() {
+ QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() {
@Override
- public Queue[] doInConnection(Connection c) throws SQLException {
- Queue[] queues = {
+ public QueueExecutor[] doInConnection(Connection c) throws SQLException {
+ QueueExecutor[] queues = {
queueService.lookupQueue(c, inputName, interval, true),
queueService.lookupQueue(c, outputName, interval, true)};
@@ -63,12 +63,12 @@ public class PlainJavaExample {
}
});
- final Queue input = queues[0];
- final Queue output = queues[1];
+ final QueueExecutor input = queues[0];
+ final QueueExecutor output = queues[1];
QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false);
- queueService.consumeAll(input, req, new TaskEffect() {
+ input.consumeAll(req, new TaskEffect() {
public List<Task> apply(Task task) throws Exception {
Long a = Long.valueOf(task.arguments.get(0));
Long b = Long.valueOf(task.arguments.get(1));
@@ -76,7 +76,7 @@ public class PlainJavaExample {
System.out.println("a + b = " + a + " + " + b + " = " + (a + b));
if (r.nextInt(3000) > 0) {
- return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b)));
+ return singletonList(task.childTask(output.queue.name, new Date(), Long.toString(a + b)));
}
throw new RuntimeException("Simulated exception while processing task.");
@@ -98,9 +98,9 @@ public class PlainJavaExample {
SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- final JdbcQueueService queueService = queueSystem.queueService;
+ final JdbcQueueService queueService = queueSystem.createQueueService();
- final Queue queue;
+ final QueueExecutor queue;
try (Connection c = ds.getConnection()) {
queue = queueService.lookupQueue(c, inputName, interval, true);
c.commit();
@@ -112,7 +112,7 @@ public class PlainJavaExample {
@Override
public void doInConnection(Connection c) throws SQLException {
for (int j = 0; j < chunk; j++) {
- queueService.schedule(c, queue, new Date(), asList("10", "20"));
+ queue.schedule(c, new Date(), asList("10", "20"));
}
}
});
diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
index d06d8d6..38d3361 100644
--- a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
+++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
@@ -1,7 +1,7 @@
package io.trygvis.test.spring;
import io.trygvis.async.AsyncService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
import io.trygvis.queue.QueueService;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -13,6 +13,7 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.sql.SQLException;
+import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,6 +34,8 @@ public class PlainSpringTest {
@Autowired
private QueueService queueService;
+ private final QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+
static {
String username = getProperty("user.name");
setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
@@ -42,9 +45,9 @@ public class PlainSpringTest {
@Test
public void testBasic() throws SQLException, InterruptedException {
- Queue test = queueService.getQueue("test", 10, true);
+ QueueExecutor test = queueService.getQueue("test", 10, true);
final AtomicReference<List<String>> ref = new AtomicReference<>();
- asyncService.registerQueue(test, new TaskEffect() {
+ asyncService.registerQueue(test.queue, req, new TaskEffect() {
@Override
public List<Task> apply(Task task) throws Exception {
System.out.println("PlainSpringTest.run");
@@ -58,12 +61,14 @@ public class PlainSpringTest {
synchronized (ref) {
System.out.println("Scheduling task");
- asyncService.schedule(test, asList("hello", "world"));
- System.out.println("Waiting");
+ queueService.schedule(test.queue, new Date(), asList("hello", "world"));
+ System.out.println("Task scheduled, waiting");
ref.wait(1000);
+ System.out.println("Back!");
}
List<String> args = ref.get();
+ System.out.println("args = " + args);
assertNotNull(args);
assertThat(args).containsExactly("hello", "world");
}