diff options
author | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-16 21:26:17 +0200 |
---|---|---|
committer | Trygve Laugstøl <trygvis@inamo.no> | 2013-06-16 21:26:17 +0200 |
commit | e57f313713ee0fc52d7dd1247c51914d1462dfc2 (patch) | |
tree | f39a49b967f6f18d0637e61f2dd4d8081615283b /src/main/java/io/trygvis/async | |
parent | 1ec4fae12c5e5363591013e5a759590d913d6782 (diff) | |
download | quartz-based-queue-e57f313713ee0fc52d7dd1247c51914d1462dfc2.tar.gz quartz-based-queue-e57f313713ee0fc52d7dd1247c51914d1462dfc2.tar.bz2 quartz-based-queue-e57f313713ee0fc52d7dd1247c51914d1462dfc2.tar.xz quartz-based-queue-e57f313713ee0fc52d7dd1247c51914d1462dfc2.zip |
wip
Diffstat (limited to 'src/main/java/io/trygvis/async')
-rw-r--r-- | src/main/java/io/trygvis/async/JdbcAsyncService.java | 6 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/QueueController.java | 46 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/QueueStats.java | 10 |
3 files changed, 56 insertions, 6 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index fd4b38b..57ab5c6 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -71,12 +71,10 @@ public class JdbcAsyncService { } public Task update(Connection c, Task ref) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - - return taskDao.findById(ref.id()); + return queueSystem.createTaskDao(c).findById(ref.id()); } - private QueueController getQueueThread(String name) { + private synchronized QueueController getQueueThread(String name) { QueueController queueController = queues.get(name); if (queueController == null) { diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java index ea1c42d..863d6a5 100644 --- a/src/main/java/io/trygvis/async/QueueController.java +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -1,6 +1,7 @@ package io.trygvis.async; import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -9,9 +10,15 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; @@ -46,6 +53,38 @@ public class QueueController { this.queue = queue; } + public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List<Task> tasks, + ScheduledThreadPoolExecutor executor) { + ExecutorCompletionService<TaskExecutionResult> completionService = new ExecutorCompletionService<>(executor); + + List<Future<TaskExecutionResult>> results = new ArrayList<>(tasks.size()); + + for (final Task task : tasks) { + results.add(completionService.submit(new Callable<TaskExecutionResult>() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(effect, task); + } + })); + } + + for (Future<TaskExecutionResult> result : results) { + while(!result.isDone()) { + try { + result.get(); + break; + } catch (InterruptedException e) { + if(shouldRun) { + continue; + } + } catch (ExecutionException e) { + log.error("Unexpected exception.", e); + break; + } + } + } + } + private class QueueThread implements Runnable { public void run() { while (shouldRun) { @@ -61,7 +100,7 @@ public class QueueController { log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); if (tasks.size() > 0) { - queue.executeTasks(req, taskEffect, tasks, executor); + executeTasks(req, taskEffect, tasks, executor); } } catch (Throwable e) { if (shouldRun) { @@ -71,6 +110,7 @@ public class QueueController { // If we found exactly the same number of tasks that we asked for, there is most likely more to go. if (tasks != null && tasks.size() == req.chunkSize) { + log.info("Got a full chunk, continuing directly."); continue; } @@ -101,7 +141,7 @@ public class QueueController { throw new IllegalStateException("Already running"); } - log.info("Starting thread for queue {} with poll interval = {}s", queue.queue.name, queue.queue.interval); + log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval); running = true; this.executor = executor; @@ -120,6 +160,8 @@ public class QueueController { shouldRun = false; + // TODO: empty out the currently executing tasks. + thread.interrupt(); while (running) { try { diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java index 7c75149..3694c06 100644 --- a/src/main/java/io/trygvis/async/QueueStats.java +++ b/src/main/java/io/trygvis/async/QueueStats.java @@ -12,4 +12,14 @@ public class QueueStats { this.failedMessageCount = failedMessageCount; this.scheduledMessageCount = scheduledMessageCount; } + + @Override + public String toString() { + return "QueueStats{" + + "totalMessageCount=" + totalMessageCount + + ", okMessageCount=" + okMessageCount + + ", failedMessageCount=" + failedMessageCount + + ", scheduledMessageCount=" + scheduledMessageCount + + '}'; + } } |