aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-10 22:23:13 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-10 22:23:13 +0200
commit4b0bab9e722cf77ca0049c54515e8c93acefa355 (patch)
treee9d74a7cf999e48a9f143027784ef4e8e0c02c0a
parentabb0b2aaf4ee5e6f147987401c9b059e5a7679d2 (diff)
downloadquartz-based-queue-4b0bab9e722cf77ca0049c54515e8c93acefa355.tar.gz
quartz-based-queue-4b0bab9e722cf77ca0049c54515e8c93acefa355.tar.bz2
quartz-based-queue-4b0bab9e722cf77ca0049c54515e8c93acefa355.tar.xz
quartz-based-queue-4b0bab9e722cf77ca0049c54515e8c93acefa355.zip
wip
-rw-r--r--README.md0
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java6
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java24
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java12
-rw-r--r--src/main/java/io/trygvis/queue/QueueStats.java20
-rw-r--r--src/main/java/io/trygvis/queue/TaskDao.java28
-rw-r--r--src/main/resources/create-postgresql.sql2
-rw-r--r--src/test/java/io/trygvis/test/PlainJavaExample.java36
8 files changed, 96 insertions, 32 deletions
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/README.md
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
index 558e769..ea77911 100644
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ b/src/main/java/io/trygvis/async/QueueThread.java
@@ -56,17 +56,19 @@ class QueueThread implements Runnable {
public void run() {
while (shouldRun) {
try {
+ TaskExecutionRequest req = new TaskExecutionRequest(100, true);
+
List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
@Override
public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW);
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100);
}
});
log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
if (tasks.size() > 0) {
- queueService.executeTask(new TaskExecutionRequest(true), taskEffect, tasks);
+ queueService.executeTask(req, taskEffect, tasks);
}
} catch (Throwable e) {
log.warn("Error while executing tasks.", e);
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index edd6c80..cb7af4b 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -12,7 +12,6 @@ import java.util.List;
import static io.trygvis.queue.QueueService.TaskExecutionRequest;
import static io.trygvis.queue.Task.TaskState.NEW;
-import static io.trygvis.queue.Task.TaskState.PROCESSING;
public class JdbcQueueService {
@@ -27,15 +26,22 @@ public class JdbcQueueService {
this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consumeAll(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
- final List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
- @Override
- public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW);
- }
- });
+ public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
+ log.info("Consuming tasks: request={}", req);
+
+ List<Task> tasks;
+ do {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Consuming chunk with {} tasks", tasks.size());
- applyTasks(req, effect, tasks);
+ applyTasks(req, effect, tasks);
+ } while (tasks.size() > 0);
}
public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException {
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
index ee74bf4..d97eaf0 100644
--- a/src/main/java/io/trygvis/queue/QueueService.java
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -12,11 +12,21 @@ public interface QueueService {
void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException;
public static class TaskExecutionRequest {
+ public final long chunkSize;
public final boolean stopOnError;
// TODO: saveExceptions
- public TaskExecutionRequest(boolean stopOnError) {
+ public TaskExecutionRequest(long chunkSize, boolean stopOnError) {
+ this.chunkSize = chunkSize;
this.stopOnError = stopOnError;
}
+
+ @Override
+ public String toString() {
+ return "TaskExecutionRequest{" +
+ "chunkSize=" + chunkSize +
+ ", stopOnError=" + stopOnError +
+ '}';
+ }
}
}
diff --git a/src/main/java/io/trygvis/queue/QueueStats.java b/src/main/java/io/trygvis/queue/QueueStats.java
new file mode 100644
index 0000000..5b048b3
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueStats.java
@@ -0,0 +1,20 @@
+package io.trygvis.queue;
+
+import java.util.EnumMap;
+
+public class QueueStats {
+ public final String name;
+ public final long totalTaskCount;
+ public final EnumMap<Task.TaskState, Long> states;
+
+ public QueueStats(String name, EnumMap<Task.TaskState, Long> states) {
+ this.name = name;
+ this.states = states;
+
+ long c = 0;
+ for (Long l : states.values()) {
+ c += l;
+ }
+ this.totalTaskCount = c;
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/TaskDao.java b/src/main/java/io/trygvis/queue/TaskDao.java
index 9adec8f..5d77a41 100644
--- a/src/main/java/io/trygvis/queue/TaskDao.java
+++ b/src/main/java/io/trygvis/queue/TaskDao.java
@@ -9,12 +9,10 @@ import java.sql.Types;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
+import java.util.EnumMap;
import java.util.List;
-import static io.trygvis.queue.Task.TaskState;
-import static io.trygvis.queue.Task.TaskState.valueOf;
-import static io.trygvis.queue.Task.argumentsToString;
-import static io.trygvis.queue.Task.stringToArguments;
+import static io.trygvis.queue.Task.*;
public class TaskDao {
@@ -57,11 +55,12 @@ public class TaskDao {
}
}
- public List<Task> findByQueueAndState(String queue, TaskState state) throws SQLException {
- try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=?")) {
+ public List<Task> findByQueueAndState(String queue, TaskState state, long limit) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT " + fields + " FROM task WHERE queue=? AND state=? LIMIT ?")) {
int i = 1;
stmt.setString(i++, queue);
- stmt.setString(i, state.name());
+ stmt.setString(i++, state.name());
+ stmt.setLong(i, limit);
ResultSet rs = stmt.executeQuery();
List<Task> list = new ArrayList<>();
while (rs.next()) {
@@ -71,6 +70,19 @@ public class TaskDao {
}
}
+ public QueueStats findQueueStatsByName(String queue) throws SQLException {
+ try (PreparedStatement stmt = c.prepareStatement("SELECT state, COUNT(id) FROM task WHERE queue=? GROUP BY state")) {
+ int i = 1;
+ stmt.setString(i, queue);
+ ResultSet rs = stmt.executeQuery();
+ EnumMap<TaskState, Long> states = new EnumMap<>(TaskState.class);
+ while (rs.next()) {
+ states.put(TaskState.valueOf(rs.getString(1)), rs.getLong(2));
+ }
+ return new QueueStats(queue, states);
+ }
+ }
+
public int update(Task task) throws SQLException {
try (PreparedStatement stmt = c.prepareStatement("UPDATE task SET state=?, scheduled=?, last_run=?, run_count=?, completed=? WHERE id=?")) {
int i = 1;
@@ -108,7 +120,7 @@ public class TaskDao {
rs.getLong(i++),
rs.getLong(i++),
rs.getString(i++),
- TaskState.valueOf(rs.getString(i++).trim()),
+ TaskState.valueOf(rs.getString(i++)),
rs.getTimestamp(i++),
rs.getTimestamp(i++),
rs.getInt(i++),
diff --git a/src/main/resources/create-postgresql.sql b/src/main/resources/create-postgresql.sql
index 7c331fd..a0739c2 100644
--- a/src/main/resources/create-postgresql.sql
+++ b/src/main/resources/create-postgresql.sql
@@ -14,7 +14,7 @@ CREATE TABLE task (
id BIGINT NOT NULL,
parent BIGINT,
queue VARCHAR(100) NOT NULL,
- state CHAR(10) NOT NULL,
+ state VARCHAR(10) NOT NULL,
scheduled TIMESTAMP NOT NULL,
last_run TIMESTAMP,
run_count INT NOT NULL,
diff --git a/src/test/java/io/trygvis/test/PlainJavaExample.java b/src/test/java/io/trygvis/test/PlainJavaExample.java
index 788d8a0..cad8559 100644
--- a/src/test/java/io/trygvis/test/PlainJavaExample.java
+++ b/src/test/java/io/trygvis/test/PlainJavaExample.java
@@ -5,8 +5,10 @@ import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
import io.trygvis.queue.QueueService;
+import io.trygvis.queue.QueueStats;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskDao;
import io.trygvis.queue.TaskEffect;
import javax.sql.DataSource;
@@ -14,8 +16,10 @@ import java.sql.Connection;
import java.sql.SQLException;
import java.util.Date;
import java.util.List;
+import java.util.Map;
import java.util.Random;
+import static io.trygvis.queue.Task.TaskState;
import static io.trygvis.test.DbUtil.createDataSource;
import static java.lang.System.currentTimeMillis;
import static java.util.Arrays.asList;
@@ -37,32 +41,41 @@ public class PlainJavaExample {
SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
- QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
final JdbcQueueService queueService = queueSystem.queueService;
Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() {
@Override
public Queue[] doInConnection(Connection c) throws SQLException {
- return new Queue[]{
+ Queue[] queues = {
queueService.lookupQueue(c, inputName, interval, true),
queueService.lookupQueue(c, outputName, interval, true)};
+
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+
+ QueueStats stats = taskDao.findQueueStatsByName(inputName);
+ System.out.println("Queue stats for " + stats.name + ". Total number of tasks: " + stats.totalTaskCount);
+ for (Map.Entry<TaskState, Long> entry : stats.states.entrySet()) {
+ System.out.println(entry.getKey() + " = " + entry.getValue());
+ }
+
+ return queues;
}
});
final Queue input = queues[0];
final Queue output = queues[1];
- QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(false);
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false);
queueService.consumeAll(input, req, new TaskEffect() {
public List<Task> apply(Task task) throws Exception {
- System.out.println("PlainJavaExample$Consumer.consumeAll: arguments = " + task.arguments);
Long a = Long.valueOf(task.arguments.get(0));
Long b = Long.valueOf(task.arguments.get(1));
System.out.println("a + b = " + a + " + " + b + " = " + (a + b));
- if (r.nextInt(3) == 0) {
+ if (r.nextInt(3000) > 0) {
return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b)));
}
@@ -77,18 +90,21 @@ public class PlainJavaExample {
public static void main(String[] args) throws Exception {
System.out.println("Starting producer");
- int chunks = 10;
- final int chunk = 2000;
+ int chunks = 100;
+ final int chunk = 10000;
DataSource ds = createDataSource();
- Connection c = ds.getConnection();
SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
final JdbcQueueService queueService = queueSystem.queueService;
- final Queue queue = queueService.lookupQueue(c, inputName, interval, true);
+ final Queue queue;
+ try (Connection c = ds.getConnection()) {
+ queue = queueService.lookupQueue(c, inputName, interval, true);
+ c.commit();
+ }
for (int i = 0; i < chunks; i++) {
long start = currentTimeMillis();
@@ -105,8 +121,6 @@ public class PlainJavaExample {
long time = end - start;
System.out.println("Scheduled " + chunk + " tasks in " + time + "ms, " + (((double) chunk * 1000)) / ((double) time) + " chunks per second");
}
-
- c.commit();
}
}
}