From 4b0bab9e722cf77ca0049c54515e8c93acefa355 Mon Sep 17 00:00:00 2001 From: Trygve Laugstøl Date: Mon, 10 Jun 2013 22:23:13 +0200 Subject: wip --- src/main/java/io/trygvis/async/QueueThread.java | 6 ++-- .../java/io/trygvis/queue/JdbcQueueService.java | 24 +++++++++------ src/main/java/io/trygvis/queue/QueueService.java | 12 +++++++- src/main/java/io/trygvis/queue/QueueStats.java | 20 ++++++++++++ src/main/java/io/trygvis/queue/TaskDao.java | 28 ++++++++++++----- src/main/resources/create-postgresql.sql | 2 +- .../java/io/trygvis/test/PlainJavaExample.java | 36 +++++++++++++++------- 7 files changed, 96 insertions(+), 32 deletions(-) create mode 100644 src/main/java/io/trygvis/queue/QueueStats.java (limited to 'src') diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java index 558e769..ea77911 100644 --- a/src/main/java/io/trygvis/async/QueueThread.java +++ b/src/main/java/io/trygvis/async/QueueThread.java @@ -56,17 +56,19 @@ class QueueThread implements Runnable { public void run() { while (shouldRun) { try { + TaskExecutionRequest req = new TaskExecutionRequest(100, true); + List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { @Override public List doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100); } }); log.info("Found {} tasks on queue {}", tasks.size(), queue.name); if (tasks.size() > 0) { - queueService.executeTask(new TaskExecutionRequest(true), taskEffect, tasks); + queueService.executeTask(req, taskEffect, tasks); } } catch (Throwable e) { log.warn("Error while executing tasks.", e); diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java index edd6c80..cb7af4b 100644 --- a/src/main/java/io/trygvis/queue/JdbcQueueService.java +++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java @@ -12,7 +12,6 @@ import java.util.List; import static io.trygvis.queue.QueueService.TaskExecutionRequest; import static io.trygvis.queue.Task.TaskState.NEW; -import static io.trygvis.queue.Task.TaskState.PROCESSING; public class JdbcQueueService { @@ -27,15 +26,22 @@ public class JdbcQueueService { this.sqlEffectExecutor = queueSystem.sqlEffectExecutor; } - public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException { - final List tasks = sqlEffectExecutor.transaction(new SqlEffect>() { - @Override - public List doInConnection(Connection c) throws SQLException { - return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW); - } - }); + public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException { + log.info("Consuming tasks: request={}", req); + + List tasks; + do { + tasks = sqlEffectExecutor.transaction(new SqlEffect>() { + @Override + public List doInConnection(Connection c) throws SQLException { + return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize); + } + }); + + log.info("Consuming chunk with {} tasks", tasks.size()); - applyTasks(req, effect, tasks); + applyTasks(req, effect, tasks); + } while (tasks.size() > 0); } public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List tasks) throws SQLException { diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java index ee74bf4..d97eaf0 100644 --- a/src/main/java/io/trygvis/queue/QueueService.java +++ b/src/main/java/io/trygvis/queue/QueueService.java @@ -12,11 +12,21 @@ public interface QueueService { void schedule(Queue queue, Date scheduled, List arguments) throws SQLException; public static class TaskExecutionRequest { + public final long chunkSize; public final boolean stopOnError; // TODO: saveExceptions - public TaskExecutionRequest(boolean stopOnError) { + public TaskExecutionRequest(long chunkSize, boolean stopOnError) { + this.chunkSize = chunkSize; this.stopOnError = stopOnError; } + + @Override + public String toString() { + return "TaskExecutionRequest{" + + "chunkSize=" + chunkSize + + ", stopOnError=" + stopOnError + + '}'; + } } } diff --git a/src/main/java/io/trygvis/queue/QueueStats.java b/src/main/java/io/trygvis/queue/QueueStats.java new file mode 100644 index 0000000..5b048b3 --- /dev/null +++ b/src/main/java/io/trygvis/queue/QueueStats.java @@ -0,0 +1,20 @@ +package io.trygvis.queue; + +import java.util.EnumMap; + +public class QueueStats { + public final String name; + public final long totalTaskCount; + public final EnumMap states; + + public QueueStats(String name, EnumMap states) { + this.name = name; + this.states = states; + + long c = 0; + for (Long l : states.values()) { + c += l; + } + this.totalTaskCount = c; + } +} diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java index 9adec8f..5d77a41 100644 --- a/src/main/java/io/trygvis/queue/TaskDao.java +++ b/src/main/java/io/trygvis/queue/TaskDao.java @@ -9,12 +9,10 @@ import java.sql.Types; import java.util.ArrayList; import java.util.Collections; import java.util.Date; +import java.util.EnumMap; import java.util.List; -import static io.trygvis.queue.Task.TaskState; -import static io.trygvis.queue.Task.TaskState.valueOf; -import static io.trygvis.queue.Task.argumentsToString; -import static io.trygvis.queue.Task.stringToArguments; +import static io.trygvis.queue.Task.*; public class TaskDao { @@ -57,11 +55,12 @@ public class TaskDao { } } - public List findByQueueAndState(String queue, TaskState state) throws SQLException { - try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=?")) { + public List findByQueueAndState(String queue, TaskState state, long limit) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) { int i = 1; stmt.setString(i++, queue); - stmt.setString(i, state.name()); + stmt.setString(i++, state.name()); + stmt.setLong(i, limit); ResultSet rs = stmt.executeQuery(); List list = new ArrayList<>(); while (rs.next()) { @@ -71,6 +70,19 @@ public class TaskDao { } } + public QueueStats findQueueStatsByName(String queue) throws SQLException { + try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) { + int i = 1; + stmt.setString(i, queue); + ResultSet rs = stmt.executeQuery(); + EnumMap states = new EnumMap<>(TaskState.class); + while (rs.next()) { + states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2)); + } + return new QueueStats(queue, states); + } + } + public int update(Task task) throws SQLException { try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) { int i = 1; @@ -108,7 +120,7 @@ public class TaskDao { rs.getLong(i++), rs.getLong(i++), rs.getString(i++), - TaskState.valueOf(rs.getString(i++).trim()), + TaskState.valueOf(rs.getString(i++)), rs.getTimestamp(i++), rs.getTimestamp(i++), rs.getInt(i++), diff --git a/src/main/resources/create-postgresql.sql b/src/main/resources/create-postgresql.sql index 7c331fd..a0739c2 100644 --- a/src/main/resources/create-postgresql.sql +++ b/src/main/resources/create-postgresql.sql @@ -14,7 +14,7 @@ CREATE TABLE task ( id BIGINT NOT NULL, parent BIGINT, queue VARCHAR(100) NOT NULL, - state CHAR(10) NOT NULL, + state VARCHAR(10) NOT NULL, scheduled TIMESTAMP NOT NULL, last_run TIMESTAMP, run_count INT NOT NULL, diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java index 788d8a0..cad8559 100644 --- a/src/test/java/io/trygvis/test/PlainJavaExample.java +++ b/src/test/java/io/trygvis/test/PlainJavaExample.java @@ -5,8 +5,10 @@ import io.trygvis.async.SqlEffectExecutor; import io.trygvis.queue.JdbcQueueService; import io.trygvis.queue.Queue; import io.trygvis.queue.QueueService; +import io.trygvis.queue.QueueStats; import io.trygvis.queue.QueueSystem; import io.trygvis.queue.Task; +import io.trygvis.queue.TaskDao; import io.trygvis.queue.TaskEffect; import javax.sql.DataSource; @@ -14,8 +16,10 @@ import java.sql.Connection; import java.sql.SQLException; import java.util.Date; import java.util.List; +import java.util.Map; import java.util.Random; +import static io.trygvis.queue.Task.TaskState; import static io.trygvis.test.DbUtil.createDataSource; import static java.lang.System.currentTimeMillis; import static java.util.Arrays.asList; @@ -37,32 +41,41 @@ public class PlainJavaExample { SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); - QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); + final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); final JdbcQueueService queueService = queueSystem.queueService; Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect() { @Override public Queue[] doInConnection(Connection c) throws SQLException { - return new Queue[]{ + Queue[] queues = { queueService.lookupQueue(c, inputName, interval, true), queueService.lookupQueue(c, outputName, interval, true)}; + + TaskDao taskDao = queueSystem.createTaskDao(c); + + QueueStats stats = taskDao.findQueueStatsByName(inputName); + System.out.println("Queue stats for " + stats.name + ". Total number of tasks: " + stats.totalTaskCount); + for (Map.Entry entry : stats.states.entrySet()) { + System.out.println(entry.getKey() + " = " + entry.getValue()); + } + + return queues; } }); final Queue input = queues[0]; final Queue output = queues[1]; - QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(false); + QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false); queueService.consumeAll(input, req, new TaskEffect() { public List apply(Task task) throws Exception { - System.out.println("PlainJavaExample$Consumer.consumeAll: arguments = " + task.arguments); Long a = Long.valueOf(task.arguments.get(0)); Long b = Long.valueOf(task.arguments.get(1)); System.out.println("a + b = " + a + " + " + b + " = " + (a + b)); - if (r.nextInt(3) == 0) { + if (r.nextInt(3000) > 0) { return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b))); } @@ -77,18 +90,21 @@ public class PlainJavaExample { public static void main(String[] args) throws Exception { System.out.println("Starting producer"); - int chunks = 10; - final int chunk = 2000; + int chunks = 100; + final int chunk = 10000; DataSource ds = createDataSource(); - Connection c = ds.getConnection(); SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds); QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor); final JdbcQueueService queueService = queueSystem.queueService; - final Queue queue = queueService.lookupQueue(c, inputName, interval, true); + final Queue queue; + try (Connection c = ds.getConnection()) { + queue = queueService.lookupQueue(c, inputName, interval, true); + c.commit(); + } for (int i = 0; i < chunks; i++) { long start = currentTimeMillis(); @@ -105,8 +121,6 @@ public class PlainJavaExample { long time = end - start; System.out.println("Scheduled " + chunk + " tasks in " + time + "ms, " + (((double) chunk * 1000)) / ((double) time) + " chunks per second"); } - - c.commit(); } } } -- cgit v1.2.3