diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/main/java/io/trygvis/async/JdbcAsyncService.java | 6 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/QueueController.java | 46 | ||||
-rw-r--r-- | src/main/java/io/trygvis/async/QueueStats.java | 10 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueExecutor.java | 27 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueService.java | 4 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskDao.java | 17 | ||||
-rw-r--r-- | src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java | 32 |
7 files changed, 113 insertions, 29 deletions
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java index fd4b38b..57ab5c6 100644 --- a/src/main/java/io/trygvis/async/JdbcAsyncService.java +++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java @@ -71,12 +71,10 @@ public class JdbcAsyncService { } public Task update(Connection c, Task ref) throws SQLException { - TaskDao taskDao = queueSystem.createTaskDao(c); - - return taskDao.findById(ref.id()); + return queueSystem.createTaskDao(c).findById(ref.id()); } - private QueueController getQueueThread(String name) { + private synchronized QueueController getQueueThread(String name) { QueueController queueController = queues.get(name); if (queueController == null) { diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java index ea1c42d..863d6a5 100644 --- a/src/main/java/io/trygvis/async/QueueController.java +++ b/src/main/java/io/trygvis/async/QueueController.java @@ -1,6 +1,7 @@ package io.trygvis.async; import io.trygvis.queue.QueueExecutor; +import io.trygvis.queue.QueueService; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; import io.trygvis.queue.TaskEffect; @@ -9,9 +10,15 @@ import org.slf4j.LoggerFactory; import java.sql.Connection; import java.sql.SQLException; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledThreadPoolExecutor; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; @@ -46,6 +53,38 @@ public class QueueController { this.queue = queue; } + public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect effect, final List<Task> tasks, + ScheduledThreadPoolExecutor executor) { + ExecutorCompletionService<TaskExecutionResult> completionService = new ExecutorCompletionService<>(executor); + + List<Future<TaskExecutionResult>> results = new ArrayList<>(tasks.size()); + + for (final Task task : tasks) { + results.add(completionService.submit(new Callable<TaskExecutionResult>() { + @Override + public TaskExecutionResult call() throws Exception { + return queue.applyTask(effect, task); + } + })); + } + + for (Future<TaskExecutionResult> result : results) { + while(!result.isDone()) { + try { + result.get(); + break; + } catch (InterruptedException e) { + if(shouldRun) { + continue; + } + } catch (ExecutionException e) { + log.error("Unexpected exception.", e); + break; + } + } + } + } + private class QueueThread implements Runnable { public void run() { while (shouldRun) { @@ -61,7 +100,7 @@ public class QueueController { log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name); if (tasks.size() > 0) { - queue.executeTasks(req, taskEffect, tasks, executor); + executeTasks(req, taskEffect, tasks, executor); } } catch (Throwable e) { if (shouldRun) { @@ -71,6 +110,7 @@ public class QueueController { // If we found exactly the same number of tasks that we asked for, there is most likely more to go. if (tasks != null && tasks.size() == req.chunkSize) { + log.info("Got a full chunk, continuing directly."); continue; } @@ -101,7 +141,7 @@ public class QueueController { throw new IllegalStateException("Already running"); } - log.info("Starting thread for queue {} with poll interval = {}s", queue.queue.name, queue.queue.interval); + log.info("Starting thread for queue {} with poll interval = {}ms", queue.queue.name, queue.queue.interval); running = true; this.executor = executor; @@ -120,6 +160,8 @@ public class QueueController { shouldRun = false; + // TODO: empty out the currently executing tasks. + thread.interrupt(); while (running) { try { diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java index 7c75149..3694c06 100644 --- a/src/main/java/io/trygvis/async/QueueStats.java +++ b/src/main/java/io/trygvis/async/QueueStats.java @@ -12,4 +12,14 @@ public class QueueStats { this.failedMessageCount = failedMessageCount; this.scheduledMessageCount = scheduledMessageCount; } + + @Override + public String toString() { + return "QueueStats{" + + "totalMessageCount=" + totalMessageCount + + ", okMessageCount=" + okMessageCount + + ", failedMessageCount=" + failedMessageCount + + ", scheduledMessageCount=" + scheduledMessageCount + + '}'; + } } diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java index a1eb3b7..3739532 100644 --- a/src/main/java/io/trygvis/queue/QueueExecutor.java +++ b/src/main/java/io/trygvis/queue/QueueExecutor.java @@ -10,11 +10,8 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED; -import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK; +import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.*; import static io.trygvis.queue.Task.TaskState.NEW; public class QueueExecutor { @@ -51,6 +48,10 @@ public class QueueExecutor { } } + public QueueStats getStats() { + return stats.toStats(); + } + public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException { log.info("Consuming tasks: request={}", req); @@ -69,17 +70,7 @@ public class QueueExecutor { } while (tasks.size() > 0); } - public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List<Task> tasks, - ScheduledThreadPoolExecutor executor) { - executor.execute(new Runnable() { - @Override - public void run() { - applyTasks(req, taskEffect, tasks); - } - }); - } - - private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) { + public void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) { for (Task task : tasks) { TaskExecutionResult result = applyTask(effect, task); @@ -94,17 +85,17 @@ public class QueueExecutor { * <p/> * If the task fails, the status is set to error in a separate transaction. */ - private TaskExecutionResult applyTask(TaskEffect effect, final Task task) { + public TaskExecutionResult applyTask(TaskEffect effect, final Task task) { try { Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() { @Override public Integer doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).update(task.markProcessing()); + return queueSystem.createTaskDao(c).update(task.markProcessing(), NEW); } }); if (count == 0) { - log.trace("Missed task {}", task.id()); + log.warn("Missed task {}", task.id()); return MISSED; } diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index eee14ed..f4ce536 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -15,6 +15,10 @@ public interface QueueService { // TODO: saveExceptions public TaskExecutionRequest(long chunkSize, boolean stopOnError) { + if (chunkSize <= 0) { + throw new IllegalArgumentException("chunkSize has to be bigger than zero."); + } + this.chunkSize = chunkSize; this.stopOnError = stopOnError; } diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 5d77a41..365b44b 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -84,14 +84,27 @@ public class TaskDao { } public int update(Task task) throws SQLException { - try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { + return update(task, null); + } + + public int update(Task task, TaskState state) throws SQLException { + String sql = "UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?"; + + if (state != null) { + sql += " AND state=?"; + } + + try (PreparedStatement stmt = c.prepareStatement(sql)) { int i = 1; stmt.setString(i++, task.state.name()); stmt.setTimestamp(i++, new Timestamp(task.scheduled.getTime())); setTimestamp(stmt, i++, task.lastRun); stmt.setInt(i++, task.runCount); setTimestamp(stmt, i++, task.completed); - stmt.setLong(i, task.id()); + stmt.setLong(i++, task.id()); + if (state != null) { + stmt.setString(i, state.name()); + } return stmt.executeUpdate(); } } diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java index dd478d7..361c2c5 100644 --- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java +++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java @@ -2,6 +2,7 @@ package io.trygvis.test.jdbc; import io.trygvis.async.JdbcAsyncService; import io.trygvis.async.QueueController; +import io.trygvis.async.QueueStats; import io.trygvis.async.SqlEffect; import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; @@ -16,10 +17,13 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.ScheduledThreadPoolExecutor; import static io.trygvis.queue.Task.newTask; import static io.trygvis.test.DbUtil.createDataSource; +import static java.lang.System.currentTimeMillis; import static java.util.Collections.singletonList; public class AsyncConsumerExample { @@ -27,7 +31,7 @@ public class AsyncConsumerExample { private static String inputName = "my-input"; private static String outputName = "my-output"; - private static int interval = 10; + private static int interval = 1000; private static final TaskEffect adder = new TaskEffect() { public List<Task> apply(Task task) throws Exception { @@ -68,8 +72,30 @@ public class AsyncConsumerExample { QueueController controller = asyncService.registerQueue(input, req, adder); - controller.start(new ScheduledThreadPoolExecutor(2)); - Thread.sleep(60 * 1000); + Timer timer = new Timer(); + timer.scheduleAtFixedRate(new TimerTask() { + @Override + public void run() { + System.out.println(input.getStats()); + System.out.println(output.getStats()); + } + }, 1000, 1000); + + long start = currentTimeMillis(); + controller.start(new ScheduledThreadPoolExecutor(1)); + Thread.sleep(5 * 1000); controller.stop(); + long end = currentTimeMillis(); + timer.cancel(); + + QueueStats stats = input.getStats(); + + System.out.println("Summary:"); + System.out.println(stats.toString()); + System.out.println(output.getStats().toString()); + + long duration = end - start; + double rate = 1000 * ((double) stats.totalMessageCount) / duration; + System.out.println("Consumed " + stats.totalMessageCount + " messages in " + duration + "ms at " + rate + " msg/s"); } } |