diff options
Diffstat (limited to 'src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java')
-rw-r--r-- | src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java new file mode 100644 index 0000000..8d981f2 --- /dev/null +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -0,0 +1,73 @@ +package io.trygvis.test.jdbc; + +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.SqlEffect; +import io.trygvis.async.SqlEffectExecutor; +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.Queue; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.Task.newTask; +import static io.trygvis.test.DbUtil.createDataSource; +import static java.util.Collections.singletonList; + +public class AsyncConsumerExample { + + private static String inputName = "my-input"; + private static String outputName = "my-output"; + + private static int interval = 10; + + private static final TaskEffect adder = new TaskEffect() { + public List<Task> apply(Task task) throws Exception { + Long a = Long.valueOf(task.arguments.get(0)); + Long b = Long.valueOf(task.arguments.get(1)); + + System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); + + return singletonList(newTask(outputName, new Date(), Long.toString(a + b))); + } + }; + + public static class Consumer { + public static void main(String[] args) throws Exception { + System.out.println("Starting consumer"); + + DataSource ds = createDataSource(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + JdbcAsyncService asyncService = queueSystem.createAsyncService(); + final JdbcQueueService queueService = queueSystem.queueService; + + Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() { + @Override + public Queue[] doInConnection(Connection c) throws SQLException { + return new Queue[]{ + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true) + }; + } + }); + + final Queue input = queues[0]; + final Queue output = queues[1]; + + asyncService.registerQueue(input, adder); + + asyncService.startQueue(input, new ScheduledThreadPoolExecutor(2)); + Thread.sleep(5 * 1000); + asyncService.stopQueue(input); + } + } +} |