diff options
Diffstat (limited to 'src/test/java/io/trygvis/test/PlainJavaExample.java')
-rw-r--r-- | src/test/java/io/trygvis/test/PlainJavaExample.java | 51 |
1 files changed, 36 insertions, 15 deletions
diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java index 338abad..b09d3e9 100644 --- a/src/test/java/io/trygvis/test/PlainJavaExample.java +++ b/src/test/java/io/trygvis/test/PlainJavaExample.java @@ -1,22 +1,26 @@ package io.trygvis.test; +import io.trygvis.async.SqlEffect; +import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; -import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; import javax.sql.DataSource; import java.sql.Connection; +import java.sql.SQLException; import java.util.Date; import java.util.List; +import java.util.Random; -import static io.trygvis.queue.JdbcQueueService.createQueueService; -import static io.trygvis.queue.Task.newTask; import static io.trygvis.test.DbUtil.createDataSource; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; public class PlainJavaExample { + private static final Random r = new Random(); private static String inputName = "my-input"; private static String outputName = "my-output"; @@ -28,26 +32,40 @@ public class PlainJavaExample { System.out.println("Starting consumer"); DataSource ds = createDataSource(); - Connection c = ds.getConnection(); - JdbcQueueService queueService = createQueueService(c); + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + final JdbcQueueService queueService = queueSystem.queueService; + + Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() { + @Override + public Queue[] doInConnection(Connection c) throws SQLException { + return new Queue[]{ + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true)}; + } + }); - final Queue input = queueService.getQueue(c, inputName, interval, true); - final Queue output = queueService.getQueue(c, outputName, interval, true); + final Queue input = queues[0]; + final Queue output = queues[1]; - queueService.consume(c, input, new QueueService.TaskEffect() { - public List<Task> consume(Task task) throws Exception { - System.out.println("PlainJavaExample$Consumer.consume"); + queueService.consumeAll(input, new TaskEffect() { + public List<Task> apply(Task task) throws Exception { + System.out.println("PlainJavaExample$Consumer.consumeAll: arguments = " + task.arguments); Long a = Long.valueOf(task.arguments.get(0)); Long b = Long.valueOf(task.arguments.get(1)); System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); - return singletonList(newTask(output.name, new Date(), Long.toString(a + b))); + if(r.nextInt(3) == 0) { + return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b))); + } + + throw new RuntimeException("Simulated exception while processing task."); } }); - - c.commit(); + System.out.println("Done"); } } @@ -58,9 +76,12 @@ public class PlainJavaExample { DataSource ds = createDataSource(); Connection c = ds.getConnection(); - JdbcQueueService queueService = createQueueService(c); + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + JdbcQueueService queueService = queueSystem.queueService; - Queue queue = queueService.getQueue(c, inputName, interval, true); + Queue queue = queueService.lookupQueue(c, inputName, interval, true); queueService.schedule(c, queue, new Date(), asList("10", "20")); |