From 4b0bab9e722cf77ca0049c54515e8c93acefa355 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Mon, 10 Jun 2013 22:23:13 +0200 Subject: wip --- src/main/java/io/trygvis/async/QueueThread.java | 6 +++-- .../java/io/trygvis/queue/JdbcQueueService.java | 24 ++++++++++++------- src/main/java/io/trygvis/queue/QueueService.java | 12 +++++++++- src/main/java/io/trygvis/queue/QueueStats.java | 20 ++++++++++++++++ src/main/java/io/trygvis/queue/TaskDao.java | 28 +++++++++++++++------- 5 files changed, 70 insertions(+), 20 deletions(-) create mode 100644 src/main/java/io/trygvis/queue/QueueStats.java (limited to 'src/main/java/io') diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 558e769..ea77911 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -56,17 +56,19 @@ class QueueThread implements Runnable { public void run() { while (shouldRun) { try { + TaskExecutionRequest req = new TaskExecutionRequest(100, true); + List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { @Override public List doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { - queueService.executeTask(new TaskExecutionRequest(true), taskEffect, tasks); + queueService.executeTask(req, taskEffect, tasks); } } catch (Throwable e) { log.warn("Error while executing tasks.", e); diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index edd6c80..cb7af4b 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -12,7 +12,6 @@ import java.util.List; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; -import static io.trygvis.queue.Task.TaskState.PROCESSING; public class JdbcQueueService { @@ -27,15 +26,22 @@ public class JdbcQueueService { this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException { - final List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { - @Override - public List doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); - } - }); + public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + log.info("Consuming tasks: request={}", req); + + List tasks; + do { + tasks = sqlEffectExecutor.transaction(new SqlEffect>() { + @Override + public List doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); + } + }); + + log.info("Consuming chunk with {} tasks", tasks.size()); - applyTasks(req, effect, tasks); + applyTasks(req, effect, tasks); + } while (tasks.size() > 0); } public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List tasks) throws SQLException { diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index ee74bf4..d97eaf0 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -12,11 +12,21 @@ public interface QueueService { void schedule(Queue queue, Date scheduled, List arguments) throws SQLException; public static class TaskExecutionRequest { + public final long chunkSize; public final boolean stopOnError; // TODO: saveExceptions - public TaskExecutionRequest(boolean stopOnError) { + public TaskExecutionRequest(long chunkSize, boolean stopOnError) { + this.chunkSize = chunkSize; this.stopOnError = stopOnError; } + + @Override + public String toString() { + return "TaskExecutionRequest{" + + "chunkSize=" + chunkSize + + ", stopOnError=" + stopOnError + + '}'; + } } } diff --git a/src/main/java/io/trygvis/queue/QueueStats.java b/src/main/java/io/trygvis/queue/QueueStats.java new file mode 100644 index 0000000..5b048b3 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueStats.java @@ -0,0 +1,20 @@ +package io.trygvis.queue; + +import java.util.EnumMap; + +public class QueueStats { + public final String name; + public final long totalTaskCount; + public final EnumMap states; + + public QueueStats(String name, EnumMap states) { + this.name = name; + this.states = states; + + long c = 0; + for (Long l : states.values()) { + c += l; + } + this.totalTaskCount = c; + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 9adec8f..5d77a41 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -9,12 +9,10 @@ import java.sql.Types; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.EnumMap; import java.util.List; -import static io.trygvis.queue.Task.TaskState; -import static io.trygvis.queue.Task.TaskState.valueOf; -import static io.trygvis.queue.Task.argumentsToString; -import static io.trygvis.queue.Task.stringToArguments; +import static io.trygvis.queue.Task.*; public class TaskDao { @@ -57,11 +55,12 @@ public class TaskDao { } } - public List findByQueueAndState(String queue, TaskState state) throws SQLException { - try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=?")) { + public List findByQueueAndState(String queue, TaskState state, long limit) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) { int i = 1; stmt.setString(i++, queue); - stmt.setString(i, state.name()); + stmt.setString(i++, state.name()); + stmt.setLong(i, limit); ResultSet rs = stmt.executeQuery(); List list = new ArrayList<>(); while (rs.next()) { @@ -71,6 +70,19 @@ public class TaskDao { } } + public QueueStats findQueueStatsByName(String queue) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) { + int i = 1; + stmt.setString(i, queue); + ResultSet rs = stmt.executeQuery(); + EnumMap states = new EnumMap<>(TaskState.class); + while (rs.next()) { + states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2)); + } + return new QueueStats(queue, states); + } + } + public int update(Task task) throws SQLException { try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { int i = 1; @@ -108,7 +120,7 @@ public class TaskDao { rs.getLong(i++), rs.getLong(i++), rs.getString(i++), - TaskState.valueOf(rs.getString(i++).trim()), + TaskState.valueOf(rs.getString(i++)), rs.getTimestamp(i++), rs.getTimestamp(i++), rs.getInt(i++), -- cgit v1.2.3