From 7caa5b1f1e08f99cfe4465f091f47e2966d78aa7 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sun, 23 Jun 2013 09:37:57 +0200 Subject: o Initial import of JDBC queue. --- .../io/trygvis/test/jdbc/AsyncConsumerExample.java | 107 +++++++++++++++++ .../io/trygvis/test/jdbc/PlainJavaExample.java | 126 +++++++++++++++++++++ 2 files changed, 233 insertions(+) create mode 100644 src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java create mode 100644 src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java (limited to 'src/test/java/io/trygvis/test/jdbc') diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java new file mode 100644 index 0000000..16640dd --- /dev/null +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -0,0 +1,107 @@ +package io.trygvis.test.jdbc; + +import io.trygvis.async.JdbcAsyncService; +import io.trygvis.async.QueueController; +import io.trygvis.async.QueueStats; +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskEffect; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import static io.trygvis.queue.Task.newTask; +import static io.trygvis.test.DbUtil.createDataSource; +import static java.lang.System.currentTimeMillis; +import static java.util.Collections.singletonList; + +public class AsyncConsumerExample { + + private static String inputName = "my-input"; + private static String outputName = "my-output"; + + private static int interval = 1000; + + private static final TaskEffect adder = new TaskEffect() { + public List apply(Task task) throws Exception { + Long a = Long.valueOf(task.arguments.get(0)); + Long b = Long.valueOf(task.arguments.get(1)); + + return singletonList(newTask(outputName, new Date(), Long.toString(a + b))); + } + }; + + public static void main(String[] args) throws Exception { + int poolSize = 4; + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); + new AsyncConsumerExample().work(poolSize, req); + } + + public void work(int poolSize, QueueService.TaskExecutionRequest req) throws Exception { + System.out.println("Starting consumer"); + + DataSource ds = createDataSource(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + JdbcAsyncService asyncService = queueSystem.createAsyncService(); + final JdbcQueueService queueService = queueSystem.createQueueService(); + + QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public QueueExecutor[] doInConnection(Connection c) throws SQLException { + return new QueueExecutor[]{ + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true) + }; + } + }); + + final QueueExecutor input = queues[0]; + final QueueExecutor output = queues[1]; + + QueueController controller = asyncService.registerQueue(input, req, adder); + wrapInputQueue(controller); + + Timer timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + System.out.println(input.getStats()); + System.out.println(output.getStats()); + } + }, 1000, 1000); + + long start = currentTimeMillis(); + controller.start(new ScheduledThreadPoolExecutor(poolSize)); + Thread.sleep(60 * 1000); + controller.stop(); + long end = currentTimeMillis(); + timer.cancel(); + + QueueStats stats = input.getStats(); + + System.out.println("Summary:"); + System.out.println(stats.toString()); + System.out.println(output.getStats().toString()); + + long duration = end - start; + double rate = 1000 * ((double) stats.totalMessageCount) / duration; + System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s"); + } + + protected void wrapInputQueue(QueueController input) throws Exception { + } +} diff --git a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java new file mode 100644 index 0000000..0b7ba50 --- /dev/null +++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java @@ -0,0 +1,126 @@ +package io.trygvis.test.jdbc; + +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.SqlEffectExecutor; +import io.trygvis.queue.JdbcQueueService; +import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueStats; +import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.Task; +import io.trygvis.queue.TaskDao; +import io.trygvis.queue.TaskEffect; + +import javax.sql.DataSource; +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Random; + +import static io.trygvis.queue.Task.TaskState; +import static io.trygvis.test.DbUtil.createDataSource; +import static java.lang.System.currentTimeMillis; +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; + +public class PlainJavaExample { + private static final Random r = new Random(); + + private static String inputName = "my-input"; + private static String outputName = "my-output"; + + private static int interval = 10; + + public static class Consumer { + public static void main(String[] args) throws Exception { + System.out.println("Starting consumer"); + + DataSource ds = createDataSource(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + final JdbcQueueService queueService = queueSystem.createQueueService(); + + QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect() { + @Override + public QueueExecutor[] doInConnection(Connection c) throws SQLException { + QueueExecutor[] queues = { + queueService.lookupQueue(c, inputName, interval, true), + queueService.lookupQueue(c, outputName, interval, true)}; + + TaskDao taskDao = queueSystem.createTaskDao(c); + + QueueStats stats = taskDao.findQueueStatsByName(inputName); + System.out.println("Queue stats for " + stats.name + ". Total number of tasks: " + stats.totalTaskCount); + for (Map.Entry entry : stats.states.entrySet()) { + System.out.println(entry.getKey() + " = " + entry.getValue()); + } + + return queues; + } + }); + + final QueueExecutor input = queues[0]; + final QueueExecutor output = queues[1]; + + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false); + + input.consumeAll(req, new TaskEffect() { + public List apply(Task task) throws Exception { + Long a = Long.valueOf(task.arguments.get(0)); + Long b = Long.valueOf(task.arguments.get(1)); + + System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); + + if (r.nextInt(3000) > 0) { + return singletonList(task.childTask(output.queue.name, new Date(), Long.toString(a + b))); + } + + throw new RuntimeException("Simulated exception while processing task."); + } + }); + System.out.println("Done"); + } + } + + public static class Producer { + public static void main(String[] args) throws Exception { + System.out.println("Starting producer"); + + int chunks = 100; + final int chunk = 10000; + + DataSource ds = createDataSource(); + + SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); + + QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + final JdbcQueueService queueService = queueSystem.createQueueService(); + + final QueueExecutor queue; + try (Connection c = ds.getConnection()) { + queue = queueService.lookupQueue(c, inputName, interval, true); + c.commit(); + } + + for (int i = 0; i < chunks; i++) { + long start = currentTimeMillis(); + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + for (int j = 0; j < chunk; j++) { + queue.schedule(c, new Date(), asList("10", "20")); + } + } + }); + long end = currentTimeMillis(); + + long time = end - start; + System.out.println("Scheduled " + chunk + " tasks in " + time + "ms, " + (((double) chunk * 1000)) / ((double) time) + " chunks per second"); + } + } + } +} -- cgit v1.2.3