From 29dc40a0f7fa765d6f66e7a1bdd31083f71286de Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 22 Jun 2013 09:39:49 +0200 Subject: o Adding an ActiveMQ connection to messages can be sent as a hint to the consumer. --- pom.xml | 9 ++ .../java/io/trygvis/activemq/ActiveMqHinter.java | 152 +++++++++++++++++++++ .../java/io/trygvis/async/JdbcAsyncService.java | 10 -- .../java/io/trygvis/async/QueueController.java | 70 ++++++---- src/main/java/io/trygvis/async/QueueStats.java | 5 +- src/main/java/io/trygvis/queue/QueueExecutor.java | 8 +- src/main/java/io/trygvis/queue/QueueService.java | 26 +++- src/test/java/io/trygvis/test/DbUtil.java | 4 +- .../test/activemq/AsyncConsumerWithActiveMq.java | 37 +++++ .../io/trygvis/test/jdbc/AsyncConsumerExample.java | 18 ++- src/test/resources/logback.xml | 2 +- 11 files changed, 293 insertions(+), 48 deletions(-) create mode 100644 src/main/java/io/trygvis/activemq/ActiveMqHinter.java create mode 100644 src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java diff --git a/pom.xml b/pom.xml index a86ce8d..5c5fb02 100755 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,15 @@ test + + + org.apache.activemq + activemq-client + 5.8.0 + provided + + + com.jolbox bonecp diff --git a/src/main/java/io/trygvis/activemq/ActiveMqHinter.java b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java new file mode 100644 index 0000000..f2cfb6e --- /dev/null +++ b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java @@ -0,0 +1,152 @@ +package io.trygvis.activemq; + +import io.trygvis.async.QueueController; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.TextMessage; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.sql.SQLException; + +import static java.lang.Long.parseLong; +import static java.lang.System.arraycopy; +import static java.nio.charset.Charset.forName; +import static javax.jms.Session.AUTO_ACKNOWLEDGE; + +public class ActiveMqHinter implements AutoCloseable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueConnection c; + + private static final Charset utf8 = forName("utf-8"); + + public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException { + log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL()); + c = connectionFactory.createQueueConnection(); + c.start(); + log.info("Connected, clientId = {}", c.getClientID()); + } + + public void createHinter(final QueueController controller) throws JMSException { + QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(controller.queue.queue.name); + final MessageConsumer consumer = session.createConsumer(queue); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + if ((message instanceof TextMessage)) { + String body; + try { + TextMessage textMessage = (TextMessage) message; + body = textMessage.getText(); + } catch (JMSException e) { + log.warn("Exception while reading body.", e); + throw new RuntimeException("Exception while reading body.", e); + } + + consumeString(new StringReader(body), controller); + } else if (message instanceof BytesMessage) { + final BytesMessage bytesMessage = (BytesMessage) message; + consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller); + } else { + throw new RuntimeException("Unsupported message type: " + message.getClass()); + } + } + }); + + controller.addOnStopListener(new Runnable() { + @Override + public void run() { + try { + consumer.close(); + } catch (JMSException e) { + log.error("Error while closing JMS consumer", e); + } + } + }); + } + + private void consumeString(Reader reader, QueueController controller) { + try { + BufferedReader r = new BufferedReader(reader); + + String line = r.readLine(); + + while (line != null) { + for (String id : line.split(",")) { + controller.hint(parseLong(id.trim())); + } + line = r.readLine(); + } + } catch (IOException | SQLException e) { + log.warn("Could not consume body.", e); + throw new RuntimeException("Could not consume body.", e); + } catch (NumberFormatException e) { + log.warn("Could not consume body.", e); + throw e; + } + } + + public void close() throws JMSException { + c.close(); + } + + private static class ByteMessageInputStream extends InputStream { + private final BytesMessage bytesMessage; + + public ByteMessageInputStream(BytesMessage bytesMessage) { + this.bytesMessage = bytesMessage; + } + + @Override + public int read(byte[] b) throws IOException { + try { + return bytesMessage.readBytes(b); + } catch (JMSException e) { + throw new IOException(e); + } + } + + @Override + public int read(byte[] out, int off, int len) throws IOException { + byte[] b = new byte[len]; + try { + int read = bytesMessage.readBytes(b); + if (read == -1) { + return -1; + } + arraycopy(b, 0, out, off, read); + return read; + } catch (JMSException e) { + throw new IOException(e); + } + } + + @Override + public int read() throws IOException { + try { + return bytesMessage.readByte(); + } catch (javax.jms.MessageEOFException e) { + return -1; + } catch (JMSException e) { + throw new IOException(e); + } + } + } +} diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 57ab5c6..46f1f30 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,14 +1,10 @@ package io.trygvis.async; -import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; -import io.trygvis.queue.TaskDao; import io.trygvis.queue.TaskEffect; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; @@ -19,16 +15,12 @@ import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; public class JdbcAsyncService { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final Map queues = new HashMap<>(); private final QueueSystem queueSystem; - private final JdbcQueueService queueService; public JdbcAsyncService(QueueSystem queueSystem) { this.queueSystem = queueSystem; - this.queueService = queueSystem.createQueueService(); } public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { @@ -40,8 +32,6 @@ public class JdbcAsyncService { queues.put(queue.queue.name, queueController); - log.info("registerQueue: LEAVE"); - return queueController; } diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java index 8aed91f..a343d42 100644 --- a/src/main/java/io/trygvis/async/QueueController.java +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -1,7 +1,6 @@ package io.trygvis.async; import io.trygvis.queue.QueueExecutor; -import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.SqlEffect; import io.trygvis.queue.SqlEffectExecutor; @@ -13,11 +12,10 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; @@ -47,6 +45,8 @@ public class QueueController { private ScheduledThreadPoolExecutor executor; + private List stopListeners = new ArrayList<>(); + public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { this.queueSystem = queueSystem; this.req = req; @@ -55,36 +55,50 @@ public class QueueController { this.queue = queue; } - public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List tasks, - ScheduledThreadPoolExecutor executor) { - ExecutorCompletionService completionService = new ExecutorCompletionService<>(executor); + public void scheduleAll(final List tasks) throws InterruptedException { + ExecutorCompletionService service = new ExecutorCompletionService<>(executor); - List> results = new ArrayList<>(tasks.size()); + for (final Task task : tasks) { + service.submit(new Callable() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(taskEffect, task); + } + }); + } + } + public void invokeAll(final List tasks) throws InterruptedException { + List> callables = new ArrayList<>(tasks.size()); for (final Task task : tasks) { - results.add(completionService.submit(new Callable() { + callables.add(new Callable() { @Override public TaskExecutionResult call() throws Exception { - return queue.applyTask(effect, task); + return queue.applyTask(taskEffect, task); } - })); + }); } - for (Future result : results) { - while(!result.isDone()) { + executor.invokeAll(callables); + } + + public void hint(final long id) throws SQLException { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + Task task = queueSystem.createTaskDao(c).findById(id); + try { - result.get(); - break; + scheduleAll(Collections.singletonList(task)); } catch (InterruptedException e) { - if(shouldRun) { - continue; - } - } catch (ExecutionException e) { - log.error("Unexpected exception.", e); - break; + throw new SQLException(e); } } - } + }); + } + + public synchronized void addOnStopListener(Runnable runnable) { + stopListeners.add(runnable); } private class QueueThread implements Runnable { @@ -102,7 +116,7 @@ public class QueueController { log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); if (tasks.size() > 0) { - executeTasks(req, taskEffect, tasks, executor); + invokeAll(tasks); } } catch (Throwable e) { if (shouldRun) { @@ -111,7 +125,7 @@ public class QueueController { } // If we found exactly the same number of tasks that we asked for, there is most likely more to go. - if (tasks != null && tasks.size() == req.chunkSize) { + if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) { log.info("Got a full chunk, continuing directly."); continue; } @@ -122,7 +136,7 @@ public class QueueController { checkForNewTasks = false; } else { try { - wait(queue.queue.interval); + wait(req.interval(queue.queue)); } catch (InterruptedException e) { // ignore } @@ -160,6 +174,14 @@ public class QueueController { log.info("Stopping thread for queue {}", queue.queue.name); + for (Runnable runnable : stopListeners) { + try { + runnable.run(); + } catch (Throwable e) { + log.error("Error while running stop listener " + runnable, e); + } + } + shouldRun = false; // TODO: empty out the currently executing tasks. diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java index 3694c06..8edc720 100644 --- a/src/main/java/io/trygvis/async/QueueStats.java +++ b/src/main/java/io/trygvis/async/QueueStats.java @@ -4,12 +4,14 @@ public class QueueStats { public final int totalMessageCount; public final int okMessageCount; public final int failedMessageCount; + public final int missedMessageCount; public final int scheduledMessageCount; - public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int scheduledMessageCount) { + public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) { this.totalMessageCount = totalMessageCount; this.okMessageCount = okMessageCount; this.failedMessageCount = failedMessageCount; + this.missedMessageCount = missedMessageCount; this.scheduledMessageCount = scheduledMessageCount; } @@ -19,6 +21,7 @@ public class QueueStats { "totalMessageCount=" + totalMessageCount + ", okMessageCount=" + okMessageCount + ", failedMessageCount=" + failedMessageCount + + ", missedMessageCount=" + missedMessageCount + ", scheduledMessageCount=" + scheduledMessageCount + '}'; } diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java index 468059d..88e5b46 100644 --- a/src/main/java/io/trygvis/queue/QueueExecutor.java +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -40,9 +40,10 @@ public class QueueExecutor { public int ok; public int failed; public int scheduled; + public int missed; - public QueueStats toStats() { - return new QueueStats(total, ok, failed, scheduled); + public synchronized QueueStats toStats() { + return new QueueStats(total, ok, failed, missed, scheduled); } } @@ -94,6 +95,9 @@ public class QueueExecutor { if (count == 0) { log.warn("Missed task {}", task.id()); + synchronized (stats) { + stats.missed++; + } return MISSED; } diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index f4ce536..1c38f1f 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -12,15 +12,27 @@ public interface QueueService { public static class TaskExecutionRequest { public final long chunkSize; public final boolean stopOnError; + public final Long interval; + public final boolean continueOnFullChunk; // TODO: saveExceptions public TaskExecutionRequest(long chunkSize, boolean stopOnError) { - if (chunkSize <= 0) { - throw new IllegalArgumentException("chunkSize has to be bigger than zero."); - } + this(chunkSize, stopOnError, null, true); + } + private TaskExecutionRequest(long chunkSize, boolean stopOnError, Long interval, boolean continueOnFullChunk) { this.chunkSize = chunkSize; this.stopOnError = stopOnError; + this.interval = interval; + this.continueOnFullChunk = continueOnFullChunk; + } + + public TaskExecutionRequest interval(long interval) { + return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); + } + + public TaskExecutionRequest continueOnFullChunk(boolean continueOnFullChunk) { + return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); } @Override @@ -30,5 +42,13 @@ public interface QueueService { ", stopOnError=" + stopOnError + '}'; } + + public long interval(Queue queue) { + if (interval != null) { + return interval; + } + + return queue.interval; + } } } diff --git a/src/test/java/io/trygvis/test/DbUtil.java b/src/test/java/io/trygvis/test/DbUtil.java index 025442e..33c2807 100644 --- a/src/test/java/io/trygvis/test/DbUtil.java +++ b/src/test/java/io/trygvis/test/DbUtil.java @@ -37,8 +37,8 @@ public class DbUtil { ds.setIdleConnectionTestPeriodInSeconds(60); ds.setIdleMaxAgeInSeconds(240); ds.setMaxConnectionsPerPartition(4); - ds.setMinConnectionsPerPartition(2); - ds.setPartitionCount(2); + ds.setMinConnectionsPerPartition(1); + ds.setPartitionCount(4); ds.setAcquireIncrement(1); ds.setStatementsCacheSize(1000); ds.setReleaseHelperThreads(1); diff --git a/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java new file mode 100644 index 0000000..bd86732 --- /dev/null +++ b/src/test/java/io/trygvis/test/activemq/AsyncConsumerWithActiveMq.java @@ -0,0 +1,37 @@ +package io.trygvis.test.activemq; + +import io.trygvis.activemq.ActiveMqHinter; +import io.trygvis.async.QueueController; +import io.trygvis.queue.QueueService; +import io.trygvis.test.jdbc.AsyncConsumerExample; +import org.apache.activemq.ActiveMQConnectionFactory; + +import javax.jms.JMSException; + +public class AsyncConsumerWithActiveMq extends AsyncConsumerExample implements AutoCloseable { + private final ActiveMqHinter activeMqHinter; + + public AsyncConsumerWithActiveMq(ActiveMQConnectionFactory cf) throws JMSException { + activeMqHinter = new ActiveMqHinter(cf); + } + + public static void main(String[] args) throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:tcp://localhost:61616"); + + int poolSize = 4; + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1, true). + interval(60 * 6000). + continueOnFullChunk(false); + try (AsyncConsumerWithActiveMq consumer = new AsyncConsumerWithActiveMq(cf)) { + consumer.work(poolSize, req); + } + } + + public void close() throws JMSException { + activeMqHinter.close(); + } + + protected void wrapInputQueue(QueueController input) throws Exception { + activeMqHinter.createHinter(input); + } +} diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java index 14db21b..16640dd 100644 --- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -3,12 +3,12 @@ package io.trygvis.test.jdbc; import io.trygvis.async.JdbcAsyncService; import io.trygvis.async.QueueController; import io.trygvis.async.QueueStats; -import io.trygvis.queue.SqlEffect; -import io.trygvis.queue.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; +import io.trygvis.queue.SqlEffect; +import io.trygvis.queue.SqlEffectExecutor; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -43,6 +43,12 @@ public class AsyncConsumerExample { }; public static void main(String[] args) throws Exception { + int poolSize = 4; + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); + new AsyncConsumerExample().work(poolSize, req); + } + + public void work(int poolSize, QueueService.TaskExecutionRequest req) throws Exception { System.out.println("Starting consumer"); DataSource ds = createDataSource(); @@ -66,9 +72,8 @@ public class AsyncConsumerExample { final QueueExecutor input = queues[0]; final QueueExecutor output = queues[1]; - QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true); - QueueController controller = asyncService.registerQueue(input, req, adder); + wrapInputQueue(controller); Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { @@ -80,7 +85,7 @@ public class AsyncConsumerExample { }, 1000, 1000); long start = currentTimeMillis(); - controller.start(new ScheduledThreadPoolExecutor(4)); + controller.start(new ScheduledThreadPoolExecutor(poolSize)); Thread.sleep(60 * 1000); controller.stop(); long end = currentTimeMillis(); @@ -96,4 +101,7 @@ public class AsyncConsumerExample { double rate = 1000 * ((double) stats.totalMessageCount) / duration; System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s"); } + + protected void wrapInputQueue(QueueController input) throws Exception { + } } diff --git a/src/test/resources/logback.xml b/src/test/resources/logback.xml index f6e0f47..f964cd3 100755 --- a/src/test/resources/logback.xml +++ b/src/test/resources/logback.xml @@ -23,7 +23,7 @@ - + -- cgit v1.2.3