aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTrygve Laugstøl <trygvis@inamo.no>2013-06-16 12:07:43 +0200
committerTrygve Laugstøl <trygvis@inamo.no>2013-06-16 12:07:43 +0200
commit1ec4fae12c5e5363591013e5a759590d913d6782 (patch)
treebac64a4e0e86a3227ca21591b8efc34aee6223da
parent54d7b2ce520e57cc0ffb9582546b80a32fa00682 (diff)
downloadquartz-based-queue-1ec4fae12c5e5363591013e5a759590d913d6782.tar.gz
quartz-based-queue-1ec4fae12c5e5363591013e5a759590d913d6782.tar.bz2
quartz-based-queue-1ec4fae12c5e5363591013e5a759590d913d6782.tar.xz
quartz-based-queue-1ec4fae12c5e5363591013e5a759590d913d6782.zip
wip
-rwxr-xr-xsrc/main/java/io/trygvis/async/AsyncService.java12
-rw-r--r--src/main/java/io/trygvis/async/JdbcAsyncService.java61
-rw-r--r--src/main/java/io/trygvis/async/QueueController.java135
-rw-r--r--src/main/java/io/trygvis/async/QueueStats.java15
-rw-r--r--src/main/java/io/trygvis/async/QueueThread.java149
-rw-r--r--src/main/java/io/trygvis/queue/JdbcQueueService.java131
-rw-r--r--src/main/java/io/trygvis/queue/QueueExecutor.java184
-rw-r--r--src/main/java/io/trygvis/queue/QueueService.java4
-rw-r--r--src/main/java/io/trygvis/queue/QueueSystem.java2
-rw-r--r--src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java43
-rw-r--r--src/main/java/io/trygvis/spring/SpringQueueService.java19
-rwxr-xr-xsrc/test/java/io/trygvis/test/Main.java4
-rw-r--r--src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java52
-rw-r--r--src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java24
-rw-r--r--src/test/java/io/trygvis/test/spring/PlainSpringTest.java15
15 files changed, 467 insertions, 383 deletions
diff --git a/src/main/java/io/trygvis/async/AsyncService.java b/src/main/java/io/trygvis/async/AsyncService.java
index daf99e4..9332596 100755
--- a/src/main/java/io/trygvis/async/AsyncService.java
+++ b/src/main/java/io/trygvis/async/AsyncService.java
@@ -1,9 +1,13 @@
package io.trygvis.async;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
+import java.sql.SQLException;
+import java.util.Date;
import java.util.List;
/**
@@ -11,13 +15,13 @@ import java.util.List;
*/
public interface AsyncService {
- void registerQueue(Queue queue, TaskEffect processor);
+ QueueController registerQueue(Queue queue, QueueService.TaskExecutionRequest req, TaskEffect processor) throws SQLException;
- Queue getQueue(String name);
+ QueueExecutor getQueue(String name);
- Task schedule(Queue queue, List<String> args);
+ Task schedule(Queue queue, Date scheduled, List<String> args);
- Task schedule(long parent, Queue queue, List<String> args);
+ Task schedule(Queue queue, long parent, Date scheduled, List<String> args);
/**
* Polls for a new state of the execution.
diff --git a/src/main/java/io/trygvis/async/JdbcAsyncService.java b/src/main/java/io/trygvis/async/JdbcAsyncService.java
index ddfa150..fd4b38b 100644
--- a/src/main/java/io/trygvis/async/JdbcAsyncService.java
+++ b/src/main/java/io/trygvis/async/JdbcAsyncService.java
@@ -1,7 +1,8 @@
package io.trygvis.async;
import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskDao;
@@ -11,11 +12,8 @@ import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.Date;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import static java.lang.System.currentTimeMillis;
import static java.lang.Thread.sleep;
@@ -23,7 +21,7 @@ import static java.lang.Thread.sleep;
public class JdbcAsyncService {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final Map<String, QueueThread> queues = new HashMap<>();
+ private final Map<String, QueueController> queues = new HashMap<>();
private final QueueSystem queueSystem;
private final JdbcQueueService queueService;
@@ -33,49 +31,22 @@ public class JdbcAsyncService {
this.queueService = queueSystem.createQueueService();
}
- public synchronized void registerQueue(Queue queue, TaskEffect processor) {
- final QueueThread queueThread = new QueueThread(queueSystem, processor, queue);
-
- queues.put(queue.name, queueThread);
-
- log.info("registerQueue: LEAVE");
- }
-
- public synchronized void startQueue(Queue queue, ScheduledThreadPoolExecutor executor) {
- getQueueThread(queue.name).start(executor);
- }
-
- public synchronized void stopQueue(Queue queue) {
- QueueThread queueThread = queues.remove(queue.name);
-
- if (queueThread == null) {
- throw new RuntimeException("No such queue: '" + queue.name + "'.");
+ public synchronized QueueController registerQueue(QueueExecutor queue, QueueService.TaskExecutionRequest req, TaskEffect processor) {
+ if (queues.containsKey(queue.queue.name)) {
+ throw new IllegalArgumentException("Queue already exist.");
}
- queueThread.stop();
- }
-
- public Queue getQueue(String name) {
- QueueThread queueThread = getQueueThread(name);
+ QueueController queueController = new QueueController(queueSystem, req, processor, queue);
- return queueThread.queue;
- }
+ queues.put(queue.queue.name, queueController);
- public Task schedule(Connection c, final Queue queue, List<String> args) throws SQLException {
- return scheduleInner(c, null, queue, args);
- }
+ log.info("registerQueue: LEAVE");
- public Task schedule(Connection c, long parent, Queue queue, List<String> args) throws SQLException {
- return scheduleInner(c, parent, queue, args);
+ return queueController;
}
- private Task scheduleInner(Connection c, Long parent, final Queue queue, List<String> args) throws SQLException {
- Date scheduled = new Date();
-
- Task task = queueService.schedule(c, queue, parent, scheduled, args);
- log.info("Created task = {}", task);
-
- return task;
+ public QueueExecutor getQueue(String name) {
+ return getQueueThread(name).queue;
}
public Task await(Connection c, Task task, long timeout) throws SQLException {
@@ -105,12 +76,12 @@ public class JdbcAsyncService {
return taskDao.findById(ref.id());
}
- private QueueThread getQueueThread(String name) {
- QueueThread queueThread = queues.get(name);
+ private QueueController getQueueThread(String name) {
+ QueueController queueController = queues.get(name);
- if (queueThread == null) {
+ if (queueController == null) {
throw new RuntimeException("No such queue: '" + name + "'.");
}
- return queueThread;
+ return queueController;
}
}
diff --git a/src/main/java/io/trygvis/async/QueueController.java b/src/main/java/io/trygvis/async/QueueController.java
new file mode 100644
index 0000000..ea1c42d
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueController.java
@@ -0,0 +1,135 @@
+package io.trygvis.async;
+
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueSystem;
+import io.trygvis.queue.Task;
+import io.trygvis.queue.TaskEffect;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static io.trygvis.queue.QueueService.TaskExecutionRequest;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
+public class QueueController {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ private final TaskEffect taskEffect;
+
+ private final TaskExecutionRequest req;
+
+ public final QueueExecutor queue;
+
+ private boolean shouldRun = true;
+
+ private boolean checkForNewTasks;
+
+ private boolean running;
+
+ private Thread thread;
+
+ private ScheduledThreadPoolExecutor executor;
+
+ public QueueController(QueueSystem queueSystem, TaskExecutionRequest req, TaskEffect taskEffect, QueueExecutor queue) {
+ this.queueSystem = queueSystem;
+ this.req = req;
+ this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
+ this.taskEffect = taskEffect;
+ this.queue = queue;
+ }
+
+ private class QueueThread implements Runnable {
+ public void run() {
+ while (shouldRun) {
+ List<Task> tasks = null;
+
+ try {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Found {} tasks on queue {}", tasks.size(), queue.queue.name);
+
+ if (tasks.size() > 0) {
+ queue.executeTasks(req, taskEffect, tasks, executor);
+ }
+ } catch (Throwable e) {
+ if (shouldRun) {
+ log.warn("Error while executing tasks.", e);
+ }
+ }
+
+ // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
+ if (tasks != null && tasks.size() == req.chunkSize) {
+ continue;
+ }
+
+ synchronized (this) {
+ if (checkForNewTasks) {
+ log.info("Ping received!");
+ checkForNewTasks = false;
+ } else {
+ try {
+ wait(queue.queue.interval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ log.info("Thread for queue {} has stopped.", queue.queue.name);
+ running = false;
+ synchronized (this) {
+ this.notifyAll();
+ }
+ }
+ }
+
+ public synchronized void start(ScheduledThreadPoolExecutor executor) {
+ if (running) {
+ throw new IllegalStateException("Already running");
+ }
+
+ log.info("Starting thread for queue {} with poll interval = {}s", queue.queue.name, queue.queue.interval);
+
+ running = true;
+ this.executor = executor;
+
+ thread = new Thread(new QueueThread(), "queue: " + queue.queue.name);
+ thread.setDaemon(true);
+ thread.start();
+ }
+
+ public synchronized void stop() {
+ if (!running) {
+ return;
+ }
+
+ log.info("Stopping thread for queue {}", queue.queue.name);
+
+ shouldRun = false;
+
+ thread.interrupt();
+ while (running) {
+ try {
+ wait(1000);
+ } catch (InterruptedException e) {
+ // continue
+ }
+ thread.interrupt();
+ }
+ thread = null;
+ executor.shutdownNow();
+ }
+}
diff --git a/src/main/java/io/trygvis/async/QueueStats.java b/src/main/java/io/trygvis/async/QueueStats.java
new file mode 100644
index 0000000..7c75149
--- /dev/null
+++ b/src/main/java/io/trygvis/async/QueueStats.java
@@ -0,0 +1,15 @@
+package io.trygvis.async;
+
+public class QueueStats {
+ public final int totalMessageCount;
+ public final int okMessageCount;
+ public final int failedMessageCount;
+ public final int scheduledMessageCount;
+
+ public QueueStats(int totalMessageCount, int okMessageCount, int failedMessageCount, int scheduledMessageCount) {
+ this.totalMessageCount = totalMessageCount;
+ this.okMessageCount = okMessageCount;
+ this.failedMessageCount = failedMessageCount;
+ this.scheduledMessageCount = scheduledMessageCount;
+ }
+}
diff --git a/src/main/java/io/trygvis/async/QueueThread.java b/src/main/java/io/trygvis/async/QueueThread.java
deleted file mode 100644
index 61196b6..0000000
--- a/src/main/java/io/trygvis/async/QueueThread.java
+++ /dev/null
@@ -1,149 +0,0 @@
-package io.trygvis.async;
-
-import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
-import io.trygvis.queue.QueueSystem;
-import io.trygvis.queue.Task;
-import io.trygvis.queue.TaskEffect;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.List;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-
-import static io.trygvis.queue.QueueService.TaskExecutionRequest;
-import static io.trygvis.queue.Task.TaskState.NEW;
-
-class QueueThread implements Runnable {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final QueueSystem queueSystem;
-
- private final JdbcQueueService queueService;
-
- private final SqlEffectExecutor sqlEffectExecutor;
-
- private final TaskEffect taskEffect;
-
- public final Queue queue;
-
- private boolean shouldRun = true;
-
- private boolean checkForNewTasks;
-
- private boolean available;
-
- private boolean running;
-
- private Thread thread;
-
- QueueThread(QueueSystem queueSystem, TaskEffect taskEffect, Queue queue) {
- this.queueSystem = queueSystem;
- this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
- this.queueService = queueSystem.createQueueService();
- this.taskEffect = taskEffect;
- this.queue = queue;
- }
-
- public void ping() {
- synchronized (this) {
- System.out.println("QueueThread.ping: available=" + available + ", checkForNewTasks=" + checkForNewTasks);
- if (available) {
- log.info("Sending ping to " + queue);
- notifyAll();
- } else {
- checkForNewTasks = true;
- }
- }
- }
-
- public void run() {
- while (shouldRun) {
- try {
- TaskExecutionRequest req = new TaskExecutionRequest(100, true);
-
- List<Task> tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
- @Override
- public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, 100);
- }
- });
-
- log.info("Found {} tasks on queue {}", tasks.size(), queue.name);
-
- if (tasks.size() > 0) {
- queueService.executeTask(req, taskEffect, tasks);
- }
-
- // If we found exactly the same number of tasks that we asked for, there is most likely more to go.
- if (tasks.size() == req.chunkSize) {
- continue;
- }
- } catch (Throwable e) {
- if (shouldRun) {
- log.warn("Error while executing tasks.", e);
- }
- }
-
- synchronized (this) {
- available = true;
-
- if (checkForNewTasks) {
- log.info("Ping received!");
- checkForNewTasks = false;
- } else {
- try {
- wait(queue.interval);
- } catch (InterruptedException e) {
- // ignore
- }
- }
-
- available = false;
- }
- }
-
- log.info("Thread for queue {} has stopped.", queue.name);
- running = false;
- synchronized (this) {
- this.notifyAll();
- }
- }
-
- public synchronized void start(ScheduledThreadPoolExecutor executor) {
- if (running) {
- throw new IllegalStateException("Already running");
- }
-
- log.info("Starting thread for queue {} with poll interval = {}s", queue.name, queue.interval);
-
- running = true;
-
- thread = new Thread(this, queue.name);
- thread.setDaemon(true);
- thread.start();
- }
-
- public synchronized void stop() {
- if (!running) {
- return;
- }
-
- log.info("Stopping thread for queue {}", queue.name);
-
- shouldRun = false;
-
- thread.interrupt();
- while (running) {
- try {
- wait(1000);
- } catch (InterruptedException e) {
- // continue
- }
- thread.interrupt();
- }
- thread = null;
- }
-}
diff --git a/src/main/java/io/trygvis/queue/JdbcQueueService.java b/src/main/java/io/trygvis/queue/JdbcQueueService.java
index cb7af4b..a366838 100644
--- a/src/main/java/io/trygvis/queue/JdbcQueueService.java
+++ b/src/main/java/io/trygvis/queue/JdbcQueueService.java
@@ -1,121 +1,42 @@
package io.trygvis.queue;
-import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
-import java.util.Date;
-import java.util.List;
-
-import static io.trygvis.queue.QueueService.TaskExecutionRequest;
-import static io.trygvis.queue.Task.TaskState.NEW;
+import java.util.HashMap;
+import java.util.Map;
public class JdbcQueueService {
- private final Logger log = LoggerFactory.getLogger(getClass());
-
private final QueueSystem queueSystem;
private final SqlEffectExecutor sqlEffectExecutor;
+ private final Map<String, QueueExecutor> queues = new HashMap<>();
+
JdbcQueueService(QueueSystem queueSystem) {
this.queueSystem = queueSystem;
this.sqlEffectExecutor = queueSystem.sqlEffectExecutor;
}
- public void consumeAll(final Queue queue, final TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
- log.info("Consuming tasks: request={}", req);
-
- List<Task> tasks;
- do {
- tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
- @Override
- public List<Task> doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize);
- }
- });
-
- log.info("Consuming chunk with {} tasks", tasks.size());
-
- applyTasks(req, effect, tasks);
- } while (tasks.size() > 0);
- }
-
- public void executeTask(TaskExecutionRequest req, TaskEffect taskEffect, List<Task> tasks) throws SQLException {
- applyTasks(req, taskEffect, tasks);
- }
+ public synchronized QueueExecutor getQueue(String name) {
+ QueueExecutor queueExecutor = queues.get(name);
- /**
- * Tries to execute all the tasks on the connection. If it fails, it will execute an SQL effect.
- */
- private void applyTasks(TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) throws SQLException {
- for (Task task : tasks) {
- boolean ok = applyTask(effect, task);
-
- if (!ok && req.stopOnError) {
- throw new RuntimeException("Error while executing task, id=" + task.id());
- }
+ if (queueExecutor != null) {
+ return queueExecutor;
}
- }
-
- private boolean applyTask(TaskEffect effect, final Task task) throws SQLException {
- try {
- final Date run = new Date();
- Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
- @Override
- public Integer doInConnection(Connection c) throws SQLException {
- return queueSystem.createTaskDao(c).update(task.markProcessing());
- }
- });
-
- if (count == 1) {
- log.info("Executing task {}", task.id());
- } else {
- log.trace("Missed task {}", task.id());
- }
-
- final List<Task> newTasks = effect.apply(task);
-
- final Date now = new Date();
-
- log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection c) throws SQLException {
- for (Task newTask : newTasks) {
- schedule(c, newTask);
- }
-
- queueSystem.createTaskDao(c).update(task.markOk(now));
- }
- });
-
- return true;
- } catch (Exception e) {
- final Date now = new Date();
- log.error("Unable to execute task, id=" + task.id(), e);
-
- sqlEffectExecutor.transaction(new SqlEffect.Void() {
- @Override
- public void doInConnection(Connection c) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
- taskDao.update(task.markFailed(now));
- }
- });
+ throw new IllegalArgumentException("No such queue: " + name);
+ }
- if (e instanceof SQLException) {
- throw ((SQLException) e);
- }
+ public synchronized QueueExecutor lookupQueue(Connection c, String name, long interval, boolean autoCreate) throws SQLException {
+ QueueExecutor queueExecutor = queues.get(name);
- return false;
+ if (queueExecutor != null) {
+ return queueExecutor;
}
- }
- public Queue lookupQueue(Connection c, String name, int interval, boolean autoCreate) throws SQLException {
QueueDao queueDao = queueSystem.createQueueDao(c);
Queue q = queueDao.findByName(name);
@@ -129,26 +50,8 @@ public class JdbcQueueService {
queueDao.insert(q);
}
- return q;
- }
-
- public void schedule(Connection c, Task task) throws SQLException {
- schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
- }
-
- public Task schedule(Connection c, Queue queue, Date scheduled, List<String> arguments) throws SQLException {
- return schedule(c, queue.name, null, scheduled, arguments);
- }
-
- public Task schedule(Connection c, Queue queue, long parent, Date scheduled, List<String> arguments) throws SQLException {
- return schedule(c, queue.name, parent, scheduled, arguments);
- }
-
- private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
- TaskDao taskDao = queueSystem.createTaskDao(c);
-
- long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
-
- return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
+ queueExecutor = new QueueExecutor(queueSystem, sqlEffectExecutor, q);
+ queues.put(name, queueExecutor);
+ return queueExecutor;
}
}
diff --git a/src/main/java/io/trygvis/queue/QueueExecutor.java b/src/main/java/io/trygvis/queue/QueueExecutor.java
new file mode 100644
index 0000000..a1eb3b7
--- /dev/null
+++ b/src/main/java/io/trygvis/queue/QueueExecutor.java
@@ -0,0 +1,184 @@
+package io.trygvis.queue;
+
+import io.trygvis.async.QueueStats;
+import io.trygvis.async.SqlEffect;
+import io.trygvis.async.SqlEffectExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.FAILED;
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.MISSED;
+import static io.trygvis.queue.QueueExecutor.TaskExecutionResult.OK;
+import static io.trygvis.queue.Task.TaskState.NEW;
+
+public class QueueExecutor {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private final QueueSystem queueSystem;
+
+ private final SqlEffectExecutor sqlEffectExecutor;
+
+ public final Queue queue;
+
+ private final Stats stats = new Stats();
+
+ public enum TaskExecutionResult {
+ OK,
+ FAILED,
+ MISSED
+ }
+
+ public QueueExecutor(QueueSystem queueSystem, SqlEffectExecutor sqlEffectExecutor, Queue queue) {
+ this.queueSystem = queueSystem;
+ this.sqlEffectExecutor = sqlEffectExecutor;
+ this.queue = queue;
+ }
+
+ private static class Stats {
+ public int total;
+ public int ok;
+ public int failed;
+ public int scheduled;
+
+ public QueueStats toStats() {
+ return new QueueStats(total, ok, failed, scheduled);
+ }
+ }
+
+ public void consumeAll(final QueueService.TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
+ log.info("Consuming tasks: request={}", req);
+
+ List<Task> tasks;
+ do {
+ tasks = sqlEffectExecutor.transaction(new SqlEffect<List<Task>>() {
+ @Override
+ public List<Task> doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).findByQueueAndState(queue.name, NEW, req.chunkSize);
+ }
+ });
+
+ log.info("Consuming chunk with {} tasks", tasks.size());
+
+ applyTasks(req, effect, tasks);
+ } while (tasks.size() > 0);
+ }
+
+ public void executeTasks(final QueueService.TaskExecutionRequest req, final TaskEffect taskEffect, final List<Task> tasks,
+ ScheduledThreadPoolExecutor executor) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ applyTasks(req, taskEffect, tasks);
+ }
+ });
+ }
+
+ private void applyTasks(QueueService.TaskExecutionRequest req, TaskEffect effect, List<Task> tasks) {
+ for (Task task : tasks) {
+ TaskExecutionResult result = applyTask(effect, task);
+
+ if (result == FAILED && req.stopOnError) {
+ throw new RuntimeException("Error while executing task, id=" + task.id());
+ }
+ }
+ }
+
+ /**
+ * Executed each task in its own transaction.
+ * <p/>
+ * If the task fails, the status is set to error in a separate transaction.
+ */
+ private TaskExecutionResult applyTask(TaskEffect effect, final Task task) {
+ try {
+ Integer count = sqlEffectExecutor.transaction(new SqlEffect<Integer>() {
+ @Override
+ public Integer doInConnection(Connection c) throws SQLException {
+ return queueSystem.createTaskDao(c).update(task.markProcessing());
+ }
+ });
+
+ if (count == 0) {
+ log.trace("Missed task {}", task.id());
+ return MISSED;
+ }
+
+ log.info("Executing task {}", task.id());
+
+ final List<Task> newTasks = effect.apply(task);
+
+ final Date now = new Date();
+
+ log.info("Executed task {} at {}, newTasks: {}", task.id(), now, newTasks.size());
+
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ for (Task newTask : newTasks) {
+ schedule(c, newTask);
+ }
+
+ queueSystem.createTaskDao(c).update(task.markOk(now));
+ }
+ });
+
+ synchronized (stats) {
+ stats.total++;
+ stats.ok++;
+ }
+
+ return OK;
+ } catch (Exception e) {
+ final Date now = new Date();
+ log.error("Unable to execute task, id=" + task.id(), e);
+
+ synchronized (stats) {
+ stats.total++;
+ stats.failed++;
+ }
+
+ try {
+ sqlEffectExecutor.transaction(new SqlEffect.Void() {
+ @Override
+ public void doInConnection(Connection c) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+ taskDao.update(task.markFailed(now));
+ }
+ });
+ } catch (SQLException e1) {
+ log.error("Error while marking task as failed.", e1);
+ }
+
+ return FAILED;
+ }
+ }
+
+ public void schedule(Connection c, Task task) throws SQLException {
+ schedule(c, task.queue, task.parent, task.scheduled, task.arguments);
+ }
+
+ public Task schedule(Connection c, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, null, scheduled, arguments);
+ }
+
+ public Task schedule(Connection c, long parent, Date scheduled, List<String> arguments) throws SQLException {
+ return schedule(c, queue.name, parent, scheduled, arguments);
+ }
+
+ private Task schedule(Connection c, String queue, Long parent, Date scheduled, List<String> arguments) throws SQLException {
+ TaskDao taskDao = queueSystem.createTaskDao(c);
+
+ long id = taskDao.insert(parent, queue, NEW, scheduled, arguments);
+
+ synchronized (stats) {
+ stats.scheduled++;
+ }
+
+ return new Task(id, parent, queue, NEW, scheduled, null, 0, null, arguments);
+ }
+}
diff --git a/src/main/java/io/trygvis/queue/QueueService.java b/src/main/java/io/trygvis/queue/QueueService.java
index d97eaf0..eee14ed 100644
--- a/src/main/java/io/trygvis/queue/QueueService.java
+++ b/src/main/java/io/trygvis/queue/QueueService.java
@@ -5,9 +5,7 @@ import java.util.Date;
import java.util.List;
public interface QueueService {
- void consume(Queue queue, TaskExecutionRequest req, TaskEffect effect) throws SQLException;
-
- Queue getQueue(String name, int interval, boolean autoCreate) throws SQLException;
+ QueueExecutor getQueue(String name, int interval, boolean autoCreate) throws SQLException;
void schedule(Queue queue, Date scheduled, List<String> arguments) throws SQLException;
diff --git a/src/main/java/io/trygvis/queue/QueueSystem.java b/src/main/java/io/trygvis/queue/QueueSystem.java
index 3b0c018..6710bf4 100644
--- a/src/main/java/io/trygvis/queue/QueueSystem.java
+++ b/src/main/java/io/trygvis/queue/QueueSystem.java
@@ -15,7 +15,7 @@ public class QueueSystem {
public final SqlEffectExecutor sqlEffectExecutor;
- public final JdbcQueueService queueService;
+ private final JdbcQueueService queueService;
private QueueSystem(SqlEffectExecutor sqlEffectExecutor) throws SQLException {
sqlEffectExecutor.transaction(new SqlEffect.Void() {
diff --git a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
index b27e94d..a1c9cda 100644
--- a/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
+++ b/src/main/java/io/trygvis/spring/SpringJdbcAsyncService.java
@@ -2,8 +2,12 @@ package io.trygvis.spring;
import io.trygvis.async.AsyncService;
import io.trygvis.async.JdbcAsyncService;
-import io.trygvis.async.SqlEffectExecutor;
+import io.trygvis.async.QueueController;
+import io.trygvis.async.SqlEffect;
+import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -17,6 +21,7 @@ import org.springframework.transaction.support.TransactionSynchronizationAdapter
import java.sql.Connection;
import java.sql.SQLException;
+import java.util.Date;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -27,52 +32,68 @@ import static org.springframework.transaction.support.TransactionSynchronization
public class SpringJdbcAsyncService implements AsyncService {
private final Logger log = LoggerFactory.getLogger(getClass());
- private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
+ private final ScheduledThreadPoolExecutor executor;
private final JdbcTemplate jdbcTemplate;
private final JdbcAsyncService jdbcAsyncService;
+ private final JdbcQueueService queueService;
+
+ private final QueueSystem queueSystem;
+
public SpringJdbcAsyncService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
+ this.queueSystem = queueSystem;
this.jdbcTemplate = jdbcTemplate;
jdbcAsyncService = new JdbcAsyncService(queueSystem);
+ queueService = queueSystem.createQueueService();
+ executor = new ScheduledThreadPoolExecutor(10, Executors.defaultThreadFactory());
}
@Transactional(propagation = REQUIRED)
- public void registerQueue(final Queue queue, final TaskEffect processor) {
- jdbcAsyncService.registerQueue(queue, processor);
+ public QueueController registerQueue(final Queue queue, final QueueService.TaskExecutionRequest req, final TaskEffect processor) throws SQLException {
+ QueueExecutor queueExecutor = queueSystem.sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor>() {
+ @Override
+ public QueueExecutor doInConnection(Connection c) throws SQLException {
+ return queueService.lookupQueue(c, queue.name, queue.interval, true);
+ }
+ });
+
+ final QueueController queueController = jdbcAsyncService.registerQueue(queueExecutor, req, processor);
registerSynchronization(new TransactionSynchronizationAdapter() {
public void afterCompletion(int status) {
log.info("Transaction completed with status = {}", status);
if (status == TransactionSynchronization.STATUS_COMMITTED) {
- jdbcAsyncService.startQueue(queue, executor);
+ queueController.start(executor);
}
}
});
- log.info("registerQueue: LEAVE");
+ return queueController;
}
- public Queue getQueue(String name) {
+ public QueueExecutor getQueue(String name) {
return jdbcAsyncService.getQueue(name);
}
@Transactional(propagation = REQUIRED)
- public Task schedule(final Queue queue, final List<String> args) {
+ public Task schedule(final Queue queue, final Date scheduled, final List<String> args) {
return jdbcTemplate.execute(new ConnectionCallback<Task>() {
@Override
public Task doInConnection(Connection c) throws SQLException {
- return jdbcAsyncService.schedule(c, queue, args);
+ QueueExecutor queueExecutor = queueService.getQueue(queue.name);
+ return queueExecutor.schedule(c, scheduled, args);
}
});
}
- public Task schedule(final long parent, final Queue queue, final List<String> args) {
+ public Task schedule(final Queue queue, final long parent, final Date scheduled, final List<String> args) {
return jdbcTemplate.execute(new ConnectionCallback<Task>() {
@Override
public Task doInConnection(Connection c) throws SQLException {
- return jdbcAsyncService.schedule(c, parent, queue, args);
+ QueueExecutor queueExecutor = queueService.getQueue(queue.name);
+ return queueExecutor.schedule(c, parent, scheduled, args);
}
});
}
diff --git a/src/main/java/io/trygvis/spring/SpringQueueService.java b/src/main/java/io/trygvis/spring/SpringQueueService.java
index 271e9bf..2027ab5 100644
--- a/src/main/java/io/trygvis/spring/SpringQueueService.java
+++ b/src/main/java/io/trygvis/spring/SpringQueueService.java
@@ -2,9 +2,9 @@ package io.trygvis.spring;
import io.trygvis.queue.JdbcQueueService;
import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
-import io.trygvis.queue.TaskEffect;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.ConnectionCallback;
import org.springframework.jdbc.core.JdbcTemplate;
@@ -23,21 +23,14 @@ public class SpringQueueService implements QueueService {
public SpringQueueService(QueueSystem queueSystem, JdbcTemplate jdbcTemplate) {
this.jdbcTemplate = jdbcTemplate;
- this.queueService = queueSystem.queueService;
- }
-
- /**
- * @see JdbcQueueService#consumeAll(io.trygvis.queue.Queue, io.trygvis.queue.QueueService.TaskExecutionRequest, io.trygvis.queue.TaskEffect)
- */
- public void consume(final Queue queue, TaskExecutionRequest req, final TaskEffect effect) throws SQLException {
- queueService.consumeAll(queue, req, effect);
+ this.queueService = queueSystem.createQueueService();
}
@Transactional
- public Queue getQueue(final String name, final int interval, final boolean autoCreate) throws SQLException {
- return jdbcTemplate.execute(new ConnectionCallback<Queue>() {
+ public QueueExecutor getQueue(final String name, final int interval, final boolean autoCreate) throws SQLException {
+ return jdbcTemplate.execute(new ConnectionCallback<QueueExecutor>() {
@Override
- public Queue doInConnection(Connection c) throws SQLException, DataAccessException {
+ public QueueExecutor doInConnection(Connection c) throws SQLException, DataAccessException {
return queueService.lookupQueue(c, name, interval, autoCreate);
}
});
@@ -48,7 +41,7 @@ public class SpringQueueService implements QueueService {
jdbcTemplate.execute(new ConnectionCallback<Object>() {
@Override
public Object doInConnection(Connection c) throws SQLException, DataAccessException {
- queueService.schedule(c, queue, scheduled, arguments);
+ queueService.getQueue(queue.name).schedule(c, scheduled, arguments);
return null;
}
});
diff --git a/src/test/java/io/trygvis/test/Main.java b/src/test/java/io/trygvis/test/Main.java
index 0721ec9..f03d6fa 100755
--- a/src/test/java/io/trygvis/test/Main.java
+++ b/src/test/java/io/trygvis/test/Main.java
@@ -74,7 +74,9 @@ public class Main {
final Queue q = null; // queueService.lookupQueue(c, "create-article", 1);
- asyncService.registerQueue(q, createArticleCallable);
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+
+ asyncService.registerQueue(q, req, createArticleCallable);
// log.info("queue registered: ref = {}", q);
// asyncService.registerQueue("update-queue", 1, updateArticleCallable);
diff --git a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
index 8d981f2..dd478d7 100644
--- a/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/AsyncConsumerExample.java
@@ -1,10 +1,12 @@
package io.trygvis.test.jdbc;
import io.trygvis.async.JdbcAsyncService;
+import io.trygvis.async.QueueController;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
+import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueSystem;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -38,36 +40,36 @@ public class AsyncConsumerExample {
}
};
- public static class Consumer {
- public static void main(String[] args) throws Exception {
- System.out.println("Starting consumer");
+ public static void main(String[] args) throws Exception {
+ System.out.println("Starting consumer");
- DataSource ds = createDataSource();
+ DataSource ds = createDataSource();
- SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
+ SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
- QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- JdbcAsyncService asyncService = queueSystem.createAsyncService();
- final JdbcQueueService queueService = queueSystem.queueService;
+ QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
+ JdbcAsyncService asyncService = queueSystem.createAsyncService();
+ final JdbcQueueService queueService = queueSystem.createQueueService();
- Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() {
- @Override
- public Queue[] doInConnection(Connection c) throws SQLException {
- return new Queue[]{
- queueService.lookupQueue(c, inputName, interval, true),
- queueService.lookupQueue(c, outputName, interval, true)
- };
- }
- });
+ QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() {
+ @Override
+ public QueueExecutor[] doInConnection(Connection c) throws SQLException {
+ return new QueueExecutor[]{
+ queueService.lookupQueue(c, inputName, interval, true),
+ queueService.lookupQueue(c, outputName, interval, true)
+ };
+ }
+ });
- final Queue input = queues[0];
- final Queue output = queues[1];
+ final QueueExecutor input = queues[0];
+ final QueueExecutor output = queues[1];
- asyncService.registerQueue(input, adder);
+ QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
- asyncService.startQueue(input, new ScheduledThreadPoolExecutor(2));
- Thread.sleep(5 * 1000);
- asyncService.stopQueue(input);
- }
+ QueueController controller = asyncService.registerQueue(input, req, adder);
+
+ controller.start(new ScheduledThreadPoolExecutor(2));
+ Thread.sleep(60 * 1000);
+ controller.stop();
}
}
diff --git a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
index 994c310..0e11ab3 100644
--- a/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
+++ b/src/test/java/io/trygvis/test/jdbc/PlainJavaExample.java
@@ -3,7 +3,7 @@ package io.trygvis.test.jdbc;
import io.trygvis.async.SqlEffect;
import io.trygvis.async.SqlEffectExecutor;
import io.trygvis.queue.JdbcQueueService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
import io.trygvis.queue.QueueService;
import io.trygvis.queue.QueueStats;
import io.trygvis.queue.QueueSystem;
@@ -42,12 +42,12 @@ public class PlainJavaExample {
SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
final QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- final JdbcQueueService queueService = queueSystem.queueService;
+ final JdbcQueueService queueService = queueSystem.createQueueService();
- Queue[] queues = sqlEffectExecutor.transaction(new SqlEffect<Queue[]>() {
+ QueueExecutor[] queues = sqlEffectExecutor.transaction(new SqlEffect<QueueExecutor[]>() {
@Override
- public Queue[] doInConnection(Connection c) throws SQLException {
- Queue[] queues = {
+ public QueueExecutor[] doInConnection(Connection c) throws SQLException {
+ QueueExecutor[] queues = {
queueService.lookupQueue(c, inputName, interval, true),
queueService.lookupQueue(c, outputName, interval, true)};
@@ -63,12 +63,12 @@ public class PlainJavaExample {
}
});
- final Queue input = queues[0];
- final Queue output = queues[1];
+ final QueueExecutor input = queues[0];
+ final QueueExecutor output = queues[1];
QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(1000, false);
- queueService.consumeAll(input, req, new TaskEffect() {
+ input.consumeAll(req, new TaskEffect() {
public List<Task> apply(Task task) throws Exception {
Long a = Long.valueOf(task.arguments.get(0));
Long b = Long.valueOf(task.arguments.get(1));
@@ -76,7 +76,7 @@ public class PlainJavaExample {
System.out.println("a + b = " + a + " + " + b + " = " + (a + b));
if (r.nextInt(3000) > 0) {
- return singletonList(task.childTask(output.name, new Date(), Long.toString(a + b)));
+ return singletonList(task.childTask(output.queue.name, new Date(), Long.toString(a + b)));
}
throw new RuntimeException("Simulated exception while processing task.");
@@ -98,9 +98,9 @@ public class PlainJavaExample {
SqlEffectExecutor sqlEffectExecutor = new SqlEffectExecutor(ds);
QueueSystem queueSystem = QueueSystem.initialize(sqlEffectExecutor);
- final JdbcQueueService queueService = queueSystem.queueService;
+ final JdbcQueueService queueService = queueSystem.createQueueService();
- final Queue queue;
+ final QueueExecutor queue;
try (Connection c = ds.getConnection()) {
queue = queueService.lookupQueue(c, inputName, interval, true);
c.commit();
@@ -112,7 +112,7 @@ public class PlainJavaExample {
@Override
public void doInConnection(Connection c) throws SQLException {
for (int j = 0; j < chunk; j++) {
- queueService.schedule(c, queue, new Date(), asList("10", "20"));
+ queue.schedule(c, new Date(), asList("10", "20"));
}
}
});
diff --git a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
index d06d8d6..38d3361 100644
--- a/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
+++ b/src/test/java/io/trygvis/test/spring/PlainSpringTest.java
@@ -1,7 +1,7 @@
package io.trygvis.test.spring;
import io.trygvis.async.AsyncService;
-import io.trygvis.queue.Queue;
+import io.trygvis.queue.QueueExecutor;
import io.trygvis.queue.QueueService;
import io.trygvis.queue.Task;
import io.trygvis.queue.TaskEffect;
@@ -13,6 +13,7 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.sql.SQLException;
+import java.util.Date;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
@@ -33,6 +34,8 @@ public class PlainSpringTest {
@Autowired
private QueueService queueService;
+ private final QueueService.TaskExecutionRequest req = new QueueService.TaskExecutionRequest(100, true);
+
static {
String username = getProperty("user.name");
setProperty("database.url", getProperty("jdbc.url", "jdbc:postgresql://localhost/" + username));
@@ -42,9 +45,9 @@ public class PlainSpringTest {
@Test
public void testBasic() throws SQLException, InterruptedException {
- Queue test = queueService.getQueue("test", 10, true);
+ QueueExecutor test = queueService.getQueue("test", 10, true);
final AtomicReference<List<String>> ref = new AtomicReference<>();
- asyncService.registerQueue(test, new TaskEffect() {
+ asyncService.registerQueue(test.queue, req, new TaskEffect() {
@Override
public List<Task> apply(Task task) throws Exception {
System.out.println("PlainSpringTest.run");
@@ -58,12 +61,14 @@ public class PlainSpringTest {
synchronized (ref) {
System.out.println("Scheduling task");
- asyncService.schedule(test, asList("hello", "world"));
- System.out.println("Waiting");
+ queueService.schedule(test.queue, new Date(), asList("hello", "world"));
+ System.out.println("Task scheduled, waiting");
ref.wait(1000);
+ System.out.println("Back!");
}
List<String> args = ref.get();
+ System.out.println("args = " + args);
assertNotNull(args);
assertThat(args).containsExactly("hello", "world");
}