From 29dc40a0f7fa765d6f66e7a1bdd31083f71286de Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Sat, 22 Jun 2013 09:39:49 +0200 Subject: o Adding an ActiveMQ connection to messages can be sent as a hint to the consumer. --- .../java/io/trygvis/async/JdbcAsyncService.java | 10 ---- .../java/io/trygvis/async/QueueController.java | 70 ++++++++++++++-------- src/main/java/io/trygvis/async/QueueStats.java | 5 +- 3 files changed, 50 insertions(+), 35 deletions(-) (limited to 'src/main/java/io/trygvis/async') diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index 57ab5c6..46f1f30 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -1,14 +1,10 @@ package io.trygvis.async; -import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.QueueExecutor; import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; -import io.trygvis.queue.TaskDao; import io.trygvis.queue.TaskEffect; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; @@ -19,16 +15,12 @@ import static java.lang.System.currentTimeMillis; import static java.lang.Thread.sleep; public class JdbcAsyncService { - private final Logger log = LoggerFactory.getLogger(getClass()); - private final Map queues = new HashMap<>(); private final QueueSystem queueSystem; - private final JdbcQueueService queueService; public JdbcAsyncService(QueueSystem queueSystem) { this.queueSystem = queueSystem; - this.queueService = queueSystem.createQueueService(); } public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) { @@ -40,8 +32,6 @@ public class JdbcAsyncService { queues.put(queue.queue.name, queueController); - log.info("registerQueue: LEAVE"); - return queueController; } diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java index 8aed91f..a343d42 100644 --- a/src/main/java/io/trygvis/async/QueueController.java +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -1,7 +1,6 @@ package io.trygvis.async; import io.trygvis.queue.QueueExecutor; -import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.SqlEffect; import io.trygvis.queue.SqlEffectExecutor; @@ -13,11 +12,10 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; @@ -47,6 +45,8 @@ public class QueueController { private ScheduledThreadPoolExecutor executor; + private List stopListeners = new ArrayList<>(); + public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) { this.queueSystem = queueSystem; this.req = req; @@ -55,36 +55,50 @@ public class QueueController { this.queue = queue; } - public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List tasks, - ScheduledThreadPoolExecutor executor) { - ExecutorCompletionService completionService = new ExecutorCompletionService<>(executor); + public void scheduleAll(final List tasks) throws InterruptedException { + ExecutorCompletionService service = new ExecutorCompletionService<>(executor); - List> results = new ArrayList<>(tasks.size()); + for (final Task task : tasks) { + service.submit(new Callable() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(taskEffect, task); + } + }); + } + } + public void invokeAll(final List tasks) throws InterruptedException { + List> callables = new ArrayList<>(tasks.size()); for (final Task task : tasks) { - results.add(completionService.submit(new Callable() { + callables.add(new Callable() { @Override public TaskExecutionResult call() throws Exception { - return queue.applyTask(effect, task); + return queue.applyTask(taskEffect, task); } - })); + }); } - for (Future result : results) { - while(!result.isDone()) { + executor.invokeAll(callables); + } + + public void hint(final long id) throws SQLException { + sqlEffectExecutor.transaction(new SqlEffect.Void() { + @Override + public void doInConnection(Connection c) throws SQLException { + Task task = queueSystem.createTaskDao(c).findById(id); + try { - result.get(); - break; + scheduleAll(Collections.singletonList(task)); } catch (InterruptedException e) { - if(shouldRun) { - continue; - } - } catch (ExecutionException e) { - log.error("Unexpected exception.", e); - break; + throw new SQLException(e); } } - } + }); + } + + public synchronized void addOnStopListener(Runnable runnable) { + stopListeners.add(runnable); } private class QueueThread implements Runnable { @@ -102,7 +116,7 @@ public class QueueController { log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); if (tasks.size() > 0) { - executeTasks(req, taskEffect, tasks, executor); + invokeAll(tasks); } } catch (Throwable e) { if (shouldRun) { @@ -111,7 +125,7 @@ public class QueueController { } // If we found exactly the same number of tasks that we asked for, there is most likely more to go. - if (tasks != null && tasks.size() == req.chunkSize) { + if (req.continueOnFullChunk && tasks != null && tasks.size() == req.chunkSize) { log.info("Got a full chunk, continuing directly."); continue; } @@ -122,7 +136,7 @@ public class QueueController { checkForNewTasks = false; } else { try { - wait(queue.queue.interval); + wait(req.interval(queue.queue)); } catch (InterruptedException e) { // ignore } @@ -160,6 +174,14 @@ public class QueueController { log.info("Stopping thread for queue {}", queue.queue.name); + for (Runnable runnable : stopListeners) { + try { + runnable.run(); + } catch (Throwable e) { + log.error("Error while running stop listener " + runnable, e); + } + } + shouldRun = false; // TODO: empty out the currently executing tasks. diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java index 3694c06..8edc720 100644 --- a/src/main/java/io/trygvis/async/QueueStats.java +++ b/src/main/java/io/trygvis/async/QueueStats.java @@ -4,12 +4,14 @@ public class QueueStats { public final int totalMessageCount; public final int okMessageCount; public final int failedMessageCount; + public final int missedMessageCount; public final int scheduledMessageCount; - public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int scheduledMessageCount) { + public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int missedMessageCount, int scheduledMessageCount) { this.totalMessageCount = totalMessageCount; this.okMessageCount = okMessageCount; this.failedMessageCount = failedMessageCount; + this.missedMessageCount = missedMessageCount; this.scheduledMessageCount = scheduledMessageCount; } @@ -19,6 +21,7 @@ public class QueueStats { "totalMessageCount=" + totalMessageCount + ", okMessageCount=" + okMessageCount + ", failedMessageCount=" + failedMessageCount + + ", missedMessageCount=" + missedMessageCount + ", scheduledMessageCount=" + scheduledMessageCount + '}'; } -- cgit v1.2.3