package io.trygvis.test.jdbc; import io.trygvis.async.JdbcAsyncService; import io.trygvis.async.QueueController; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.Task.newTask; import static io.trygvis.test.DbUtil.createDataSource; import static java.util.Collections.singletonList; public class AsyncConsumerExample { private static String inputName = "my-input"; private static String outputName = "my-output"; private static int interval = 10; private static final TaskEffect adder = new TaskEffect() { public List apply(Task task) throws Exception { Long a = Long.valueOf(task.arguments.get(0)); Long b = Long.valueOf(task.arguments.get(1)); System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); return singletonList(newTask(outputName, new Date(), Long.toString(a + b))); } }; public static void main(String[] args) throws Exception { System.out.println("Starting consumer"); DataSource ds = createDataSource(); SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); JdbcAsyncService asyncService = queueSystem.createAsyncService(); final JdbcQueueService queueService = queueSystem.createQueueService(); QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect() { @Override public QueueExecutor[] doInConnection(Connection c) throws SQLException { return new QueueExecutor[]{ queueService.lookupQueue(c, inputName, interval, true), queueService.lookupQueue(c, outputName, interval, true) }; } }); final QueueExecutor input = queues[0]; final QueueExecutor output = queues[1]; QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); QueueController controller = asyncService.registerQueue(input, req, adder); controller.start(new ScheduledThreadPoolExecutor(2)); Thread.sleep(60 * 1000); controller.stop(); } }