package io.trygvis.test; import io.trygvis.async.JdbcAsyncService; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; import javax.sql.DataSource; import java.sql.Connection; import java.util.Date; import java.util.List; import static io.trygvis.queue.Task.newTask; import static io.trygvis.test.DbUtil.createDataSource; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; public class PlainJavaExample2 { private static String inputName = "my-input"; private static String outputName = "my-output"; private static int interval = 10; private static final TaskEffect adder = new TaskEffect() { public List apply(Task task) throws Exception { System.out.println("PlainJavaExample$Consumer.consumeAll"); Long a = Long.valueOf(task.arguments.get(0)); Long b = Long.valueOf(task.arguments.get(1)); System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); return singletonList(newTask(outputName, new Date(), Long.toString(a + b))); } }; public static class Consumer { public static void main(String[] args) throws Exception { System.out.println("Starting consumer"); DataSource ds = createDataSource(); Connection c = ds.getConnection(); SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); JdbcQueueService queueService = queueSystem.queueService; final Queue input = queueService.lookupQueue(c, inputName, interval, true); final Queue output = queueService.lookupQueue(c, outputName, interval, true); JdbcAsyncService asyncService = new JdbcAsyncService(queueSystem); asyncService.registerQueue(input, adder); // queueService.consumeAll(c, input, adder); c.commit(); } } public static class Producer { public static void main(String[] args) throws Exception { System.out.println("Starting producer"); DataSource ds = createDataSource(); Connection c = ds.getConnection(); SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); JdbcQueueService queueService = queueSystem.queueService; Queue queue = queueService.lookupQueue(c, inputName, interval, true); queueService.schedule(c, queue, new Date(), asList("10", "20")); c.commit(); } } }