aboutsummaryrefslogtreecommitdiff
path: root/src/test/java/io/trygvis/test/PlainJavaExample.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/test/java/io/trygvis/test/PlainJavaExample.java')
-rw-r--r--src/test/java/io/trygvis/test/PlainJavaExample.java51
1 files changed, 36 insertions, 15 deletions
diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java
index 338abad..b09d3e9 100644
--- a/src/test/java/io/trygvis/test/PlainJavaExample.java
+++ b/src/test/java/io/trygvis/test/PlainJavaExample.java
@@ -1,22 +1,26 @@
package io.trygvis.test;
+import io.trygvis.async.SqlEffect;
+import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
-import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
import javax.sql.DataSource;
import java.sql.Connection;
+import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import java.util.Random;
-import static io.trygvis.queue.JdbcQueueService.createQueueService;
-import static io.trygvis.queue.Task.newTask;
import static io.trygvis.test.DbUtil.createDataSource;
import static java.util.Arrays.asList;
import static java.util.Collections.singletonList;
public class PlainJavaExample {
+ private static final Random r = new Random();
private static String inputName = "my-input";
private static String outputName = "my-output";
@@ -28,26 +32,40 @@ public class PlainJavaExample {
System.out.println("Starting consumer");
DataSource ds = createDataSource();
- Connection c = ds.getConnection();
- JdbcQueueService queueService = createQueueService(c);
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ final JdbcQueueService queueService = queueSystem.queueService;
+
+ Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() {
+ @Override
+ public Queue[] doInConnection(Connection c) throws SQLException {
+ return new Queue[]{
+ queueService.lookupQueue(c, inputName, interval, true),
+ queueService.lookupQueue(c, outputName, interval, true)};
+ }
+ });
- final Queue input = queueService.getQueue(c, inputName, interval, true);
- final Queue output = queueService.getQueue(c, outputName, interval, true);
+ final Queue input = queues[0];
+ final Queue output = queues[1];
- queueService.consume(c, input, new QueueService.TaskEffect() {
- public List<Task> consume(Task task) throws Exception {
- System.out.println("PlainJavaExample$Consumer.consume");
+ queueService.consumeAll(input, new TaskEffect() {
+ public List<Task> apply(Task task) throws Exception {
+ System.out.println("PlainJavaExample$Consumer.consumeAll: arguments = " + task.arguments);
Long a = Long.valueOf(task.arguments.get(0));
Long b = Long.valueOf(task.arguments.get(1));
System.out.println("a + b = " + a + " + " + b + " = " + (a + b));
- return singletonList(newTask(output.name, new Date(), Long.toString(a + b)));
+ if(r.nextInt(3) == 0) {
+ return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b)));
+ }
+
+ throw new RuntimeException("Simulated exception while processing task.");
}
});
-
- c.commit();
+ System.out.println("Done");
}
}
@@ -58,9 +76,12 @@ public class PlainJavaExample {
DataSource ds = createDataSource();
Connection c = ds.getConnection();
- JdbcQueueService queueService = createQueueService(c);
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ JdbcQueueService queueService = queueSystem.queueService;
- Queue queue = queueService.getQueue(c, inputName, interval, true);
+ Queue queue = queueService.lookupQueue(c, inputName, interval, true);
queueService.schedule(c, queue, new Date(), asList("10", "20"));