diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-22 09:39:49 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-22 10:16:37 +0200 |
commit | 29dc40a0f7fa765d6f66e7a1bdd31083f71286de (patch) | |
tree | 9270bc21dc8d7982a9b761b40261db9b7bd4a41c /src/main | |
parent | 49c70dd5bdafe3461c03a4ce45ec7e78a1a479a5 (diff) | |
download | quartz-based-queue-master.tar.gz quartz-based-queue-master.tar.bz2 quartz-based-queue-master.tar.xz quartz-based-queue-master.zip |
Diffstat (limited to 'src/main')
-rw-r--r-- | src/main/java/io/trygvis/activemq/ActiveMqHinter.java | 152 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/JdbcAsyncService.java | 10 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/QueueController.java | 70 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/QueueStats.java | 5 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueExecutor.java | 8 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueService.java | 26 |
6 files changed, 231 insertions, 40 deletions
diff --git a/src/main/java/io/trygvis/activemq/ActiveMqHinter.java b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java new file mode 100644 index 0000000..f2cfb6e --- /dev/null +++ b/src/main/java/io/trygvis/activemq/ActiveMqHinter.java @@ -0,0 +1,152 @@ +package io.trygvis.activemq; + +import io.trygvis.async.QueueController; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueSession; +import javax.jms.TextMessage; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.Reader; +import java.io.StringReader; +import java.nio.charset.Charset; +import java.sql.SQLException; + +import static java.lang.Long.parseLong; +import static java.lang.System.arraycopy; +import static java.nio.charset.Charset.forName; +import static javax.jms.Session.AUTO_ACKNOWLEDGE; + +public class ActiveMqHinter implements AutoCloseable { + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final QueueConnection c; + + private static final Charset utf8 = forName("utf-8"); + + public ActiveMqHinter(ActiveMQConnectionFactory connectionFactory) throws JMSException { + log.info("Connecting to ActiveMQ Broker at {}", connectionFactory.getBrokerURL()); + c = connectionFactory.createQueueConnection(); + c.start(); + log.info("Connected, clientId = {}", c.getClientID()); + } + + public void createHinter(final QueueController controller) throws JMSException { + QueueSession session = c.createQueueSession(false, AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(controller.queue.queue.name); + final MessageConsumer consumer = session.createConsumer(queue); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + if ((message instanceof TextMessage)) { + String body; + try { + TextMessage textMessage = (TextMessage) message; + body = textMessage.getText(); + } catch (JMSException e) { + log.warn("Exception while reading body.", e); + throw new RuntimeException("Exception while reading body.", e); + } + + consumeString(new StringReader(body), controller); + } else if (message instanceof BytesMessage) { + final BytesMessage bytesMessage = (BytesMessage) message; + consumeString(new InputStreamReader(new ByteMessageInputStream(bytesMessage), utf8), controller); + } else { + throw new RuntimeException("Unsupported message type: " + message.getClass()); + } + } + }); + + controller.addOnStopListener(new Runnable() { + @Override + public void run() { + try { + consumer.close(); + } catch (JMSException e) { + log.error("Error while closing JMS consumer", e); + } + } + }); + } + + private void consumeString(Reader reader, QueueController controller) { + try { + BufferedReader r = new BufferedReader(reader); + + String line = r.readLine(); + + while (line != null) { + for (String id : line.split(",")) { + controller.hint(parseLong(id.trim())); + } + line = r.readLine(); + } + } catch (IOException | SQLException e) { + log.warn("Could not consume body.", e); + throw new RuntimeException("Could not consume body.", e); + } catch (NumberFormatException e) { + log.warn("Could not consume body.", e); + throw e; + } + } + + public void close() throws JMSException { + c.close(); + } + + private static class ByteMessageInputStream extends InputStream { + private final BytesMessage bytesMessage; + + public ByteMessageInputStream(BytesMessage bytesMessage) { + this.bytesMessage = bytesMessage; + } + + @Override + public int read(byte[] b) throws IOException { + try { + return bytesMessage.readBytes(b); + } catch (JMSException e) { + throw new IOException(e); + } + } + + @Override + public int read(byte[] out, int off, int len) throws IOException { + byte[] b = new byte[len]; + try { + int read = bytesMessage.readBytes(b); + if (read == -1) { + return -1; + } + arraycopy(b, 0, out, off, read); + return read; + } catch (JMSException e) { + throw new IOException(e); + } + } + + @Override + public int read() throws IOException { + try { + return bytesMessage.readByte(); + } catch (javax.jms.MessageEOFException e) { + return -1; + } catch (JMSException e) { + throw new IOException(e); + } + } + } +} diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 57ab5c6..46f1f30 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,14 +1,10 @@ package io.trygvis.async; -import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; -import io.trygvis.queue.TaskDao; import io.trygvis.queue.TaskEffect; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; @@ -19,16 +15,12 @@ import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; public class JdbcAsyncService { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final Map<String, QueueController> queues = new HashMap<>(); private final QueueSystem queueSystem; - private final JdbcQueueService queueService; public JdbcAsyncService(QueueSystem queueSystem) { this.queueSystem = queueSystem; - this.queueService = queueSystem.createQueueService(); } public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { @@ -40,8 +32,6 @@ public class JdbcAsyncService { queues.put(queue.queue.name, queueController); - log.info("registerQueue: LEAVE"); - return queueController; } diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java index 8aed91f..a343d42 100644 --- a/src/main/java/io/trygvis/async/QueueController.java +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -1,7 +1,6 @@ package io.trygvis.async; import io.trygvis.queue.QueueExecutor; -import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.SqlEffect; import io.trygvis.queue.SqlEffectExecutor; @@ -13,11 +12,10 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; @@ -47,6 +45,8 @@ public class QueueController { private ScheduledThreadPoolExecutor executor; + private List<Runnable> stopListeners = new ArrayList<>(); + public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { this.queueSystem = queueSystem; this.req = req; @@ -55,36 +55,50 @@ public class QueueController { this.queue = queue; } - public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List<Task> tasks, - ScheduledThreadPoolExecutor executor) { - ExecutorCompletionService<TaskExecutionResult> completionService = new ExecutorCompletionService<>(executor); + public void scheduleAll(final List<Task> tasks) throws InterruptedException { + ExecutorCompletionService<TaskExecutionResult> service = new ExecutorCompletionService<>(executor); - List<Future<TaskExecutionResult>> results = new ArrayList<>(tasks.size()); + for (final Task task : tasks) { + service.submit(new Callable<TaskExecutionResult>() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(taskEffect, task); + } + }); + } + } + public void invokeAll(final List<Task> tasks) throws InterruptedException { + List<Callable<TaskExecutionResult>> callables = new ArrayList<>(tasks.size()); for (final Task task : tasks) { - results.add(completionService.submit(new Callable<TaskExecutionResult>() { + callables.add(new Callable<TaskExecutionResult>() { @Override public TaskExecutionResult call() throws Exception { - return queue.applyTask(effect, task); + return queue.applyTask(taskEffect, task); } - })); + }); } - for (Future<TaskExecutionResult> result : results) { - while(!result.isDone()) { + executor.invokeAll(callables); + } + + public void hint(final long id) throws SQLException { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + Task task = queueSystem.createTaskDao(c).findById(id); + try { - result.get(); - break; + scheduleAll(Collections.singletonList(task)); } catch (InterruptedException e) { - if(shouldRun) { - continue; - } - } catch (ExecutionException e) { - log.error("Unexpected exception.", e); - break; + throw new SQLException(e); } } - } + }); + } + + public synchronized void addOnStopListener(Runnable runnable) { + stopListeners.add(runnable); } private class QueueThread implements Runnable { @@ -102,7 +116,7 @@ public class QueueController { log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); if (tasks.size() > 0) { - executeTasks(req, taskEffect, tasks, executor); + invokeAll(tasks); } } catch (Throwable e) { if (shouldRun) { @@ -111,7 +125,7 @@ public class QueueController { } // If we found exactly the same number of tasks that we asked for, there is most likely more to go. - if (tasks != null && tasks.size() == req.chunkSize) { + if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) { log.info("Got a full chunk, continuing directly."); continue; } @@ -122,7 +136,7 @@ public class QueueController { checkForNewTasks = false; } else { try { - wait(queue.queue.interval); + wait(req.interval(queue.queue)); } catch (InterruptedException e) { // ignore } @@ -160,6 +174,14 @@ public class QueueController { log.info("Stopping thread for queue {}", queue.queue.name); + for (Runnable runnable : stopListeners) { + try { + runnable.run(); + } catch (Throwable e) { + log.error("Error while running stop listener " + runnable, e); + } + } + shouldRun = false; // TODO: empty out the currently executing tasks. diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java index 3694c06..8edc720 100644 --- a/src/main/java/io/trygvis/async/QueueStats.java +++ b/src/main/java/io/trygvis/async/QueueStats.java @@ -4,12 +4,14 @@ public class QueueStats { public final int totalMessageCount; public final int okMessageCount; public final int failedMessageCount; + public final int missedMessageCount; public final int scheduledMessageCount; - public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int scheduledMessageCount) { + public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) { this.totalMessageCount = totalMessageCount; this.okMessageCount = okMessageCount; this.failedMessageCount = failedMessageCount; + this.missedMessageCount = missedMessageCount; this.scheduledMessageCount = scheduledMessageCount; } @@ -19,6 +21,7 @@ public class QueueStats { "totalMessageCount=" + totalMessageCount + ", okMessageCount=" + okMessageCount + ", failedMessageCount=" + failedMessageCount + + ", missedMessageCount=" + missedMessageCount + ", scheduledMessageCount=" + scheduledMessageCount + '}'; } diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java index 468059d..88e5b46 100644 --- a/src/main/java/io/trygvis/queue/QueueExecutor.java +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -40,9 +40,10 @@ public class QueueExecutor { public int ok; public int failed; public int scheduled; + public int missed; - public QueueStats toStats() { - return new QueueStats(total, ok, failed, scheduled); + public synchronized QueueStats toStats() { + return new QueueStats(total, ok, failed, missed, scheduled); } } @@ -94,6 +95,9 @@ public class QueueExecutor { if (count == 0) { log.warn("Missed task {}", task.id()); + synchronized (stats) { + stats.missed++; + } return MISSED; } diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index f4ce536..1c38f1f 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -12,15 +12,27 @@ public interface QueueService { public static class TaskExecutionRequest { public final long chunkSize; public final boolean stopOnError; + public final Long interval; + public final boolean continueOnFullChunk; // TODO: saveExceptions public TaskExecutionRequest(long chunkSize, boolean stopOnError) { - if (chunkSize <= 0) { - throw new IllegalArgumentException("chunkSize has to be bigger than zero."); - } + this(chunkSize, stopOnError, null, true); + } + private TaskExecutionRequest(long chunkSize, boolean stopOnError, Long interval, boolean continueOnFullChunk) { this.chunkSize = chunkSize; this.stopOnError = stopOnError; + this.interval = interval; + this.continueOnFullChunk = continueOnFullChunk; + } + + public TaskExecutionRequest interval(long interval) { + return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); + } + + public TaskExecutionRequest continueOnFullChunk(boolean continueOnFullChunk) { + return new TaskExecutionRequest(chunkSize, stopOnError, interval, continueOnFullChunk); } @Override @@ -30,5 +42,13 @@ public interface QueueService { ", stopOnError=" + stopOnError + '}'; } + + public long interval(Queue queue) { + if (interval != null) { + return interval; + } + + return queue.interval; + } } } |