package io.trygvis.test.jdbc; import io.trygvis.async.JdbcAsyncService; import io.trygvis.async.QueueController; import io.trygvis.async.QueueStats; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.SqlEffect; import io.trygvis.queue.SqlEffectExecutor; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; import javax.sql.DataSource; import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.Task.newTask; import static io.trygvis.test.DbUtil.createDataSource; import static java.lang.System.currentTimeMillis; import static java.util.Collections.singletonList; public class AsyncConsumerExample { private static String inputName = "my-input"; private static String outputName = "my-output"; private static int interval = 1000; private static final TaskEffect adder = new TaskEffect() { public List apply(Task task) throws Exception { Long a = Long.valueOf(task.arguments.get(0)); Long b = Long.valueOf(task.arguments.get(1)); return singletonList(newTask(outputName, new Date(), Long.toString(a + b))); } }; public static void main(String[] args) throws Exception { int poolSize = 4; QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); new AsyncConsumerExample().work(poolSize, req); } public void work(int poolSize, QueueService.TaskExecutionRequest req) throws Exception { System.out.println("Starting consumer"); DataSource ds = createDataSource(); SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); JdbcAsyncService asyncService = queueSystem.createAsyncService(); final JdbcQueueService queueService = queueSystem.createQueueService(); QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect() { @Override public QueueExecutor[] doInConnection(Connection c) throws SQLException { return new QueueExecutor[]{ queueService.lookupQueue(c, inputName, interval, true), queueService.lookupQueue(c, outputName, interval, true) }; } }); final QueueExecutor input = queues[0]; final QueueExecutor output = queues[1]; QueueController controller = asyncService.registerQueue(input, req, adder); wrapInputQueue(controller); Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { System.out.println(input.getStats()); System.out.println(output.getStats()); } }, 1000, 1000); long start = currentTimeMillis(); controller.start(new ScheduledThreadPoolExecutor(poolSize)); Thread.sleep(60 * 1000); controller.stop(); long end = currentTimeMillis(); timer.cancel(); QueueStats stats = input.getStats(); System.out.println("Summary:"); System.out.println(stats.toString()); System.out.println(output.getStats().toString()); long duration = end - start; double rate = 1000 * ((double) stats.totalMessageCount) / duration; System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s"); } protected void wrapInputQueue(QueueController input) throws Exception { } }