diff options
Diffstat (limited to 'src/main/java/io/trygvis')
-rw-r--r-- | src/main/java/io/trygvis/async/QueueThread.java | 6 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/JdbcQueueService.java | 24 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueService.java | 12 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/QueueStats.java | 20 | ||||
-rw-r--r-- | src/main/java/io/trygvis/queue/TaskDao.java | 28 |
5 files changed, 70 insertions, 20 deletions
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 558e769..ea77911 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -56,17 +56,19 @@ class QueueThread implements Runnable { public void run() { while (shouldRun) { try { + TaskExecutionRequest req = new TaskExecutionRequest(100, true); + List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { @Override public List<Task> doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { - queueService.executeTask(new TaskExecutionRequest(true), taskEffect, tasks); + queueService.executeTask(req, taskEffect, tasks); } } catch (Throwable e) { log.warn("Error while executing tasks.", e); diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index edd6c80..cb7af4b 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -12,7 +12,6 @@ import java.util.List; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; -import static io.trygvis.queue.Task.TaskState.PROCESSING; public class JdbcQueueService { @@ -27,15 +26,22 @@ public class JdbcQueueService { this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException { - final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { - @Override - public List<Task> doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); - } - }); + public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + log.info("Consuming tasks: request={}", req); + + List<Task> tasks; + do { + tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() { + @Override + public List<Task> doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); + } + }); + + log.info("Consuming chunk with {} tasks", tasks.size()); - applyTasks(req, effect, tasks); + applyTasks(req, effect, tasks); + } while (tasks.size() > 0); } public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException { diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index ee74bf4..d97eaf0 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -12,11 +12,21 @@ public interface QueueService { void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException; public static class TaskExecutionRequest { + public final long chunkSize; public final boolean stopOnError; // TODO: saveExceptions - public TaskExecutionRequest(boolean stopOnError) { + public TaskExecutionRequest(long chunkSize, boolean stopOnError) { + this.chunkSize = chunkSize; this.stopOnError = stopOnError; } + + @Override + public String toString() { + return "TaskExecutionRequest{" + + "chunkSize=" + chunkSize + + ", stopOnError=" + stopOnError + + '}'; + } } } diff --git a/src/main/java/io/trygvis/queue/QueueStats.java b/src/main/java/io/trygvis/queue/QueueStats.java new file mode 100644 index 0000000..5b048b3 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueStats.java @@ -0,0 +1,20 @@ +package io.trygvis.queue; + +import java.util.EnumMap; + +public class QueueStats { + public final String name; + public final long totalTaskCount; + public final EnumMap<Task.TaskState, Long> states; + + public QueueStats(String name, EnumMap<Task.TaskState, Long> states) { + this.name = name; + this.states = states; + + long c = 0; + for (Long l : states.values()) { + c += l; + } + this.totalTaskCount = c; + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 9adec8f..5d77a41 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -9,12 +9,10 @@ import java.sql.Types; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.EnumMap; import java.util.List; -import static io.trygvis.queue.Task.TaskState; -import static io.trygvis.queue.Task.TaskState.valueOf; -import static io.trygvis.queue.Task.argumentsToString; -import static io.trygvis.queue.Task.stringToArguments; +import static io.trygvis.queue.Task.*; public class TaskDao { @@ -57,11 +55,12 @@ public class TaskDao { } } - public List<Task> findByQueueAndState(String queue, TaskState state) throws SQLException { - try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=?")) { + public List<Task> findByQueueAndState(String queue, TaskState state, long limit) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) { int i = 1; stmt.setString(i++, queue); - stmt.setString(i, state.name()); + stmt.setString(i++, state.name()); + stmt.setLong(i, limit); ResultSet rs = stmt.executeQuery(); List<Task> list = new ArrayList<>(); while (rs.next()) { @@ -71,6 +70,19 @@ public class TaskDao { } } + public QueueStats findQueueStatsByName(String queue) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) { + int i = 1; + stmt.setString(i, queue); + ResultSet rs = stmt.executeQuery(); + EnumMap<TaskState, Long> states = new EnumMap<>(TaskState.class); + while (rs.next()) { + states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2)); + } + return new QueueStats(queue, states); + } + } + public int update(Task task) throws SQLException { try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { int i = 1; @@ -108,7 +120,7 @@ public class TaskDao { rs.getLong(i++), rs.getLong(i++), rs.getString(i++), - TaskState.valueOf(rs.getString(i++).trim()), + TaskState.valueOf(rs.getString(i++)), rs.getTimestamp(i++), rs.getTimestamp(i++), rs.getInt(i++), |